From 94b2b57b85c82ab983d4e03a4f9f5856d84949f9 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Fri, 30 Aug 2024 18:58:02 +0300 Subject: [PATCH 1/3] Add support for Spark iceberg --- .github/workflows/pr_tests.yml | 58 ++- .github/workflows/spark_deployment/Dockerfile | 34 ++ .../spark_deployment/build_and_push.sh | 20 + .../spark_deployment/docker-compose.yml | 66 ++++ .../spark_deployment/spark-defaults.conf | 44 +++ .../.scripts/integration_tests.sh | 2 +- integration_tests/ci/profiles.yml | 17 +- integration_tests/dbt_project.yml | 17 +- .../snowplow_base_events_this_run_actual.sql | 13 +- ...ase_sessions_lifecycle_manifest_actual.sql | 16 +- .../base/source/spark/snowplow_events_stg.sql | 346 ++++++++++++++++++ .../default_strategy/test_incremental.sql | 18 +- .../test_incremental_w_lookback_disabled.sql | 5 +- .../test_snowplow_delete_from_manifest.sql | 5 +- .../base_create_snowplow_events_this_run.sql | 68 ++++ macros/utils/cross_db/get_field.sql | 8 +- macros/utils/cross_db/get_string_agg.sql | 30 ++ macros/utils/cross_db/timestamp_functions.sql | 41 ++- macros/utils/get_schemas_by_pattern.sql | 20 +- macros/utils/get_value_by_target_type.sql | 6 +- macros/utils/post_ci_cleanup.sql | 85 ++++- .../utils/snowplow_delete_from_manifest.sql | 19 +- 22 files changed, 874 insertions(+), 64 deletions(-) create mode 100644 .github/workflows/spark_deployment/Dockerfile create mode 100755 .github/workflows/spark_deployment/build_and_push.sh create mode 100644 .github/workflows/spark_deployment/docker-compose.yml create mode 100644 .github/workflows/spark_deployment/spark-defaults.conf create mode 100644 integration_tests/models/base/source/spark/snowplow_events_stg.sql diff --git a/.github/workflows/pr_tests.yml b/.github/workflows/pr_tests.yml index a63cd3db..69122d12 100644 --- a/.github/workflows/pr_tests.yml +++ b/.github/workflows/pr_tests.yml @@ -59,9 +59,10 @@ jobs: # Run tests from integration_tests sub dir working-directory: ./integration_tests strategy: + fail-fast: false matrix: dbt_version: ["1.*"] - warehouse: ["postgres", "bigquery", "snowflake", "databricks", "redshift"] # TODO: Add RS self-hosted runner + warehouse: ["postgres", "bigquery", "snowflake", "databricks", "redshift", "spark_iceberg"] # TODO: Add RS self-hosted runner services: postgres: image: postgres:latest @@ -82,7 +83,26 @@ jobs: steps: - name: Check out uses: actions/checkout@v3 - + - name: Configure Docker credentials + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_USERNAME }} + password: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_PASSWORD }} + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: eu-west-1 + - name: Set warehouse variables + id: set_warehouse + run: | + WAREHOUSE_PLATFORM=$(echo ${{ matrix.warehouse }} | cut -d'_' -f1) + WAREHOUSE_SPECIFIC=$(echo ${{ matrix.warehouse }} | cut -s -d'_' -f2) + echo "WAREHOUSE_PLATFORM=${WAREHOUSE_PLATFORM}" >> $GITHUB_ENV + echo "WAREHOUSE_SPECIFIC=${WAREHOUSE_SPECIFIC}" >> $GITHUB_ENV + echo "warehouse_platform=${WAREHOUSE_PLATFORM}" >> $GITHUB_OUTPUT + echo "warehouse_specific=${WAREHOUSE_SPECIFIC}" >> $GITHUB_OUTPUT # Remove '*' and replace '.' with '_' in DBT_VERSION & set as SCHEMA_SUFFIX. # SCHEMA_SUFFIX allows us to run multiple versions of dbt in parallel without overwriting the output tables - name: Set SCHEMA_SUFFIX env @@ -92,7 +112,7 @@ jobs: - name: Set DEFAULT_TARGET env run: | - echo "DEFAULT_TARGET=${{ matrix.warehouse }}" >> $GITHUB_ENV + echo "DEFAULT_TARGET=${{matrix.warehouse}}" >> $GITHUB_ENV - name: Python setup uses: actions/setup-python@v4 @@ -103,32 +123,46 @@ jobs: uses: actions/cache@v3 with: path: ~/.cache/pip - key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }} + key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}} restore-keys: | - ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }} + ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}} # Install latest patch version. Upgrade if cache contains old patch version. - name: Install dependencies run: | pip install wheel setuptools - pip install -Iv dbt-${{ matrix.warehouse }}==${{ matrix.dbt_version }} --upgrade + pip install -Iv dbt-${{env.WAREHOUSE_PLATFORM}}==${{ matrix.dbt_version }} --upgrade dbt deps - if: ${{matrix.warehouse != 'spark'}} + if: ${{env.WAREHOUSE_PLATFORM != 'spark'}} - name: Install spark dependencies run: | pip install --upgrade pip wheel setuptools - pip install -Iv "dbt-${{ matrix.warehouse }}[ODBC]"==${{ matrix.dbt_version }} --upgrade + pip install -Iv "dbt-${{ env.WAREHOUSE_PLATFORM }}[PyHive]"==${{ matrix.dbt_version }} --upgrade dbt deps - if: ${{matrix.warehouse == 'spark'}} + if: ${{env.WAREHOUSE_PLATFORM == 'spark'}} + + - name: Install Docker Compose + run: | + sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose + sudo chmod +x /usr/local/bin/docker-compose + + - name: Build and start Spark cluster + working-directory: .github/workflows/spark_deployment + run: | + docker-compose up -d + echo "Waiting for Spark services to start..." + sleep 90 + if: ${{env.WAREHOUSE_PLATFORM == 'spark'}} + - name: "Pre-test: Drop ci schemas" run: | - dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }} + dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}} - name: Run tests - run: ./.scripts/integration_tests.sh -d ${{ matrix.warehouse }} + run: ./.scripts/integration_tests.sh -d ${{matrix.warehouse}} - name: "Post-test: Drop ci schemas" run: | - dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }} + dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}} diff --git a/.github/workflows/spark_deployment/Dockerfile b/.github/workflows/spark_deployment/Dockerfile new file mode 100644 index 00000000..dab57200 --- /dev/null +++ b/.github/workflows/spark_deployment/Dockerfile @@ -0,0 +1,34 @@ +FROM openjdk:11-jre-slim + +# Set environment variables +ENV SPARK_VERSION=3.5.1 +ENV HADOOP_VERSION=3.3.4 +ENV ICEBERG_VERSION=1.4.2 +ENV AWS_SDK_VERSION=1.12.581 + +# Install necessary tools +RUN apt-get update && apt-get install -y curl wget procps rsync ssh + +# Download and install Spark +RUN wget https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz && \ + tar -xvzf spark-${SPARK_VERSION}-bin-hadoop3.tgz && \ + mv spark-${SPARK_VERSION}-bin-hadoop3 /spark && \ + rm spark-${SPARK_VERSION}-bin-hadoop3.tgz + +# Set Spark environment variables +ENV SPARK_HOME=/spark +ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin + +# Download necessary JARs +RUN mkdir -p /spark/jars && \ + wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/${ICEBERG_VERSION}/iceberg-spark-runtime-3.5_2.12-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-spark-runtime.jar && \ + wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-aws-bundle.jar && \ + wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -O /spark/jars/hadoop-aws.jar && \ + wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -O /spark/jars/aws-java-sdk-bundle.jar + +# Create directory for Spark events +RUN mkdir -p /tmp/spark-events + +WORKDIR /spark + +CMD ["bash"] \ No newline at end of file diff --git a/.github/workflows/spark_deployment/build_and_push.sh b/.github/workflows/spark_deployment/build_and_push.sh new file mode 100755 index 00000000..1be2b6d2 --- /dev/null +++ b/.github/workflows/spark_deployment/build_and_push.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# Set variables +DOCKER_HUB_ORG="snowplow" +IMAGE_NAME="spark-s3-iceberg" +TAG="latest" + +# Build the image +echo "Building Docker image..." +docker build --platform linux/amd64 -t $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG . + +# Log in to Docker Hub +echo "Logging in to Docker Hub..." +docker login + +# Push the image to Docker Hub +echo "Pushing image to Docker Hub..." +docker push $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG + +echo "Image successfully built and pushed to Docker Hub" \ No newline at end of file diff --git a/.github/workflows/spark_deployment/docker-compose.yml b/.github/workflows/spark_deployment/docker-compose.yml new file mode 100644 index 00000000..2e8077ba --- /dev/null +++ b/.github/workflows/spark_deployment/docker-compose.yml @@ -0,0 +1,66 @@ +version: '3' + +networks: + spark-network: + driver: bridge + +services: + spark-master: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "/spark/sbin/start-master.sh -h spark-master --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.master.Master-1-*.out"] + hostname: spark-master + ports: + - '8080:8080' + - '7077:7077' + environment: + - SPARK_LOCAL_IP=spark-master + - SPARK_MASTER_HOST=spark-master + - SPARK_MASTER_PORT=7077 + - SPARK_MASTER_OPTS="-Dspark.driver.memory=2g" + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network + + spark-worker: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "sleep 10 && /spark/sbin/start-worker.sh spark://spark-master:7077 --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.worker.Worker-*.out"] + depends_on: + - spark-master + environment: + - SPARK_WORKER_CORES=2 + - SPARK_WORKER_MEMORY=4G + - SPARK_EXECUTOR_MEMORY=3G + - SPARK_LOCAL_IP=spark-worker + - SPARK_MASTER=spark://spark-master:7077 + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network + + thrift-server: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "sleep 30 && /spark/sbin/start-thriftserver.sh --master spark://spark-master:7077 --driver-memory 2g --executor-memory 3g --hiveconf hive.server2.thrift.port=10000 --hiveconf hive.server2.thrift.bind.host=0.0.0.0 --conf spark.sql.hive.thriftServer.async=true --conf spark.sql.hive.thriftServer.workerQueue.size=2000 --conf spark.sql.hive.thriftServer.maxWorkerThreads=100 --conf spark.sql.hive.thriftServer.minWorkerThreads=50 && tail -f /spark/logs/spark--org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-*.out"] + ports: + - '10000:10000' + depends_on: + - spark-master + - spark-worker + environment: + - SPARK_LOCAL_IP=thrift-server + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network \ No newline at end of file diff --git a/.github/workflows/spark_deployment/spark-defaults.conf b/.github/workflows/spark_deployment/spark-defaults.conf new file mode 100644 index 00000000..9052a056 --- /dev/null +++ b/.github/workflows/spark_deployment/spark-defaults.conf @@ -0,0 +1,44 @@ +spark.master spark://spark-master:7077 + +spark.sql.warehouse.dir s3a://dbt-spark-iceberg/github-integration-testing +spark.sql.catalog.glue org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.glue.catalog-impl org.apache.iceberg.aws.glue.GlueCatalog +spark.sql.catalog.glue.warehouse s3a://dbt-spark-iceberg/github-integration-testing +spark.sql.catalog.glue.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.defaultCatalog glue +spark.sql.catalog.glue.database dbt-spark-iceberg + +spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem +spark.hadoop.fs.s3a.access.key +spark.hadoop.fs.s3a.secret.key +spark.hadoop.fs.s3a.endpoint s3.eu-west-1.amazonaws.com +spark.hadoop.fs.s3a.path.style.access true +spark.hadoop.fs.s3a.region eu-west-1 +spark.hadoop.fs.s3a.aws.region eu-west-1 + +# Enabling AWS SDK V4 signing (required for regions launched after January 2014) +spark.hadoop.com.amazonaws.services.s3.enableV4 true +spark.hadoop.fs.s3a.aws.credentials.provider org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + +# Hive Metastore Configuration (using AWS Glue) +spark.hadoop.hive.metastore.client.factory.class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory + +# Thrift Server Configuration for better performance in concurrent environments +spark.sql.hive.thriftServer.singleSession false +spark.sql.hive.thriftServer.async true +# spark.sql.hive.thriftServer.maxWorkerThreads 100 +# spark.sql.hive.thriftServer.minWorkerThreads 50 +# spark.sql.hive.thriftServer.workerQueue.size 2000 + +# Memory and Performance Tuning +# spark.driver.memory 2g +# spark.executor.memory 3g +# spark.worker.memory 4g +spark.network.timeout 600s +spark.sql.broadcastTimeout 600s +spark.sql.adaptive.enabled true +spark.serializer org.apache.spark.serializer.KryoSerializer + +# Logging and Debugging +spark.eventLog.enabled true +spark.eventLog.dir /tmp/spark-events diff --git a/integration_tests/.scripts/integration_tests.sh b/integration_tests/.scripts/integration_tests.sh index e7b70de5..e622f167 100755 --- a/integration_tests/.scripts/integration_tests.sh +++ b/integration_tests/.scripts/integration_tests.sh @@ -10,7 +10,7 @@ do esac done -declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "postgres" "redshift" "snowflake") +declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "postgres" "redshift" "snowflake", "spark_iceberg") # set to lower case DATABASE="$(echo $DATABASE | tr '[:upper:]' '[:lower:]')" diff --git a/integration_tests/ci/profiles.yml b/integration_tests/ci/profiles.yml index d75a3270..7cd4a4d9 100644 --- a/integration_tests/ci/profiles.yml +++ b/integration_tests/ci/profiles.yml @@ -47,7 +47,6 @@ integration_tests: token_uri: "{{ env_var('BIGQUERY_SERVICE_TOKEN_URI') }}" auth_provider_x509_cert_url: "{{ env_var('BIGQUERY_SERVICE_AUTH_PROVIDER_X509_CERT_URL') }}" client_x509_cert_url: "{{ env_var('BIGQUERY_SERVICE_CLIENT_X509_CERT_URL') }}" - snowflake: type: snowflake account: "{{ env_var('SNOWFLAKE_TEST_ACCOUNT') }}" @@ -58,7 +57,6 @@ integration_tests: warehouse: "{{ env_var('SNOWFLAKE_TEST_WAREHOUSE') }}" schema: "gh_sp_utils_dbt_{{ env_var('SCHEMA_SUFFIX') }}" threads: 4 - databricks: type: databricks schema: "gh_sp_utils_dbt_{{ env_var('SCHEMA_SUFFIX') }}" @@ -66,12 +64,13 @@ integration_tests: http_path: "{{ env_var('DATABRICKS_TEST_HTTP_PATH') }}" token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}" threads: 4 - - spark: + spark_iceberg: type: spark + method: thrift + host: "{{ env_var('SPARK_MASTER_HOST', 'localhost') }}" + port: 10000 + user: "{{ env_var('SPARK_USER', 'spark') }}" schema: "gh_sp_utils_dbt_{{ env_var('SCHEMA_SUFFIX') }}" - host: "{{ env_var('DATABRICKS_TEST_HOST') }}" - http_path: "{{ env_var('DATABRICKS_TEST_HTTP_PATH') }}" - token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}" - endpoint: "{{ env_var('DATABRICKS_TEST_ENDPOINT') }}" - threads: 4 + connect_retries: 5 + connect_timeout: 60 + threads: 1 \ No newline at end of file diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 821e8dca..7b57f43d 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -41,6 +41,7 @@ vars: snowplow__session_identifiers: [{"schema": "atomic", "field" : "domain_sessionid"}] snowplow__bigquery_session_identifiers: [{"schema": "contexts_com_snowplowanalytics_session_identifier_2_0_0", "field": "session_identifier"}, {"schema": "contexts_com_snowplowanalytics_session_identifier_1_0_0", "field" : "session_id"}] snowplow__databricks_session_identifiers: [{"schema": "contexts_com_snowplowanalytics_session_identifier_2", "field": "session_identifier"}, {"schema": "contexts_com_snowplowanalytics_session_identifier_1", "field" : "session_id"}] + snowplow__spark_session_identifiers: [{"schema": "contexts_com_snowplowanalytics_session_identifier_2", "field": "session_identifier"}, {"schema": "contexts_com_snowplowanalytics_session_identifier_1", "field" : "session_id"}] snowplow__postgres_session_identifiers: [{"schema": "contexts_com_snowplowanalytics_session_identifier_2_0_0", "field": "session_identifier", "prefix": "si_t", "alias": "sito"}, {"schema": "contexts_com_snowplowanalytics_session_identifier_1_0_0", "field" : "session_id", "prefix" : "si_o", "alias": "sido"}] snowplow__snowflake_session_identifiers: [{"schema": "contexts_com_snowplowanalytics_session_identifier_2", "field": "sessionIdentifier"}, {"schema": "contexts_com_snowplowanalytics_session_identifier_1", "field" : "sessionId"}] snowplow__custom_session_sql: 'CAST(DATE(collector_tstamp) as {{ dbt.type_string() }})' @@ -48,6 +49,7 @@ vars: snowplow__user_identifiers: [{"schema": "atomic", "field" : "domain_userid"}] snowplow__bigquery_user_identifiers: [{"schema": "contexts_com_snowplowanalytics_user_identifier_2_0_0", "field" : "user_id"}, {"schema": "contexts_com_snowplowanalytics_user_identifier_1_0_0", "field" : "user_id"}] snowplow__databricks_user_identifiers: [{"schema": "contexts_com_snowplowanalytics_user_identifier_2", "field" : "user_id"}, {"schema": "contexts_com_snowplowanalytics_user_identifier_1", "field" : "user_id"}] + snowplow__spark_user_identifiers: [{"schema": "contexts_com_snowplowanalytics_user_identifier_2", "field" : "user_id"}, {"schema": "contexts_com_snowplowanalytics_user_identifier_1", "field" : "user_id"}] snowplow__postgres_user_identifiers: [{"schema": "contexts_com_snowplowanalytics_user_identifier_2_0_0", "field" : "user_id", "prefix" : "ui_t", "alias": "uidt"}, {"schema": "contexts_com_snowplowanalytics_user_identifier_1_0_0", "field" : "user_id", "prefix": "ui_o", "alias": "uido"}] snowplow__snowflake_user_identifiers: [{"schema": "contexts_com_snowplowanalytics_user_identifier_2", "field" : "userId"}, {"schema": "contexts_com_snowplowanalytics_user_identifier_1", "field" : "userId"}] snowplow__quarantined_sessions: 'snowplow_base_quarantined_sessions_actual' @@ -57,6 +59,7 @@ vars: snowplow__custom_entities_or_sdes: [{"schema" : "contexts_com_snowplowanalytics_custom_entity_1_0_0", "prefix": "custom", "single_entity": true}] snowplow__bigquery_custom_sql: 'cast(contexts_com_snowplowanalytics_custom_entity_1_0_0[safe_offset(0)].contents as STRING) as custom_contents' snowplow__databricks_custom_sql: 'contexts_com_snowplowanalytics_custom_entity_1[0].contents as custom_contents' + snowplow__spark_custom_sql: 'contexts_com_snowplowanalytics_custom_entity_1[0].contents as custom_contents' snowplow__snowflake_custom_sql: 'contexts_com_snowplowanalytics_custom_entity_1[0].contents::TEXT as custom_contents' snowplow__derived_tstamp_partitioned: true snowplow__days_late_allowed: 3 @@ -73,10 +76,13 @@ vars: models: snowplow_utils_integration_tests: +schema: "snplw_utils_int_tests" + +incremental_strategy: "{{ none if target.type not in ['spark'] else 'merge' }}" + +file_format: "{{ 'delta' if target.type not in ['spark'] else 'iceberg'}}" materializations: snowflake_delete_insert: enabled: "{{ target.type == 'snowflake' | as_bool() }}" utils: + +materialized: "{{ 'table' if target.type in ['spark'] else 'view' }}" bigquery: enabled: "{{ target.type == 'bigquery' | as_bool() }}" cross_db: @@ -93,6 +99,11 @@ models: data_get_string_agg_grp: +materialized: table + incremental_hooks: + +materialized: "{{ 'table' if target.type in ['spark'] else 'view' }}" + + + base: +bind: false +materialized: table @@ -100,11 +111,13 @@ models: bigquery: +enabled: "{{ target.type == 'bigquery' | as_bool() }}" databricks: - +enabled: "{{ target.type in ['databricks', 'spark'] | as_bool() }}" + +enabled: "{{ target.type == 'databricks' | as_bool() }}" default: +enabled: "{{ target.type in ['redshift', 'postgres'] | as_bool() }}" snowflake: +enabled: "{{ target.type == 'snowflake' | as_bool() }}" + spark: + +enabled: "{{ target.type == 'spark' | as_bool() }}" tests: @@ -180,10 +193,12 @@ seeds: data_incremental_expected: +column_types: id: integer + id2: integer start_tstamp: timestamp data_incremental_w_lookback_disabled_expected: +column_types: id: integer + id2: integer start_tstamp: timestamp utils: diff --git a/integration_tests/models/base/actual/snowplow_base_events_this_run_actual.sql b/integration_tests/models/base/actual/snowplow_base_events_this_run_actual.sql index 9af989fe..afbc1154 100644 --- a/integration_tests/models/base/actual/snowplow_base_events_this_run_actual.sql +++ b/integration_tests/models/base/actual/snowplow_base_events_this_run_actual.sql @@ -20,9 +20,18 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% set snowplow_session_sql = '' %} {% if var('snowplow__custom_test', false) %} - {% set snowplow_session_identifiers = snowplow_utils.get_value_by_target_type(bigquery_val=var("snowplow__bigquery_session_identifiers"), snowflake_val=var("snowplow__snowflake_session_identifiers"), databricks_val=var("snowplow__databricks_session_identifiers"), postgres_val=var("snowplow__postgres_session_identifiers"), redshift_val=var("snowplow__postgres_session_identifiers"))%} + {% set snowplow_session_identifiers = snowplow_utils.get_value_by_target_type(bigquery_val=var("snowplow__bigquery_session_identifiers"), + snowflake_val=var("snowplow__snowflake_session_identifiers"), + databricks_val=var("snowplow__databricks_session_identifiers"), + spark_val=var("snowplow__spark_session_identifiers"), + postgres_val=var("snowplow__postgres_session_identifiers"), + redshift_val=var("snowplow__postgres_session_identifiers"))%} {% set snowplow_entities_or_sdes = var("snowplow__custom_entities_or_sdes") %} - {% set snowplow_custom_sql = snowplow_utils.get_value_by_target_type(bigquery_val=var("snowplow__bigquery_custom_sql"), snowflake_val=var("snowplow__snowflake_custom_sql"), databricks_val=var("snowplow__databricks_custom_sql"))%} + {% set snowplow_custom_sql = snowplow_utils.get_value_by_target_type(bigquery_val=var("snowplow__bigquery_custom_sql"), + snowflake_val=var("snowplow__snowflake_custom_sql"), + databricks_val=var("snowplow__databricks_custom_sql"), + spark_val=var("snowplow__spark_custom_sql") + )%} {% elif var('snowplow__session_test', false) %} {% set snowplow_session_sql = var("snowplow__custom_session_sql") %} {% endif %} diff --git a/integration_tests/models/base/actual/snowplow_base_sessions_lifecycle_manifest_actual.sql b/integration_tests/models/base/actual/snowplow_base_sessions_lifecycle_manifest_actual.sql index 9fe05b2a..ed2a4250 100644 --- a/integration_tests/models/base/actual/snowplow_base_sessions_lifecycle_manifest_actual.sql +++ b/integration_tests/models/base/actual/snowplow_base_sessions_lifecycle_manifest_actual.sql @@ -15,8 +15,20 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% set snowplow_session_sql = '' %} {% if var('snowplow__custom_test', false) %} - {% set snowplow_session_identifiers = snowplow_utils.get_value_by_target_type(bigquery_val=var("snowplow__bigquery_session_identifiers"), snowflake_val=var("snowplow__snowflake_session_identifiers"), databricks_val=var("snowplow__databricks_session_identifiers"), postgres_val=var("snowplow__postgres_session_identifiers"), redshift_val=var("snowplow__postgres_session_identifiers"))%} - {% set snowplow_user_identifiers = snowplow_utils.get_value_by_target_type(bigquery_val=var("snowplow__bigquery_user_identifiers"), snowflake_val=var("snowplow__snowflake_user_identifiers"), databricks_val=var("snowplow__databricks_user_identifiers"), postgres_val=var("snowplow__postgres_user_identifiers"), redshift_val=var("snowplow__postgres_user_identifiers"))%} + {% set snowplow_session_identifiers = snowplow_utils.get_value_by_target_type( + bigquery_val=var("snowplow__bigquery_session_identifiers"), + snowflake_val=var("snowplow__snowflake_session_identifiers"), + databricks_val=var("snowplow__databricks_session_identifiers"), + spark_val=var("snowplow__spark_session_identifiers"), + postgres_val=var("snowplow__postgres_session_identifiers"), + redshift_val=var("snowplow__postgres_session_identifiers"))%} + {% set snowplow_user_identifiers = snowplow_utils.get_value_by_target_type( + bigquery_val=var("snowplow__bigquery_user_identifiers"), + snowflake_val=var("snowplow__snowflake_user_identifiers"), + databricks_val=var("snowplow__databricks_user_identifiers"), + spark_val=var("snowplow__spark_user_identifiers"), + postgres_val=var("snowplow__postgres_user_identifiers"), + redshift_val=var("snowplow__postgres_user_identifiers"))%} {% elif var('snowplow__session_test', false) %} {% set snowplow_session_sql = var("snowplow__custom_session_sql") %} {% endif %} diff --git a/integration_tests/models/base/source/spark/snowplow_events_stg.sql b/integration_tests/models/base/source/spark/snowplow_events_stg.sql new file mode 100644 index 00000000..bb5d1839 --- /dev/null +++ b/integration_tests/models/base/source/spark/snowplow_events_stg.sql @@ -0,0 +1,346 @@ +{# +Copyright (c) 2021-present Snowplow Analytics Ltd. All rights reserved. +This program is licensed to you under the Snowplow Personal and Academic License Version 1.0, +and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0. +You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/ +#} +{{ + config( + tags=['base_macro'] + ) +}} + +-- page view context is given as json string in csv. Parse json +with prep as ( + select + app_id, + platform, + etl_tstamp, + collector_tstamp, + dvce_created_tstamp, + event, + event_id, + txn_id, + name_tracker, + v_tracker, + v_collector, + v_etl, + user_id, + user_ipaddress, + user_fingerprint, + domain_userid, + domain_sessionidx, + network_userid, + geo_country, + geo_region, + geo_city, + geo_zipcode, + geo_latitude, + geo_longitude, + geo_region_name, + ip_isp, + ip_organization, + ip_domain, + ip_netspeed, + page_url, + page_title, + page_referrer, + page_urlscheme, + page_urlhost, + page_urlport, + page_urlpath, + page_urlquery, + page_urlfragment, + refr_urlscheme, + refr_urlhost, + refr_urlport, + refr_urlpath, + refr_urlquery, + refr_urlfragment, + refr_medium, + refr_source, + refr_term, + mkt_medium, + mkt_source, + mkt_term, + mkt_content, + mkt_campaign, + se_category, + se_action, + se_label, + se_property, + se_value, + tr_orderid, + tr_affiliation, + tr_total, + tr_tax, + tr_shipping, + tr_city, + tr_state, + tr_country, + ti_orderid, + ti_sku, + ti_name, + ti_category, + ti_price, + ti_quantity, + pp_xoffset_min, + pp_xoffset_max, + pp_yoffset_min, + pp_yoffset_max, + useragent, + br_name, + br_family, + br_version, + br_type, + br_renderengine, + br_lang, + br_features_pdf, + br_features_flash, + br_features_java, + br_features_director, + br_features_quicktime, + br_features_realplayer, + br_features_windowsmedia, + br_features_gears, + br_features_silverlight, + br_cookies, + br_colordepth, + br_viewwidth, + br_viewheight, + os_name, + os_family, + os_manufacturer, + os_timezone, + dvce_type, + dvce_ismobile, + dvce_screenwidth, + dvce_screenheight, + doc_charset, + doc_width, + doc_height, + tr_currency, + tr_total_base, + tr_tax_base, + tr_shipping_base, + ti_currency, + ti_price_base, + base_currency, + geo_timezone, + mkt_clickid, + mkt_network, + etl_tags, + dvce_sent_tstamp, + refr_domain_userid, + refr_dvce_tstamp, + domain_sessionid, + derived_tstamp, + event_vendor, + event_name, + event_format, + event_version, + event_fingerprint, + true_tstamp, + load_tstamp, + from_json(contexts_com_snowplowanalytics_snowplow_web_page_1_0_0, 'array>') as contexts_com_snowplowanalytics_snowplow_web_page_1, + from_json(unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1_0_0, 'array, consent_url:string, consent_version:string, domains_applied:array, event_type:string, gdpr_applies:string>>') as unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1, + from_json(unstruct_event_com_snowplowanalytics_snowplow_cmp_visible_1_0_0, 'array>') as unstruct_event_com_snowplowanalytics_snowplow_cmp_visible_1, + from_json(contexts_com_iab_snowplow_spiders_and_robots_1_0_0, 'array>') as contexts_com_iab_snowplow_spiders_and_robots_1, + from_json(contexts_nl_basjes_yauaa_context_1_0_0, 'array>') as contexts_nl_basjes_yauaa_context_1, + from_json(contexts_com_snowplowanalytics_user_identifier_1_0_0, 'array>') as contexts_com_snowplowanalytics_user_identifier_1, + from_json(contexts_com_snowplowanalytics_user_identifier_2_0_0, 'array>') as contexts_com_snowplowanalytics_user_identifier_2, + from_json(contexts_com_snowplowanalytics_session_identifier_1_0_0, 'array>') as contexts_com_snowplowanalytics_session_identifier_1, + from_json(contexts_com_snowplowanalytics_session_identifier_2_0_0, 'array>') as contexts_com_snowplowanalytics_session_identifier_2, + from_json(contexts_com_snowplowanalytics_custom_entity_1_0_0, 'array>') as contexts_com_snowplowanalytics_custom_entity_1 + from + {{ ref('snowplow_events') }} +) + +select + app_id, + platform, + etl_tstamp, + collector_tstamp, + dvce_created_tstamp, + event, + event_id, + txn_id, + name_tracker, + v_tracker, + v_collector, + v_etl, + user_id, + user_ipaddress, + user_fingerprint, + domain_userid, + domain_sessionidx, + network_userid, + geo_country, + geo_region, + geo_city, + geo_zipcode, + geo_latitude, + geo_longitude, + geo_region_name, + ip_isp, + ip_organization, + ip_domain, + ip_netspeed, + page_url, + page_title, + page_referrer, + page_urlscheme, + page_urlhost, + page_urlport, + page_urlpath, + page_urlquery, + page_urlfragment, + refr_urlscheme, + refr_urlhost, + refr_urlport, + refr_urlpath, + refr_urlquery, + refr_urlfragment, + refr_medium, + refr_source, + refr_term, + mkt_medium, + mkt_source, + mkt_term, + mkt_content, + mkt_campaign, + se_category, + se_action, + se_label, + se_property, + se_value, + tr_orderid, + tr_affiliation, + tr_total, + tr_tax, + tr_shipping, + tr_city, + tr_state, + tr_country, + ti_orderid, + ti_sku, + ti_name, + ti_category, + ti_price, + ti_quantity, + pp_xoffset_min, + pp_xoffset_max, + pp_yoffset_min, + pp_yoffset_max, + useragent, + br_name, + br_family, + br_version, + br_type, + br_renderengine, + br_lang, + br_features_pdf, + br_features_flash, + br_features_java, + br_features_director, + br_features_quicktime, + br_features_realplayer, + br_features_windowsmedia, + br_features_gears, + br_features_silverlight, + br_cookies, + br_colordepth, + br_viewwidth, + br_viewheight, + os_name, + os_family, + os_manufacturer, + os_timezone, + dvce_type, + dvce_ismobile, + dvce_screenwidth, + dvce_screenheight, + doc_charset, + doc_width, + doc_height, + tr_currency, + tr_total_base, + tr_tax_base, + tr_shipping_base, + ti_currency, + ti_price_base, + base_currency, + geo_timezone, + mkt_clickid, + mkt_network, + etl_tags, + dvce_sent_tstamp, + refr_domain_userid, + refr_dvce_tstamp, + domain_sessionid, + derived_tstamp, + event_vendor, + event_name, + event_format, + event_version, + event_fingerprint, + true_tstamp, + load_tstamp, + contexts_com_snowplowanalytics_snowplow_web_page_1, + struct( + unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1[0].basis_for_processing AS basis_for_processing, + unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1[0].consent_scopes AS consent_scopes, + unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1[0].consent_url AS consent_url, + unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1[0].consent_version AS consent_version, + unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1[0].domains_applied AS domains_applied, + unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1[0].event_type AS event_type, + CAST(unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1[0].gdpr_applies AS BOOLEAN) AS gdpr_applies + ) AS unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1, + struct(CAST(unstruct_event_com_snowplowanalytics_snowplow_cmp_visible_1[0].elapsed_time AS FLOAT) as elapsed_time) as unstruct_event_com_snowplowanalytics_snowplow_cmp_visible_1, + array(struct(contexts_com_iab_snowplow_spiders_and_robots_1[0].category as category, + contexts_com_iab_snowplow_spiders_and_robots_1[0].primaryImpact as primary_impact, + contexts_com_iab_snowplow_spiders_and_robots_1[0].reason as reason, + contexts_com_iab_snowplow_spiders_and_robots_1[0].spiderOrRobot as spider_or_robot)) as contexts_com_iab_snowplow_spiders_and_robots_1, + array(struct(contexts_nl_basjes_yauaa_context_1[0].agentClass as agent_class, + contexts_nl_basjes_yauaa_context_1[0].agentInformationEmail as agent_information_email, + contexts_nl_basjes_yauaa_context_1[0].agentName as agent_name, + contexts_nl_basjes_yauaa_context_1[0].agentNameVersion as agent_name_version, + contexts_nl_basjes_yauaa_context_1[0].agentNameVersionMajor as agent_name_version_major, + contexts_nl_basjes_yauaa_context_1[0].agentVersion as agent_version, + contexts_nl_basjes_yauaa_context_1[0].agentVersionMajor as agent_version_major, + contexts_nl_basjes_yauaa_context_1[0].deviceBrand as device_brand, + contexts_nl_basjes_yauaa_context_1[0].deviceClass as device_class, + contexts_nl_basjes_yauaa_context_1[0].deviceCpu as device_cpu, + contexts_nl_basjes_yauaa_context_1[0].deviceCpuBits as device_cpu_bits, + contexts_nl_basjes_yauaa_context_1[0].deviceName as device_name, + contexts_nl_basjes_yauaa_context_1[0].deviceVersion as device_version, + contexts_nl_basjes_yauaa_context_1[0].layoutEngineClass as layout_engine_class, + contexts_nl_basjes_yauaa_context_1[0].layoutEngineName as layout_engine_name, + contexts_nl_basjes_yauaa_context_1[0].layoutEngineNameVersion as layout_engine_name_version, + contexts_nl_basjes_yauaa_context_1[0].layoutEngineNameVersionMajor as layout_engine_name_version_major, + contexts_nl_basjes_yauaa_context_1[0].layoutEngineVersion as layout_engine_version, + contexts_nl_basjes_yauaa_context_1[0].layoutEngineVersionMajor as layout_engine_version_major, + contexts_nl_basjes_yauaa_context_1[0].networkType as network_type, + contexts_nl_basjes_yauaa_context_1[0].operatingSystemClass as operating_system_class, + contexts_nl_basjes_yauaa_context_1[0].operatingSystemName as operating_system_name, + contexts_nl_basjes_yauaa_context_1[0].operatingSystemNameVersion as operating_system_name_version, + contexts_nl_basjes_yauaa_context_1[0].operatingSystemNameVersionMajor as operating_system_name_version_major, + contexts_nl_basjes_yauaa_context_1[0].operatingSystemVersion as operating_system_version, + contexts_nl_basjes_yauaa_context_1[0].operatingSystemVersionBuild as operating_system_version_build, + contexts_nl_basjes_yauaa_context_1[0].operatingSystemVersionMajor as operating_system_version_major, + contexts_nl_basjes_yauaa_context_1[0].webviewAppName as webview_app_name, + contexts_nl_basjes_yauaa_context_1[0].webviewAppNameVersionMajor as webview_app_name_version_major, + contexts_nl_basjes_yauaa_context_1[0].webviewAppVersion as webview_app_version, + contexts_nl_basjes_yauaa_context_1[0].webviewAppVersionMajor as webview_app_version_major)) as contexts_nl_basjes_yauaa_context_1, + array(struct(contexts_com_snowplowanalytics_user_identifier_1[0].user_id as user_id)) as contexts_com_snowplowanalytics_user_identifier_1, + array(struct(contexts_com_snowplowanalytics_user_identifier_2[0].user_id as user_id)) as contexts_com_snowplowanalytics_user_identifier_2, + array(struct(contexts_com_snowplowanalytics_session_identifier_1[0].session_id as session_id)) as contexts_com_snowplowanalytics_session_identifier_1, + array(struct(contexts_com_snowplowanalytics_session_identifier_2[0].session_identifier as session_identifier)) as contexts_com_snowplowanalytics_session_identifier_2, + + {% if var("snowplow__custom_test", false) %} + array(struct(contexts_com_snowplowanalytics_custom_entity_1[0].contents as contents)) AS contexts_com_snowplowanalytics_custom_entity_1 + {% else %} + cast(null as array>) as contexts_com_snowplowanalytics_custom_entity_1 + {% endif %} +from + prep \ No newline at end of file diff --git a/integration_tests/models/materializations/default_strategy/test_incremental.sql b/integration_tests/models/materializations/default_strategy/test_incremental.sql index 9b08b3dc..a3372fa1 100644 --- a/integration_tests/models/materializations/default_strategy/test_incremental.sql +++ b/integration_tests/models/materializations/default_strategy/test_incremental.sql @@ -9,22 +9,18 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 partition_by: BQ only. Key used to limit table scan TODO: Add tests that change the granularity of the partition #} -{{ - config( +{{ config( materialized='incremental', - unique_key=['id', 'id2'], + unique_key=['id','id2'], upsert_date_key='start_tstamp', - partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "start_tstamp", - "data_type": "timestamp" - }), tags=["requires_script"], - snowplow_optimize=true - ) -}} + snowplow_optimize=true, +) }} + with data as ( - select * from {{ ref('data_incremental') }} + select * + from {{ ref('data_incremental') }} {% if target.type == 'snowflake' %} -- data set intentionally contains dupes. -- Snowflake merge will error if dupes occur. Removing for test diff --git a/integration_tests/models/materializations/default_strategy/test_incremental_w_lookback_disabled.sql b/integration_tests/models/materializations/default_strategy/test_incremental_w_lookback_disabled.sql index c64a50b1..cb52a815 100644 --- a/integration_tests/models/materializations/default_strategy/test_incremental_w_lookback_disabled.sql +++ b/integration_tests/models/materializations/default_strategy/test_incremental_w_lookback_disabled.sql @@ -24,9 +24,10 @@ incremental materialization with lookback disabled. ) }} -with data as ( + +WITH data as ( select * from {{ ref('data_incremental') }} - {% if target.type == 'snowflake' %} + {% if target.type in ['snowflake'] %} -- data set intentionally contains dupes. -- Snowflake merge will error if dupes occur. Removing for test where not (run = 1 and id = 2 and start_tstamp = '2021-03-03 00:00:00') diff --git a/integration_tests/models/utils/test_snowplow_delete_from_manifest.sql b/integration_tests/models/utils/test_snowplow_delete_from_manifest.sql index a1f4d568..e56f02f8 100644 --- a/integration_tests/models/utils/test_snowplow_delete_from_manifest.sql +++ b/integration_tests/models/utils/test_snowplow_delete_from_manifest.sql @@ -14,6 +14,5 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 -- Note: Test covers functionality however when running the macro on-run-start hook, transaction behaviour changes. -- Wrapped delete statement in transation so it commits. BQ wouldnt just support 'commit;' without opening trans. Snowflake behaviour untested. -select * - -from {{ ref('data_snowplow_delete_from_manifest_staging') }} +SELECT * +FROM {{ ref('data_snowplow_delete_from_manifest_staging') }} diff --git a/macros/base/base_create_snowplow_events_this_run.sql b/macros/base/base_create_snowplow_events_this_run.sql index 73a51485..6e945abf 100644 --- a/macros/base/base_create_snowplow_events_this_run.sql +++ b/macros/base/base_create_snowplow_events_this_run.sql @@ -255,3 +255,71 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {{ return(events_this_run_query) }} {% endmacro %} + + +{% macro spark__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers, session_sql, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, entities_or_sdes, custom_sql, allow_null_dvce_tstamps) %} + {%- set lower_limit, upper_limit = snowplow_utils.return_limits_from_model(ref(sessions_this_run_table), + 'start_tstamp', + 'end_tstamp') %} + {% set sessions_this_run = ref(sessions_this_run_table) %} + {% set snowplow_events = api.Relation.create(database=snowplow_events_database, schema=snowplow_events_schema, identifier=snowplow_events_table) %} + + {% set events_this_run_query %} + with identified_events AS ( + select + {% if session_sql %} + {{ session_sql }} as session_identifier, + {% else -%} + COALESCE( + {% for identifier in session_identifiers %} + {%- if identifier['schema']|lower != 'atomic' -%} + {{ snowplow_utils.get_field(identifier['schema'], identifier['field'], 'e', dbt.type_string(), 0, snowplow_events) }} + {%- else -%} + e.{{identifier['field']}} + {%- endif -%} + , + {%- endfor -%} + NULL + ) as session_identifier, + {%- endif %} + e.*, + row_number() over (partition by event_id order by {{ session_timestamp }}, dvce_created_tstamp) as event_id_dedupe_index + + from {{ snowplow_events }} e + + ) + + select + a.* + ,b.user_identifier -- take user_identifier from manifest. This ensures only 1 domain_userid per session. + {% if custom_sql %} + , {{ custom_sql }} + {% endif %} + + from identified_events as a + inner join {{ sessions_this_run }} as b + on a.session_identifier = b.session_identifier + + where a.{{ session_timestamp }} <= {{ snowplow_utils.timestamp_add('day', max_session_days, 'b.start_tstamp') }} + {% if allow_null_dvce_tstamps %} + and coalesce(a.dvce_sent_tstamp, a.collector_tstamp) <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'coalesce(a.dvce_created_tstamp, a.collector_tstamp)') }} + {% else %} + and a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'a.dvce_created_tstamp') }} + {% endif %} + and a.{{ session_timestamp }} >= {{ lower_limit }} + and a.{{ session_timestamp }} <= {{ upper_limit }} + and a.{{ session_timestamp }} >= b.start_tstamp -- deal with late loading events + + {% if derived_tstamp_partitioned and target.type == 'bigquery' | as_bool() %} + and a.derived_tstamp >= {{ snowplow_utils.timestamp_add('hour', -1, lower_limit) }} + and a.derived_tstamp <= {{ upper_limit }} + {% endif %} + + and {{ snowplow_utils.app_id_filter(app_ids) }} + + and a.event_id_dedupe_index = 1 + {% endset %} + + {{ return(events_this_run_query) }} + +{% endmacro %} \ No newline at end of file diff --git a/macros/utils/cross_db/get_field.sql b/macros/utils/cross_db/get_field.sql index 3a2dc8df..94d6d487 100644 --- a/macros/utils/cross_db/get_field.sql +++ b/macros/utils/cross_db/get_field.sql @@ -29,7 +29,13 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% if '*' in column_name %} {% do exceptions.raise_compiler_error('Wildcard schema versions are only supported for Bigquery, they are not supported for ' ~ target.type) %} {% else %} - {%- if table_alias -%}{{table_alias}}.{%- endif -%}{{column_name}}{%- if array_index is not none -%}[{{array_index}}]{%- endif -%}.{{field_name}}{%- if type -%}::{{type}}{%- endif -%} + {%- if type is none -%} + {%- if table_alias -%}{{table_alias}}.{%- endif -%}{{column_name}}{%- if array_index is not none -%}[{{array_index}}]{%- endif -%}.{{field_name}} + {%- else -%} + CAST( + {%- if table_alias -%}{{table_alias}}.{%- endif -%}{{column_name}}{%- if array_index is not none -%}[{{array_index}}]{%- endif -%}.{{field_name}} AS {{type}} + ) + {%- endif -%} {% endif %} {% endmacro %} diff --git a/macros/utils/cross_db/get_string_agg.sql b/macros/utils/cross_db/get_string_agg.sql index 90384af6..6d3ec71c 100644 --- a/macros/utils/cross_db/get_string_agg.sql +++ b/macros/utils/cross_db/get_string_agg.sql @@ -93,6 +93,36 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% endmacro %} {% macro spark__get_string_agg(base_column, column_prefix, separator=',', order_by_column=base_column, sort_numeric=false, order_by_column_prefix=column_prefix, is_distinct=false, order_desc = false) %} + /* Explaining inside out: + 1. Create a group array which is made of sub-arrays of the base_column and the sort column + 2. Sort these sub-arrays based on a lambda function that compares on the second element (the sort column, casted if needed) + 3. Use transform to select just the first element of the array + 4. Optionally use array_distinct + 5. Join the array into a string + */ + array_join( + {% if is_distinct %} array_distinct( {% endif %} + transform( + array_sort( + FILTER(collect_list( + ARRAY(cast({{column_prefix}}.{{base_column}} as string), cast({{order_by_column_prefix}}.{{order_by_column}} as string))), x -> x[0] is not null), (left, right) -> + + {%- if sort_numeric -%} + CASE WHEN cast(left[1] as decimal(38, 9)) {% if order_desc %} > {% else %} < {% endif %} cast(right[1] as decimal(38, 9)) THEN -1 + WHEN cast(left[1] as decimal(38, 9)) {% if order_desc %} < {% else %} > {% endif %} cast(right[1] as decimal(38, 9)) THEN 1 ELSE 0 END + + {% else %} + CASE WHEN left[1] {% if order_desc %} > {% else %} < {% endif %} right[1] THEN -1 + WHEN left[1] {% if order_desc %} < {% else %} > {% endif %} right[1] THEN 1 ELSE 0 END + + {% endif %} + ), x -> x[0]) + {% if is_distinct %} ) {% endif %}, + '{{separator}}') +{% endmacro %} + + +{% macro databricks__get_string_agg(base_column, column_prefix, separator=',', order_by_column=base_column, sort_numeric=false, order_by_column_prefix=column_prefix, is_distinct=false, order_desc = false) %} /* Explaining inside out: 1. Create a group array which is made of sub-arrays of the base_column and the sort column 2. Sort these sub-arrays based on a lamdba function that compares on the second element (the sort column, casted if needed) diff --git a/macros/utils/cross_db/timestamp_functions.sql b/macros/utils/cross_db/timestamp_functions.sql index e4ef4f03..feb0bdb0 100644 --- a/macros/utils/cross_db/timestamp_functions.sql +++ b/macros/utils/cross_db/timestamp_functions.sql @@ -14,16 +14,36 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {{ return(adapter.dispatch('timestamp_diff', 'snowplow_utils')(first_tstamp, second_tstamp, datepart)) }} {% endmacro %} - {% macro default__timestamp_diff(first_tstamp, second_tstamp, datepart) %} {{ return(datediff(first_tstamp, second_tstamp, datepart)) }} {% endmacro %} - {% macro bigquery__timestamp_diff(first_tstamp, second_tstamp, datepart) %} timestamp_diff({{second_tstamp}}, {{first_tstamp}}, {{datepart}}) {% endmacro %} +{% macro databricks__timestamp_diff(first_tstamp, second_tstamp, datepart) %} + {{ return(datediff(first_tstamp, second_tstamp, datepart)) }} +{% endmacro %} + +{% macro spark__timestamp_diff(first_tstamp, second_tstamp, datepart) %} + {% if datepart|lower == 'week' %} + cast((unix_timestamp(cast({{second_tstamp}} as timestamp)) - unix_timestamp(cast({{first_tstamp}} as timestamp))) / (3600 * 24 * 7) as bigint) + {% elif datepart|lower == 'day' %} + cast((unix_timestamp(cast({{second_tstamp}} as timestamp)) - unix_timestamp(cast({{first_tstamp}} as timestamp))) / (3600 * 24) as bigint) + {% elif datepart|lower == 'hour' %} + cast((unix_timestamp(cast({{second_tstamp}} as timestamp)) - unix_timestamp(cast({{first_tstamp}} as timestamp))) / 3600 as bigint) + {% elif datepart|lower == 'minute' %} + cast((unix_timestamp(cast({{second_tstamp}} as timestamp)) - unix_timestamp(cast({{first_tstamp}} as timestamp))) / 60 as bigint) + {% elif datepart|lower == 'second' %} + cast(unix_timestamp(cast({{second_tstamp}} as timestamp)) - unix_timestamp(cast({{first_tstamp}} as timestamp)) as bigint) + {% elif datepart|lower == 'millisecond' %} + cast((unix_timestamp(cast({{second_tstamp}} as timestamp)) - unix_timestamp(cast({{first_tstamp}} as timestamp))) * 1000 as bigint) + {% else %} + {{ exceptions.raise_compiler_error("Unsupported datepart for Spark: " ~ datepart) }} + {% endif %} +{% endmacro %} + {% macro timestamp_add(datepart, interval, tstamp) %} {{ return(adapter.dispatch('timestamp_add', 'snowplow_utils')(datepart, interval, tstamp)) }} @@ -44,6 +64,23 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 timestampadd({{datepart}}, {{interval}}, {{tstamp}}) {% endmacro %} +{% macro spark__timestamp_add(datepart, interval, tstamp) %} + {% if datepart|lower == 'week' %} + timestamp_millis(cast(cast(unix_millis({{tstamp}}) as bigint) + (cast({{interval}} as bigint) * cast(3600 as bigint) * cast(24 as bigint) * cast(7 as bigint) * cast(1000 as bigint)) as bigint)) + {% elif datepart|lower == 'day' %} + timestamp_millis(cast(cast(unix_millis({{tstamp}}) as bigint) + (cast({{interval}} as bigint) * cast(3600 as bigint) * cast(24 as bigint) * cast(1000 as bigint)) as bigint)) + {% elif datepart|lower == 'hour' %} + timestamp_millis(cast(cast(unix_millis({{tstamp}}) as bigint) + (cast({{interval}} as bigint) * cast(3600 as bigint) * cast(1000 as bigint)) as bigint)) + {% elif datepart|lower == 'minute' %} + timestamp_millis(cast(cast(unix_millis({{tstamp}}) as bigint) + (cast({{interval}} as bigint) * cast(60 as bigint) * cast(1000 as bigint)) as bigint)) + {% elif datepart|lower == 'second' %} + timestamp_millis(cast(cast(unix_millis({{tstamp}}) as bigint) + cast({{interval}} as bigint) * cast(1000 as bigint) as bigint)) + {% elif datepart|lower == 'millisecond' %} + timestamp_millis(cast(cast(unix_millis({{tstamp}}) as bigint) + cast({{interval}} as bigint) as bigint)) + {% else %} + {{ exceptions.raise_compiler_error("Unsupported datepart for Spark: " ~ datepart) }} + {% endif %} +{% endmacro %} {% macro cast_to_tstamp(tstamp_literal) -%} {% if tstamp_literal is none or tstamp_literal|lower in ['null',''] %} diff --git a/macros/utils/get_schemas_by_pattern.sql b/macros/utils/get_schemas_by_pattern.sql index 17ffb78d..2becf0ec 100644 --- a/macros/utils/get_schemas_by_pattern.sql +++ b/macros/utils/get_schemas_by_pattern.sql @@ -4,7 +4,7 @@ This program is licensed to you under the Snowplow Personal and Academic License and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0. You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/ #} -{% macro get_schemas_by_pattern(schema_pattern) %} +{% macro get_schemas_by_pattern(schema_pattern=target.schema) %} {{ return(adapter.dispatch('get_schemas_by_pattern', 'snowplow_utils') (schema_pattern)) }} {% endmacro %} @@ -19,16 +19,20 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% endmacro %} {% macro spark__get_schemas_by_pattern(schema_pattern) %} - {# databricks/spark uses a regex on SHOW SCHEMAS and doesn't have an information schema in hive_metastore #} - {%- set schema_pattern= dbt.replace(schema_pattern, "%", "*") -%} + {# + Databricks/Spark uses a regex on SHOW SCHEMAS and doesn't have an information schema in hive_metastore. + Replace '%' with '*' for Spark's pattern matching. + #} + {%- set adjusted_schema_pattern = schema_pattern | replace("%", "*") -%} - {# Get all schemas with the target.schema prefix #} - {%- set get_schemas_sql -%} - SHOW SCHEMAS LIKE '{{schema_pattern}}'; - {%- endset -%} + {# Construct the SHOW SCHEMAS LIKE query #} + {%- set get_schemas_sql = "SHOW SCHEMAS LIKE '" ~ adjusted_schema_pattern ~ "'" -%} + {# Execute the query and fetch results #} {% set results = run_query(get_schemas_sql) %} - {% set schemas = results|map(attribute='databaseName')|unique|list %} + + {# Extract schema names from the results #} + {% set schemas = results.columns[0].values() | unique | list %} {{ return(schemas) }} diff --git a/macros/utils/get_value_by_target_type.sql b/macros/utils/get_value_by_target_type.sql index 4c301315..6d89b6ae 100644 --- a/macros/utils/get_value_by_target_type.sql +++ b/macros/utils/get_value_by_target_type.sql @@ -4,7 +4,7 @@ This program is licensed to you under the Snowplow Personal and Academic License and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0. You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/ #} -{%- macro get_value_by_target_type(bigquery_val=none, snowflake_val=none, redshift_val=none, postgres_val=none, databricks_val=none) -%} +{%- macro get_value_by_target_type(bigquery_val=none, snowflake_val=none, redshift_val=none, postgres_val=none, databricks_val=none, spark_val=none) -%} {% if target.type == 'bigquery' %} {{ return(bigquery_val) }} @@ -14,8 +14,10 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {{ return(redshift_val) }} {% elif target.type == 'postgres' %} {{ return(postgres_val) }} - {% elif target.type in ['databricks', 'spark'] %} + {% elif target.type in ['databricks'] %} {{ return(databricks_val) }} + {% elif target.type in ['spark'] %} + {{ return(spark_val) }} {% else %} {{ exceptions.raise_compiler_error("Snowplow: Unexpected target type "~target.type) }} {% endif %} diff --git a/macros/utils/post_ci_cleanup.sql b/macros/utils/post_ci_cleanup.sql index db80cfc8..21a4af71 100644 --- a/macros/utils/post_ci_cleanup.sql +++ b/macros/utils/post_ci_cleanup.sql @@ -7,13 +7,56 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {# Destructive macro. Use with care! #} {% macro post_ci_cleanup(schema_pattern=target.schema) %} + {{ return(adapter.dispatch('post_ci_cleanup', 'snowplow_utils')(schema_pattern)) }} +{% endmacro %} + + +{% macro default__post_ci_cleanup(schema_pattern=target.schema) %} + + {# Get all schemas with the target.schema prefix #} + {% set schemas = snowplow_utils.get_schemas_by_pattern(schema_pattern~'%') %} + + {% if schemas|length %} + + {%- if target.type in ['databricks'] -%} + {# Generate sql to drop all identified schemas #} + {% for schema in schemas -%} + {%- set drop_schema_sql -%} + DROP SCHEMA IF EXISTS {{schema}} CASCADE; + {%- endset -%} + + {% do run_query(drop_schema_sql) %} + + {% endfor %} + + {%- else -%} + {# Generate sql to drop all identified schemas #} + {% set drop_schema_sql -%} + + {% for schema in schemas -%} + DROP SCHEMA IF EXISTS {{schema}} CASCADE; + {% endfor %} + + {%- endset %} + + {# Drop schemas #} + {% do run_query(drop_schema_sql) %} + + {%- endif -%} + + {% endif %} + +{% endmacro %} + + +{% macro databricks__post_ci_cleanup(schema_pattern=target.schema) %} {# Get all schemas with the target.schema prefix #} {% set schemas = snowplow_utils.get_schemas_by_pattern(schema_pattern~'%') %} {% if schemas|length %} - {%- if target.type in ['databricks', 'spark'] -%} + {%- if target.type in ['databricks'] -%} {# Generate sql to drop all identified schemas #} {% for schema in schemas -%} {%- set drop_schema_sql -%} @@ -42,3 +85,43 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% endif %} {% endmacro %} + +{# + Spark-specific implementation for post CI cleanup. +#} + +{% macro spark__post_ci_cleanup(schema_pattern=target.schema) %} + {# Retrieve all schemas matching the pattern #} + {% set schemas = snowplow_utils.get_schemas_by_pattern(schema_pattern ~ "%") %} + + {% if schemas | length > 0 %} + {% for schema in schemas %} + {{ log("Processing schema: " ~ schema, info=True) }} + + {# Step 1: List all tables in the current schema #} + {% set tables_query = "SHOW TABLES IN " ~ schema %} + {% set tables_result = run_query(tables_query) %} + + {# Initialize an empty list for tables #} + {% set table_list = [] %} + + {% if tables_result and tables_result.rows %} + {% for row in tables_result.rows %} + {% set table = row[1] %} + {% do table_list.append(table) %} + {% endfor %} + + {# Step 2: Drop each table individually #} + {% for table in table_list %} + {% set drop_table_sql = "DROP TABLE IF EXISTS " ~ schema ~ "." ~ table ~ ";" %} + {% do adapter.execute(drop_table_sql) %} + {% endfor %} + {% else %} + {% endif %} + + {# For spark we shouldn't delete the schema as this has the role of the database #} + {% endfor %} + {% else %} + {{ log("No schemas found matching pattern: " ~ schema_pattern, info=True) }} + {% endif %} +{% endmacro %} diff --git a/macros/utils/snowplow_delete_from_manifest.sql b/macros/utils/snowplow_delete_from_manifest.sql index a8f34da7..63b8d60d 100644 --- a/macros/utils/snowplow_delete_from_manifest.sql +++ b/macros/utils/snowplow_delete_from_manifest.sql @@ -45,17 +45,22 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {%- endif -%} {% set delete_statement %} - {%- if target.type in ['databricks', 'spark'] -%} - delete from {{ incremental_manifest_table }} where model in ({{ snowplow_utils.print_list(matched_models) }}); + {%- if target.type in ['databricks'] -%} + DELETE FROM {{ incremental_manifest_table }} + WHERE model IN ({{ snowplow_utils.print_list(matched_models) }}); + {%- elif target.type in ['spark'] -%} + DELETE FROM {{ incremental_manifest_table }} + WHERE model IN ({{ snowplow_utils.print_list(matched_models) }}); {%- else -%} -- We don't need transaction but Redshift needs commit statement while BQ does not. By using transaction we cover both. - begin; - delete from {{ incremental_manifest_table }} where model in ({{ snowplow_utils.print_list(matched_models) }}); - commit; + BEGIN; + DELETE FROM {{ incremental_manifest_table }} + WHERE model IN ({{ snowplow_utils.print_list(matched_models) }}); + COMMIT; {%- endif -%} - {% endset %} - {%- do run_query(delete_statement) -%} + {% endset %} + {%- do adapter.execute(delete_statement) -%} {%- if matched_models|length -%} {% do snowplow_utils.log_message("Snowplow: Deleted models "+snowplow_utils.print_list(matched_models)+" from the manifest") %} From 6cea9aee85034e91ba82bfef37f97afc37db106b Mon Sep 17 00:00:00 2001 From: Oscar Date: Tue, 8 Oct 2024 16:41:06 +0100 Subject: [PATCH 2/3] Changed return_limits_from_model macro default limits ------------------ Co-authored-by: Agnes Kiss --- .../models/utils/cross_db/cross_db.yml | 2 -- macros/utils/return_limits_from_model.sql | 13 +++++++++---- macros/utils/schema.yml | 3 +++ 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/integration_tests/models/utils/cross_db/cross_db.yml b/integration_tests/models/utils/cross_db/cross_db.yml index baf03371..b94a5275 100644 --- a/integration_tests/models/utils/cross_db/cross_db.yml +++ b/integration_tests/models/utils/cross_db/cross_db.yml @@ -17,8 +17,6 @@ models: tests: - dbt_utils.equality: compare_model: ref('expected_get_field_bq') - config: - +enabled: "{{ target.type in ['bigquery'] | as_bool() }}" - name: test_indexed_unnest tests: - dbt_utils.equality: diff --git a/macros/utils/return_limits_from_model.sql b/macros/utils/return_limits_from_model.sql index 09e06bf0..7de61939 100644 --- a/macros/utils/return_limits_from_model.sql +++ b/macros/utils/return_limits_from_model.sql @@ -4,7 +4,7 @@ This program is licensed to you under the Snowplow Personal and Academic License and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0. You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/ #} -{% macro return_limits_from_model(model, lower_limit_col, upper_limit_col) -%} +{% macro return_limits_from_model(model, lower_limit_col, upper_limit_col, lower_output=False) -%} {# In case of not execute just return empty strings to avoid hitting database #} {% if not execute %} @@ -35,9 +35,14 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {{ snowplow_utils.log_message("Snowplow Warning: *************") }} {% do exceptions.warn("Snowplow Warning: No data in "~this~" for date range from variables, please modify your run variables to include data if this is not expected.") %} {{ snowplow_utils.log_message("Snowplow Warning: *************") }} - {# This allows for bigquery to still run the same way the other warehouses do, but also ensures no data is processed #} - {% set lower_limit = snowplow_utils.cast_to_tstamp('9999-01-01 00:00:00') %} - {% set upper_limit = snowplow_utils.cast_to_tstamp('9999-01-02 00:00:00') %} + {% if lower_output %} + {% set lower_limit = snowplow_utils.cast_to_tstamp('0000-01-01 00:00:00') %} + {% set upper_limit = snowplow_utils.cast_to_tstamp('0000-01-02 00:00:00') %} + {%- else -%} + {# Default behaviour for incrementalization. This allows for bigquery to still run the same way the other warehouses do, but also ensures no data is processed #} + {% set lower_limit = snowplow_utils.cast_to_tstamp('9999-01-01 00:00:00') %} + {% set upper_limit = snowplow_utils.cast_to_tstamp('9999-01-02 00:00:00') %} + {% endif %} {%- else -%} {% set lower_limit = snowplow_utils.cast_to_tstamp(results.columns[0].values()[0]) %} {% set upper_limit = snowplow_utils.cast_to_tstamp(results.columns[1].values()[0]) %} diff --git a/macros/utils/schema.yml b/macros/utils/schema.yml index b28a7604..80c17a1e 100644 --- a/macros/utils/schema.yml +++ b/macros/utils/schema.yml @@ -92,6 +92,9 @@ macros: - name: upper_limit_col type: string description: The column to take the `max` of to get the upper limit + - name: lower_output + type: boolean + description: This changes the output to a low set date if true and high set date if false, in the event the function cannot find value but needs to set a value - name: set_query_tag description: '{{ doc("macro_set_query_tag") }}' arguments: From fe28dfd56385aad5f3d6a63fea34ede814030b76 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Fri, 11 Oct 2024 15:22:45 +0300 Subject: [PATCH 3/3] Prepare for release --- CHANGELOG | 16 ++++++++++++++++ dbt_project.yml | 2 +- integration_tests/dbt_project.yml | 2 +- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 985911aa..37836af7 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,19 @@ +snowplow-utils 0.17.0 (2024-10-14) +--------------------------------------- +## Summary +This release focuses on expanding support for Apache Spark with the Iceberg file format and enhancing integration tests. + +## Features +- Add support for Apache Spark with Iceberg file format + +## Under the hood +- Modify integration tests to support Spark +- Changed return_limits_from_model macro default limits + + +## Upgrading +Update the snowplow-utils version in your `packages.yml` file. + snowplow-utils 0.16.8 (2024-07-29) --------------------------------------- ## Summary diff --git a/dbt_project.yml b/dbt_project.yml index 7bd2a14b..6e7b8dca 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -1,5 +1,5 @@ name: 'snowplow_utils' -version: '0.16.8' +version: '0.17.0' config-version: 2 require-dbt-version: [">=1.4.0", "<2.0.0"] diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 7b57f43d..36126fdb 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -1,5 +1,5 @@ name: 'snowplow_utils_integration_tests' -version: '0.16.8' +version: '0.17.0' config-version: 2 profile: 'integration_tests'