diff --git a/.github/workflows/pr_tests.yml b/.github/workflows/pr_tests.yml index da3e623..ea566af 100644 --- a/.github/workflows/pr_tests.yml +++ b/.github/workflows/pr_tests.yml @@ -2,9 +2,6 @@ name: pr_tests on: pull_request: - branches: - - main - - 'release/**' concurrency: dbt_integration_tests @@ -42,13 +39,12 @@ env: SNOWFLAKE_TEST_WAREHOUSE: ${{ secrets.SNOWFLAKE_TEST_WAREHOUSE }} # Postgres Connection - POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }} POSTGRES_TEST_USER: ${{ secrets.POSTGRES_TEST_USER }} POSTGRES_TEST_PASS: ${{ secrets.POSTGRES_TEST_PASS }} - POSTGRES_TEST_PORT: ${{ secrets.POSTGRES_TEST_PORT }} POSTGRES_TEST_DBNAME: ${{ secrets.POSTGRES_TEST_DBNAME }} + POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }} + POSTGRES_TEST_PORT: ${{ secrets.POSTGRES_TEST_PORT }} - # Databricks Connection DATABRICKS_TEST_HOST: ${{ secrets.DATABRICKS_TEST_HOST }} DATABRICKS_TEST_HTTP_PATH: ${{ secrets.DATABRICKS_TEST_HTTP_PATH }} DATABRICKS_TEST_TOKEN: ${{ secrets.DATABRICKS_TEST_TOKEN }} @@ -63,33 +59,43 @@ jobs: # Run tests from integration_tests sub dir working-directory: ./integration_tests strategy: + fail-fast: false matrix: dbt_version: ["1.*"] - warehouse: ["bigquery", "snowflake", "databricks", "redshift"] # TODO: Add RS self-hosted runner - - # services: - # postgres: - # image: postgres:latest - # env: - # POSTGRES_DB: ${{ secrets.POSTGRES_TEST_DBNAME }} - # POSTGRES_USER: ${{ secrets.POSTGRES_TEST_USER }} - # POSTGRES_PASSWORD: ${{ secrets.POSTGRES_TEST_PASS }} - # # Set health checks to wait until postgres has started - # options: >- - # --health-cmd pg_isready - # --health-interval 10s - # --health-timeout 5s - # --health-retries 5 - # ports: - # # Maps tcp port 5432 on service container to the host - # - 5432:5432 + warehouse: ["bigquery", "snowflake", "databricks", "redshift", "spark_iceberg"] # TODO: Add RS self-hosted runner steps: - name: Check out - uses: actions/checkout@v3 + uses: actions/checkout@v4 # Remove '*' and replace '.' with '_' in DBT_VERSION & set as SCHEMA_SUFFIX. # SCHEMA_SUFFIX allows us to run multiple versions of dbt in parallel without overwriting the output tables + - name: Set SCHEMA_SUFFIX env + run: echo "SCHEMA_SUFFIX=$(echo ${DBT_VERSION%.*} | tr . _)" >> $GITHUB_ENV + env: + DBT_VERSION: ${{ matrix.dbt_version }} + - name: Configure Docker credentials + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_USERNAME }} + password: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_PASSWORD }} + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: eu-west-1 + - name: Set warehouse variables + id: set_warehouse + run: | + WAREHOUSE_PLATFORM=$(echo ${{ matrix.warehouse }} | cut -d'_' -f1) + WAREHOUSE_SPECIFIC=$(echo ${{ matrix.warehouse }} | cut -s -d'_' -f2) + echo "WAREHOUSE_PLATFORM=${WAREHOUSE_PLATFORM}" >> $GITHUB_ENV + echo "WAREHOUSE_SPECIFIC=${WAREHOUSE_SPECIFIC}" >> $GITHUB_ENV + echo "warehouse_platform=${WAREHOUSE_PLATFORM}" >> $GITHUB_OUTPUT + echo "warehouse_specific=${WAREHOUSE_SPECIFIC}" >> $GITHUB_OUTPUT + # Remove '*' and replace '.' with '_' in DBT_VERSION & set as SCHEMA_SUFFIX. + # SCHEMA_SUFFIX allows us to run multiple versions of dbt in parallel without overwriting the output tables - name: Set SCHEMA_SUFFIX env run: echo "SCHEMA_SUFFIX=$(echo ${DBT_VERSION%.*} | tr . _)" >> $GITHUB_ENV env: @@ -97,7 +103,8 @@ jobs: - name: Set DEFAULT_TARGET env run: | - echo "DEFAULT_TARGET=${{ matrix.warehouse }}" >> $GITHUB_ENV + echo "DEFAULT_TARGET=${{matrix.warehouse}}" >> $GITHUB_ENV + - name: Python setup uses: actions/setup-python@v4 with: @@ -107,31 +114,46 @@ jobs: uses: actions/cache@v3 with: path: ~/.cache/pip - key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }} + key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}} restore-keys: | - ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }} + ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}} + # Install latest patch version. Upgrade if cache contains old patch version. - name: Install dependencies run: | - pip install --upgrade pip wheel setuptools - pip install -Iv dbt-${{ matrix.warehouse }}==${{ matrix.dbt_version }} --upgrade + pip install wheel setuptools + pip install -Iv dbt-${{env.WAREHOUSE_PLATFORM}}==${{ matrix.dbt_version }} --upgrade dbt deps - if: ${{matrix.warehouse != 'spark'}} + if: ${{env.WAREHOUSE_PLATFORM != 'spark'}} - name: Install spark dependencies run: | pip install --upgrade pip wheel setuptools - pip install -Iv "dbt-${{ matrix.warehouse }}[ODBC]"==${{ matrix.dbt_version }} --upgrade + pip install -Iv "dbt-${{ env.WAREHOUSE_PLATFORM }}[PyHive]"==${{ matrix.dbt_version }} --upgrade dbt deps - if: ${{matrix.warehouse == 'spark'}} + if: ${{env.WAREHOUSE_PLATFORM == 'spark'}} + + - name: Install Docker Compose + run: | + sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose + sudo chmod +x /usr/local/bin/docker-compose + + - name: Build and start Spark cluster + working-directory: .github/workflows/spark_deployment + run: | + docker-compose up -d + echo "Waiting for Spark services to start..." + sleep 90 + if: ${{env.WAREHOUSE_PLATFORM == 'spark'}} + - name: "Pre-test: Drop ci schemas" run: | - dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }} + dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}} + - name: Run tests - run: ./.scripts/integration_test.sh -d ${{ matrix.warehouse }} + run: ./.scripts/integration_test.sh -d ${{matrix.warehouse}} - # post_ci_cleanup sits in utils package - name: "Post-test: Drop ci schemas" run: | - dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }} + dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}} diff --git a/.github/workflows/spark_deployment/Dockerfile b/.github/workflows/spark_deployment/Dockerfile new file mode 100644 index 0000000..dab5720 --- /dev/null +++ b/.github/workflows/spark_deployment/Dockerfile @@ -0,0 +1,34 @@ +FROM openjdk:11-jre-slim + +# Set environment variables +ENV SPARK_VERSION=3.5.1 +ENV HADOOP_VERSION=3.3.4 +ENV ICEBERG_VERSION=1.4.2 +ENV AWS_SDK_VERSION=1.12.581 + +# Install necessary tools +RUN apt-get update && apt-get install -y curl wget procps rsync ssh + +# Download and install Spark +RUN wget https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz && \ + tar -xvzf spark-${SPARK_VERSION}-bin-hadoop3.tgz && \ + mv spark-${SPARK_VERSION}-bin-hadoop3 /spark && \ + rm spark-${SPARK_VERSION}-bin-hadoop3.tgz + +# Set Spark environment variables +ENV SPARK_HOME=/spark +ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin + +# Download necessary JARs +RUN mkdir -p /spark/jars && \ + wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/${ICEBERG_VERSION}/iceberg-spark-runtime-3.5_2.12-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-spark-runtime.jar && \ + wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-aws-bundle.jar && \ + wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -O /spark/jars/hadoop-aws.jar && \ + wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -O /spark/jars/aws-java-sdk-bundle.jar + +# Create directory for Spark events +RUN mkdir -p /tmp/spark-events + +WORKDIR /spark + +CMD ["bash"] \ No newline at end of file diff --git a/.github/workflows/spark_deployment/build_and_push.sh b/.github/workflows/spark_deployment/build_and_push.sh new file mode 100755 index 0000000..1be2b6d --- /dev/null +++ b/.github/workflows/spark_deployment/build_and_push.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# Set variables +DOCKER_HUB_ORG="snowplow" +IMAGE_NAME="spark-s3-iceberg" +TAG="latest" + +# Build the image +echo "Building Docker image..." +docker build --platform linux/amd64 -t $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG . + +# Log in to Docker Hub +echo "Logging in to Docker Hub..." +docker login + +# Push the image to Docker Hub +echo "Pushing image to Docker Hub..." +docker push $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG + +echo "Image successfully built and pushed to Docker Hub" \ No newline at end of file diff --git a/.github/workflows/spark_deployment/docker-compose.yml b/.github/workflows/spark_deployment/docker-compose.yml new file mode 100644 index 0000000..bd02f76 --- /dev/null +++ b/.github/workflows/spark_deployment/docker-compose.yml @@ -0,0 +1,66 @@ +version: '3' + +networks: + spark-network: + driver: bridge + +services: + spark-master: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "/spark/sbin/start-master.sh -h spark-master --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.master.Master-1-*.out"] + hostname: spark-master + ports: + - '8080:8080' + - '7077:7077' + environment: + - SPARK_LOCAL_IP=spark-master + - SPARK_MASTER_HOST=spark-master + - SPARK_MASTER_PORT=7077 + - SPARK_MASTER_OPTS="-Dspark.driver.memory=2g" + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network + + spark-worker: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "sleep 10 && /spark/sbin/start-worker.sh spark://spark-master:7077 --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.worker.Worker-*.out"] + depends_on: + - spark-master + environment: + # - SPARK_WORKER_CORES=2 + # - SPARK_WORKER_MEMORY=4G + # - SPARK_EXECUTOR_MEMORY=3G + - SPARK_LOCAL_IP=spark-worker + - SPARK_MASTER=spark://spark-master:7077 + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network + + thrift-server: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "sleep 30 && /spark/sbin/start-thriftserver.sh --master spark://spark-master:7077 --driver-memory 2g --executor-memory 3g --hiveconf hive.server2.thrift.port=10000 --hiveconf hive.server2.thrift.bind.host=0.0.0.0 --conf spark.sql.hive.thriftServer.async=true --conf spark.sql.hive.thriftServer.workerQueue.size=2000 --conf spark.sql.hive.thriftServer.maxWorkerThreads=100 --conf spark.sql.hive.thriftServer.minWorkerThreads=50 && tail -f /spark/logs/spark--org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-*.out"] + ports: + - '10000:10000' + depends_on: + - spark-master + - spark-worker + environment: + - SPARK_LOCAL_IP=thrift-server + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network \ No newline at end of file diff --git a/.github/workflows/spark_deployment/spark-defaults.conf b/.github/workflows/spark_deployment/spark-defaults.conf new file mode 100644 index 0000000..9052a05 --- /dev/null +++ b/.github/workflows/spark_deployment/spark-defaults.conf @@ -0,0 +1,44 @@ +spark.master spark://spark-master:7077 + +spark.sql.warehouse.dir s3a://dbt-spark-iceberg/github-integration-testing +spark.sql.catalog.glue org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.glue.catalog-impl org.apache.iceberg.aws.glue.GlueCatalog +spark.sql.catalog.glue.warehouse s3a://dbt-spark-iceberg/github-integration-testing +spark.sql.catalog.glue.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.defaultCatalog glue +spark.sql.catalog.glue.database dbt-spark-iceberg + +spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem +spark.hadoop.fs.s3a.access.key +spark.hadoop.fs.s3a.secret.key +spark.hadoop.fs.s3a.endpoint s3.eu-west-1.amazonaws.com +spark.hadoop.fs.s3a.path.style.access true +spark.hadoop.fs.s3a.region eu-west-1 +spark.hadoop.fs.s3a.aws.region eu-west-1 + +# Enabling AWS SDK V4 signing (required for regions launched after January 2014) +spark.hadoop.com.amazonaws.services.s3.enableV4 true +spark.hadoop.fs.s3a.aws.credentials.provider org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + +# Hive Metastore Configuration (using AWS Glue) +spark.hadoop.hive.metastore.client.factory.class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory + +# Thrift Server Configuration for better performance in concurrent environments +spark.sql.hive.thriftServer.singleSession false +spark.sql.hive.thriftServer.async true +# spark.sql.hive.thriftServer.maxWorkerThreads 100 +# spark.sql.hive.thriftServer.minWorkerThreads 50 +# spark.sql.hive.thriftServer.workerQueue.size 2000 + +# Memory and Performance Tuning +# spark.driver.memory 2g +# spark.executor.memory 3g +# spark.worker.memory 4g +spark.network.timeout 600s +spark.sql.broadcastTimeout 600s +spark.sql.adaptive.enabled true +spark.serializer org.apache.spark.serializer.KryoSerializer + +# Logging and Debugging +spark.eventLog.enabled true +spark.eventLog.dir /tmp/spark-events diff --git a/dbt_project.yml b/dbt_project.yml index 10d94c9..f599afa 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -65,6 +65,8 @@ on-run-end: models: snowplow_attribution: + +file_format: "{{ 'delta' if target.type not in ['spark'] else 'iceberg'}}" + +incremental_strategy: "{{ none if target.type not in ['spark'] else 'merge' }}" +schema: "derived" +materialized: table manifest: diff --git a/integration_tests/.scripts/integration_test.sh b/integration_tests/.scripts/integration_test.sh index 035f66c..8ab9e6b 100755 --- a/integration_tests/.scripts/integration_test.sh +++ b/integration_tests/.scripts/integration_test.sh @@ -11,7 +11,7 @@ do done declare -a ATTRIBUTION_MODELS_TO_TEST=("last_touch" "shapley") -declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "snowflake", "redshift") +declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "snowflake", "redshift", "spark_iceberg") # set to lower case DATABASE="$(echo $DATABASE | tr '[:upper:]' '[:lower:]')" diff --git a/integration_tests/ci/profiles.yml b/integration_tests/ci/profiles.yml index 3171f13..8f33337 100644 --- a/integration_tests/ci/profiles.yml +++ b/integration_tests/ci/profiles.yml @@ -67,12 +67,13 @@ integration_tests: token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}" threads: 1 - spark: + spark_iceberg: type: spark - method: odbc - driver: "{{ env_var('DATABRICKS_TEST_HTTP_PATH') }}" + method: thrift + host: "{{ env_var('SPARK_MASTER_HOST', 'localhost') }}" + port: 10000 + user: "{{ env_var('SPARK_USER', 'spark') }}" schema: "gh_sp_attrib_dbt_{{ env_var('SCHEMA_SUFFIX') }}" - host: "{{ env_var('DATABRICKS_TEST_HOST') }}" - token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}" - endpoint: "{{ env_var('DATABRICKS_TEST_ENDPOINT') }}" - threads: 4 + connect_retries: 5 + connect_timeout: 60 + threads: 1 diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 07fcf0f..6945337 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -26,6 +26,7 @@ quoting: models: snowplow_attribution_integration_tests: + +materialized: table bind: false +schema: "snplw_attribution_int_tests" source: diff --git a/integration_tests/models/unit_tests/test_spend_data_actual.sql b/integration_tests/models/unit_tests/test_spend_data_actual.sql index f05ee9a..cc1097b 100644 --- a/integration_tests/models/unit_tests/test_spend_data_actual.sql +++ b/integration_tests/models/unit_tests/test_spend_data_actual.sql @@ -14,25 +14,23 @@ with spend_with_unique_keys as ( -- we need to dedupe as the join does the filtering, we can't group them upfront , campaign_spend as ( - select s.campaign, s.spend + select s.campaign, s.spend, row_number() over (partition by s.spend_id order by s.spend_tstamp) as row_num from spend_with_unique_keys s inner join {{ ref('snowplow_attribution_campaign_attributions') }} c on c.campaign = s.campaign and s.spend_tstamp < cv_tstamp and s.spend_tstamp > {{ snowplow_utils.timestamp_add('day', -90, 'cv_tstamp') }} where s.campaign is not null - qualify row_number() over (partition by s.spend_id order by s.spend_tstamp) = 1 ) , channel_spend as ( - select s.channel, s.spend + select s.channel, s.spend, row_number() over (partition by s.spend_id order by s.spend_tstamp) as row_num from spend_with_unique_keys s inner join {{ ref('snowplow_attribution_channel_attributions') }} c on c.channel = s.channel and s.spend_tstamp < cv_tstamp and s.spend_tstamp > {{ snowplow_utils.timestamp_add('day', -90, 'cv_tstamp') }} where s.channel is not null - qualify row_number() over (partition by s.spend_id order by s.spend_tstamp) = 1 ) @@ -41,6 +39,7 @@ with spend_with_unique_keys as ( select campaign as path, sum(spend) as spend from campaign_spend + where row_num = 1 group by 1 ) @@ -49,6 +48,7 @@ with spend_with_unique_keys as ( select channel as path, sum(spend) as spend from channel_spend + where row_num = 1 group by 1 ) diff --git a/integration_tests/packages.yml b/integration_tests/packages.yml index ec6faac..c055b30 100644 --- a/integration_tests/packages.yml +++ b/integration_tests/packages.yml @@ -1,4 +1,4 @@ packages: - local: ../ - - package: snowplow/snowplow_unified - version: [">=0.4.0", "<0.5.0"] + - git: "https://github.com/snowplow/dbt-snowplow-unified.git" + revision: "Release/snowplow-unified/0.5.0" \ No newline at end of file diff --git a/macros/attribution_overview.sql b/macros/attribution_overview.sql index d757198..35b997b 100644 --- a/macros/attribution_overview.sql +++ b/macros/attribution_overview.sql @@ -28,14 +28,12 @@ with spend_with_unique_keys as ( , campaign_spend as ( {% if var('snowplow__spend_source') != 'not defined' %} - select s.campaign, s.spend + select s.campaign, s.spend, row_number() over (partition by s.spend_id order by s.spend_tstamp) as row_num from spend_with_unique_keys s inner join {{ ref('snowplow_attribution_campaign_attributions') }} c on c.campaign = s.campaign and s.spend_tstamp < cv_tstamp and s.spend_tstamp > {{ snowplow_utils.timestamp_add('day', -90, 'cv_tstamp') }} - where s.campaign is not null - qualify row_number() over (partition by s.spend_id order by s.spend_tstamp) = 1 - + where s.campaign is not null {% else %} select true {% endif %} @@ -45,14 +43,12 @@ with spend_with_unique_keys as ( , channel_spend as ( {% if var('snowplow__spend_source') != 'not defined' %} - select s.channel, s.spend + select s.channel, s.spend, row_number() over (partition by s.spend_id order by s.spend_tstamp) as row_num from spend_with_unique_keys s inner join {{ ref('snowplow_attribution_channel_attributions') }} c on c.channel = s.channel and s.spend_tstamp < cv_tstamp and s.spend_tstamp > {{ snowplow_utils.timestamp_add('day', -90, 'cv_tstamp') }} - where s.channel is not null - qualify row_number() over (partition by s.spend_id order by s.spend_tstamp) = 1 - + where s.channel is not null {% else %} select true {% endif %} @@ -65,6 +61,7 @@ with spend_with_unique_keys as ( {% if var('snowplow__spend_source') != 'not defined' %} select campaign, sum(spend) as spend from campaign_spend + where row_num = 1 group by 1 {% else %} @@ -78,6 +75,7 @@ with spend_with_unique_keys as ( {% if var('snowplow__spend_source') != 'not defined' %} select channel, sum(spend) as spend from channel_spend + where row_num = 1 group by 1 {% else %} diff --git a/models/schema.yml b/models/schema.yml index c599999..2f48e43 100644 --- a/models/schema.yml +++ b/models/schema.yml @@ -8,7 +8,7 @@ sources: - name: events - name: derived schema: "{{target.schema ~ '_derived'}}" - database: "{{ var('snowplow__database', target.database) if target.type not in ['databricks', 'spark'] else var('snowplow__databricks_catalog', 'hive_metastore') if target.type in ['databricks'] else var('snowplow__atomic_schema', 'atomic') }}" + database: "{{ var('snowplow__database', target.database) if target.type not in ['databricks', 'spark'] else var('snowplow__databricks_catalog', 'hive_metastore') if target.type in ['databricks'] else none }}" tables: - name: snowplow_unified_conversions description: An incremental table which contains all relevant fields for unique conversion events. diff --git a/models/snowplow_attribution_overview.sql b/models/snowplow_attribution_overview.sql index f3ce0ea..8254d47 100644 --- a/models/snowplow_attribution_overview.sql +++ b/models/snowplow_attribution_overview.sql @@ -10,7 +10,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 config( enabled=var('snowplow__enable_attribution_overview'), sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')), - materialized='view' + materialized= 'table' if target.type in ['spark','redshift'] else 'view' ) }} diff --git a/packages.yml b/packages.yml index 6da1d8c..8f3da37 100644 --- a/packages.yml +++ b/packages.yml @@ -1,3 +1,3 @@ packages: - - package: snowplow/snowplow_utils - version: [">=0.16.2", "<0.17.0"] + - git: "https://github.com/snowplow/dbt-snowplow-utils.git" + revision: "release/snowplow-utils/0.17" \ No newline at end of file