From 566fe88a3324e09ec9f724363166480702551bec Mon Sep 17 00:00:00 2001 From: Rex Ledesma Date: Mon, 18 Mar 2024 17:46:02 -0400 Subject: [PATCH] feat(dbt): add toggle to emit column dependency metadata --- docs/content/integrations/dbt/reference.mdx | 16 +++++++++ .../dbt_packages/test_columns_metadata.py | 28 +++++++++++++++ .../test_dagster_metadata/dbt_project.yml | 4 +-- .../macros/log_column_level_metadata.sql | 34 ++++++++++++------- 4 files changed, 67 insertions(+), 15 deletions(-) diff --git a/docs/content/integrations/dbt/reference.mdx b/docs/content/integrations/dbt/reference.mdx index cd7f4bd41b7b2..b6b2ea64b76bc 100644 --- a/docs/content/integrations/dbt/reference.mdx +++ b/docs/content/integrations/dbt/reference.mdx @@ -611,6 +611,22 @@ snapshots: - "{{ dagster.log_column_level_metadata() }}" ``` +Column dependencies can be removed from materialization metadata by disabling the collection of parent relation metadata. This can be done by setting the `enable_parent_relation_metadata_collection` argument to `False` in the `dagster.log_column_level_metadata()` macro: + +```yaml +models: + +post-hook: + - "{{ dagster.log_column_level_metadata(enable_parent_relation_metadata_collection=false) }}" + +seeds: + +post-hook: + - "{{ dagster.log_column_level_metadata(enable_parent_relation_metadata_collection=false) }}" + +snapshots: + +post-hook: + - "{{ dagster.log_column_level_metadata(enable_parent_relation_metadata_collection=false) }}" +``` + --- ## Defining dependencies diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py index 43c26e8e878e0..d5a284fecc1d3 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py @@ -75,6 +75,34 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): assert table_schema_by_asset_key == expected_table_schema_by_asset_key +@pytest.mark.skipif( + version.parse(dbt_version) < version.parse("1.6.0"), + reason="Retrieving the dbt project name from the manifest is only available in `dbt-core>=1.6`", +) +def test_no_lineage(test_metadata_manifest: Dict[str, Any]) -> None: + @dbt_assets(manifest=test_metadata_manifest) + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): + yield from dbt.cli( + [ + "build", + "--vars", + json.dumps({"dagster_enable_parent_relation_metadata_collection": False}), + ], + context=context, + ).stream() + + result = materialize( + [my_dbt_assets], + resources={"dbt": DbtCliResource(project_dir=os.fspath(test_metadata_path))}, + ) + + assert result.success + assert all( + not event.materialization.metadata.get("dagster/column_lineage") + for event in result.get_asset_materialization_events() + ) + + @pytest.mark.skipif( version.parse(dbt_version) < version.parse("1.6.0"), reason="Retrieving the dbt project name from the manifest is only available in `dbt-core>=1.6`", diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml index 53288d76b1284..d10377cd2b2e4 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml @@ -21,7 +21,7 @@ require-dbt-version: [">=1.0.0", "<2.0.0"] models: +post-hook: - - "{{ dagster.log_column_level_metadata() }}" + - "{{ dagster.log_column_level_metadata(enable_parent_relation_metadata_collection=var('dagster_enable_parent_relation_metadata_collection', 'true')) }}" test_dagster_metadata: materialized: table staging: @@ -29,4 +29,4 @@ models: seeds: +post-hook: - - "{{ dagster.log_column_level_metadata() }}" + - "{{ dagster.log_column_level_metadata(enable_parent_relation_metadata_collection=var('dagster_enable_parent_relation_metadata_collection', 'true')) }}" diff --git a/python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_column_level_metadata.sql b/python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_column_level_metadata.sql index 5fcdf70e7b6cb..2f2e9630e49bc 100644 --- a/python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_column_level_metadata.sql +++ b/python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_column_level_metadata.sql @@ -1,4 +1,4 @@ -{% macro log_column_level_metadata() %} +{% macro log_column_level_metadata(enable_parent_relation_metadata_collection=true) %} -- This macro should only be run in the context of a `dagster-dbt` invocation. {%- set is_dagster_dbt_cli = env_var('DAGSTER_DBT_CLI', '') == 'true' -%} @@ -44,19 +44,27 @@ -- } -- } -- } - {%- set structured_log = {'relation_name': this.render(), 'columns': column_schema, 'parents': {}} -%} - - {%- for parent_relation in parent_relations -%} - {%- set parent_relation_columns = adapter.get_columns_in_relation(parent_relation) -%} - {%- set parent_relation_column_schema = {} -%} - {%- for column in parent_relation_columns -%} - {%- set serializable_column = {column.name: {'data_type': column.data_type}} -%} - {%- set _ = parent_relation_column_schema.update(serializable_column) -%} - {%- endfor -%} + -- + -- If `enable_parent_relation_metadata_collection` is set to `false`, the structured log + -- will only contain the current node's column metadata. + {%- set structured_log = {'relation_name': this.render(), 'columns': column_schema} -%} - {%- set structured_parent_relation_metadata = {parent_relation.render(): {'columns': parent_relation_column_schema}} -%} - {%- set _ = structured_log['parents'].update(structured_parent_relation_metadata) -%} - {%- endfor -%} + {%- if enable_parent_relation_metadata_collection -%} + {%- set _ = structured_log.update({'parents': {}}) -%} + + {%- for parent_relation in parent_relations -%} + {%- set parent_relation_columns = adapter.get_columns_in_relation(parent_relation) -%} + {%- set parent_relation_column_schema = {} -%} + + {%- for column in parent_relation_columns -%} + {%- set serializable_column = {column.name: {'data_type': column.data_type}} -%} + {%- set _ = parent_relation_column_schema.update(serializable_column) -%} + {%- endfor -%} + + {%- set structured_parent_relation_metadata = {parent_relation.render(): {'columns': parent_relation_column_schema}} -%} + {%- set _ = structured_log['parents'].update(structured_parent_relation_metadata) -%} + {%- endfor -%} + {%- endif -%} {%- do log(tojson(structured_log), info=true) -%} {%- endif -%}