-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement microbatch incremental strategy #825
Changes from all commits
6812460
fb62fac
4834834
23d7283
bd4be24
8993064
28e3cfd
62b47db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
version: str = "1.8.7" | ||
version: str = "1.9.0b1" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -170,3 +170,18 @@ select {{source_cols_csv}} from {{ source_relation }} | |
{%- endfor %}) | ||
{%- endif -%} | ||
{% endmacro %} | ||
|
||
{% macro databricks__get_incremental_microbatch_sql(arg_dict) %} | ||
{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%} | ||
{%- set event_time = model.config.event_time -%} | ||
{%- set start_time = config.get("__dbt_internal_microbatch_event_time_start") -%} | ||
{%- set end_time = config.get("__dbt_internal_microbatch_event_time_end") -%} | ||
{%- if start_time -%} | ||
{%- do incremental_predicates.append("cast(" ~ event_time ~ " as TIMESTAMP) >= '" ~ start_time ~ "'") -%} | ||
{%- endif -%} | ||
{%- if end_time -%} | ||
{%- do incremental_predicates.append("cast(" ~ event_time ~ " as TIMESTAMP) < '" ~ end_time ~ "'") -%} | ||
{%- endif -%} | ||
{%- do arg_dict.update({'incremental_predicates': incremental_predicates}) -%} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Preps for replace_where strategy by adding
as an incremental predicate. |
||
{{ return(get_replace_where_sql(arg_dict)) }} | ||
{% endmacro %} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
schema = """version: 2 | ||
models: | ||
- name: input_model | ||
|
||
- name: microbatch_model | ||
config: | ||
persist_docs: | ||
relation: True | ||
columns: True | ||
description: This is a microbatch model | ||
columns: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added my own schema with column comments to supplement the included tests, since hitting comments originally broke my implementation despite passing the included functional tests. |
||
- name: id | ||
description: "Id of the model" | ||
- name: event_time | ||
description: "Timestamp of the event" | ||
""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
from dbt.tests.adapter.incremental.test_incremental_microbatch import ( | ||
BaseMicrobatch, | ||
) | ||
import pytest | ||
|
||
from tests.functional.adapter.microbatch import fixtures | ||
|
||
|
||
class TestDatabricksMicrobatch(BaseMicrobatch): | ||
@pytest.fixture(scope="class") | ||
def models(self, microbatch_model_sql, input_model_sql): | ||
return { | ||
"schema.yml": fixtures.schema, | ||
"input_model.sql": input_model_sql, | ||
"microbatch_model.sql": microbatch_model_sql, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure exactly what introduces this uncertainty, but I've experimentally observed that sometimes config_column is a dict and sometimes its a ColumnInfo, and these types have different access methods for getting description.