Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare Ecommerce codebase for Spark ( Iceberg ) Support #54

Merged
merged 15 commits into from
Oct 8, 2024
79 changes: 57 additions & 22 deletions .github/workflows/pr_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ name: pr_tests

on:
pull_request:
branches:
- main
- 'release/**'

concurrency: dbt_integration_tests

Expand Down Expand Up @@ -42,13 +39,12 @@ env:
SNOWFLAKE_TEST_WAREHOUSE: ${{ secrets.SNOWFLAKE_TEST_WAREHOUSE }}

# Postgres Connection
POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }}
POSTGRES_TEST_USER: ${{ secrets.POSTGRES_TEST_USER }}
POSTGRES_TEST_PASS: ${{ secrets.POSTGRES_TEST_PASS }}
POSTGRES_TEST_PORT: ${{ secrets.POSTGRES_TEST_PORT }}
POSTGRES_TEST_DBNAME: ${{ secrets.POSTGRES_TEST_DBNAME }}
POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }}
POSTGRES_TEST_PORT: ${{ secrets.POSTGRES_TEST_PORT }}

# Databricks Connection
DATABRICKS_TEST_HOST: ${{ secrets.DATABRICKS_TEST_HOST }}
DATABRICKS_TEST_HTTP_PATH: ${{ secrets.DATABRICKS_TEST_HTTP_PATH }}
DATABRICKS_TEST_TOKEN: ${{ secrets.DATABRICKS_TEST_TOKEN }}
Expand All @@ -63,10 +59,10 @@ jobs:
# Run tests from integration_tests sub dir
working-directory: ./integration_tests
strategy:
fail-fast: false
matrix:
dbt_version: ["1.*"]
warehouse: ["bigquery", "snowflake", "databricks", "postgres"] # TODO: Add RS self-hosted runner

warehouse: ["postgres", "bigquery", "snowflake", "databricks", "spark_iceberg"] # TODO: Add RS self-hosted runner
services:
postgres:
image: postgres:latest
Expand All @@ -90,51 +86,90 @@ jobs:

# Remove '*' and replace '.' with '_' in DBT_VERSION & set as SCHEMA_SUFFIX.
# SCHEMA_SUFFIX allows us to run multiple versions of dbt in parallel without overwriting the output tables
- name: Set SCHEMA_SUFFIX env
run: echo "SCHEMA_SUFFIX=$(echo ${DBT_VERSION%.*} | tr . _)" >> $GITHUB_ENV
env:
DBT_VERSION: ${{ matrix.dbt_version }}
- name: Configure Docker credentials
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_USERNAME }}
password: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_PASSWORD }}
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: eu-west-1
- name: Set warehouse variables
id: set_warehouse
run: |
WAREHOUSE_PLATFORM=$(echo ${{ matrix.warehouse }} | cut -d'_' -f1)
WAREHOUSE_SPECIFIC=$(echo ${{ matrix.warehouse }} | cut -s -d'_' -f2)
echo "WAREHOUSE_PLATFORM=${WAREHOUSE_PLATFORM}" >> $GITHUB_ENV
echo "WAREHOUSE_SPECIFIC=${WAREHOUSE_SPECIFIC}" >> $GITHUB_ENV
echo "warehouse_platform=${WAREHOUSE_PLATFORM}" >> $GITHUB_OUTPUT
echo "warehouse_specific=${WAREHOUSE_SPECIFIC}" >> $GITHUB_OUTPUT
# Remove '*' and replace '.' with '_' in DBT_VERSION & set as SCHEMA_SUFFIX.
# SCHEMA_SUFFIX allows us to run multiple versions of dbt in parallel without overwriting the output tables
- name: Set SCHEMA_SUFFIX env
run: echo "SCHEMA_SUFFIX=$(echo ${DBT_VERSION%.*} | tr . _)" >> $GITHUB_ENV
env:
DBT_VERSION: ${{ matrix.dbt_version }}

- name: Set DEFAULT_TARGET env
run: |
echo "DEFAULT_TARGET=${{ matrix.warehouse }}" >> $GITHUB_ENV
echo "DEFAULT_TARGET=${{matrix.warehouse}}" >> $GITHUB_ENV

