From 68ac482543143bbd62c1a078fe03e83fcf8d22e5 Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Wed, 9 Oct 2024 18:18:04 +0300 Subject: [PATCH] Ecommerce for Spark ( Iceberg ) Support RC1 --- .github/workflows/pr_tests.yml | 79 +++++-- .github/workflows/spark_deployment/Dockerfile | 34 +++ .../spark_deployment/build_and_push.sh | 20 ++ .../spark_deployment/docker-compose.yml | 66 ++++++ .../spark_deployment/spark-defaults.conf | 44 ++++ dbt_project.yml | 6 +- .../.scripts/integration_test.sh | 4 +- integration_tests/ci/profiles.yml | 21 ++ integration_tests/dbt_project.yml | 5 +- ...low_ecommerce_cart_interactions_actual.sql | 9 +- ...ecommerce_checkout_interactions_actual.sql | 8 +- ..._ecommerce_product_interactions_actual.sql | 9 +- .../snowplow_ecommerce_sessions_actual.sql | 7 +- ...mmerce_transaction_interactions_actual.sql | 10 +- .../spark/snowplow_ecommerce_events_stg.sql | 73 +++++++ .../get_action_context_fields.sql | 4 +- .../get_cart_context_fields.sql | 6 +- .../get_checkout_context_fields.sql | 24 +-- .../get_page_context_fields.sql | 6 +- .../get_page_view_context_fields.sql | 4 +- .../get_screen_view_context_fields.sql | 4 +- .../get_session_context_fields.sql | 16 +- .../get_transaction_context_fields.sql | 20 +- .../get_user_context_fields.sql | 6 +- ...merce_base_sessions_lifecycle_manifest.sql | 2 +- ...nowplow_ecommerce_base_events_this_run.sql | 3 +- models/base/src_base.yml | 2 +- ...commerce_product_interactions_this_run.sql | 193 ++++++++++++++++++ packages.yml | 4 +- 29 files changed, 595 insertions(+), 94 deletions(-) create mode 100644 .github/workflows/spark_deployment/Dockerfile create mode 100755 .github/workflows/spark_deployment/build_and_push.sh create mode 100644 .github/workflows/spark_deployment/docker-compose.yml create mode 100644 .github/workflows/spark_deployment/spark-defaults.conf create mode 100644 integration_tests/models/source/spark/snowplow_ecommerce_events_stg.sql create mode 100644 models/products/scratch/spark/snowplow_ecommerce_product_interactions_this_run.sql diff --git a/.github/workflows/pr_tests.yml b/.github/workflows/pr_tests.yml index 0430d9f..24b2dee 100644 --- a/.github/workflows/pr_tests.yml +++ b/.github/workflows/pr_tests.yml @@ -2,9 +2,6 @@ name: pr_tests on: pull_request: - branches: - - main - - 'release/**' concurrency: dbt_integration_tests @@ -42,13 +39,12 @@ env: SNOWFLAKE_TEST_WAREHOUSE: ${{ secrets.SNOWFLAKE_TEST_WAREHOUSE }} # Postgres Connection - POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }} POSTGRES_TEST_USER: ${{ secrets.POSTGRES_TEST_USER }} POSTGRES_TEST_PASS: ${{ secrets.POSTGRES_TEST_PASS }} - POSTGRES_TEST_PORT: ${{ secrets.POSTGRES_TEST_PORT }} POSTGRES_TEST_DBNAME: ${{ secrets.POSTGRES_TEST_DBNAME }} + POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }} + POSTGRES_TEST_PORT: ${{ secrets.POSTGRES_TEST_PORT }} - # Databricks Connection DATABRICKS_TEST_HOST: ${{ secrets.DATABRICKS_TEST_HOST }} DATABRICKS_TEST_HTTP_PATH: ${{ secrets.DATABRICKS_TEST_HTTP_PATH }} DATABRICKS_TEST_TOKEN: ${{ secrets.DATABRICKS_TEST_TOKEN }} @@ -63,10 +59,10 @@ jobs: # Run tests from integration_tests sub dir working-directory: ./integration_tests strategy: + fail-fast: false matrix: dbt_version: ["1.*"] - warehouse: ["bigquery", "snowflake", "databricks", "postgres"] # TODO: Add RS self-hosted runner - + warehouse: ["postgres", "bigquery", "snowflake", "databricks", "spark_iceberg"] # TODO: Add RS self-hosted runner services: postgres: image: postgres:latest @@ -90,6 +86,32 @@ jobs: # Remove '*' and replace '.' with '_' in DBT_VERSION & set as SCHEMA_SUFFIX. # SCHEMA_SUFFIX allows us to run multiple versions of dbt in parallel without overwriting the output tables + - name: Set SCHEMA_SUFFIX env + run: echo "SCHEMA_SUFFIX=$(echo ${DBT_VERSION%.*} | tr . _)" >> $GITHUB_ENV + env: + DBT_VERSION: ${{ matrix.dbt_version }} + - name: Configure Docker credentials + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_USERNAME }} + password: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_PASSWORD }} + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: eu-west-1 + - name: Set warehouse variables + id: set_warehouse + run: | + WAREHOUSE_PLATFORM=$(echo ${{ matrix.warehouse }} | cut -d'_' -f1) + WAREHOUSE_SPECIFIC=$(echo ${{ matrix.warehouse }} | cut -s -d'_' -f2) + echo "WAREHOUSE_PLATFORM=${WAREHOUSE_PLATFORM}" >> $GITHUB_ENV + echo "WAREHOUSE_SPECIFIC=${WAREHOUSE_SPECIFIC}" >> $GITHUB_ENV + echo "warehouse_platform=${WAREHOUSE_PLATFORM}" >> $GITHUB_OUTPUT + echo "warehouse_specific=${WAREHOUSE_SPECIFIC}" >> $GITHUB_OUTPUT + # Remove '*' and replace '.' with '_' in DBT_VERSION & set as SCHEMA_SUFFIX. + # SCHEMA_SUFFIX allows us to run multiple versions of dbt in parallel without overwriting the output tables - name: Set SCHEMA_SUFFIX env run: echo "SCHEMA_SUFFIX=$(echo ${DBT_VERSION%.*} | tr . _)" >> $GITHUB_ENV env: @@ -97,44 +119,57 @@ jobs: - name: Set DEFAULT_TARGET env run: | - echo "DEFAULT_TARGET=${{ matrix.warehouse }}" >> $GITHUB_ENV + echo "DEFAULT_TARGET=${{matrix.warehouse}}" >> $GITHUB_ENV - name: Python setup - uses: actions/setup-python@v5 + uses: actions/setup-python@v4 with: python-version: "3.8.x" - name: Pip cache - uses: actions/cache@v4 + uses: actions/cache@v3 with: path: ~/.cache/pip - key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }} + key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}} restore-keys: | - ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }} + ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}} # Install latest patch version. Upgrade if cache contains old patch version. - name: Install dependencies run: | - pip install --upgrade pip wheel setuptools - pip install -Iv dbt-${{ matrix.warehouse }}==${{ matrix.dbt_version }} --upgrade + pip install wheel setuptools + pip install -Iv dbt-${{env.WAREHOUSE_PLATFORM}}==${{ matrix.dbt_version }} --upgrade dbt deps - if: ${{matrix.warehouse != 'spark'}} + if: ${{env.WAREHOUSE_PLATFORM != 'spark'}} - name: Install spark dependencies run: | pip install --upgrade pip wheel setuptools - pip install -Iv "dbt-${{ matrix.warehouse }}[ODBC]"==${{ matrix.dbt_version }} --upgrade + pip install -Iv "dbt-${{ env.WAREHOUSE_PLATFORM }}[PyHive]"==${{ matrix.dbt_version }} --upgrade dbt deps - if: ${{matrix.warehouse == 'spark'}} + if: ${{env.WAREHOUSE_PLATFORM == 'spark'}} + + - name: Install Docker Compose + run: | + sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose + sudo chmod +x /usr/local/bin/docker-compose + + - name: Build and start Spark cluster + working-directory: .github/workflows/spark_deployment + run: | + docker-compose up -d + echo "Waiting for Spark services to start..." + sleep 90 + if: ${{env.WAREHOUSE_PLATFORM == 'spark'}} + - name: "Pre-test: Drop ci schemas" run: | - dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }} + dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}} - name: Run tests - run: ./.scripts/integration_test.sh -d ${{ matrix.warehouse }} + run: ./.scripts/integration_test.sh -d ${{matrix.warehouse}} - # post_ci_cleanup sits in utils package - name: "Post-test: Drop ci schemas" run: | - dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }} + dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}} diff --git a/.github/workflows/spark_deployment/Dockerfile b/.github/workflows/spark_deployment/Dockerfile new file mode 100644 index 0000000..dab5720 --- /dev/null +++ b/.github/workflows/spark_deployment/Dockerfile @@ -0,0 +1,34 @@ +FROM openjdk:11-jre-slim + +# Set environment variables +ENV SPARK_VERSION=3.5.1 +ENV HADOOP_VERSION=3.3.4 +ENV ICEBERG_VERSION=1.4.2 +ENV AWS_SDK_VERSION=1.12.581 + +# Install necessary tools +RUN apt-get update && apt-get install -y curl wget procps rsync ssh + +# Download and install Spark +RUN wget https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz && \ + tar -xvzf spark-${SPARK_VERSION}-bin-hadoop3.tgz && \ + mv spark-${SPARK_VERSION}-bin-hadoop3 /spark && \ + rm spark-${SPARK_VERSION}-bin-hadoop3.tgz + +# Set Spark environment variables +ENV SPARK_HOME=/spark +ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin + +# Download necessary JARs +RUN mkdir -p /spark/jars && \ + wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/${ICEBERG_VERSION}/iceberg-spark-runtime-3.5_2.12-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-spark-runtime.jar && \ + wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-aws-bundle.jar && \ + wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -O /spark/jars/hadoop-aws.jar && \ + wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -O /spark/jars/aws-java-sdk-bundle.jar + +# Create directory for Spark events +RUN mkdir -p /tmp/spark-events + +WORKDIR /spark + +CMD ["bash"] \ No newline at end of file diff --git a/.github/workflows/spark_deployment/build_and_push.sh b/.github/workflows/spark_deployment/build_and_push.sh new file mode 100755 index 0000000..1be2b6d --- /dev/null +++ b/.github/workflows/spark_deployment/build_and_push.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# Set variables +DOCKER_HUB_ORG="snowplow" +IMAGE_NAME="spark-s3-iceberg" +TAG="latest" + +# Build the image +echo "Building Docker image..." +docker build --platform linux/amd64 -t $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG . + +# Log in to Docker Hub +echo "Logging in to Docker Hub..." +docker login + +# Push the image to Docker Hub +echo "Pushing image to Docker Hub..." +docker push $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG + +echo "Image successfully built and pushed to Docker Hub" \ No newline at end of file diff --git a/.github/workflows/spark_deployment/docker-compose.yml b/.github/workflows/spark_deployment/docker-compose.yml new file mode 100644 index 0000000..2e8077b --- /dev/null +++ b/.github/workflows/spark_deployment/docker-compose.yml @@ -0,0 +1,66 @@ +version: '3' + +networks: + spark-network: + driver: bridge + +services: + spark-master: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "/spark/sbin/start-master.sh -h spark-master --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.master.Master-1-*.out"] + hostname: spark-master + ports: + - '8080:8080' + - '7077:7077' + environment: + - SPARK_LOCAL_IP=spark-master + - SPARK_MASTER_HOST=spark-master + - SPARK_MASTER_PORT=7077 + - SPARK_MASTER_OPTS="-Dspark.driver.memory=2g" + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network + + spark-worker: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "sleep 10 && /spark/sbin/start-worker.sh spark://spark-master:7077 --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.worker.Worker-*.out"] + depends_on: + - spark-master + environment: + - SPARK_WORKER_CORES=2 + - SPARK_WORKER_MEMORY=4G + - SPARK_EXECUTOR_MEMORY=3G + - SPARK_LOCAL_IP=spark-worker + - SPARK_MASTER=spark://spark-master:7077 + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network + + thrift-server: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "sleep 30 && /spark/sbin/start-thriftserver.sh --master spark://spark-master:7077 --driver-memory 2g --executor-memory 3g --hiveconf hive.server2.thrift.port=10000 --hiveconf hive.server2.thrift.bind.host=0.0.0.0 --conf spark.sql.hive.thriftServer.async=true --conf spark.sql.hive.thriftServer.workerQueue.size=2000 --conf spark.sql.hive.thriftServer.maxWorkerThreads=100 --conf spark.sql.hive.thriftServer.minWorkerThreads=50 && tail -f /spark/logs/spark--org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-*.out"] + ports: + - '10000:10000' + depends_on: + - spark-master + - spark-worker + environment: + - SPARK_LOCAL_IP=thrift-server + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network \ No newline at end of file diff --git a/.github/workflows/spark_deployment/spark-defaults.conf b/.github/workflows/spark_deployment/spark-defaults.conf new file mode 100644 index 0000000..9052a05 --- /dev/null +++ b/.github/workflows/spark_deployment/spark-defaults.conf @@ -0,0 +1,44 @@ +spark.master spark://spark-master:7077 + +spark.sql.warehouse.dir s3a://dbt-spark-iceberg/github-integration-testing +spark.sql.catalog.glue org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.glue.catalog-impl org.apache.iceberg.aws.glue.GlueCatalog +spark.sql.catalog.glue.warehouse s3a://dbt-spark-iceberg/github-integration-testing +spark.sql.catalog.glue.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.defaultCatalog glue +spark.sql.catalog.glue.database dbt-spark-iceberg + +spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem +spark.hadoop.fs.s3a.access.key +spark.hadoop.fs.s3a.secret.key +spark.hadoop.fs.s3a.endpoint s3.eu-west-1.amazonaws.com +spark.hadoop.fs.s3a.path.style.access true +spark.hadoop.fs.s3a.region eu-west-1 +spark.hadoop.fs.s3a.aws.region eu-west-1 + +# Enabling AWS SDK V4 signing (required for regions launched after January 2014) +spark.hadoop.com.amazonaws.services.s3.enableV4 true +spark.hadoop.fs.s3a.aws.credentials.provider org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + +# Hive Metastore Configuration (using AWS Glue) +spark.hadoop.hive.metastore.client.factory.class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory + +# Thrift Server Configuration for better performance in concurrent environments +spark.sql.hive.thriftServer.singleSession false +spark.sql.hive.thriftServer.async true +# spark.sql.hive.thriftServer.maxWorkerThreads 100 +# spark.sql.hive.thriftServer.minWorkerThreads 50 +# spark.sql.hive.thriftServer.workerQueue.size 2000 + +# Memory and Performance Tuning +# spark.driver.memory 2g +# spark.executor.memory 3g +# spark.worker.memory 4g +spark.network.timeout 600s +spark.sql.broadcastTimeout 600s +spark.sql.adaptive.enabled true +spark.serializer org.apache.spark.serializer.KryoSerializer + +# Logging and Debugging +spark.eventLog.enabled true +spark.eventLog.dir /tmp/spark-events diff --git a/dbt_project.yml b/dbt_project.yml index fc16e93..a040791 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -108,6 +108,8 @@ on-run-end: models: snowplow_ecommerce: +materialized: table + +file_format: "{{ 'delta' if target.type not in ['spark'] else 'iceberg'}}" + +incremental_strategy: "{{ none if target.type not in ['spark'] else 'merge' }}" base: manifest: +schema: "snowplow_manifest" @@ -135,7 +137,9 @@ models: bigquery: +enabled: "{{ target.type == 'bigquery' | as_bool() }}" databricks: - +enabled: "{{ target.type in ['databricks', 'spark'] | as_bool() }}" + +enabled: "{{ target.type in ['databricks'] | as_bool() }}" + spark: + +enabled: "{{ target.type in ['spark'] | as_bool() }}" default: +enabled: "{{ target.type in ['redshift', 'postgres'] | as_bool() }}" snowflake: diff --git a/integration_tests/.scripts/integration_test.sh b/integration_tests/.scripts/integration_test.sh index 2a19e7f..dcfefcc 100755 --- a/integration_tests/.scripts/integration_test.sh +++ b/integration_tests/.scripts/integration_test.sh @@ -10,7 +10,7 @@ do esac done -declare -a SUPPORTED_DATABASES=("bigquery" "postgres" "databricks" "snowflake") +declare -a SUPPORTED_DATABASES=("bigquery" "postgres" "databricks" "snowflake", "spark_iceberg") # set to lower case DATABASE="$(echo $DATABASE | tr '[:upper:]' '[:lower:]')" @@ -27,7 +27,7 @@ for db in ${DATABASES[@]}; do eval "dbt seed --full-refresh --target $db" || exit 1; - echo "Snowplow e-commerce integration tests: Execute models (no mobile) - run 0/4" + echo "Snowplow e-commerce integration tests: Execute models (no mobile) - run 0/4" eval "dbt run --full-refresh --vars '{snowplow__allow_refresh: true, snowplow__backfill_limit_days: 30, snowplow__enable_mobile_events: false}' --target $db" || exit 1; diff --git a/integration_tests/ci/profiles.yml b/integration_tests/ci/profiles.yml index 06d64a0..ead3611 100644 --- a/integration_tests/ci/profiles.yml +++ b/integration_tests/ci/profiles.yml @@ -56,3 +56,24 @@ integration_tests: dbname: "{{ env_var('POSTGRES_TEST_DBNAME') }}" schema: "gh_sp_ecom_dbt_{{ env_var('SCHEMA_SUFFIX') }}" threads: 4 + + redshift: + type: redshift + host: "{{ env_var('REDSHIFT_TEST_HOST') }}" + user: "{{ env_var('REDSHIFT_TEST_USER') }}" + pass: "{{ env_var('REDSHIFT_TEST_PASS') }}" + dbname: "{{ env_var('REDSHIFT_TEST_DBNAME') }}" + port: "{{ env_var('REDSHIFT_TEST_PORT') | as_number }}" + schema: "gh_sp_ecom_dbt_{{ env_var('SCHEMA_SUFFIX') }}" + threads: 4 + + spark_iceberg: + type: spark + method: thrift + host: "{{ env_var('SPARK_MASTER_HOST', 'localhost') }}" + port: 10000 + user: "{{ env_var('SPARK_USER', 'spark') }}" + schema: "{{ env_var('SPARK_SCHEMA', 'default') }}" + connect_retries: 5 + connect_timeout: 60 + threads: 1 \ No newline at end of file diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 68475fd..8a41eb6 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -27,11 +27,14 @@ models: snowplow_ecommerce_integration_tests: bind: false +schema: "snplw_ecommerce_int_tests" + +materialized: table source: bigquery: +enabled: "{{ target.type == 'bigquery' | as_bool() }}" databricks: - +enabled: "{{ target.type in ['databricks', 'spark'] | as_bool() }}" + +enabled: "{{ target.type in ['databricks'] | as_bool() }}" + spark: + +enabled: "{{ target.type in ['spark'] | as_bool() }}" default: +enabled: "{{ target.type in ['redshift', 'postgres'] | as_bool() }}" snowflake: diff --git a/integration_tests/models/actual/snowplow_ecommerce_cart_interactions_actual.sql b/integration_tests/models/actual/snowplow_ecommerce_cart_interactions_actual.sql index e6f111b..73e1cc9 100644 --- a/integration_tests/models/actual/snowplow_ecommerce_cart_interactions_actual.sql +++ b/integration_tests/models/actual/snowplow_ecommerce_cart_interactions_actual.sql @@ -5,9 +5,10 @@ and you may not use this file except in compliance with the Snowplow Personal an You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/ #} - -select * {% if target.type == 'databricks' %} - except(derived_tstamp_date) +{% if target.type in ('databricks','spark') %} +SELECT {{ dbt_utils.star(from=ref('snowplow_ecommerce_cart_interactions'), except=['derived_tstamp_date'] )}}, +date(derived_tstamp_date) as derived_tstamp_date +{% else %} +SELECT {{ dbt_utils.star(from=ref('snowplow_ecommerce_cart_interactions')) }} {% endif %} - from {{ ref('snowplow_ecommerce_cart_interactions') }} diff --git a/integration_tests/models/actual/snowplow_ecommerce_checkout_interactions_actual.sql b/integration_tests/models/actual/snowplow_ecommerce_checkout_interactions_actual.sql index 0c4b584..e069e0e 100644 --- a/integration_tests/models/actual/snowplow_ecommerce_checkout_interactions_actual.sql +++ b/integration_tests/models/actual/snowplow_ecommerce_checkout_interactions_actual.sql @@ -6,8 +6,10 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 #} -select * {% if target.type == 'databricks' %} - except(derived_tstamp_date) +{% if target.type in ('databricks','spark') %} +SELECT {{ dbt_utils.star(from=ref('snowplow_ecommerce_checkout_interactions'), except=['derived_tstamp_date'] )}}, +date(derived_tstamp_date) as derived_tstamp_date +{% else %} +SELECT {{ dbt_utils.star(from=ref('snowplow_ecommerce_checkout_interactions')) }} {% endif %} - from {{ ref('snowplow_ecommerce_checkout_interactions') }} diff --git a/integration_tests/models/actual/snowplow_ecommerce_product_interactions_actual.sql b/integration_tests/models/actual/snowplow_ecommerce_product_interactions_actual.sql index 9758e1b..1a1cc1b 100644 --- a/integration_tests/models/actual/snowplow_ecommerce_product_interactions_actual.sql +++ b/integration_tests/models/actual/snowplow_ecommerce_product_interactions_actual.sql @@ -6,8 +6,11 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 #} -select * {% if target.type == 'databricks' %} - except(derived_tstamp_date) +{% if target.type in ('databricks','spark') %} +SELECT {{ dbt_utils.star(from=ref('snowplow_ecommerce_product_interactions'), except=['derived_tstamp_date'] )}}, +date(derived_tstamp_date) as derived_tstamp_date +{% else %} +SELECT {{ dbt_utils.star(from=ref('snowplow_ecommerce_product_interactions')) }} {% endif %} - from {{ ref('snowplow_ecommerce_product_interactions') }} + diff --git a/integration_tests/models/actual/snowplow_ecommerce_sessions_actual.sql b/integration_tests/models/actual/snowplow_ecommerce_sessions_actual.sql index a66397f..90ea015 100644 --- a/integration_tests/models/actual/snowplow_ecommerce_sessions_actual.sql +++ b/integration_tests/models/actual/snowplow_ecommerce_sessions_actual.sql @@ -6,8 +6,9 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 #} -select * {% if target.type == 'databricks' %} - except(start_tstamp_date) +{% if target.type in ('databricks','spark') %} +SELECT {{ dbt_utils.star(from=ref('snowplow_ecommerce_sessions'), except=['start_tstamp_date'] )}} +{% else %} +SELECT {{ dbt_utils.star(from=ref('snowplow_ecommerce_sessions')) }} {% endif %} - from {{ ref('snowplow_ecommerce_sessions') }} diff --git a/integration_tests/models/actual/snowplow_ecommerce_transaction_interactions_actual.sql b/integration_tests/models/actual/snowplow_ecommerce_transaction_interactions_actual.sql index 8478d0b..e42b00d 100644 --- a/integration_tests/models/actual/snowplow_ecommerce_transaction_interactions_actual.sql +++ b/integration_tests/models/actual/snowplow_ecommerce_transaction_interactions_actual.sql @@ -6,8 +6,10 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 #} -select * {% if target.type == 'databricks' %} - except(derived_tstamp_date) +{% if target.type in ('databricks','spark') %} +SELECT {{ dbt_utils.star(from=ref('snowplow_ecommerce_transaction_interactions'), except=['derived_tstamp_date'] )}}, +date(derived_tstamp_date) as derived_tstamp_date +{% else %} +SELECT {{ dbt_utils.star(from=ref('snowplow_ecommerce_transaction_interactions')) }} {% endif %} - -from {{ ref('snowplow_ecommerce_transaction_interactions') }} +from {{ ref('snowplow_ecommerce_transaction_interactions') }} \ No newline at end of file diff --git a/integration_tests/models/source/spark/snowplow_ecommerce_events_stg.sql b/integration_tests/models/source/spark/snowplow_ecommerce_events_stg.sql new file mode 100644 index 0000000..cae7020 --- /dev/null +++ b/integration_tests/models/source/spark/snowplow_ecommerce_events_stg.sql @@ -0,0 +1,73 @@ +{# +Copyright (c) 2022-present Snowplow Analytics Ltd. All rights reserved. +This program is licensed to you under the Snowplow Personal and Academic License Version 1.0, +and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0. +You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/ +#} + +-- Contexts are given as json string in csv. Parse json +with prep as ( + select + {{ dbt_utils.star(from=ref('snowplow_ecommerce_events'), except=[ + "contexts_com_snowplowanalytics_snowplow_web_page_1_0_0", + "contexts_com_snowplowanalytics_snowplow_ecommerce_user_1_0_0", + "contexts_com_snowplowanalytics_snowplow_ecommerce_page_1_0_0", + "unstruct_event_com_snowplowanalytics_snowplow_ecommerce_snowplow_ecommerce_action_1_0_0", + "contexts_com_snowplowanalytics_snowplow_ecommerce_cart_1_0_0", + "contexts_com_snowplowanalytics_snowplow_ecommerce_product_1_0_0", + "contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1_0_0", + "contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1_0_0", + "contexts_com_snowplowanalytics_mobile_screen_1_0_0", + "contexts_com_snowplowanalytics_snowplow_client_session_1_0_1" + ]) }} + , + from_json(contexts_com_snowplowanalytics_snowplow_web_page_1_0_0, 'array>') as contexts_com_snowplowanalytics_snowplow_web_page_1, + from_json(contexts_com_snowplowanalytics_snowplow_ecommerce_user_1_0_0, 'array>') as contexts_com_snowplowanalytics_snowplow_ecommerce_user_1, + from_json(contexts_com_snowplowanalytics_snowplow_ecommerce_page_1_0_0, 'array>') as contexts_com_snowplowanalytics_snowplow_ecommerce_page_1, + from_json(unstruct_event_com_snowplowanalytics_snowplow_ecommerce_snowplow_ecommerce_action_1_0_0, 'struct') as unstruct_event_com_snowplowanalytics_snowplow_ecommerce_snowplow_ecommerce_action_1, + from_json(contexts_com_snowplowanalytics_snowplow_ecommerce_cart_1_0_0, 'array>') as contexts_com_snowplowanalytics_snowplow_ecommerce_cart_1, + from_json(contexts_com_snowplowanalytics_snowplow_ecommerce_product_1_0_0, 'array>') as contexts_com_snowplowanalytics_snowplow_ecommerce_product_1, + from_json(contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1_0_0, 'array>') as contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1, + from_json(contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1_0_0, 'array>') as contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1, + from_json(contexts_com_snowplowanalytics_mobile_screen_1_0_0, 'array>') as contexts_com_snowplowanalytics_mobile_screen_1, + from_json(contexts_com_snowplowanalytics_snowplow_client_session_1_0_1, 'array>') as contexts_com_snowplowanalytics_snowplow_client_session_1 + + + FROM {{ ref('snowplow_ecommerce_events') }} +) + +select + {{ dbt_utils.star(from=ref('snowplow_ecommerce_events'), except=[ + "contexts_com_snowplowanalytics_snowplow_web_page_1_0_0", + "contexts_com_snowplowanalytics_snowplow_ecommerce_user_1_0_0", + "contexts_com_snowplowanalytics_snowplow_ecommerce_page_1_0_0", + "unstruct_event_com_snowplowanalytics_snowplow_ecommerce_snowplow_ecommerce_action_1_0_0", + "contexts_com_snowplowanalytics_snowplow_ecommerce_cart_1_0_0", + "contexts_com_snowplowanalytics_snowplow_ecommerce_product_1_0_0", + "contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1_0_0", + "contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1_0_0", + "contexts_com_snowplowanalytics_mobile_screen_1_0_0", + "contexts_com_snowplowanalytics_snowplow_client_session_1_0_1" + ]) }} + , + contexts_com_snowplowanalytics_snowplow_web_page_1, + contexts_com_snowplowanalytics_snowplow_ecommerce_user_1, + contexts_com_snowplowanalytics_snowplow_ecommerce_page_1, + unstruct_event_com_snowplowanalytics_snowplow_ecommerce_snowplow_ecommerce_action_1, + contexts_com_snowplowanalytics_snowplow_ecommerce_cart_1, + contexts_com_snowplowanalytics_snowplow_ecommerce_product_1, + contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1, + contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1, + + array(struct(contexts_com_snowplowanalytics_mobile_screen_1[0].id as id, contexts_com_snowplowanalytics_mobile_screen_1[0].name as name)) as contexts_com_snowplowanalytics_mobile_screen_1, + array(struct( + contexts_com_snowplowanalytics_snowplow_client_session_1[0].sessionId as session_id, + contexts_com_snowplowanalytics_snowplow_client_session_1[0].userId as user_id, + contexts_com_snowplowanalytics_snowplow_client_session_1[0].sessionIndex as session_index, + contexts_com_snowplowanalytics_snowplow_client_session_1[0].firstEventId as first_event_id, + contexts_com_snowplowanalytics_snowplow_client_session_1[0].previousSessionId as previous_session_id, + contexts_com_snowplowanalytics_snowplow_client_session_1[0].eventIndex as event_index, + contexts_com_snowplowanalytics_snowplow_client_session_1[0].storageMechanism as storage_mechanism, + contexts_com_snowplowanalytics_snowplow_client_session_1[0].firstEventTimestamp as first_event_timestamp + )) as contexts_com_snowplowanalytics_snowplow_client_session_1 +from prep diff --git a/macros/field_extractions/get_action_context_fields.sql b/macros/field_extractions/get_action_context_fields.sql index 2a2dd26..8035f55 100644 --- a/macros/field_extractions/get_action_context_fields.sql +++ b/macros/field_extractions/get_action_context_fields.sql @@ -24,8 +24,8 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% endmacro %} {% macro spark__get_action_context_fields() %} - , unstruct_event_com_snowplowanalytics_snowplow_ecommerce_snowplow_ecommerce_action_1.type::string as ecommerce_action_type - , unstruct_event_com_snowplowanalytics_snowplow_ecommerce_snowplow_ecommerce_action_1.name::string as ecommerce_action_name + , CAST(unstruct_event_com_snowplowanalytics_snowplow_ecommerce_snowplow_ecommerce_action_1.type AS string) as ecommerce_action_type + , CAST(unstruct_event_com_snowplowanalytics_snowplow_ecommerce_snowplow_ecommerce_action_1.name AS string) as ecommerce_action_name {% endmacro %} {% macro snowflake__get_action_context_fields() %} diff --git a/macros/field_extractions/get_cart_context_fields.sql b/macros/field_extractions/get_cart_context_fields.sql index 9780030..1fb93a2 100644 --- a/macros/field_extractions/get_cart_context_fields.sql +++ b/macros/field_extractions/get_cart_context_fields.sql @@ -43,9 +43,9 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 , cast(NULL as {{ type_string() }}) as cart_currency , cast(NULL as decimal(9,2)) as cart_total_value {% else %} - , contexts_com_snowplowanalytics_snowplow_ecommerce_cart_1[0].cart_id::string as cart_id - , contexts_com_snowplowanalytics_snowplow_ecommerce_cart_1[0].currency::string as cart_currency - , contexts_com_snowplowanalytics_snowplow_ecommerce_cart_1[0].total_value::decimal(9,2) as cart_total_value + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_cart_1[0].cart_id AS string) as cart_id + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_cart_1[0].currency AS string) as cart_currency + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_cart_1[0].total_value AS decimal(9,2)) as cart_total_value {% endif %} {% endmacro %} diff --git a/macros/field_extractions/get_checkout_context_fields.sql b/macros/field_extractions/get_checkout_context_fields.sql index 8d75490..acf0364 100644 --- a/macros/field_extractions/get_checkout_context_fields.sql +++ b/macros/field_extractions/get_checkout_context_fields.sql @@ -79,18 +79,18 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 , cast(NULL as {{ type_string() }}) as checkout_shipping_full_address , cast(NULL as {{ type_string() }}) as checkout_shipping_postcode {% else %} - , contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].step::int as checkout_step_number - , contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].account_type::string as checkout_account_type - , contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].billing_full_address::string as checkout_billing_full_address - , contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].billing_postcode::string as checkout_billing_postcode - , contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].coupon_code::string as checkout_coupon_code - , contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].delivery_method::string as checkout_delivery_method - , contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].delivery_provider::string as checkout_delivery_provider - , contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].marketing_opt_in::boolean as checkout_marketing_opt_in - , contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].payment_method::string as checkout_payment_method - , contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].proof_of_payment::string as checkout_proof_of_payment - , contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].shipping_full_address::string as checkout_shipping_full_address - , contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].shipping_postcode::string as checkout_shipping_postcode + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].step AS int) AS checkout_step_number + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].account_type AS string) AS checkout_account_type + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].billing_full_address AS string) AS checkout_billing_full_address + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].billing_postcode AS string) AS checkout_billing_postcode + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].coupon_code AS string) AS checkout_coupon_code + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].delivery_method AS string) AS checkout_delivery_method + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].delivery_provider AS string) AS checkout_delivery_provider + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].marketing_opt_in AS boolean) AS checkout_marketing_opt_in + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].payment_method AS string) AS checkout_payment_method + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].proof_of_payment AS string) AS checkout_proof_of_payment + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].shipping_full_address AS string) AS checkout_shipping_full_address + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_checkout_step_1[0].shipping_postcode AS string) AS checkout_shipping_postcode {% endif %} {% endmacro %} diff --git a/macros/field_extractions/get_page_context_fields.sql b/macros/field_extractions/get_page_context_fields.sql index 6e723e3..5220353 100644 --- a/macros/field_extractions/get_page_context_fields.sql +++ b/macros/field_extractions/get_page_context_fields.sql @@ -43,9 +43,9 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 , cast(NULL as {{ type_string() }}) as ecommerce_page_language , cast(NULL as {{ type_string() }}) as ecommerce_page_locale {% else %} - , contexts_com_snowplowanalytics_snowplow_ecommerce_page_1[0].type::string as ecommerce_page_type - , contexts_com_snowplowanalytics_snowplow_ecommerce_page_1[0].language::string as ecommerce_page_language - , contexts_com_snowplowanalytics_snowplow_ecommerce_page_1[0].locale::string as ecommerce_page_locale + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_page_1[0].type AS string) as ecommerce_page_type + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_page_1[0].language AS string) as ecommerce_page_language + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_page_1[0].locale AS string) as ecommerce_page_locale {% endif %} {% endmacro %} diff --git a/macros/field_extractions/get_page_view_context_fields.sql b/macros/field_extractions/get_page_view_context_fields.sql index eeaabe4..b66718c 100644 --- a/macros/field_extractions/get_page_view_context_fields.sql +++ b/macros/field_extractions/get_page_view_context_fields.sql @@ -50,9 +50,9 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% macro spark__get_page_view_context_fields() %} {% if var('snowplow__enable_mobile_events', false) %} - , coalesce(contexts_com_snowplowanalytics_mobile_screen_1[0].id::string, contexts_com_snowplowanalytics_snowplow_web_page_1[0].id::string) as page_view_id + , coalesce(CAST(contexts_com_snowplowanalytics_mobile_screen_1[0].id AS string), CAST(contexts_com_snowplowanalytics_snowplow_web_page_1[0].id AS string)) as page_view_id {% else %} - , contexts_com_snowplowanalytics_snowplow_web_page_1[0].id::string as page_view_id + , CAST(contexts_com_snowplowanalytics_snowplow_web_page_1[0].id AS string) as page_view_id {% endif %} {% endmacro %} diff --git a/macros/field_extractions/get_screen_view_context_fields.sql b/macros/field_extractions/get_screen_view_context_fields.sql index 42c61b6..495ed95 100644 --- a/macros/field_extractions/get_screen_view_context_fields.sql +++ b/macros/field_extractions/get_screen_view_context_fields.sql @@ -36,8 +36,8 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% macro spark__get_screen_view_context_fields() %} {% if var('snowplow__enable_mobile_events', false) %} - , contexts_com_snowplowanalytics_mobile_screen_1[0].id::string as screen_view_id - , contexts_com_snowplowanalytics_mobile_screen_1[0].name::string as screen_view_name + , cast(contexts_com_snowplowanalytics_mobile_screen_1[0].id AS string) as screen_view_id + , cast(contexts_com_snowplowanalytics_mobile_screen_1[0].name AS string) as screen_view_name {% else %} , cast(NULL as {{ type_string() }}) as screen_view_id , cast(NULL as {{ type_string() }}) as screen_view_name diff --git a/macros/field_extractions/get_session_context_fields.sql b/macros/field_extractions/get_session_context_fields.sql index 3be9bab..dcf4a83 100644 --- a/macros/field_extractions/get_session_context_fields.sql +++ b/macros/field_extractions/get_session_context_fields.sql @@ -54,14 +54,14 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% macro spark__get_session_context_fields() %} {% if var('snowplow__enable_mobile_events', false) %} - , contexts_com_snowplowanalytics_snowplow_client_session_1[0].event_index::int as event_index - , contexts_com_snowplowanalytics_snowplow_client_session_1[0].first_event_id::string as first_event_id - , contexts_com_snowplowanalytics_snowplow_client_session_1[0].first_event_timestamp::timestamp as first_event_timestamp - , contexts_com_snowplowanalytics_snowplow_client_session_1[0].previous_session_id::string as previous_session_id - , contexts_com_snowplowanalytics_snowplow_client_session_1[0].session_id::string as session_id - , contexts_com_snowplowanalytics_snowplow_client_session_1[0].session_index::int as session_index - , contexts_com_snowplowanalytics_snowplow_client_session_1[0].storage_mechanism::string as storage_mechanism - , contexts_com_snowplowanalytics_snowplow_client_session_1[0].user_id::string as mobile_user_id + , CAST(contexts_com_snowplowanalytics_snowplow_client_session_1[0].event_index AS int) as event_index + , CAST(contexts_com_snowplowanalytics_snowplow_client_session_1[0].first_event_id AS string) as first_event_id + , CAST(contexts_com_snowplowanalytics_snowplow_client_session_1[0].first_event_timestamp AS timestamp) as first_event_timestamp + , CAST(contexts_com_snowplowanalytics_snowplow_client_session_1[0].previous_session_id AS string) as previous_session_id + , CAST(contexts_com_snowplowanalytics_snowplow_client_session_1[0].session_id AS string) as session_id + , CAST(contexts_com_snowplowanalytics_snowplow_client_session_1[0].session_index AS int) as session_index + , CAST(contexts_com_snowplowanalytics_snowplow_client_session_1[0].storage_mechanism AS string) as storage_mechanism + , CAST(contexts_com_snowplowanalytics_snowplow_client_session_1[0].user_id AS string) as mobile_user_id {% else %} , cast(NULL as {{ type_int() }}) as event_index , cast(NULL as {{ type_string() }}) as first_event_id diff --git a/macros/field_extractions/get_transaction_context_fields.sql b/macros/field_extractions/get_transaction_context_fields.sql index 01e6b8a..a7165ef 100644 --- a/macros/field_extractions/get_transaction_context_fields.sql +++ b/macros/field_extractions/get_transaction_context_fields.sql @@ -71,16 +71,16 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 , cast(NULL as decimal(9,2)) as transaction_shipping , cast(NULL as decimal(9,2)) as transaction_tax {% else %} - , contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].transaction_id::string as transaction_id - , contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].currency::string as transaction_currency - , contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].payment_method::string as transaction_payment_method - , contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].revenue::decimal(9,2) as transaction_revenue - , contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].total_quantity::int as transaction_total_quantity - , contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].credit_order::boolean as transaction_credit_order - , contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].discount_amount::decimal(9,2) as transaction_discount_amount - , contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].discount_code::string as transaction_discount_code - , contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].shipping::decimal(9,2) as transaction_shipping - , contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].tax::decimal(9,2) as transaction_tax + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].transaction_id AS string) as transaction_id + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].currency AS string) as transaction_currency + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].payment_method AS string) as transaction_payment_method + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].revenue AS decimal(9,2)) as transaction_revenue + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].total_quantity AS int) as transaction_total_quantity + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].credit_order AS boolean) as transaction_credit_order + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].discount_amount AS decimal(9,2)) as transaction_discount_amount + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].discount_code AS string) as transaction_discount_code + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].shipping AS decimal(9,2)) as transaction_shipping + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_transaction_1[0].tax AS decimal(9,2)) as transaction_tax {% endif %} {% endmacro %} diff --git a/macros/field_extractions/get_user_context_fields.sql b/macros/field_extractions/get_user_context_fields.sql index 05f658f..95cedbf 100644 --- a/macros/field_extractions/get_user_context_fields.sql +++ b/macros/field_extractions/get_user_context_fields.sql @@ -43,9 +43,9 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 , cast(NULL as {{ type_string() }}) as ecommerce_user_email , cast(NULL as {{ type_boolean() }}) as ecommerce_user_is_guest {% else %} - , contexts_com_snowplowanalytics_snowplow_ecommerce_user_1[0].id::string as ecommerce_user_id - , contexts_com_snowplowanalytics_snowplow_ecommerce_user_1[0].email::string as ecommerce_user_email - , contexts_com_snowplowanalytics_snowplow_ecommerce_user_1[0].is_guest::boolean as ecommerce_user_is_guest + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_user_1[0].id AS string) as ecommerce_user_id + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_user_1[0].email AS string) as ecommerce_user_email + , CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_user_1[0].is_guest AS boolean) as ecommerce_user_is_guest {% endif %} {% endmacro %} diff --git a/models/base/manifest/snowplow_ecommerce_base_sessions_lifecycle_manifest.sql b/models/base/manifest/snowplow_ecommerce_base_sessions_lifecycle_manifest.sql index 263f98f..2edc9e4 100644 --- a/models/base/manifest/snowplow_ecommerce_base_sessions_lifecycle_manifest.sql +++ b/models/base/manifest/snowplow_ecommerce_base_sessions_lifecycle_manifest.sql @@ -36,7 +36,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 days_late_allowed=var('snowplow__days_late_allowed', 3), max_session_days=var('snowplow__max_session_days', 3), app_ids=var('snowplow__app_id', []), - snowplow_events_database=var('snowplow__database', target.database) if target.type not in ['databricks', 'spark'] else var('snowplow__databricks_catalog', 'hive_metastore') if target.type in ['databricks'] else var('snowplow__atomic_schema', 'atomic'), + snowplow_events_database=var('snowplow__database', target.database) if target.type not in ['databricks', 'spark'] else var('snowplow__databricks_catalog', 'hive_metastore') if target.type in ['databricks'] else none, snowplow_events_schema=var('snowplow__atomic_schema', 'atomic'), snowplow_events_table=var('snowplow__events_table', 'events'), event_limits_table='snowplow_ecommerce_base_new_event_limits', diff --git a/models/base/scratch/snowplow_ecommerce_base_events_this_run.sql b/models/base/scratch/snowplow_ecommerce_base_events_this_run.sql index a134020..b000101 100644 --- a/models/base/scratch/snowplow_ecommerce_base_events_this_run.sql +++ b/models/base/scratch/snowplow_ecommerce_base_events_this_run.sql @@ -69,7 +69,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 with base_query as ( {{ base_events_query }} ), prep as ( - {% if target.type in ['postgres', 'redshift']%} + {% if target.type in ['postgres', 'redshift', 'spark']%} select {% for col in base_query_cols | map(attribute='name') | list -%} {% if col.lower() == 'session_identifier' -%} @@ -87,7 +87,6 @@ with base_query as ( {% endfor %} {% else %} select * {% if target.type in ['databricks', 'bigquery'] %}except{% else %}exclude{% endif %}(session_identifier, domain_sessionid, user_identifier, domain_userid) - , session_identifier as domain_sessionid , domain_sessionid as original_domain_sessionid , user_identifier as domain_userid diff --git a/models/base/src_base.yml b/models/base/src_base.yml index 7c5e56e..fecfb85 100644 --- a/models/base/src_base.yml +++ b/models/base/src_base.yml @@ -3,7 +3,7 @@ version: 2 sources: - name: atomic schema: "{{ var('snowplow__atomic_schema', 'atomic') if project_name != 'snowplow_ecommerce_integration_tests' else target.schema~'_snplw_ecommerce_int_tests' }}" - database: "{{ var('snowplow__database', target.database) if target.type not in ['databricks', 'spark'] else var('snowplow__databricks_catalog', 'hive_metastore') if target.type in ['databricks'] else var('snowplow__atomic_schema', 'atomic') }}" + # database: "{{ var('snowplow__database', target.database) if target.type not in ['databricks', 'spark'] else var('snowplow__databricks_catalog', 'hive_metastore') if target.type in ['databricks'] else none }}" tables: - name: events identifier: "{{ var('snowplow__events_table', 'events') if project_name != 'snowplow_ecommerce_integration_tests' else 'snowplow_ecommerce_events_stg' }}" diff --git a/models/products/scratch/spark/snowplow_ecommerce_product_interactions_this_run.sql b/models/products/scratch/spark/snowplow_ecommerce_product_interactions_this_run.sql new file mode 100644 index 0000000..2a38d4f --- /dev/null +++ b/models/products/scratch/spark/snowplow_ecommerce_product_interactions_this_run.sql @@ -0,0 +1,193 @@ +{# +Copyright (c) 2022-present Snowplow Analytics Ltd. All rights reserved. +This program is licensed to you under the Snowplow Personal and Academic License Version 1.0, +and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0. +You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/ +#} + +{{ + config( + tags=["this_run"] + ) +}} + +with prep as ( + + select + t.event_id, + t.page_view_id, + t.app_id, + + -- session fields + t.domain_sessionid, + t.event_in_session_index, + + -- user fields + t.domain_userid, + t.network_userid, + t.user_id, + t.ecommerce_user_id, + + -- timestamp fields + t.derived_tstamp, + DATE(derived_tstamp) as derived_tstamp_date, + + t.ecommerce_action_type, + t.ecommerce_action_name, + + -- ecommerce action booleans + t.ecommerce_action_type IN ('product_view', 'list_view') as is_product_view, + CASE WHEN t.ecommerce_action_type IN ('product_view', 'list_view') THEN t.ecommerce_action_type END as product_view_type, + t.ecommerce_action_type = 'add_to_cart' as is_add_to_cart, + t.ecommerce_action_type = 'remove_from_cart' as is_remove_from_cart, + CASE WHEN t.ecommerce_action_type = 'list_view' THEN t.ecommerce_action_name END as product_list_name, + t.ecommerce_action_type = 'transaction' as is_product_transaction, + + t.ecommerce_user_is_guest, + t.ecommerce_user_email, + t.transaction_id, + + POSEXPLODE(contexts_com_snowplowanalytics_snowplow_ecommerce_product_1) as (index, contexts_com_snowplowanalytics_snowplow_ecommerce_product_1) + + from {{ ref('snowplow_ecommerce_base_events_this_run') }} as t + +), product_info as ( + select + {{ dbt_utils.generate_surrogate_key(['event_id', 'contexts_com_snowplowanalytics_snowplow_ecommerce_product_1.id', 'index']) }} as product_event_id, + event_id, + page_view_id, + app_id, + + -- session fields + domain_sessionid, + event_in_session_index, + + -- user fields + domain_userid, + network_userid, + user_id, + ecommerce_user_id, + + -- timestamp fields + derived_tstamp, + derived_tstamp_date, + + -- ecommerce action fields + ecommerce_action_type, + ecommerce_action_name, + + -- ecommerce product fields + CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_product_1.id AS string) as product_id, + CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_product_1.category AS string) as product_category, + {%- for i in range(var("snowplow__number_category_levels", 4)) %} + split_part(CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_product_1.category AS string), '{{ var("snowplow__categories_separator", "/") }}', {{i+1}}) as product_subcategory_{{i+1}}, + {%- endfor %} + CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_product_1.currency AS string) as product_currency, + CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_product_1.price AS decimal(9,2)) as product_price, + CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_product_1.brand AS string) as product_brand, + CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_product_1.creative_id AS string) as product_creative_id, + CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_product_1.inventory_status AS string) as product_inventory_status, + CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_product_1.list_price AS float) as product_list_price, + CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_product_1.name AS string) as product_name, + CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_product_1.position AS integer) as product_list_position, + CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_product_1.quantity AS integer) as product_quantity, + CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_product_1.size AS string) as product_size, + CAST(contexts_com_snowplowanalytics_snowplow_ecommerce_product_1.variant AS string) as product_variant, + ecommerce_action_type, + ecommerce_action_name, + + -- ecommerce action booleans + is_product_view, + product_view_type, + is_add_to_cart, + is_remove_from_cart, + product_list_name, + is_product_transaction, + + ecommerce_user_is_guest, + ecommerce_user_email, + transaction_id + + {%- if var('snowplow__product_passthroughs', []) -%} + {%- set passthrough_names = [] -%} + {%- for identifier in var('snowplow__product_passthroughs', []) %} + {# Check if it's a simple column or a sql+alias #} + {%- if identifier is mapping -%} + ,{{identifier['sql']}} as {{identifier['alias']}} + {%- do passthrough_names.append(identifier['alias']) -%} + {%- else -%} + ,t.{{identifier}} + {%- do passthrough_names.append(identifier) -%} + {%- endif -%} + {% endfor -%} + {%- endif %} + + + from prep t + +) + +select + product_event_id, + -- event fields + event_id, + page_view_id, + app_id, + + -- session fields + domain_sessionid, + event_in_session_index, + + -- user fields + domain_userid, + network_userid, + user_id, + ecommerce_user_id, + + -- timestamp fields + derived_tstamp, + derived_tstamp_date, + + -- ecommerce action fields + ecommerce_action_type, + ecommerce_action_name, + + -- ecommerce product fields + product_id, + product_category, + {%- for i in range(var("snowplow__number_category_levels", 4)) %} + CASE WHEN product_subcategory_{{i+1}} = '' THEN NULL ELSE product_subcategory_{{i+1}} END as product_subcategory_{{i+1}}, -- in case the product itself has less than the maximum number of category levels, we stay consistent with BQ/Snowflake + {%- endfor %} + product_currency, + product_price, + product_brand, + product_creative_id, + product_inventory_status, + product_list_price, + product_name, + product_list_position, + product_quantity, + product_size, + product_variant, + + -- ecommerce action booleans + is_product_view, + product_view_type, + is_add_to_cart, + is_remove_from_cart, + product_list_name, + is_product_transaction, + + -- transaction and user fields + transaction_id, + ecommerce_user_email, + ecommerce_user_is_guest + + + {%- if var('snowplow__product_passthroughs', []) -%} + {%- for col in passthrough_names %} + , {{col}} + {%- endfor -%} + {%- endif %} + +from product_info diff --git a/packages.yml b/packages.yml index 452bda5..8f3da37 100644 --- a/packages.yml +++ b/packages.yml @@ -1,3 +1,3 @@ packages: - - package: snowplow/snowplow_utils - version: [">=0.16.3", "<0.17.0"] + - git: "https://github.com/snowplow/dbt-snowplow-utils.git" + revision: "release/snowplow-utils/0.17" \ No newline at end of file