From 3047d000def082686dabf95a5a2f119bf32c5f8c Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 9 Sep 2024 22:57:26 -0400 Subject: [PATCH 01/10] first pass: add incremental_predicates --- dbt/include/snowflake/macros/materializations/merge.sql | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dbt/include/snowflake/macros/materializations/merge.sql b/dbt/include/snowflake/macros/materializations/merge.sql index e93b29155..66bba0bde 100644 --- a/dbt/include/snowflake/macros/materializations/merge.sql +++ b/dbt/include/snowflake/macros/materializations/merge.sql @@ -48,3 +48,11 @@ {% set dml = default__get_incremental_append_sql(get_incremental_append_sql) %} {% do return(snowflake_dml_explicit_transaction(dml)) %} {% endmacro %} + + +{% macro snowflake__get_incremental_microbatch_sql(target, source, unique_key, dest_columns, incremental_predicates) %} + {% do predicates.append(model.config.event_time ~ " >= " ~ model.config.event_time_start) %} + {% do predicates.append(model.config.event_time ~ " < " ~ model.config.event_time_end) %} + {% set dml = default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %} + {% do return(snowflake_dml_explicit_transaction(dml)) %} +{% endmacro %} From 7aa79d80d149931188fcf5eab31932a034922c26 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 9 Sep 2024 22:57:32 -0400 Subject: [PATCH 02/10] first pass: add incremental_predicates --- dbt/include/snowflake/macros/materializations/merge.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/include/snowflake/macros/materializations/merge.sql b/dbt/include/snowflake/macros/materializations/merge.sql index 66bba0bde..e6c9e207d 100644 --- a/dbt/include/snowflake/macros/materializations/merge.sql +++ b/dbt/include/snowflake/macros/materializations/merge.sql @@ -51,8 +51,8 @@ {% macro snowflake__get_incremental_microbatch_sql(target, source, unique_key, dest_columns, incremental_predicates) %} - {% do predicates.append(model.config.event_time ~ " >= " ~ model.config.event_time_start) %} - {% do predicates.append(model.config.event_time ~ " < " ~ model.config.event_time_end) %} + {% do incremental_predicates.append(model.config.event_time ~ " >= " ~ model.config.event_time_start) %} + {% do incremental_predicates.append(model.config.event_time ~ " < " ~ model.config.event_time_end) %} {% set dml = default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %} {% do return(snowflake_dml_explicit_transaction(dml)) %} {% endmacro %} From b79bd8b71e3d22d5213d58efafc612fe71adff39 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 11 Sep 2024 17:11:24 -0400 Subject: [PATCH 03/10] safely add incremental_predicates + testing --- dbt/adapters/snowflake/impl.py | 2 +- .../macros/materializations/merge.sql | 20 ++++++++++++++----- dev-requirements.txt | 2 +- .../adapter/test_incremental_microbatch.py | 13 ++++++++++++ 4 files changed, 30 insertions(+), 7 deletions(-) create mode 100644 tests/functional/adapter/test_incremental_microbatch.py diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 6854b199d..92b2c3078 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -385,7 +385,7 @@ def submit_python_job(self, parsed_model: dict, compiled_code: str): return response def valid_incremental_strategies(self): - return ["append", "merge", "delete+insert"] + return ["append", "merge", "delete+insert", "microbatch"] def debug_query(self): """Override for DebugTask method""" diff --git a/dbt/include/snowflake/macros/materializations/merge.sql b/dbt/include/snowflake/macros/materializations/merge.sql index e6c9e207d..f4ebd26b6 100644 --- a/dbt/include/snowflake/macros/materializations/merge.sql +++ b/dbt/include/snowflake/macros/materializations/merge.sql @@ -50,9 +50,19 @@ {% endmacro %} -{% macro snowflake__get_incremental_microbatch_sql(target, source, unique_key, dest_columns, incremental_predicates) %} - {% do incremental_predicates.append(model.config.event_time ~ " >= " ~ model.config.event_time_start) %} - {% do incremental_predicates.append(model.config.event_time ~ " < " ~ model.config.event_time_end) %} - {% set dml = default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %} - {% do return(snowflake_dml_explicit_transaction(dml)) %} +{% macro snowflake__get_incremental_microbatch_sql(arg_dict) %} + {% set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') %} + {#-- Add additional incremental_predicates if it is safe to do so --#} + {% if model.config.event_time -%} + {% if model.config.event_time_start -%} + {% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ model.config.event_time ~ " >= " ~ model.config.event_time_start) %} + {% endif %} + {% if model.config.event_time_start -%} + {% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ model.config.event_time ~ " < " ~ model.config.event_time_end) %} + {% endif %} + {% endif %} + {% do arg_dict.update({'incremental_predicates': incremental_predicates}) %} + + {% set dml = default__get_incremental_delete_insert_sql(arg_dict) %} + {% do return(dml) %} {% endmacro %} diff --git a/dev-requirements.txt b/dev-requirements.txt index f3d120eec..d7543eee7 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,5 +1,5 @@ # install latest changes in dbt-core -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git@event-time-ref-filtering#egg=dbt-core&subdirectory=core git+https://github.com/dbt-labs/dbt-adapters.git git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git diff --git a/tests/functional/adapter/test_incremental_microbatch.py b/tests/functional/adapter/test_incremental_microbatch.py new file mode 100644 index 000000000..26046f12f --- /dev/null +++ b/tests/functional/adapter/test_incremental_microbatch.py @@ -0,0 +1,13 @@ +import pytest +from dbt.tests.adapter.incremental.test_incremental_microbatch import ( + BaseMicrobatch, +) + + +class TestSnowflakeMicrobatch(BaseMicrobatch): + @pytest.fixture(scope="class") + def insert_two_rows_sql(self, project) -> str: + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, '2020-01-04 00:00:00-0'), (5, '2020-01-05 00:00:00-0')" From fbed66beadf73b080e90c46a0bc033b8464d90f9 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 12 Sep 2024 12:16:37 -0400 Subject: [PATCH 04/10] rename to __dbt_internal_microbatch_event_time_start --- dbt/include/snowflake/macros/materializations/merge.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/include/snowflake/macros/materializations/merge.sql b/dbt/include/snowflake/macros/materializations/merge.sql index f4ebd26b6..2acf95bad 100644 --- a/dbt/include/snowflake/macros/materializations/merge.sql +++ b/dbt/include/snowflake/macros/materializations/merge.sql @@ -55,10 +55,10 @@ {#-- Add additional incremental_predicates if it is safe to do so --#} {% if model.config.event_time -%} {% if model.config.event_time_start -%} - {% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ model.config.event_time ~ " >= " ~ model.config.event_time_start) %} + {% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ model.config.event_time ~ " >= " ~ model.__dbt_internal_microbatch_event_time_start) %} {% endif %} {% if model.config.event_time_start -%} - {% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ model.config.event_time ~ " < " ~ model.config.event_time_end) %} + {% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ model.config.event_time ~ " < " ~ model.__dbt_internal_microbatch_event_time_start) %} {% endif %} {% endif %} {% do arg_dict.update({'incremental_predicates': incremental_predicates}) %} From 7e3b3335e3b0b367a217e453f2f0e59658ae3a28 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Fri, 13 Sep 2024 18:47:46 -0400 Subject: [PATCH 05/10] update dev-requirements with patch time method --- dev-requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index d7543eee7..0ae3f286e 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,7 +1,7 @@ # install latest changes in dbt-core -git+https://github.com/dbt-labs/dbt-core.git@event-time-ref-filtering#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git@patch-microbatch-event-time#egg=dbt-core&subdirectory=core git+https://github.com/dbt-labs/dbt-adapters.git -git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter +git+https://github.com/dbt-labs/dbt-adapters.git@use-patch-microbatch-time-method#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git # dev From c47eff472cc0724a4f28ba2bce46e6ebf4c29d63 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Fri, 13 Sep 2024 21:54:20 -0400 Subject: [PATCH 06/10] changelog entry --- .changes/unreleased/Features-20240913-215416.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20240913-215416.yaml diff --git a/.changes/unreleased/Features-20240913-215416.yaml b/.changes/unreleased/Features-20240913-215416.yaml new file mode 100644 index 000000000..b2a6e556e --- /dev/null +++ b/.changes/unreleased/Features-20240913-215416.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Microbatch incremental strategy +time: 2024-09-13T21:54:16.492885-04:00 +custom: + Author: michelleark + Issue: "1182" From d4a2b66a489aec7c92f507d4a3b86e94df26803a Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Fri, 13 Sep 2024 22:05:43 -0400 Subject: [PATCH 07/10] tidy up snowflake__get_incremental_microbatch_sql --- .../snowflake/macros/materializations/merge.sql | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dbt/include/snowflake/macros/materializations/merge.sql b/dbt/include/snowflake/macros/materializations/merge.sql index 2acf95bad..9d082878a 100644 --- a/dbt/include/snowflake/macros/materializations/merge.sql +++ b/dbt/include/snowflake/macros/materializations/merge.sql @@ -53,12 +53,12 @@ {% macro snowflake__get_incremental_microbatch_sql(arg_dict) %} {% set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') %} {#-- Add additional incremental_predicates if it is safe to do so --#} - {% if model.config.event_time -%} - {% if model.config.event_time_start -%} - {% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ model.config.event_time ~ " >= " ~ model.__dbt_internal_microbatch_event_time_start) %} + {% if config.get("event_time") -%} + {% if config.get("__dbt_internal_microbatch_event_time_start") -%} + {% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ config.event_time ~ " >= " ~ config.__dbt_internal_microbatch_event_time_start) %} {% endif %} - {% if model.config.event_time_start -%} - {% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ model.config.event_time ~ " < " ~ model.__dbt_internal_microbatch_event_time_start) %} + {% if model.config.__dbt_internal_microbatch_event_time_end -%} + {% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ model.config.event_time ~ " < " ~ model.__dbt_internal_microbatch_event_time_end %} {% endif %} {% endif %} {% do arg_dict.update({'incremental_predicates': incremental_predicates}) %} From 4b09a09b3c004cea31cc20903768209a3dc64e2c Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Fri, 13 Sep 2024 23:05:18 -0400 Subject: [PATCH 08/10] remove requirement for unique_id --- .../macros/materializations/merge.sql | 36 +++++++++++++------ dev-requirements.txt | 2 +- .../adapter/test_incremental_microbatch.py | 11 ++++++ 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/dbt/include/snowflake/macros/materializations/merge.sql b/dbt/include/snowflake/macros/materializations/merge.sql index 9d082878a..57c58afdd 100644 --- a/dbt/include/snowflake/macros/materializations/merge.sql +++ b/dbt/include/snowflake/macros/materializations/merge.sql @@ -51,18 +51,32 @@ {% macro snowflake__get_incremental_microbatch_sql(arg_dict) %} - {% set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') %} - {#-- Add additional incremental_predicates if it is safe to do so --#} - {% if config.get("event_time") -%} - {% if config.get("__dbt_internal_microbatch_event_time_start") -%} - {% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ config.event_time ~ " >= " ~ config.__dbt_internal_microbatch_event_time_start) %} - {% endif %} - {% if model.config.__dbt_internal_microbatch_event_time_end -%} - {% do incremental_predicates.append("DBT_INTERNAL_DEST" ~ "." ~ model.config.event_time ~ " < " ~ model.__dbt_internal_microbatch_event_time_end %} - {% endif %} + {%- set target = arg_dict["target_relation"] -%} + {%- set source = arg_dict["temp_relation"] -%} + {%- set dest_columns = arg_dict["dest_columns"] -%} + {%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%} + + {#-- Add additional incremental_predicates to filter for batch --#} + {% if model.config.get("__dbt_internal_microbatch_event_time_start") -%} + {% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %} + {% endif %} + {% if model.config.__dbt_internal_microbatch_event_time_end -%} + {% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %} {% endif %} {% do arg_dict.update({'incremental_predicates': incremental_predicates}) %} - {% set dml = default__get_incremental_delete_insert_sql(arg_dict) %} - {% do return(dml) %} + delete from {{ target }} DBT_INTERNAL_TARGET + using {{ source }} + where ( + {% for predicate in incremental_predicates %} + {%- if not loop.first %}and {% endif -%} {{ predicate }} + {% endfor %} + ); + + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + insert into {{ target }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source }} + ) {% endmacro %} diff --git a/dev-requirements.txt b/dev-requirements.txt index 0ae3f286e..4d7460996 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,5 +1,5 @@ # install latest changes in dbt-core -git+https://github.com/dbt-labs/dbt-core.git@patch-microbatch-event-time#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git@microbatch-chunked-backfill#egg=dbt-core&subdirectory=core git+https://github.com/dbt-labs/dbt-adapters.git git+https://github.com/dbt-labs/dbt-adapters.git@use-patch-microbatch-time-method#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git diff --git a/tests/functional/adapter/test_incremental_microbatch.py b/tests/functional/adapter/test_incremental_microbatch.py index 26046f12f..bbb57f96c 100644 --- a/tests/functional/adapter/test_incremental_microbatch.py +++ b/tests/functional/adapter/test_incremental_microbatch.py @@ -4,7 +4,18 @@ ) +# No requirement for a unique_id for snowflake microbatch! +_microbatch_model_no_unique_id_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day') }} +select * from {{ ref('input_model') }} +""" + + class TestSnowflakeMicrobatch(BaseMicrobatch): + @pytest.fixture(scope="class") + def microbatch_model_sql(self) -> str: + return _microbatch_model_no_unique_id_sql + @pytest.fixture(scope="class") def insert_two_rows_sql(self, project) -> str: test_schema_relation = project.adapter.Relation.create( From ab720f66428e0eed782d31f8c2b6de8eca715e1d Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Mon, 16 Sep 2024 16:30:24 -0500 Subject: [PATCH 09/10] Revert to using `main` branch of `dbt-tests-adapter` in dev-requirements.txt --- dev-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 4d7460996..0d4514634 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,7 +1,7 @@ # install latest changes in dbt-core git+https://github.com/dbt-labs/dbt-core.git@microbatch-chunked-backfill#egg=dbt-core&subdirectory=core git+https://github.com/dbt-labs/dbt-adapters.git -git+https://github.com/dbt-labs/dbt-adapters.git@use-patch-microbatch-time-method#subdirectory=dbt-tests-adapter +git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git # dev From 46ff0dc951539a717f26dae1590e8003e8d5efec Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 18 Sep 2024 17:51:00 +0100 Subject: [PATCH 10/10] restore dev-requirements.txt --- dev-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 0d4514634..f3d120eec 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,5 +1,5 @@ # install latest changes in dbt-core -git+https://github.com/dbt-labs/dbt-core.git@microbatch-chunked-backfill#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core git+https://github.com/dbt-labs/dbt-adapters.git git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git