- name: Python setup
uses: actions/setup-python@v5
uses: actions/setup-python@v4
with:
python-version: "3.8.x"

- name: Pip cache
uses: actions/cache@v4
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }}
key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}}
restore-keys: |
${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }}
${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}}

# Install latest patch version. Upgrade if cache contains old patch version.
- name: Install dependencies
run: |
pip install --upgrade pip wheel setuptools
pip install -Iv dbt-${{ matrix.warehouse }}==${{ matrix.dbt_version }} --upgrade
pip install wheel setuptools
pip install -Iv dbt-${{env.WAREHOUSE_PLATFORM}}==${{ matrix.dbt_version }} --upgrade
dbt deps
if: ${{matrix.warehouse != 'spark'}}
if: ${{env.WAREHOUSE_PLATFORM != 'spark'}}

- name: Install spark dependencies
run: |
pip install --upgrade pip wheel setuptools
pip install -Iv "dbt-${{ matrix.warehouse }}[ODBC]"==${{ matrix.dbt_version }} --upgrade
pip install -Iv "dbt-${{ env.WAREHOUSE_PLATFORM }}[PyHive]"==${{ matrix.dbt_version }} --upgrade
dbt deps
if: ${{matrix.warehouse == 'spark'}}
if: ${{env.WAREHOUSE_PLATFORM == 'spark'}}

- name: Install Docker Compose
run: |
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose


- name: Build and start Spark cluster
working-directory: .github/workflows/spark_deployment
run: |
docker-compose up -d
echo "Waiting for Spark services to start..."
sleep 90
if: ${{env.WAREHOUSE_PLATFORM == 'spark'}}

- name: "Pre-test: Drop ci schemas"
run: |
dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }}
dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}}

- name: Run tests
run: ./.scripts/integration_test.sh -d ${{ matrix.warehouse }}
run: ./.scripts/integration_test.sh -d ${{matrix.warehouse}}

# post_ci_cleanup sits in utils package
- name: "Post-test: Drop ci schemas"
run: |
dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }}
dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}}
34 changes: 34 additions & 0 deletions .github/workflows/spark_deployment/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
FROM openjdk:11-jre-slim

# Set environment variables
ENV SPARK_VERSION=3.5.1
ENV HADOOP_VERSION=3.3.4
ENV ICEBERG_VERSION=1.4.2
ENV AWS_SDK_VERSION=1.12.581

# Install necessary tools
RUN apt-get update && apt-get install -y curl wget procps rsync ssh

# Download and install Spark
RUN wget https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz && \
tar -xvzf spark-${SPARK_VERSION}-bin-hadoop3.tgz && \
mv spark-${SPARK_VERSION}-bin-hadoop3 /spark && \
rm spark-${SPARK_VERSION}-bin-hadoop3.tgz

# Set Spark environment variables
ENV SPARK_HOME=/spark
ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

# Download necessary JARs
RUN mkdir -p /spark/jars && \
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/${ICEBERG_VERSION}/iceberg-spark-runtime-3.5_2.12-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-spark-runtime.jar && \
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-aws-bundle.jar && \
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -O /spark/jars/hadoop-aws.jar && \
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -O /spark/jars/aws-java-sdk-bundle.jar

# Create directory for Spark events
RUN mkdir -p /tmp/spark-events

WORKDIR /spark

CMD ["bash"]
20 changes: 20 additions & 0 deletions .github/workflows/spark_deployment/build_and_push.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

# Set variables
DOCKER_HUB_ORG="snowplow"
IMAGE_NAME="spark-s3-iceberg"
TAG="latest"

# Build the image
echo "Building Docker image..."
docker build --platform linux/amd64 -t $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG .

# Log in to Docker Hub
echo "Logging in to Docker Hub..."
docker login

# Push the image to Docker Hub
echo "Pushing image to Docker Hub..."
docker push $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG

echo "Image successfully built and pushed to Docker Hub"
66 changes: 66 additions & 0 deletions .github/workflows/spark_deployment/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
version: '3'

networks:
spark-network:
driver: bridge

