diff --git a/.changes/unreleased/Features-20240903-161003.yaml b/.changes/unreleased/Features-20240903-161003.yaml new file mode 100644 index 000000000..57a0f14c0 --- /dev/null +++ b/.changes/unreleased/Features-20240903-161003.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Allow configuring snapshot column names +time: 2024-09-03T16:10:03.021221-04:00 +custom: + Author: gshank + Issue: "1096" diff --git a/dbt/include/spark/macros/materializations/snapshot.sql b/dbt/include/spark/macros/materializations/snapshot.sql index a397f84e5..43c4750f6 100644 --- a/dbt/include/spark/macros/materializations/snapshot.sql +++ b/dbt/include/spark/macros/materializations/snapshot.sql @@ -13,6 +13,7 @@ {% macro spark__snapshot_merge_sql(target, source, insert_cols) -%} + {%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%} merge into {{ target }} as DBT_INTERNAL_DEST {% if target.is_iceberg %} @@ -21,12 +22,12 @@ {% else %} using {{ source }} as DBT_INTERNAL_SOURCE {% endif %} - on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id + on DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }} when matched - and DBT_INTERNAL_DEST.dbt_valid_to is null + and DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete') then update - set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to + set {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }} when not matched and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert' @@ -81,13 +82,12 @@ {% materialization snapshot, adapter='spark' %} - {%- set config = model['config'] -%} {%- set target_table = model.get('alias', model.get('name')) -%} {%- set strategy_name = config.get('strategy') -%} {%- set unique_key = config.get('unique_key') %} - {%- set file_format = config.get('file_format', 'parquet') -%} + {%- set file_format = config.get('file_format') or 'parquet' -%} {%- set grant_config = config.get('grants') -%} {% set target_relation_exists, target_relation = get_or_create_relation( @@ -126,7 +126,7 @@ {{ run_hooks(pre_hooks, inside_transaction=True) }} {% set strategy_macro = strategy_dispatch(strategy_name) %} - {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %} + {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", model['config'], target_relation_exists) %} {% if not target_relation_exists %} @@ -135,7 +135,9 @@ {% else %} - {{ adapter.valid_snapshot_target(target_relation) }} + {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} + + {{ adapter.valid_snapshot_target(target_relation, columns) }} {% set staging_table = spark_build_snapshot_staging_table(strategy, sql, target_relation) %} diff --git a/setup.py b/setup.py index 9e1fa31e3..2de68a6ff 100644 --- a/setup.py +++ b/setup.py @@ -66,7 +66,7 @@ def _get_plugin_version_dict(): install_requires=[ "sqlparams>=3.0.0", "dbt-common>=1.0.4,<2.0", - "dbt-adapters>=1.1.1,<2.0", + "dbt-adapters>=1.7.0,<2.0", # add dbt-core to ensure backwards compatibility of installation, this is not a functional dependency "dbt-core>=1.8.0", ],