From fec8b30985a2f7007c8ff04866623d7a37238236 Mon Sep 17 00:00:00 2001 From: Ben Cassell <98852248+benc-db@users.noreply.github.com> Date: Wed, 30 Oct 2024 15:44:04 -0700 Subject: [PATCH 1/2] Update split part to use safe operation (#839) --- CHANGELOG.md | 4 ++ .../databricks/macros/utils/split_part.sql | 43 +++++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 dbt/include/databricks/macros/utils/split_part.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index 4bac9a54d..c76f96739 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,10 @@ - Allow for additional options to be passed to the Databricks Job API when using other python submission methods. For example, enable email_notifications (thanks @kdazzle!) ([762](https://github.com/databricks/dbt-databricks/pull/762)) - Support microbatch incremental strategy using replace_where ([825](https://github.com/databricks/dbt-databricks/pull/825)) +### Fixes + +- Replace array indexing with 'get' in split_part so as not to raise exception when indexing beyond bounds ([839](https://github.com/databricks/dbt-databricks/pull/839)) + ### Under the Hood - Significant refactoring and increased testing of python_submissions ([830](https://github.com/databricks/dbt-databricks/pull/830)) diff --git a/dbt/include/databricks/macros/utils/split_part.sql b/dbt/include/databricks/macros/utils/split_part.sql new file mode 100644 index 000000000..92b0cecd0 --- /dev/null +++ b/dbt/include/databricks/macros/utils/split_part.sql @@ -0,0 +1,43 @@ +{% macro databricks__split_part(string_text, delimiter_text, part_number) %} + + {% set delimiter_expr %} + + -- escape if starts with a special character + case when regexp_extract({{ delimiter_text }}, '([^A-Za-z0-9])(.*)', 1) != '_' + then concat('\\', {{ delimiter_text }}) + else {{ delimiter_text }} end + + {% endset %} + + {% if part_number >= 0 %} + + {% set split_part_expr %} + + get(split( + {{ string_text }}, + {{ delimiter_expr }} + ), {{ part_number - 1 if part_number > 0 else part_number }}) + + {% endset %} + + {% else %} + + {% set split_part_expr %} + + get(split( + {{ string_text }}, + {{ delimiter_expr }} + ), + length({{ string_text }}) + - length( + replace({{ string_text }}, {{ delimiter_text }}, '') + ) + 1 + {{ part_number }} + ) + + {% endset %} + + {% endif %} + + {{ return(split_part_expr) }} + +{% endmacro %} From 6cb6eaa648cb00abb9e13b4c5a8d09120389e0d2 Mon Sep 17 00:00:00 2001 From: Ben Cassell <98852248+benc-db@users.noreply.github.com> Date: Thu, 31 Oct 2024 09:29:47 -0700 Subject: [PATCH 2/2] Change merge alias defaults (#840) --- .../incremental/strategies.sql | 8 +++---- docs/databricks-merge.md | 21 ++++++++++--------- .../adapter/incremental/fixtures.py | 1 + 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/dbt/include/databricks/macros/materializations/incremental/strategies.sql b/dbt/include/databricks/macros/materializations/incremental/strategies.sql index 1a03ee9fd..57db5496a 100644 --- a/dbt/include/databricks/macros/materializations/incremental/strategies.sql +++ b/dbt/include/databricks/macros/materializations/incremental/strategies.sql @@ -72,8 +72,8 @@ select {{source_cols_csv}} from {{ source_relation }} {% macro databricks__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %} {# need dest_columns for merge_exclude_columns, default to use "*" #} - {%- set target_alias = config.get('target_alias', 'tgt') -%} - {%- set source_alias = config.get('source_alias', 'src') -%} + {%- set target_alias = config.get('target_alias', 'DBT_INTERNAL_DEST') -%} + {%- set source_alias = config.get('source_alias', 'DBT_INTERNAL_SOURCE') -%} {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%} {%- set dest_columns = adapter.get_columns_in_relation(target) -%} @@ -146,7 +146,7 @@ select {{source_cols_csv}} from {{ source_relation }} {%- endif %} {% endmacro %} -{% macro get_merge_update_set(update_columns, on_schema_change, source_columns, source_alias='src') %} +{% macro get_merge_update_set(update_columns, on_schema_change, source_columns, source_alias='DBT_INTERNAL_SOURCE') %} {%- if update_columns -%} {%- for column_name in update_columns -%} {{ column_name }} = {{ source_alias }}.{{ column_name }}{%- if not loop.last %}, {% endif -%} @@ -160,7 +160,7 @@ select {{source_cols_csv}} from {{ source_relation }} {%- endif -%} {% endmacro %} -{% macro get_merge_insert(on_schema_change, source_columns, source_alias='src') %} +{% macro get_merge_insert(on_schema_change, source_columns, source_alias='DBT_INTERNAL_SOURCE') %} {%- if on_schema_change == 'ignore' -%} * {%- else -%} diff --git a/docs/databricks-merge.md b/docs/databricks-merge.md index 15c7b66b3..caa003367 100644 --- a/docs/databricks-merge.md +++ b/docs/databricks-merge.md @@ -6,8 +6,8 @@ The merge incremental strategy requires: - Databricks Runtime 5.1 and above for delta file format - Apache Spark for hudi file format -dbt will run an [atomic `merge` statement](https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html) which looks nearly identical to the default merge behavior on Snowflake and BigQuery. -If a `unique_key` is specified (recommended), dbt will update old records with values from new records that match on the key column. +dbt will run an [atomic `merge` statement](https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html) which looks nearly identical to the default merge behavior on Snowflake and BigQuery. +If a `unique_key` is specified (recommended), dbt will update old records with values from new records that match on the key column. If a `unique_key` is not specified, dbt will forgo match criteria and simply insert all new records (similar to `append` strategy). Specifying `merge` as the incremental strategy is optional since it's the default strategy used when none is specified. @@ -15,17 +15,18 @@ Specifying `merge` as the incremental strategy is optional since it's the defaul From v.1.9 onwards `merge` behavior can be tuned by setting the additional parameters. - Merge steps control parameters that tweak the default behaviour: + - `skip_matched_step`: if set to `true`, dbt will completely skip the `matched` clause of the merge statement. - `skip_not_matched_step`: similarly if `true` the `not matched` clause will be skipped. - `not_matched_by_source_action`: if set to `delete` the corresponding `when not matched by source ... then delete` clause will be added to the merge statement. - - `merge_with_schema_evolution`: when set to `true` dbt generates the merge statement with `WITH SCHEMA EVOLUTION` clause. + - `merge_with_schema_evolution`: when set to `true` dbt generates the merge statement with `WITH SCHEMA EVOLUTION` clause. -- Step conditions that are expressed with an explicit SQL predicates allow to execute corresponding action only in case the conditions are met in addition to matching by the `unique_key`. - - `matched_condition`: applies to `when matched` step. - In order to define such conditions one may use `tgt` and `src` as aliases for the target and source tables respectively, e.g. `tgt.col1 = hash(src.col2, src.col3)`. +- Step conditions that are expressed with an explicit SQL predicates allow to execute corresponding action only in case the conditions are met in addition to matching by the `unique_key`. + - `matched_condition`: applies to `when matched` step. + In order to define such conditions one may use `DBT_INTERNAL_DEST` and `DBT_INTERNAL_SOURCE` as aliases for the target and source tables respectively, e.g. `DBT_INTERNAL_DEST.col1 = hash(DBT_INTERNAL_SOURCE.col2, DBT_INTERNAL_SOURCE.col3)`. - `not_matched_condition`: applies to `when not matched` step. - `not_matched_by_source_condition`: applies to `when not matched by source` step. - - `target_alias`, `source_alias`: string values that will be used instead of `tgt` and `src` to distinguish between source and target tables in the merge statement. + - `target_alias`, `source_alias`: string values that will be used instead of `DBT_INTERNAL_DEST` and `DBT_INTERNAL_SOURCE` to distinguish between source and target tables in the merge statement. Example below illustrates how these parameters affect the merge statement generation: @@ -53,7 +54,7 @@ from ``` ```sql -merge +merge with schema evolution into target_table as t @@ -89,8 +90,8 @@ when not matched s.attr2, s.tech_change_ts ) - + when not matched by source and t.tech_change_ts < current_timestamp() then delete -``` \ No newline at end of file +``` diff --git a/tests/functional/adapter/incremental/fixtures.py b/tests/functional/adapter/incremental/fixtures.py index edf70d285..9d0f29133 100644 --- a/tests/functional/adapter/incremental/fixtures.py +++ b/tests/functional/adapter/incremental/fixtures.py @@ -378,6 +378,7 @@ materialized = 'incremental', unique_key = 'id', incremental_strategy='merge', + source_alias='src', target_alias='t', matched_condition='src.V > t.V and hash(src.first, src.second) <> hash(t.first, t.second)', not_matched_condition='src.V > 0',