diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py index 6025e98f6d6c6..3794285cbacfe 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py @@ -303,7 +303,7 @@ def get_manifest_and_translator_from_dbt_assets( check.invariant(len(dbt_assets) == 1, "Exactly one dbt AssetsDefinition is required") dbt_assets_def = dbt_assets[0] metadata_by_key = dbt_assets_def.metadata_by_key or {} - first_asset_key = next(iter(dbt_assets_def.keys)) + first_asset_key = next(iter(dbt_assets_def.metadata_by_key.keys())) first_metadata = metadata_by_key.get(first_asset_key, {}) manifest_wrapper: Optional["DbtManifestWrapper"] = first_metadata.get(MANIFEST_METADATA_KEY) if manifest_wrapper is None: diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py index 133254655bc78..851b807932d50 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py @@ -34,6 +34,7 @@ from dagster._core.execution.context.compute import OpExecutionContext from dbt.contracts.results import NodeStatus, TestStatus from dbt.node_types import NodeType +from dbt.version import __version__ as dbt_version from packaging import version from pydantic import Field, root_validator, validator from typing_extensions import Literal @@ -581,8 +582,6 @@ def validate_profiles_dir(cls, profiles_dir: str) -> str: @root_validator(pre=True) def validate_dbt_version(cls, values: Dict[str, Any]) -> Dict[str, Any]: """Validate that the dbt version is supported.""" - from dbt.version import __version__ as dbt_version - if version.parse(dbt_version) < version.parse("1.4.0"): raise ValueError( "To use `dagster_dbt.DbtCliResource`, you must use `dbt-core>=1.4.0`. Currently," @@ -780,6 +779,16 @@ def my_dbt_op(dbt: DbtCliResource): manifest, dagster_dbt_translator = get_manifest_and_translator_from_dbt_assets( [assets_def] ) + + # When dbt is enabled with asset checks, we turn off any indirection with dbt selection. + # This way, the Dagster context completely determines what is executed in a dbt + # invocation with a subsetted selection. + if ( + version.parse(dbt_version) >= version.parse("1.5.0") + and dagster_dbt_translator.settings.enable_asset_checks + ): + env["DBT_INDIRECT_SELECTION"] = "empty" + selection_args = get_subset_selection_for_context( context=context, manifest=manifest, @@ -844,6 +853,7 @@ def get_subset_selection_for_context( default_dbt_selection += ["--exclude", exclude] dbt_resource_props_by_output_name = get_dbt_resource_props_by_output_name(manifest) + dbt_resource_props_by_test_name = get_dbt_resource_props_by_test_name(manifest) # TODO: this should be a property on the context if this is a permanent indicator for # determining whether the current execution context is performing a subsetted execution. @@ -867,6 +877,15 @@ def get_subset_selection_for_context( selected_dbt_resources.append(fqn_selector) + for _, check_name in context.selected_asset_check_keys: + test_resource_props = dbt_resource_props_by_test_name[check_name] + + # Explicitly select a dbt resource by its fully qualified name (FQN). + # https://docs.getdbt.com/reference/node-selection/methods#the-file-or-fqn-method + fqn_selector = f"fqn:{'.'.join(test_resource_props['fqn'])}" + + selected_dbt_resources.append(fqn_selector) + # Take the union of all the selected resources. # https://docs.getdbt.com/reference/node-selection/set-operators#unions union_selected_dbt_resources = ["--select"] + [" ".join(selected_dbt_resources)] @@ -889,3 +908,13 @@ def get_dbt_resource_props_by_output_name( for node in node_info_by_dbt_unique_id.values() if node["resource_type"] in ASSET_RESOURCE_TYPES } + + +def get_dbt_resource_props_by_test_name( + manifest: Mapping[str, Any] +) -> Mapping[str, Mapping[str, Any]]: + return { + dbt_resource_props["name"]: dbt_resource_props + for unique_id, dbt_resource_props in manifest["nodes"].items() + if unique_id.startswith("test") + } diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_checks.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_checks.py index b47e195a92500..77f6ec1d08250 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_checks.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_checks.py @@ -1,18 +1,22 @@ import json import os from pathlib import Path -from typing import List +from typing import List, Optional import pytest -from dagster import AssetCheckResult, AssetExecutionContext, AssetKey, materialize +from dagster import AssetCheckResult, AssetExecutionContext, AssetKey, AssetSelection, materialize from dagster._core.definitions.asset_check_spec import AssetCheckSeverity from dagster_dbt.asset_decorator import dbt_assets from dagster_dbt.asset_defs import load_assets_from_dbt_manifest from dagster_dbt.core.resources_v2 import DbtCliResource from dagster_dbt.dagster_dbt_translator import DagsterDbtTranslator, DagsterDbtTranslatorSettings +from dbt.version import __version__ as dbt_version +from packaging import version pytest.importorskip("dbt.version", minversion="1.4") +is_dbt_1_4 = version.parse("1.4.0") <= version.parse(dbt_version) < version.parse("1.5.0") + test_asset_checks_dbt_project_dir = ( Path(__file__).joinpath("..", "dbt_projects", "test_dagster_asset_checks").resolve() @@ -110,7 +114,37 @@ def __init__(self, test_arg: str, *args, **kwargs): ], ], ) -def test_asset_check_execution(dbt_commands: List[List[str]]) -> None: +@pytest.mark.parametrize( + "selection", + [ + None, + pytest.param( + AssetSelection.keys(AssetKey(["customers"])), + marks=pytest.mark.xfail( + is_dbt_1_4, + reason="DBT_INDIRECT_SELECTION=empty is not supported in dbt 1.4", + ), + ), + pytest.param( + AssetSelection.keys(AssetKey(["customers"])).without_checks(), + marks=pytest.mark.xfail( + is_dbt_1_4, + reason="DBT_INDIRECT_SELECTION=empty is not supported in dbt 1.4", + ), + ), + AssetSelection.keys(AssetKey(["customers"])) + - AssetSelection.keys(AssetKey(["customers"])).without_checks(), + ], + ids=[ + "no selection", + "select customers with checks", + "select customers without checks", + "select only checks for customers", + ], +) +def test_asset_check_execution( + dbt_commands: List[List[str]], selection: Optional[AssetSelection] +) -> None: dbt = DbtCliResource(project_dir=os.fspath(test_asset_checks_dbt_project_dir)) @dbt_assets(manifest=manifest, dagster_dbt_translator=dagster_dbt_translator_with_checks) @@ -123,6 +157,7 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): resources={ "dbt": dbt, }, + selection=selection, ) assert result.success