Skip to content

Commit

Permalink
Merge branch 'main' into 1.9.latest
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db committed Oct 31, 2024
2 parents ccaa2f8 + 6cb6eaa commit a4ef073
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 14 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
- Allow for additional options to be passed to the Databricks Job API when using other python submission methods. For example, enable email_notifications (thanks @kdazzle!) ([762](https://github.com/databricks/dbt-databricks/pull/762))
- Support microbatch incremental strategy using replace_where ([825](https://github.com/databricks/dbt-databricks/pull/825))

### Fixes

- Replace array indexing with 'get' in split_part so as not to raise exception when indexing beyond bounds ([839](https://github.com/databricks/dbt-databricks/pull/839))

### Under the Hood

- Significant refactoring and increased testing of python_submissions ([830](https://github.com/databricks/dbt-databricks/pull/830))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ select {{source_cols_csv}} from {{ source_relation }}
{% macro databricks__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %}
{# need dest_columns for merge_exclude_columns, default to use "*" #}

{%- set target_alias = config.get('target_alias', 'tgt') -%}
{%- set source_alias = config.get('source_alias', 'src') -%}
{%- set target_alias = config.get('target_alias', 'DBT_INTERNAL_DEST') -%}
{%- set source_alias = config.get('source_alias', 'DBT_INTERNAL_SOURCE') -%}

{%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
{%- set dest_columns = adapter.get_columns_in_relation(target) -%}
Expand Down Expand Up @@ -146,7 +146,7 @@ select {{source_cols_csv}} from {{ source_relation }}
{%- endif %}
{% endmacro %}

{% macro get_merge_update_set(update_columns, on_schema_change, source_columns, source_alias='src') %}
{% macro get_merge_update_set(update_columns, on_schema_change, source_columns, source_alias='DBT_INTERNAL_SOURCE') %}
{%- if update_columns -%}
{%- for column_name in update_columns -%}
{{ column_name }} = {{ source_alias }}.{{ column_name }}{%- if not loop.last %}, {% endif -%}
Expand All @@ -160,7 +160,7 @@ select {{source_cols_csv}} from {{ source_relation }}
{%- endif -%}
{% endmacro %}

{% macro get_merge_insert(on_schema_change, source_columns, source_alias='src') %}
{% macro get_merge_insert(on_schema_change, source_columns, source_alias='DBT_INTERNAL_SOURCE') %}
{%- if on_schema_change == 'ignore' -%}
*
{%- else -%}
Expand Down
43 changes: 43 additions & 0 deletions dbt/include/databricks/macros/utils/split_part.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{% macro databricks__split_part(string_text, delimiter_text, part_number) %}

{% set delimiter_expr %}

-- escape if starts with a special character
case when regexp_extract({{ delimiter_text }}, '([^A-Za-z0-9])(.*)', 1) != '_'
then concat('\\', {{ delimiter_text }})
else {{ delimiter_text }} end

{% endset %}

{% if part_number >= 0 %}

{% set split_part_expr %}

get(split(
{{ string_text }},
{{ delimiter_expr }}
), {{ part_number - 1 if part_number > 0 else part_number }})

{% endset %}

{% else %}

{% set split_part_expr %}

get(split(
{{ string_text }},
{{ delimiter_expr }}
),
length({{ string_text }})
- length(
replace({{ string_text }}, {{ delimiter_text }}, '')
) + 1 + {{ part_number }}
)

{% endset %}

{% endif %}

{{ return(split_part_expr) }}

{% endmacro %}
21 changes: 11 additions & 10 deletions docs/databricks-merge.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,27 @@ The merge incremental strategy requires:
- Databricks Runtime 5.1 and above for delta file format
- Apache Spark for hudi file format

dbt will run an [atomic `merge` statement](https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html) which looks nearly identical to the default merge behavior on Snowflake and BigQuery.
If a `unique_key` is specified (recommended), dbt will update old records with values from new records that match on the key column.
dbt will run an [atomic `merge` statement](https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html) which looks nearly identical to the default merge behavior on Snowflake and BigQuery.
If a `unique_key` is specified (recommended), dbt will update old records with values from new records that match on the key column.
If a `unique_key` is not specified, dbt will forgo match criteria and simply insert all new records (similar to `append` strategy).

Specifying `merge` as the incremental strategy is optional since it's the default strategy used when none is specified.

From v.1.9 onwards `merge` behavior can be tuned by setting the additional parameters.

- Merge steps control parameters that tweak the default behaviour:

- `skip_matched_step`: if set to `true`, dbt will completely skip the `matched` clause of the merge statement.
- `skip_not_matched_step`: similarly if `true` the `not matched` clause will be skipped.
- `not_matched_by_source_action`: if set to `delete` the corresponding `when not matched by source ... then delete` clause will be added to the merge statement.
- `merge_with_schema_evolution`: when set to `true` dbt generates the merge statement with `WITH SCHEMA EVOLUTION` clause.
- `merge_with_schema_evolution`: when set to `true` dbt generates the merge statement with `WITH SCHEMA EVOLUTION` clause.

- Step conditions that are expressed with an explicit SQL predicates allow to execute corresponding action only in case the conditions are met in addition to matching by the `unique_key`.
- `matched_condition`: applies to `when matched` step.
In order to define such conditions one may use `tgt` and `src` as aliases for the target and source tables respectively, e.g. `tgt.col1 = hash(src.col2, src.col3)`.
- Step conditions that are expressed with an explicit SQL predicates allow to execute corresponding action only in case the conditions are met in addition to matching by the `unique_key`.
- `matched_condition`: applies to `when matched` step.
In order to define such conditions one may use `DBT_INTERNAL_DEST` and `DBT_INTERNAL_SOURCE` as aliases for the target and source tables respectively, e.g. `DBT_INTERNAL_DEST.col1 = hash(DBT_INTERNAL_SOURCE.col2, DBT_INTERNAL_SOURCE.col3)`.
- `not_matched_condition`: applies to `when not matched` step.
- `not_matched_by_source_condition`: applies to `when not matched by source` step.
- `target_alias`, `source_alias`: string values that will be used instead of `tgt` and `src` to distinguish between source and target tables in the merge statement.
- `target_alias`, `source_alias`: string values that will be used instead of `DBT_INTERNAL_DEST` and `DBT_INTERNAL_SOURCE` to distinguish between source and target tables in the merge statement.

Example below illustrates how these parameters affect the merge statement generation:

Expand Down Expand Up @@ -53,7 +54,7 @@ from
```

```sql
merge
merge
with schema evolution
into
target_table as t
Expand Down Expand Up @@ -89,8 +90,8 @@ when not matched
s.attr2,
s.tech_change_ts
)

when not matched by source
and t.tech_change_ts < current_timestamp()
then delete
```
```
1 change: 1 addition & 0 deletions tests/functional/adapter/incremental/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@
materialized = 'incremental',
unique_key = 'id',
incremental_strategy='merge',
source_alias='src',
target_alias='t',
matched_condition='src.V > t.V and hash(src.first, src.second) <> hash(t.first, t.second)',
not_matched_condition='src.V > 0',
Expand Down

0 comments on commit a4ef073

Please sign in to comment.