Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Microbatch strategy #1179

Merged
merged 11 commits into from
Sep 18, 2024
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240913-215416.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Microbatch incremental strategy
time: 2024-09-13T21:54:16.492885-04:00
custom:
Author: michelleark
Issue: "1182"
2 changes: 1 addition & 1 deletion dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ def submit_python_job(self, parsed_model: dict, compiled_code: str):
return response

def valid_incremental_strategies(self):
return ["append", "merge", "delete+insert"]
return ["append", "merge", "delete+insert", "microbatch"]

def debug_query(self):
"""Override for DebugTask method"""
Expand Down
32 changes: 32 additions & 0 deletions dbt/include/snowflake/macros/materializations/merge.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,35 @@
{% set dml = default__get_incremental_append_sql(get_incremental_append_sql) %}
{% do return(snowflake_dml_explicit_transaction(dml)) %}
{% endmacro %}


{% macro snowflake__get_incremental_microbatch_sql(arg_dict) %}
{%- set target = arg_dict["target_relation"] -%}
{%- set source = arg_dict["temp_relation"] -%}
{%- set dest_columns = arg_dict["dest_columns"] -%}
{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%}

{#-- Add additional incremental_predicates to filter for batch --#}
{% if model.config.get("__dbt_internal_microbatch_event_time_start") -%}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %}
{% endif %}
{% if model.config.__dbt_internal_microbatch_event_time_end -%}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %}
{% endif %}
{% do arg_dict.update({'incremental_predicates': incremental_predicates}) %}

delete from {{ target }} DBT_INTERNAL_TARGET
using {{ source }}
where (
{% for predicate in incremental_predicates %}
{%- if not loop.first %}and {% endif -%} {{ predicate }}
{% endfor %}
);

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% endmacro %}
24 changes: 24 additions & 0 deletions tests/functional/adapter/test_incremental_microbatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import pytest
from dbt.tests.adapter.incremental.test_incremental_microbatch import (
BaseMicrobatch,
)


# No requirement for a unique_id for snowflake microbatch!
_microbatch_model_no_unique_id_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day') }}
select * from {{ ref('input_model') }}
"""


class TestSnowflakeMicrobatch(BaseMicrobatch):
@pytest.fixture(scope="class")
def microbatch_model_sql(self) -> str:
return _microbatch_model_no_unique_id_sql

@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, '2020-01-04 00:00:00-0'), (5, '2020-01-05 00:00:00-0')"