Skip to content

Commit

Permalink
feat(dbt): model dbt tests as asset checks only on their attached dbt…
Browse files Browse the repository at this point in the history
… node
  • Loading branch information
rexledesma committed Sep 26, 2023
1 parent d405c22 commit 228070a
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def get_dbt_multi_asset_args(
]
for test_unique_id in test_unique_ids:
test_resource_props = manifest["nodes"][test_unique_id]
check_spec = default_asset_check_fn(asset_key, test_resource_props)
check_spec = default_asset_check_fn(asset_key, unique_id, test_resource_props)

if check_spec:
check_specs.append(check_spec)
Expand Down
19 changes: 16 additions & 3 deletions python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,11 +518,24 @@ def is_asset_check_from_dbt_resource_props(dbt_resource_props: Mapping[str, Any]
return dbt_resource_props["meta"].get("dagster", {}).get("asset_check", False)


def is_generic_test_on_attached_node_from_dbt_resource_props(
unique_id: str, dbt_resource_props: Mapping[str, Any]
) -> bool:
attached_node_unique_id = dbt_resource_props.get("attached_node")
is_generic_test = bool(attached_node_unique_id)

return is_generic_test and attached_node_unique_id == unique_id


def default_asset_check_fn(
asset_key: AssetKey, dbt_resource_props: Mapping[str, Any]
asset_key: AssetKey, unique_id: str, dbt_resource_props: Mapping[str, Any]
) -> Optional[AssetCheckSpec]:
is_asset_check = is_asset_check_from_dbt_resource_props(dbt_resource_props)
if not is_asset_check:
is_generic_test_on_attached_node = is_generic_test_on_attached_node_from_dbt_resource_props(
unique_id, dbt_resource_props
)

if not all([is_asset_check, is_generic_test_on_attached_node]):
return None

return AssetCheckSpec(
Expand Down Expand Up @@ -698,7 +711,7 @@ def get_asset_deps(

for test_unique_id in test_unique_ids:
test_resource_props = manifest["nodes"][test_unique_id]
check_spec = default_asset_check_fn(asset_key, test_resource_props)
check_spec = default_asset_check_fn(asset_key, unique_id, test_resource_props)

if check_spec:
check_specs.append(check_spec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,27 +139,40 @@ def to_default_asset_events(
)
elif manifest and node_resource_type == NodeType.Test and is_node_finished:
upstream_unique_ids: List[str] = manifest["parent_map"][unique_id]
test_resource_props = manifest["nodes"][unique_id]
metadata = {"unique_id": unique_id, "status": node_status}

for upstream_unique_id in upstream_unique_ids:
test_resource_props = manifest["nodes"][unique_id]
upstream_resource_props: Dict[str, Any] = manifest["nodes"].get(
upstream_unique_id
) or manifest["sources"].get(upstream_unique_id)
upstream_asset_key = dagster_dbt_translator.get_asset_key(upstream_resource_props)
is_asset_check = is_asset_check_from_dbt_resource_props(test_resource_props)
attached_node_unique_id = test_resource_props.get("attached_node")
is_generic_test = bool(attached_node_unique_id)

if is_asset_check and is_generic_test:
is_test_successful = node_status == TestStatus.Pass
metadata = {"unique_id": unique_id, "status": node_status}
severity = AssetCheckSeverity(test_resource_props["config"]["severity"].upper())

if is_asset_check_from_dbt_resource_props(test_resource_props):
yield AssetCheckResult(
success=is_test_successful,
asset_key=upstream_asset_key,
check_name=event_node_info["node_name"],
metadata=metadata,
severity=severity,
attached_node_resource_props: Dict[str, Any] = manifest["nodes"].get(
attached_node_unique_id
) or manifest["sources"].get(attached_node_unique_id)
attached_node_asset_key = dagster_dbt_translator.get_asset_key(
attached_node_resource_props
)

yield AssetCheckResult(
success=is_test_successful,
asset_key=attached_node_asset_key,
check_name=event_node_info["node_name"],
metadata=metadata,
severity=severity,
)
else:
for upstream_unique_id in upstream_unique_ids:
upstream_resource_props: Dict[str, Any] = manifest["nodes"].get(
upstream_unique_id
) or manifest["sources"].get(upstream_unique_id)
upstream_asset_key = dagster_dbt_translator.get_asset_key(
upstream_resource_props
)
else:

yield AssetObservation(
asset_key=upstream_asset_key,
metadata=metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ def test_dbt_tests_to_events(is_asset_check: bool) -> None:
},
"name": "test.a",
"meta": {"dagster": {"asset_check": is_asset_check}},
"attached_node": "model.a" if is_asset_check else None,
},
},
"sources": {},
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select 1 from {{ ref('customers') }} where false
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ def my_dbt_assets_with_checks(): ...
)
assert asset_def.check_specs_by_output_name

# dbt singular tests are not modeled as Dagster asset checks
for check_spec in asset_def.check_specs_by_output_name.values():
assert "assert_singular_test_is_not_asset_check" != check_spec.name


@pytest.mark.parametrize(
"dbt_commands",
Expand Down

0 comments on commit 228070a

Please sign in to comment.