services:
spark-master:
image: snowplow/spark-s3-iceberg:latest
command: ["/bin/bash", "-c", "/spark/sbin/start-master.sh -h spark-master --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.master.Master-1-*.out"]
hostname: spark-master
ports:
- '8080:8080'
- '7077:7077'
environment:
- SPARK_LOCAL_IP=spark-master
- SPARK_MASTER_HOST=spark-master
- SPARK_MASTER_PORT=7077
- SPARK_MASTER_OPTS="-Dspark.driver.memory=2g"
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_REGION=eu-west-1
- AWS_DEFAULT_REGION=eu-west-1
volumes:
- ./spark-defaults.conf:/spark/conf/spark-defaults.conf
networks:
- spark-network

spark-worker:
image: snowplow/spark-s3-iceberg:latest
command: ["/bin/bash", "-c", "sleep 10 && /spark/sbin/start-worker.sh spark://spark-master:7077 --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.worker.Worker-*.out"]
depends_on:
- spark-master
environment:
- SPARK_WORKER_CORES=2
- SPARK_WORKER_MEMORY=4G
- SPARK_EXECUTOR_MEMORY=3G
- SPARK_LOCAL_IP=spark-worker
- SPARK_MASTER=spark://spark-master:7077
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_REGION=eu-west-1
- AWS_DEFAULT_REGION=eu-west-1
volumes:
- ./spark-defaults.conf:/spark/conf/spark-defaults.conf
networks:
- spark-network

thrift-server:
image: snowplow/spark-s3-iceberg:latest
command: ["/bin/bash", "-c", "sleep 30 && /spark/sbin/start-thriftserver.sh --master spark://spark-master:7077 --driver-memory 2g --executor-memory 3g --hiveconf hive.server2.thrift.port=10000 --hiveconf hive.server2.thrift.bind.host=0.0.0.0 --conf spark.sql.hive.thriftServer.async=true --conf spark.sql.hive.thriftServer.workerQueue.size=2000 --conf spark.sql.hive.thriftServer.maxWorkerThreads=100 --conf spark.sql.hive.thriftServer.minWorkerThreads=50 && tail -f /spark/logs/spark--org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-*.out"]
ports:
- '10000:10000'
depends_on:
- spark-master
- spark-worker
environment:
- SPARK_LOCAL_IP=thrift-server
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_REGION=eu-west-1
- AWS_DEFAULT_REGION=eu-west-1
volumes:
- ./spark-defaults.conf:/spark/conf/spark-defaults.conf
networks:
- spark-network
44 changes: 44 additions & 0 deletions .github/workflows/spark_deployment/spark-defaults.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
spark.master spark://spark-master:7077

spark.sql.warehouse.dir s3a://dbt-spark-iceberg/github-integration-testing
spark.sql.catalog.glue org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.glue.catalog-impl org.apache.iceberg.aws.glue.GlueCatalog
spark.sql.catalog.glue.warehouse s3a://dbt-spark-iceberg/github-integration-testing
spark.sql.catalog.glue.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.defaultCatalog glue
spark.sql.catalog.glue.database dbt-spark-iceberg

spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.access.key <AWS_ACCESS_KEY_ID>
spark.hadoop.fs.s3a.secret.key <AWS_SECRET_ACCESS_KEY>
spark.hadoop.fs.s3a.endpoint s3.eu-west-1.amazonaws.com
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.fs.s3a.region eu-west-1
spark.hadoop.fs.s3a.aws.region eu-west-1

# Enabling AWS SDK V4 signing (required for regions launched after January 2014)
spark.hadoop.com.amazonaws.services.s3.enableV4 true
spark.hadoop.fs.s3a.aws.credentials.provider org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

# Hive Metastore Configuration (using AWS Glue)
spark.hadoop.hive.metastore.client.factory.class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory

# Thrift Server Configuration for better performance in concurrent environments
spark.sql.hive.thriftServer.singleSession false
spark.sql.hive.thriftServer.async true
# spark.sql.hive.thriftServer.maxWorkerThreads 100
# spark.sql.hive.thriftServer.minWorkerThreads 50
# spark.sql.hive.thriftServer.workerQueue.size 2000

