From 01999df80c089406d5c68b1d6cd7ca62c6f99788 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Wed, 4 Sep 2024 18:16:43 +0300 Subject: [PATCH 01/13] First batch --- .github/spark_deployment/Dockerfile | 34 ++++++ .github/spark_deployment/build_and_push.sh | 20 ++++ .github/spark_deployment/docker-compose.yml | 66 +++++++++++ .github/spark_deployment/spark-defaults.conf | 44 +++++++ .github/workflows/pr_tests.yml | 112 ++++++++++++------ dbt_project.yml | 8 ++ .../.scripts/integration_test.sh | 2 +- integration_tests/dbt_project.yml | 1 + .../unit_tests/test_spend_data_actual.sql | 8 +- integration_tests/packages.yml | 4 +- macros/attribution_overview.sql | 14 +-- models/schema.yml | 2 +- models/snowplow_attribution_overview.sql | 2 +- packages.yml | 4 +- 14 files changed, 265 insertions(+), 56 deletions(-) create mode 100644 .github/spark_deployment/Dockerfile create mode 100755 .github/spark_deployment/build_and_push.sh create mode 100644 .github/spark_deployment/docker-compose.yml create mode 100644 .github/spark_deployment/spark-defaults.conf diff --git a/.github/spark_deployment/Dockerfile b/.github/spark_deployment/Dockerfile new file mode 100644 index 0000000..dab5720 --- /dev/null +++ b/.github/spark_deployment/Dockerfile @@ -0,0 +1,34 @@ +FROM openjdk:11-jre-slim + +# Set environment variables +ENV SPARK_VERSION=3.5.1 +ENV HADOOP_VERSION=3.3.4 +ENV ICEBERG_VERSION=1.4.2 +ENV AWS_SDK_VERSION=1.12.581 + +# Install necessary tools +RUN apt-get update && apt-get install -y curl wget procps rsync ssh + +# Download and install Spark +RUN wget https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz && \ + tar -xvzf spark-${SPARK_VERSION}-bin-hadoop3.tgz && \ + mv spark-${SPARK_VERSION}-bin-hadoop3 /spark && \ + rm spark-${SPARK_VERSION}-bin-hadoop3.tgz + +# Set Spark environment variables +ENV SPARK_HOME=/spark +ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin + +# Download necessary JARs +RUN mkdir -p /spark/jars && \ + wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/${ICEBERG_VERSION}/iceberg-spark-runtime-3.5_2.12-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-spark-runtime.jar && \ + wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-aws-bundle.jar && \ + wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -O /spark/jars/hadoop-aws.jar && \ + wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -O /spark/jars/aws-java-sdk-bundle.jar + +# Create directory for Spark events +RUN mkdir -p /tmp/spark-events + +WORKDIR /spark + +CMD ["bash"] \ No newline at end of file diff --git a/.github/spark_deployment/build_and_push.sh b/.github/spark_deployment/build_and_push.sh new file mode 100755 index 0000000..1be2b6d --- /dev/null +++ b/.github/spark_deployment/build_and_push.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# Set variables +DOCKER_HUB_ORG="snowplow" +IMAGE_NAME="spark-s3-iceberg" +TAG="latest" + +# Build the image +echo "Building Docker image..." +docker build --platform linux/amd64 -t $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG . + +# Log in to Docker Hub +echo "Logging in to Docker Hub..." +docker login + +# Push the image to Docker Hub +echo "Pushing image to Docker Hub..." +docker push $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG + +echo "Image successfully built and pushed to Docker Hub" \ No newline at end of file diff --git a/.github/spark_deployment/docker-compose.yml b/.github/spark_deployment/docker-compose.yml new file mode 100644 index 0000000..2e8077b --- /dev/null +++ b/.github/spark_deployment/docker-compose.yml @@ -0,0 +1,66 @@ +version: '3' + +networks: + spark-network: + driver: bridge + +services: + spark-master: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "/spark/sbin/start-master.sh -h spark-master --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.master.Master-1-*.out"] + hostname: spark-master + ports: + - '8080:8080' + - '7077:7077' + environment: + - SPARK_LOCAL_IP=spark-master + - SPARK_MASTER_HOST=spark-master + - SPARK_MASTER_PORT=7077 + - SPARK_MASTER_OPTS="-Dspark.driver.memory=2g" + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network + + spark-worker: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "sleep 10 && /spark/sbin/start-worker.sh spark://spark-master:7077 --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.worker.Worker-*.out"] + depends_on: + - spark-master + environment: + - SPARK_WORKER_CORES=2 + - SPARK_WORKER_MEMORY=4G + - SPARK_EXECUTOR_MEMORY=3G + - SPARK_LOCAL_IP=spark-worker + - SPARK_MASTER=spark://spark-master:7077 + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network + + thrift-server: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "sleep 30 && /spark/sbin/start-thriftserver.sh --master spark://spark-master:7077 --driver-memory 2g --executor-memory 3g --hiveconf hive.server2.thrift.port=10000 --hiveconf hive.server2.thrift.bind.host=0.0.0.0 --conf spark.sql.hive.thriftServer.async=true --conf spark.sql.hive.thriftServer.workerQueue.size=2000 --conf spark.sql.hive.thriftServer.maxWorkerThreads=100 --conf spark.sql.hive.thriftServer.minWorkerThreads=50 && tail -f /spark/logs/spark--org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-*.out"] + ports: + - '10000:10000' + depends_on: + - spark-master + - spark-worker + environment: + - SPARK_LOCAL_IP=thrift-server + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network \ No newline at end of file diff --git a/.github/spark_deployment/spark-defaults.conf b/.github/spark_deployment/spark-defaults.conf new file mode 100644 index 0000000..9052a05 --- /dev/null +++ b/.github/spark_deployment/spark-defaults.conf @@ -0,0 +1,44 @@ +spark.master spark://spark-master:7077 + +spark.sql.warehouse.dir s3a://dbt-spark-iceberg/github-integration-testing +spark.sql.catalog.glue org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.glue.catalog-impl org.apache.iceberg.aws.glue.GlueCatalog +spark.sql.catalog.glue.warehouse s3a://dbt-spark-iceberg/github-integration-testing +spark.sql.catalog.glue.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.defaultCatalog glue +spark.sql.catalog.glue.database dbt-spark-iceberg + +spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem +spark.hadoop.fs.s3a.access.key +spark.hadoop.fs.s3a.secret.key +spark.hadoop.fs.s3a.endpoint s3.eu-west-1.amazonaws.com +spark.hadoop.fs.s3a.path.style.access true +spark.hadoop.fs.s3a.region eu-west-1 +spark.hadoop.fs.s3a.aws.region eu-west-1 + +# Enabling AWS SDK V4 signing (required for regions launched after January 2014) +spark.hadoop.com.amazonaws.services.s3.enableV4 true +spark.hadoop.fs.s3a.aws.credentials.provider org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + +# Hive Metastore Configuration (using AWS Glue) +spark.hadoop.hive.metastore.client.factory.class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory + +# Thrift Server Configuration for better performance in concurrent environments +spark.sql.hive.thriftServer.singleSession false +spark.sql.hive.thriftServer.async true +# spark.sql.hive.thriftServer.maxWorkerThreads 100 +# spark.sql.hive.thriftServer.minWorkerThreads 50 +# spark.sql.hive.thriftServer.workerQueue.size 2000 + +# Memory and Performance Tuning +# spark.driver.memory 2g +# spark.executor.memory 3g +# spark.worker.memory 4g +spark.network.timeout 600s +spark.sql.broadcastTimeout 600s +spark.sql.adaptive.enabled true +spark.serializer org.apache.spark.serializer.KryoSerializer + +# Logging and Debugging +spark.eventLog.enabled true +spark.eventLog.dir /tmp/spark-events diff --git a/.github/workflows/pr_tests.yml b/.github/workflows/pr_tests.yml index da3e623..cbbe71f 100644 --- a/.github/workflows/pr_tests.yml +++ b/.github/workflows/pr_tests.yml @@ -2,9 +2,6 @@ name: pr_tests on: pull_request: - branches: - - main - - 'release/**' concurrency: dbt_integration_tests @@ -42,13 +39,12 @@ env: SNOWFLAKE_TEST_WAREHOUSE: ${{ secrets.SNOWFLAKE_TEST_WAREHOUSE }} # Postgres Connection - POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }} POSTGRES_TEST_USER: ${{ secrets.POSTGRES_TEST_USER }} POSTGRES_TEST_PASS: ${{ secrets.POSTGRES_TEST_PASS }} - POSTGRES_TEST_PORT: ${{ secrets.POSTGRES_TEST_PORT }} POSTGRES_TEST_DBNAME: ${{ secrets.POSTGRES_TEST_DBNAME }} + POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }} + POSTGRES_TEST_PORT: ${{ secrets.POSTGRES_TEST_PORT }} - # Databricks Connection DATABRICKS_TEST_HOST: ${{ secrets.DATABRICKS_TEST_HOST }} DATABRICKS_TEST_HTTP_PATH: ${{ secrets.DATABRICKS_TEST_HTTP_PATH }} DATABRICKS_TEST_TOKEN: ${{ secrets.DATABRICKS_TEST_TOKEN }} @@ -63,33 +59,59 @@ jobs: # Run tests from integration_tests sub dir working-directory: ./integration_tests strategy: + fail-fast: false matrix: dbt_version: ["1.*"] - warehouse: ["bigquery", "snowflake", "databricks", "redshift"] # TODO: Add RS self-hosted runner - - # services: - # postgres: - # image: postgres:latest - # env: - # POSTGRES_DB: ${{ secrets.POSTGRES_TEST_DBNAME }} - # POSTGRES_USER: ${{ secrets.POSTGRES_TEST_USER }} - # POSTGRES_PASSWORD: ${{ secrets.POSTGRES_TEST_PASS }} - # # Set health checks to wait until postgres has started - # options: >- - # --health-cmd pg_isready - # --health-interval 10s - # --health-timeout 5s - # --health-retries 5 - # ports: - # # Maps tcp port 5432 on service container to the host - # - 5432:5432 + warehouse: ["postgres", "bigquery", "snowflake", "databricks", "redshift", "spark_iceberg"] # TODO: Add RS self-hosted runner + services: + postgres: + image: postgres:latest + env: + POSTGRES_DB: ${{ secrets.POSTGRES_TEST_DBNAME }} + POSTGRES_USER: ${{ secrets.POSTGRES_TEST_USER }} + POSTGRES_PASSWORD: ${{ secrets.POSTGRES_TEST_PASS }} + # Set health checks to wait until postgres has started + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + # Maps tcp port 5432 on service container to the host + - 5432:5432 steps: - name: Check out - uses: actions/checkout@v3 + uses: actions/checkout@v4 # Remove '*' and replace '.' with '_' in DBT_VERSION & set as SCHEMA_SUFFIX. # SCHEMA_SUFFIX allows us to run multiple versions of dbt in parallel without overwriting the output tables + - name: Set SCHEMA_SUFFIX env + run: echo "SCHEMA_SUFFIX=$(echo ${DBT_VERSION%.*} | tr . _)" >> $GITHUB_ENV + env: + DBT_VERSION: ${{ matrix.dbt_version }} + - name: Configure Docker credentials + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: eu-west-1 + - name: Set warehouse variables + id: set_warehouse + run: | + WAREHOUSE_PLATFORM=$(echo ${{ matrix.warehouse }} | cut -d'_' -f1) + WAREHOUSE_SPECIFIC=$(echo ${{ matrix.warehouse }} | cut -s -d'_' -f2) + echo "WAREHOUSE_PLATFORM=${WAREHOUSE_PLATFORM}" >> $GITHUB_ENV + echo "WAREHOUSE_SPECIFIC=${WAREHOUSE_SPECIFIC}" >> $GITHUB_ENV + echo "warehouse_platform=${WAREHOUSE_PLATFORM}" >> $GITHUB_OUTPUT + echo "warehouse_specific=${WAREHOUSE_SPECIFIC}" >> $GITHUB_OUTPUT + # Remove '*' and replace '.' with '_' in DBT_VERSION & set as SCHEMA_SUFFIX. + # SCHEMA_SUFFIX allows us to run multiple versions of dbt in parallel without overwriting the output tables - name: Set SCHEMA_SUFFIX env run: echo "SCHEMA_SUFFIX=$(echo ${DBT_VERSION%.*} | tr . _)" >> $GITHUB_ENV env: @@ -97,7 +119,8 @@ jobs: - name: Set DEFAULT_TARGET env run: | - echo "DEFAULT_TARGET=${{ matrix.warehouse }}" >> $GITHUB_ENV + echo "DEFAULT_TARGET=${{matrix.warehouse}}" >> $GITHUB_ENV + - name: Python setup uses: actions/setup-python@v4 with: @@ -107,31 +130,46 @@ jobs: uses: actions/cache@v3 with: path: ~/.cache/pip - key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }} + key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}} restore-keys: | - ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }} + ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}} + # Install latest patch version. Upgrade if cache contains old patch version. - name: Install dependencies run: | - pip install --upgrade pip wheel setuptools - pip install -Iv dbt-${{ matrix.warehouse }}==${{ matrix.dbt_version }} --upgrade + pip install wheel setuptools + pip install -Iv dbt-${{env.WAREHOUSE_PLATFORM}}==${{ matrix.dbt_version }} --upgrade dbt deps - if: ${{matrix.warehouse != 'spark'}} + if: ${{env.WAREHOUSE_PLATFORM != 'spark'}} - name: Install spark dependencies run: | pip install --upgrade pip wheel setuptools - pip install -Iv "dbt-${{ matrix.warehouse }}[ODBC]"==${{ matrix.dbt_version }} --upgrade + pip install -Iv "dbt-${{ env.WAREHOUSE_PLATFORM }}[PyHive]"==${{ matrix.dbt_version }} --upgrade dbt deps - if: ${{matrix.warehouse == 'spark'}} + if: ${{env.WAREHOUSE_PLATFORM == 'spark'}} + + - name: Install Docker Compose + run: | + sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose + sudo chmod +x /usr/local/bin/docker-compose + + - name: Build and start Spark cluster + working-directory: .github/workflows/spark_deployment + run: | + docker-compose up -d + echo "Waiting for Spark services to start..." + sleep 90 + if: ${{env.WAREHOUSE_PLATFORM == 'spark'}} + - name: "Pre-test: Drop ci schemas" run: | - dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }} + dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}} + - name: Run tests - run: ./.scripts/integration_test.sh -d ${{ matrix.warehouse }} + run: ./.scripts/integration_test.sh -d ${{matrix.warehouse}} - # post_ci_cleanup sits in utils package - name: "Post-test: Drop ci schemas" run: | - dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }} + dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}} diff --git a/dbt_project.yml b/dbt_project.yml index 10d94c9..6c0bd8b 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -65,6 +65,14 @@ on-run-end: models: snowplow_attribution: + +file_format: > + {{ + var( + 'snowplow__datalake_file_format', + 'delta' if target.type != 'spark' else 'iceberg' + ) + }} + +incremental_strategy: "{{ none if target.type not in ['spark'] else 'merge' }}" +schema: "derived" +materialized: table manifest: diff --git a/integration_tests/.scripts/integration_test.sh b/integration_tests/.scripts/integration_test.sh index 035f66c..8ab9e6b 100755 --- a/integration_tests/.scripts/integration_test.sh +++ b/integration_tests/.scripts/integration_test.sh @@ -11,7 +11,7 @@ do done declare -a ATTRIBUTION_MODELS_TO_TEST=("last_touch" "shapley") -declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "snowflake", "redshift") +declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "snowflake", "redshift", "spark_iceberg") # set to lower case DATABASE="$(echo $DATABASE | tr '[:upper:]' '[:lower:]')" diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 07fcf0f..6945337 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -26,6 +26,7 @@ quoting: models: snowplow_attribution_integration_tests: + +materialized: table bind: false +schema: "snplw_attribution_int_tests" source: diff --git a/integration_tests/models/unit_tests/test_spend_data_actual.sql b/integration_tests/models/unit_tests/test_spend_data_actual.sql index f05ee9a..cc1097b 100644 --- a/integration_tests/models/unit_tests/test_spend_data_actual.sql +++ b/integration_tests/models/unit_tests/test_spend_data_actual.sql @@ -14,25 +14,23 @@ with spend_with_unique_keys as ( -- we need to dedupe as the join does the filtering, we can't group them upfront , campaign_spend as ( - select s.campaign, s.spend + select s.campaign, s.spend, row_number() over (partition by s.spend_id order by s.spend_tstamp) as row_num from spend_with_unique_keys s inner join {{ ref('snowplow_attribution_campaign_attributions') }} c on c.campaign = s.campaign and s.spend_tstamp < cv_tstamp and s.spend_tstamp > {{ snowplow_utils.timestamp_add('day', -90, 'cv_tstamp') }} where s.campaign is not null - qualify row_number() over (partition by s.spend_id order by s.spend_tstamp) = 1 ) , channel_spend as ( - select s.channel, s.spend + select s.channel, s.spend, row_number() over (partition by s.spend_id order by s.spend_tstamp) as row_num from spend_with_unique_keys s inner join {{ ref('snowplow_attribution_channel_attributions') }} c on c.channel = s.channel and s.spend_tstamp < cv_tstamp and s.spend_tstamp > {{ snowplow_utils.timestamp_add('day', -90, 'cv_tstamp') }} where s.channel is not null - qualify row_number() over (partition by s.spend_id order by s.spend_tstamp) = 1 ) @@ -41,6 +39,7 @@ with spend_with_unique_keys as ( select campaign as path, sum(spend) as spend from campaign_spend + where row_num = 1 group by 1 ) @@ -49,6 +48,7 @@ with spend_with_unique_keys as ( select channel as path, sum(spend) as spend from channel_spend + where row_num = 1 group by 1 ) diff --git a/integration_tests/packages.yml b/integration_tests/packages.yml index ec6faac..c055b30 100644 --- a/integration_tests/packages.yml +++ b/integration_tests/packages.yml @@ -1,4 +1,4 @@ packages: - local: ../ - - package: snowplow/snowplow_unified - version: [">=0.4.0", "<0.5.0"] + - git: "https://github.com/snowplow/dbt-snowplow-unified.git" + revision: "Release/snowplow-unified/0.5.0" \ No newline at end of file diff --git a/macros/attribution_overview.sql b/macros/attribution_overview.sql index d757198..35b997b 100644 --- a/macros/attribution_overview.sql +++ b/macros/attribution_overview.sql @@ -28,14 +28,12 @@ with spend_with_unique_keys as ( , campaign_spend as ( {% if var('snowplow__spend_source') != 'not defined' %} - select s.campaign, s.spend + select s.campaign, s.spend, row_number() over (partition by s.spend_id order by s.spend_tstamp) as row_num from spend_with_unique_keys s inner join {{ ref('snowplow_attribution_campaign_attributions') }} c on c.campaign = s.campaign and s.spend_tstamp < cv_tstamp and s.spend_tstamp > {{ snowplow_utils.timestamp_add('day', -90, 'cv_tstamp') }} - where s.campaign is not null - qualify row_number() over (partition by s.spend_id order by s.spend_tstamp) = 1 - + where s.campaign is not null {% else %} select true {% endif %} @@ -45,14 +43,12 @@ with spend_with_unique_keys as ( , channel_spend as ( {% if var('snowplow__spend_source') != 'not defined' %} - select s.channel, s.spend + select s.channel, s.spend, row_number() over (partition by s.spend_id order by s.spend_tstamp) as row_num from spend_with_unique_keys s inner join {{ ref('snowplow_attribution_channel_attributions') }} c on c.channel = s.channel and s.spend_tstamp < cv_tstamp and s.spend_tstamp > {{ snowplow_utils.timestamp_add('day', -90, 'cv_tstamp') }} - where s.channel is not null - qualify row_number() over (partition by s.spend_id order by s.spend_tstamp) = 1 - + where s.channel is not null {% else %} select true {% endif %} @@ -65,6 +61,7 @@ with spend_with_unique_keys as ( {% if var('snowplow__spend_source') != 'not defined' %} select campaign, sum(spend) as spend from campaign_spend + where row_num = 1 group by 1 {% else %} @@ -78,6 +75,7 @@ with spend_with_unique_keys as ( {% if var('snowplow__spend_source') != 'not defined' %} select channel, sum(spend) as spend from channel_spend + where row_num = 1 group by 1 {% else %} diff --git a/models/schema.yml b/models/schema.yml index c599999..0de1c85 100644 --- a/models/schema.yml +++ b/models/schema.yml @@ -8,7 +8,7 @@ sources: - name: events - name: derived schema: "{{target.schema ~ '_derived'}}" - database: "{{ var('snowplow__database', target.database) if target.type not in ['databricks', 'spark'] else var('snowplow__databricks_catalog', 'hive_metastore') if target.type in ['databricks'] else var('snowplow__atomic_schema', 'atomic') }}" + # database: "{{ var('snowplow__database', target.database) if target.type not in ['databricks', 'spark'] else var('snowplow__databricks_catalog', 'hive_metastore') if target.type in ['databricks'] else var('snowplow__atomic_schema', 'atomic') }}" tables: - name: snowplow_unified_conversions description: An incremental table which contains all relevant fields for unique conversion events. diff --git a/models/snowplow_attribution_overview.sql b/models/snowplow_attribution_overview.sql index f3ce0ea..db654a4 100644 --- a/models/snowplow_attribution_overview.sql +++ b/models/snowplow_attribution_overview.sql @@ -10,7 +10,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 config( enabled=var('snowplow__enable_attribution_overview'), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), - materialized='view' + materialized= 'table' if target.type in ['spark'] else 'view' ) }} diff --git a/packages.yml b/packages.yml index 6da1d8c..8f3da37 100644 --- a/packages.yml +++ b/packages.yml @@ -1,3 +1,3 @@ packages: - - package: snowplow/snowplow_utils - version: [">=0.16.2", "<0.17.0"] + - git: "https://github.com/snowplow/dbt-snowplow-utils.git" + revision: "release/snowplow-utils/0.17" \ No newline at end of file From bf102dce7c07a7bafc3ad3d297d95029354190fd Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Wed, 4 Sep 2024 18:21:36 +0300 Subject: [PATCH 02/13] Update pr_tests.yml --- .github/workflows/pr_tests.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pr_tests.yml b/.github/workflows/pr_tests.yml index cbbe71f..0232f79 100644 --- a/.github/workflows/pr_tests.yml +++ b/.github/workflows/pr_tests.yml @@ -93,8 +93,8 @@ jobs: - name: Configure Docker credentials uses: docker/login-action@v2 with: - username: ${{ secrets.DOCKERHUB_USERNAME }} - password: ${{ secrets.DOCKERHUB_TOKEN }} + username: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_USERNAME }} + password: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_PASSWORD }} - name: Configure AWS credentials uses: aws-actions/configure-aws-credentials@v1 with: From 1685353802a5b04a38f4adfb6adf982a7078e1a4 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Wed, 4 Sep 2024 18:24:07 +0300 Subject: [PATCH 03/13] Updated on the spark --- .github/workflows/pr_tests.yml | 2 +- .github/{ => workflows}/spark_deployment/Dockerfile | 0 .github/{ => workflows}/spark_deployment/build_and_push.sh | 0 .github/{ => workflows}/spark_deployment/docker-compose.yml | 0 .github/{ => workflows}/spark_deployment/spark-defaults.conf | 0 5 files changed, 1 insertion(+), 1 deletion(-) rename .github/{ => workflows}/spark_deployment/Dockerfile (100%) rename .github/{ => workflows}/spark_deployment/build_and_push.sh (100%) rename .github/{ => workflows}/spark_deployment/docker-compose.yml (100%) rename .github/{ => workflows}/spark_deployment/spark-defaults.conf (100%) diff --git a/.github/workflows/pr_tests.yml b/.github/workflows/pr_tests.yml index 0232f79..615618d 100644 --- a/.github/workflows/pr_tests.yml +++ b/.github/workflows/pr_tests.yml @@ -62,7 +62,7 @@ jobs: fail-fast: false matrix: dbt_version: ["1.*"] - warehouse: ["postgres", "bigquery", "snowflake", "databricks", "redshift", "spark_iceberg"] # TODO: Add RS self-hosted runner + warehouse: ["bigquery", "snowflake", "databricks", "redshift", "spark_iceberg"] # TODO: Add RS self-hosted runner services: postgres: image: postgres:latest diff --git a/.github/spark_deployment/Dockerfile b/.github/workflows/spark_deployment/Dockerfile similarity index 100% rename from .github/spark_deployment/Dockerfile rename to .github/workflows/spark_deployment/Dockerfile diff --git a/.github/spark_deployment/build_and_push.sh b/.github/workflows/spark_deployment/build_and_push.sh similarity index 100% rename from .github/spark_deployment/build_and_push.sh rename to .github/workflows/spark_deployment/build_and_push.sh diff --git a/.github/spark_deployment/docker-compose.yml b/.github/workflows/spark_deployment/docker-compose.yml similarity index 100% rename from .github/spark_deployment/docker-compose.yml rename to .github/workflows/spark_deployment/docker-compose.yml diff --git a/.github/spark_deployment/spark-defaults.conf b/.github/workflows/spark_deployment/spark-defaults.conf similarity index 100% rename from .github/spark_deployment/spark-defaults.conf rename to .github/workflows/spark_deployment/spark-defaults.conf From 400c6d33780d957c690be7906720b975943ef9d1 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Wed, 4 Sep 2024 18:45:35 +0300 Subject: [PATCH 04/13] Update profiles.yml --- integration_tests/ci/profiles.yml | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/integration_tests/ci/profiles.yml b/integration_tests/ci/profiles.yml index 3171f13..93677df 100644 --- a/integration_tests/ci/profiles.yml +++ b/integration_tests/ci/profiles.yml @@ -67,12 +67,15 @@ integration_tests: token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}" threads: 1 - spark: + spark_iceberg: type: spark - method: odbc - driver: "{{ env_var('DATABRICKS_TEST_HTTP_PATH') }}" - schema: "gh_sp_attrib_dbt_{{ env_var('SCHEMA_SUFFIX') }}" - host: "{{ env_var('DATABRICKS_TEST_HOST') }}" - token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}" - endpoint: "{{ env_var('DATABRICKS_TEST_ENDPOINT') }}" - threads: 4 + method: thrift + host: "{{ env_var('SPARK_MASTER_HOST', 'localhost') }}" + port: 10000 + user: "{{ env_var('SPARK_USER', 'spark') }}" + schema: "{{ env_var('SPARK_SCHEMA', 'default') }}" + connect_retries: 5 + connect_timeout: 60 + threads: 1 + vars: + snowplow__datalake_file_format: iceberg From c1c4d2ac55dc05bc1ee035b406c60450f0fc3173 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Thu, 5 Sep 2024 15:20:14 +0300 Subject: [PATCH 05/13] Update profiles.yml --- integration_tests/ci/profiles.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/ci/profiles.yml b/integration_tests/ci/profiles.yml index 93677df..765e577 100644 --- a/integration_tests/ci/profiles.yml +++ b/integration_tests/ci/profiles.yml @@ -73,7 +73,7 @@ integration_tests: host: "{{ env_var('SPARK_MASTER_HOST', 'localhost') }}" port: 10000 user: "{{ env_var('SPARK_USER', 'spark') }}" - schema: "{{ env_var('SPARK_SCHEMA', 'default') }}" + schema: "gh_sp_attrib_dbt_{{ env_var('SCHEMA_SUFFIX') }}" connect_retries: 5 connect_timeout: 60 threads: 1 From 7547a318f5fc918aafa51d58f7c0c98bbdb0169f Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Thu, 5 Sep 2024 16:02:53 +0300 Subject: [PATCH 06/13] Update snowplow_attribution_overview.sql --- models/snowplow_attribution_overview.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/snowplow_attribution_overview.sql b/models/snowplow_attribution_overview.sql index db654a4..8254d47 100644 --- a/models/snowplow_attribution_overview.sql +++ b/models/snowplow_attribution_overview.sql @@ -10,7 +10,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 config( enabled=var('snowplow__enable_attribution_overview'), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), - materialized= 'table' if target.type in ['spark'] else 'view' + materialized= 'table' if target.type in ['spark','redshift'] else 'view' ) }} From 1e3be1e7a9f5eaa6e35364b2a4e7583d03b3f4b8 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Mon, 9 Sep 2024 10:10:56 +0300 Subject: [PATCH 07/13] Update docker-compose.yml --- .github/workflows/spark_deployment/docker-compose.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/spark_deployment/docker-compose.yml b/.github/workflows/spark_deployment/docker-compose.yml index 2e8077b..bd02f76 100644 --- a/.github/workflows/spark_deployment/docker-compose.yml +++ b/.github/workflows/spark_deployment/docker-compose.yml @@ -32,9 +32,9 @@ services: depends_on: - spark-master environment: - - SPARK_WORKER_CORES=2 - - SPARK_WORKER_MEMORY=4G - - SPARK_EXECUTOR_MEMORY=3G + # - SPARK_WORKER_CORES=2 + # - SPARK_WORKER_MEMORY=4G + # - SPARK_EXECUTOR_MEMORY=3G - SPARK_LOCAL_IP=spark-worker - SPARK_MASTER=spark://spark-master:7077 - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} From 9f2c0078f88c86cdc145437a18b513a3ae1331b9 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Mon, 23 Sep 2024 12:03:20 +0300 Subject: [PATCH 08/13] Update dbt_project.yml --- dbt_project.yml | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/dbt_project.yml b/dbt_project.yml index 6c0bd8b..f599afa 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -65,13 +65,7 @@ on-run-end: models: snowplow_attribution: - +file_format: > - {{ - var( - 'snowplow__datalake_file_format', - 'delta' if target.type != 'spark' else 'iceberg' - ) - }} + +file_format: "{{ 'delta' if target.type not in ['spark'] else 'iceberg'}}" +incremental_strategy: "{{ none if target.type not in ['spark'] else 'merge' }}" +schema: "derived" +materialized: table From 2c3080582b3154ac0bf2f7d2567dd76488e2f718 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Mon, 23 Sep 2024 12:08:40 +0300 Subject: [PATCH 09/13] Update pr_tests.yml --- .github/workflows/pr_tests.yml | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/.github/workflows/pr_tests.yml b/.github/workflows/pr_tests.yml index 615618d..ea566af 100644 --- a/.github/workflows/pr_tests.yml +++ b/.github/workflows/pr_tests.yml @@ -63,22 +63,6 @@ jobs: matrix: dbt_version: ["1.*"] warehouse: ["bigquery", "snowflake", "databricks", "redshift", "spark_iceberg"] # TODO: Add RS self-hosted runner - services: - postgres: - image: postgres:latest - env: - POSTGRES_DB: ${{ secrets.POSTGRES_TEST_DBNAME }} - POSTGRES_USER: ${{ secrets.POSTGRES_TEST_USER }} - POSTGRES_PASSWORD: ${{ secrets.POSTGRES_TEST_PASS }} - # Set health checks to wait until postgres has started - options: >- - --health-cmd pg_isready - --health-interval 10s - --health-timeout 5s - --health-retries 5 - ports: - # Maps tcp port 5432 on service container to the host - - 5432:5432 steps: - name: Check out From 64c72655af141bd183b87ac06d43e5062288f089 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Tue, 24 Sep 2024 11:39:10 +0300 Subject: [PATCH 10/13] Update profiles.yml --- integration_tests/ci/profiles.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/integration_tests/ci/profiles.yml b/integration_tests/ci/profiles.yml index 765e577..8f33337 100644 --- a/integration_tests/ci/profiles.yml +++ b/integration_tests/ci/profiles.yml @@ -77,5 +77,3 @@ integration_tests: connect_retries: 5 connect_timeout: 60 threads: 1 - vars: - snowplow__datalake_file_format: iceberg From 18601e9e332b5a17abe0ac0e6ec8cc3b24b8b6c1 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Wed, 25 Sep 2024 15:20:22 +0100 Subject: [PATCH 11/13] Add database again --- models/schema.yml | 1 + models/snowplow_attribution_overview.sql | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/models/schema.yml b/models/schema.yml index 0de1c85..3667b33 100644 --- a/models/schema.yml +++ b/models/schema.yml @@ -4,6 +4,7 @@ version: 2 sources: - name: atomic schema: atomic + database: "{{ var('snowplow__database', target.database) if target.type not in ['databricks', 'spark'] else var('snowplow__databricks_catalog', 'hive_metastore') if target.type in ['databricks'] else none }}" tables: - name: events - name: derived diff --git a/models/snowplow_attribution_overview.sql b/models/snowplow_attribution_overview.sql index 8254d47..db654a4 100644 --- a/models/snowplow_attribution_overview.sql +++ b/models/snowplow_attribution_overview.sql @@ -10,7 +10,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 config( enabled=var('snowplow__enable_attribution_overview'), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), - materialized= 'table' if target.type in ['spark','redshift'] else 'view' + materialized= 'table' if target.type in ['spark'] else 'view' ) }} From 4f2df6048a6f5b75173c5ae18ae3483bcad26559 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Thu, 26 Sep 2024 09:10:20 +0100 Subject: [PATCH 12/13] Update schema.yml --- models/schema.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/models/schema.yml b/models/schema.yml index 3667b33..2f48e43 100644 --- a/models/schema.yml +++ b/models/schema.yml @@ -4,12 +4,11 @@ version: 2 sources: - name: atomic schema: atomic - database: "{{ var('snowplow__database', target.database) if target.type not in ['databricks', 'spark'] else var('snowplow__databricks_catalog', 'hive_metastore') if target.type in ['databricks'] else none }}" tables: - name: events - name: derived schema: "{{target.schema ~ '_derived'}}" - # database: "{{ var('snowplow__database', target.database) if target.type not in ['databricks', 'spark'] else var('snowplow__databricks_catalog', 'hive_metastore') if target.type in ['databricks'] else var('snowplow__atomic_schema', 'atomic') }}" + database: "{{ var('snowplow__database', target.database) if target.type not in ['databricks', 'spark'] else var('snowplow__databricks_catalog', 'hive_metastore') if target.type in ['databricks'] else none }}" tables: - name: snowplow_unified_conversions description: An incremental table which contains all relevant fields for unique conversion events. From 4f534552fa5bf02ca694aeeded4f9d31427cd42a Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Tue, 1 Oct 2024 10:08:04 +0300 Subject: [PATCH 13/13] Update snowplow_attribution_overview.sql --- models/snowplow_attribution_overview.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/snowplow_attribution_overview.sql b/models/snowplow_attribution_overview.sql index db654a4..8254d47 100644 --- a/models/snowplow_attribution_overview.sql +++ b/models/snowplow_attribution_overview.sql @@ -10,7 +10,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 config( enabled=var('snowplow__enable_attribution_overview'), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), - materialized= 'table' if target.type in ['spark'] else 'view' + materialized= 'table' if target.type in ['spark','redshift'] else 'view' ) }}