diff --git a/dbt/include/databricks/macros/materializations/incremental/strategies.sql b/dbt/include/databricks/macros/materializations/incremental/strategies.sql index 1a03ee9f..57db5496 100644 --- a/dbt/include/databricks/macros/materializations/incremental/strategies.sql +++ b/dbt/include/databricks/macros/materializations/incremental/strategies.sql @@ -72,8 +72,8 @@ select {{source_cols_csv}} from {{ source_relation }} {% macro databricks__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %} {# need dest_columns for merge_exclude_columns, default to use "*" #} - {%- set target_alias = config.get('target_alias', 'tgt') -%} - {%- set source_alias = config.get('source_alias', 'src') -%} + {%- set target_alias = config.get('target_alias', 'DBT_INTERNAL_DEST') -%} + {%- set source_alias = config.get('source_alias', 'DBT_INTERNAL_SOURCE') -%} {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%} {%- set dest_columns = adapter.get_columns_in_relation(target) -%} @@ -146,7 +146,7 @@ select {{source_cols_csv}} from {{ source_relation }} {%- endif %} {% endmacro %} -{% macro get_merge_update_set(update_columns, on_schema_change, source_columns, source_alias='src') %} +{% macro get_merge_update_set(update_columns, on_schema_change, source_columns, source_alias='DBT_INTERNAL_SOURCE') %} {%- if update_columns -%} {%- for column_name in update_columns -%} {{ column_name }} = {{ source_alias }}.{{ column_name }}{%- if not loop.last %}, {% endif -%} @@ -160,7 +160,7 @@ select {{source_cols_csv}} from {{ source_relation }} {%- endif -%} {% endmacro %} -{% macro get_merge_insert(on_schema_change, source_columns, source_alias='src') %} +{% macro get_merge_insert(on_schema_change, source_columns, source_alias='DBT_INTERNAL_SOURCE') %} {%- if on_schema_change == 'ignore' -%} * {%- else -%} diff --git a/docs/databricks-merge.md b/docs/databricks-merge.md index 15c7b66b..caa00336 100644 --- a/docs/databricks-merge.md +++ b/docs/databricks-merge.md @@ -6,8 +6,8 @@ The merge incremental strategy requires: - Databricks Runtime 5.1 and above for delta file format - Apache Spark for hudi file format -dbt will run an [atomic `merge` statement](https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html) which looks nearly identical to the default merge behavior on Snowflake and BigQuery. -If a `unique_key` is specified (recommended), dbt will update old records with values from new records that match on the key column. +dbt will run an [atomic `merge` statement](https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html) which looks nearly identical to the default merge behavior on Snowflake and BigQuery. +If a `unique_key` is specified (recommended), dbt will update old records with values from new records that match on the key column. If a `unique_key` is not specified, dbt will forgo match criteria and simply insert all new records (similar to `append` strategy). Specifying `merge` as the incremental strategy is optional since it's the default strategy used when none is specified. @@ -15,17 +15,18 @@ Specifying `merge` as the incremental strategy is optional since it's the defaul From v.1.9 onwards `merge` behavior can be tuned by setting the additional parameters. - Merge steps control parameters that tweak the default behaviour: + - `skip_matched_step`: if set to `true`, dbt will completely skip the `matched` clause of the merge statement. - `skip_not_matched_step`: similarly if `true` the `not matched` clause will be skipped. - `not_matched_by_source_action`: if set to `delete` the corresponding `when not matched by source ... then delete` clause will be added to the merge statement. - - `merge_with_schema_evolution`: when set to `true` dbt generates the merge statement with `WITH SCHEMA EVOLUTION` clause. + - `merge_with_schema_evolution`: when set to `true` dbt generates the merge statement with `WITH SCHEMA EVOLUTION` clause. -- Step conditions that are expressed with an explicit SQL predicates allow to execute corresponding action only in case the conditions are met in addition to matching by the `unique_key`. - - `matched_condition`: applies to `when matched` step. - In order to define such conditions one may use `tgt` and `src` as aliases for the target and source tables respectively, e.g. `tgt.col1 = hash(src.col2, src.col3)`. +- Step conditions that are expressed with an explicit SQL predicates allow to execute corresponding action only in case the conditions are met in addition to matching by the `unique_key`. + - `matched_condition`: applies to `when matched` step. + In order to define such conditions one may use `DBT_INTERNAL_DEST` and `DBT_INTERNAL_SOURCE` as aliases for the target and source tables respectively, e.g. `DBT_INTERNAL_DEST.col1 = hash(DBT_INTERNAL_SOURCE.col2, DBT_INTERNAL_SOURCE.col3)`. - `not_matched_condition`: applies to `when not matched` step. - `not_matched_by_source_condition`: applies to `when not matched by source` step. - - `target_alias`, `source_alias`: string values that will be used instead of `tgt` and `src` to distinguish between source and target tables in the merge statement. + - `target_alias`, `source_alias`: string values that will be used instead of `DBT_INTERNAL_DEST` and `DBT_INTERNAL_SOURCE` to distinguish between source and target tables in the merge statement. Example below illustrates how these parameters affect the merge statement generation: @@ -53,7 +54,7 @@ from ``` ```sql -merge +merge with schema evolution into target_table as t @@ -89,8 +90,8 @@ when not matched s.attr2, s.tech_change_ts ) - + when not matched by source and t.tech_change_ts < current_timestamp() then delete -``` \ No newline at end of file +``` diff --git a/tests/functional/adapter/incremental/fixtures.py b/tests/functional/adapter/incremental/fixtures.py index edf70d28..9d0f2913 100644 --- a/tests/functional/adapter/incremental/fixtures.py +++ b/tests/functional/adapter/incremental/fixtures.py @@ -378,6 +378,7 @@ materialized = 'incremental', unique_key = 'id', incremental_strategy='merge', + source_alias='src', target_alias='t', matched_condition='src.V > t.V and hash(src.first, src.second) <> hash(t.first, t.second)', not_matched_condition='src.V > 0',