# Memory and Performance Tuning
# spark.driver.memory 2g
# spark.executor.memory 3g
# spark.worker.memory 4g
spark.network.timeout 600s
spark.sql.broadcastTimeout 600s
spark.sql.adaptive.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer

# Logging and Debugging
spark.eventLog.enabled true
spark.eventLog.dir /tmp/spark-events
6 changes: 5 additions & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ on-run-end:
models:
snowplow_ecommerce:
+materialized: table
+file_format: "{{ 'delta' if target.type not in ['spark'] else 'iceberg'}}"
+incremental_strategy: "{{ none if target.type not in ['spark'] else 'merge' }}"
base:
manifest:
+schema: "snowplow_manifest"
Expand Down Expand Up @@ -135,7 +137,9 @@ models:
bigquery:
+enabled: "{{ target.type == 'bigquery' | as_bool() }}"
databricks:
+enabled: "{{ target.type in ['databricks', 'spark'] | as_bool() }}"
+enabled: "{{ target.type in ['databricks'] | as_bool() }}"
spark:
+enabled: "{{ target.type in ['spark'] | as_bool() }}"
default:
+enabled: "{{ target.type in ['redshift', 'postgres'] | as_bool() }}"
snowflake:
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/.scripts/integration_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ do
esac
done

declare -a SUPPORTED_DATABASES=("bigquery" "postgres" "databricks" "snowflake")
declare -a SUPPORTED_DATABASES=("bigquery" "postgres" "databricks" "snowflake", "spark_iceberg")

# set to lower case
DATABASE="$(echo $DATABASE | tr '[:upper:]' '[:lower:]')"
Expand All @@ -27,7 +27,7 @@ for db in ${DATABASES[@]}; do

eval "dbt seed --full-refresh --target $db" || exit 1;

echo "Snowplow e-commerce integration tests: Execute models (no mobile) - run 0/4"
echo "Snowplow e-commerce integration tests: Execute models (no mobile) - run 0/4"

eval "dbt run --full-refresh --vars '{snowplow__allow_refresh: true, snowplow__backfill_limit_days: 30, snowplow__enable_mobile_events: false}' --target $db" || exit 1;

Expand Down
21 changes: 21 additions & 0 deletions integration_tests/ci/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,24 @@ integration_tests:
dbname: "{{ env_var('POSTGRES_TEST_DBNAME') }}"
schema: "gh_sp_ecom_dbt_{{ env_var('SCHEMA_SUFFIX') }}"
threads: 4

redshift:
type: redshift
host: "{{ env_var('REDSHIFT_TEST_HOST') }}"
user: "{{ env_var('REDSHIFT_TEST_USER') }}"
pass: "{{ env_var('REDSHIFT_TEST_PASS') }}"
dbname: "{{ env_var('REDSHIFT_TEST_DBNAME') }}"
port: "{{ env_var('REDSHIFT_TEST_PORT') | as_number }}"
schema: "gh_sp_ecom_dbt_{{ env_var('SCHEMA_SUFFIX') }}"
threads: 4

spark_iceberg:
type: spark
method: thrift
host: "{{ env_var('SPARK_MASTER_HOST', 'localhost') }}"
port: 10000
user: "{{ env_var('SPARK_USER', 'spark') }}"
schema: "{{ env_var('SPARK_SCHEMA', 'default') }}"
connect_retries: 5
connect_timeout: 60
threads: 1
5 changes: 4 additions & 1 deletion integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ models:
snowplow_ecommerce_integration_tests:
bind: false
+schema: "snplw_ecommerce_int_tests"
+materialized: table
source:
bigquery:
+enabled: "{{ target.type == 'bigquery' | as_bool() }}"
databricks:
+enabled: "{{ target.type in ['databricks', 'spark'] | as_bool() }}"
+enabled: "{{ target.type in ['databricks'] | as_bool() }}"
spark:
+enabled: "{{ target.type in ['spark'] | as_bool() }}"
default:
+enabled: "{{ target.type in ['redshift', 'postgres'] | as_bool() }}"
snowflake:
Expand Down
Loading
Loading