Skip to content

Commit

Permalink
feat(dbt): implement subsetting to execute dbt tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rexledesma committed Sep 27, 2023
1 parent 5497d82 commit 1d7128c
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 4 deletions.
10 changes: 9 additions & 1 deletion python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import hashlib
import itertools
import textwrap
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -299,7 +300,14 @@ def get_manifest_and_translator_from_dbt_assets(
check.invariant(len(dbt_assets) == 1, "Exactly one dbt AssetsDefinition is required")
dbt_assets_def = dbt_assets[0]
metadata_by_key = dbt_assets_def.metadata_by_key or {}
first_asset_key = next(iter(dbt_assets_def.keys))
first_asset_key = next(
iter(
itertools.chain(
dbt_assets_def.keys,
[check_spec.asset_key for check_spec in dbt_assets_def.check_specs],
)
)
)
first_metadata = metadata_by_key.get(first_asset_key, {})
manifest_wrapper: Optional["DbtManifestWrapper"] = first_metadata.get(MANIFEST_METADATA_KEY)
if manifest_wrapper is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,19 @@ def my_dbt_op(dbt: DbtCliResource):
manifest, dagster_dbt_translator = get_manifest_and_translator_from_dbt_assets(
[assets_def]
)

dbt_resource_props_by_test_name = get_dbt_resource_props_by_test_name(manifest)
has_asset_check = any(
dbt_resource_props.get("meta", {}).get("dagster", {}).get("asset_check", False)
for dbt_resource_props in dbt_resource_props_by_test_name.values()
)

# When dbt is enabled with asset checks, we turn off any indirection with dbt selection.
# This way, the Dagster context completely determines what is executed in a dbt
# invocation with a subsetted selection.
if has_asset_check:
env["DBT_INDIRECT_SELECTION"] = "empty"

selection_args = get_subset_selection_for_context(
context=context,
manifest=manifest,
Expand Down Expand Up @@ -835,6 +848,7 @@ def get_subset_selection_for_context(
default_dbt_selection += ["--exclude", exclude]

dbt_resource_props_by_output_name = get_dbt_resource_props_by_output_name(manifest)
dbt_resource_props_by_test_name = get_dbt_resource_props_by_test_name(manifest)

# TODO: this should be a property on the context if this is a permanent indicator for
# determining whether the current execution context is performing a subsetted execution.
Expand All @@ -858,6 +872,15 @@ def get_subset_selection_for_context(

selected_dbt_resources.append(fqn_selector)

for _, check_name in context.selected_asset_check_keys:
test_resource_props = dbt_resource_props_by_test_name[check_name]

# Explicitly select a dbt resource by its fully qualified name (FQN).
# https://docs.getdbt.com/reference/node-selection/methods#the-file-or-fqn-method
fqn_selector = f"fqn:{'.'.join(test_resource_props['fqn'])}"

selected_dbt_resources.append(fqn_selector)

# Take the union of all the selected resources.
# https://docs.getdbt.com/reference/node-selection/set-operators#unions
union_selected_dbt_resources = ["--select"] + [" ".join(selected_dbt_resources)]
Expand All @@ -880,3 +903,13 @@ def get_dbt_resource_props_by_output_name(
for node in node_info_by_dbt_unique_id.values()
if node["resource_type"] in ASSET_RESOURCE_TYPES
}


def get_dbt_resource_props_by_test_name(
manifest: Mapping[str, Any]
) -> Mapping[str, Mapping[str, Any]]:
return {
dbt_resource_props["name"]: dbt_resource_props
for unique_id, dbt_resource_props in manifest["nodes"].items()
if unique_id.startswith("test")
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import json
import os
from pathlib import Path
from typing import List
from typing import List, Optional

import pytest
from dagster import AssetCheckResult, AssetExecutionContext, AssetKey, materialize
from dagster import AssetCheckResult, AssetExecutionContext, AssetKey, AssetSelection, materialize
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from dagster_dbt.asset_decorator import dbt_assets
from dagster_dbt.asset_defs import load_assets_from_dbt_manifest
Expand Down Expand Up @@ -74,7 +74,25 @@ def my_dbt_assets_with_checks(): ...
],
],
)
def test_asset_check_execution(dbt_commands: List[List[str]]) -> None:
@pytest.mark.parametrize(
"selection",
[
None,
AssetSelection.keys(AssetKey(["customers"])),
AssetSelection.keys(AssetKey(["customers"])).without_checks(),
AssetSelection.keys(AssetKey(["customers"]))
- AssetSelection.keys(AssetKey(["customers"])).without_checks(),
],
ids=[
"no selection",
"select customers with checks",
"select customers without checks",
"select only checks for customers",
],
)
def test_asset_check_execution(
dbt_commands: List[List[str]], selection: Optional[AssetSelection]
) -> None:
dbt = DbtCliResource(project_dir=os.fspath(test_asset_checks_dbt_project_dir))

@dbt_assets(manifest=manifest_asset_checks_json)
Expand All @@ -87,6 +105,7 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
resources={
"dbt": dbt,
},
selection=selection,
)

assert result.success
Expand Down

0 comments on commit 1d7128c

Please sign in to comment.