diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py index f8aab64c3d7a6..0516cc83d2304 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py @@ -759,6 +759,14 @@ def my_dbt_op(dbt: DbtCliResource): # See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles#advanced-customizing-a-profile-directory # for more information. **({"DBT_PROFILES_DIR": self.profiles_dir} if self.profiles_dir else {}), + # When dbt is enabled with asset checks, we turn off any indirection with dbt selection. + # This way, the Dagster context completely determines what is executed in a dbt + # invocation with a subsetted selection. + **( + {"DBT_INDIRECT_SELECTION": "empty"} + if context and context.selected_asset_check_handles + else {} + ), } assets_def: Optional[AssetsDefinition] = None @@ -835,6 +843,7 @@ def get_subset_selection_for_context( default_dbt_selection += ["--exclude", exclude] dbt_resource_props_by_output_name = get_dbt_resource_props_by_output_name(manifest) + dbt_resource_props_by_test_name = get_dbt_resource_props_by_test_name(manifest) # TODO: this should be a property on the context if this is a permanent indicator for # determining whether the current execution context is performing a subsetted execution. @@ -858,6 +867,15 @@ def get_subset_selection_for_context( selected_dbt_resources.append(fqn_selector) + for _, check_name in context.selected_asset_check_handles: + test_resource_props = dbt_resource_props_by_test_name[check_name] + + # Explicitly select a dbt resource by its fully qualified name (FQN). + # https://docs.getdbt.com/reference/node-selection/methods#the-file-or-fqn-method + fqn_selector = f"fqn:{'.'.join(test_resource_props['fqn'])}" + + selected_dbt_resources.append(fqn_selector) + # Take the union of all the selected resources. # https://docs.getdbt.com/reference/node-selection/set-operators#unions union_selected_dbt_resources = ["--select"] + [" ".join(selected_dbt_resources)] @@ -880,3 +898,13 @@ def get_dbt_resource_props_by_output_name( for node in node_info_by_dbt_unique_id.values() if node["resource_type"] in ASSET_RESOURCE_TYPES } + + +def get_dbt_resource_props_by_test_name( + manifest: Mapping[str, Any] +) -> Mapping[str, Mapping[str, Any]]: + return { + dbt_resource_props["name"]: dbt_resource_props + for unique_id, dbt_resource_props in manifest["nodes"].items() + if unique_id.startswith("test") + }