Skip to content

Commit

Permalink
Implement microbatch incremental strategy (#825)
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db authored Oct 15, 2024
1 parent 00dd9f8 commit d0378d2
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- Add `include_full_name_in_path` config boolean for external locations. This writes tables to {location_root}/{catalog}/{schema}/{table} ([823](https://github.com/databricks/dbt-databricks/pull/823))
- Add a new `workflow_job` submission method for python, which creates a long-lived Databricks Workflow instead of a one-time run (thanks @kdazzle!) ([762](https://github.com/databricks/dbt-databricks/pull/762))
- Allow for additional options to be passed to the Databricks Job API when using other python submission methods. For example, enable email_notifications (thanks @kdazzle!) ([762](https://github.com/databricks/dbt-databricks/pull/762))
- Support microbatch incremental strategy using replace_where ([825](https://github.com/databricks/dbt-databricks/pull/825))

### Under the Hood

Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/databricks/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version: str = "1.8.7"
version: str = "1.9.0b1"
21 changes: 14 additions & 7 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
from dbt_common.utils import executor
from dbt_common.utils.dict import AttrDict
from dbt_common.exceptions import DbtConfigError
from dbt_common.exceptions import DbtInternalError
from dbt_common.contracts.config.base import BaseConfig

if TYPE_CHECKING:
Expand Down Expand Up @@ -650,7 +651,7 @@ def run_sql_for_tests(
conn.transaction_open = False

def valid_incremental_strategies(self) -> List[str]:
return ["append", "merge", "insert_overwrite", "replace_where"]
return ["append", "merge", "insert_overwrite", "replace_where", "microbatch"]

@property
def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]:
Expand Down Expand Up @@ -699,12 +700,18 @@ def get_persist_doc_columns(
# an error when we tried to alter the table.
for column in existing_columns:
name = column.column
if (
name in columns
and "description" in columns[name]
and columns[name]["description"] != (column.comment or "")
):
return_columns[name] = columns[name]
if name in columns:
config_column = columns[name]
if isinstance(config_column, dict):
comment = columns[name].get("description", "")
elif hasattr(config_column, "description"):
comment = config_column.description
else:
raise DbtInternalError(
f"Column {name} in model config is not a dictionary or ColumnInfo object."
)
if comment != (column.comment or ""):
return_columns[name] = columns[name]

return return_columns

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,18 @@ select {{source_cols_csv}} from {{ source_relation }}
{%- endfor %})
{%- endif -%}
{% endmacro %}

{% macro databricks__get_incremental_microbatch_sql(arg_dict) %}
{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%}
{%- set event_time = model.config.event_time -%}
{%- set start_time = config.get("__dbt_internal_microbatch_event_time_start") -%}
{%- set end_time = config.get("__dbt_internal_microbatch_event_time_end") -%}
{%- if start_time -%}
{%- do incremental_predicates.append("cast(" ~ event_time ~ " as TIMESTAMP) >= '" ~ start_time ~ "'") -%}
{%- endif -%}
{%- if end_time -%}
{%- do incremental_predicates.append("cast(" ~ event_time ~ " as TIMESTAMP) < '" ~ end_time ~ "'") -%}
{%- endif -%}
{%- do arg_dict.update({'incremental_predicates': incremental_predicates}) -%}
{{ return(get_replace_where_sql(arg_dict)) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
Use the 'merge' or 'replace_where' strategy instead
{%- endset %}

{% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'replace_where'] %}
{% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'replace_where', 'microbatch'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_delta_only_msg) %}
{% endif %}
{% if raw_strategy == 'replace_where' and file_format not in ['delta'] %}
{% if raw_strategy in ('replace_where', 'microbatch') and file_format not in ['delta'] %}
{% do exceptions.raise_compiler_error(invalid_delta_only_msg) %}
{% endif %}
{% endif %}
Expand Down
16 changes: 16 additions & 0 deletions tests/functional/adapter/microbatch/fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
schema = """version: 2
models:
- name: input_model
- name: microbatch_model
config:
persist_docs:
relation: True
columns: True
description: This is a microbatch model
columns:
- name: id
description: "Id of the model"
- name: event_time
description: "Timestamp of the event"
"""
16 changes: 16 additions & 0 deletions tests/functional/adapter/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dbt.tests.adapter.incremental.test_incremental_microbatch import (
BaseMicrobatch,
)
import pytest

from tests.functional.adapter.microbatch import fixtures


class TestDatabricksMicrobatch(BaseMicrobatch):
@pytest.fixture(scope="class")
def models(self, microbatch_model_sql, input_model_sql):
return {
"schema.yml": fixtures.schema,
"input_model.sql": input_model_sql,
"microbatch_model.sql": microbatch_model_sql,
}

0 comments on commit d0378d2

Please sign in to comment.