Skip to content

Commit

Permalink
feat(dbt): add toggle to emit column dependency metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
rexledesma committed Mar 20, 2024
1 parent 0628a9c commit c05e69a
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 15 deletions.
16 changes: 16 additions & 0 deletions docs/content/integrations/dbt/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,22 @@ snapshots:
- "{{ dagster.log_column_level_metadata() }}"
```
Column dependencies can be removed from materialization metadata by disabling the collection of parent relation metadata. This can be done by setting the `enable_parent_relation_metadata_collection` argument to `False` in the `dagster.log_column_level_metadata()` macro:

```yaml
models:
+post-hook:
- "{{ dagster.log_column_level_metadata(enable_parent_relation_metadata_collection=false) }}"
seeds:
+post-hook:
- "{{ dagster.log_column_level_metadata(enable_parent_relation_metadata_collection=false) }}"
snapshots:
+post-hook:
- "{{ dagster.log_column_level_metadata(enable_parent_relation_metadata_collection=false) }}"
```

---

## Defining dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,34 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
assert table_schema_by_asset_key == expected_table_schema_by_asset_key


@pytest.mark.skipif(
version.parse(dbt_version) < version.parse("1.6.0"),
reason="Retrieving the dbt project name from the manifest is only available in `dbt-core>=1.6`",
)
def test_no_lineage(test_metadata_manifest: Dict[str, Any]) -> None:
@dbt_assets(manifest=test_metadata_manifest)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(
[
"build",
"--vars",
json.dumps({"dagster_enable_parent_relation_metadata_collection": False}),
],
context=context,
).stream()

result = materialize(
[my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=os.fspath(test_metadata_path))},
)

assert result.success
assert all(
not event.materialization.metadata.get("dagster/column_lineage")
for event in result.get_asset_materialization_events()
)


@pytest.mark.skipif(
version.parse(dbt_version) < version.parse("1.6.0"),
reason="Retrieving the dbt project name from the manifest is only available in `dbt-core>=1.6`",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ require-dbt-version: [">=1.0.0", "<2.0.0"]

models:
+post-hook:
- "{{ dagster.log_column_level_metadata() }}"
- "{{ dagster.log_column_level_metadata(enable_parent_relation_metadata_collection=var('dagster_enable_parent_relation_metadata_collection', 'true')) }}"
test_dagster_metadata:
materialized: table
staging:
materialized: view

seeds:
+post-hook:
- "{{ dagster.log_column_level_metadata() }}"
- "{{ dagster.log_column_level_metadata(enable_parent_relation_metadata_collection=var('dagster_enable_parent_relation_metadata_collection', 'true')) }}"
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro log_column_level_metadata() %}
{% macro log_column_level_metadata(enable_parent_relation_metadata_collection=true) %}
-- This macro should only be run in the context of a `dagster-dbt` invocation.
{%- set is_dagster_dbt_cli = env_var('DAGSTER_DBT_CLI', '') == 'true' -%}

Expand Down Expand Up @@ -44,19 +44,27 @@
-- }
-- }
-- }
{%- set structured_log = {'relation_name': this.render(), 'columns': column_schema, 'parents': {}} -%}

{%- for parent_relation in parent_relations -%}
{%- set parent_relation_columns = adapter.get_columns_in_relation(parent_relation) -%}
{%- set parent_relation_column_schema = {} -%}
{%- for column in parent_relation_columns -%}
{%- set serializable_column = {column.name: {'data_type': column.data_type}} -%}
{%- set _ = parent_relation_column_schema.update(serializable_column) -%}
{%- endfor -%}
--
-- If `enable_parent_relation_metadata_collection` is set to `false`, the structured log
-- will only contain the current node's column metadata.
{%- set structured_log = {'relation_name': this.render(), 'columns': column_schema} -%}

{%- set structured_parent_relation_metadata = {parent_relation.render(): {'columns': parent_relation_column_schema}} -%}
{%- set _ = structured_log['parents'].update(structured_parent_relation_metadata) -%}
{%- endfor -%}
{%- if enable_parent_relation_metadata_collection -%}
{%- set _ = structured_log.update({'parents': {}}) -%}

{%- for parent_relation in parent_relations -%}
{%- set parent_relation_columns = adapter.get_columns_in_relation(parent_relation) -%}
{%- set parent_relation_column_schema = {} -%}

{%- for column in parent_relation_columns -%}
{%- set serializable_column = {column.name: {'data_type': column.data_type}} -%}
{%- set _ = parent_relation_column_schema.update(serializable_column) -%}
{%- endfor -%}

{%- set structured_parent_relation_metadata = {parent_relation.render(): {'columns': parent_relation_column_schema}} -%}
{%- set _ = structured_log['parents'].update(structured_parent_relation_metadata) -%}
{%- endfor -%}
{%- endif -%}

{%- do log(tojson(structured_log), info=true) -%}
{%- endif -%}
Expand Down

0 comments on commit c05e69a

Please sign in to comment.