Skip to content

Commit

Permalink
feat(dbt): implement subsetting to execute dbt tests (#16801)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Add the ability to select individual dbt tests when subsetting the
execution of a dbt project.

Now that users can explicitly select for both tests and models, we
should disable dbt's indirect selection:
https://docs.getdbt.com/reference/node-selection/test-selection-examples?indirect-selection-mode=empty#indirect-selection.

dbt's indirect selection basically controls whether tests are
automatically run when a dbt model is built. Previously, in the case for
`dbt build`, all tests associated with a dbt model would be run if the
model is materialized. Now, we only run the tests that were explicitly
selected and are available in the asset definition's context.

## How I Tested These Changes
local, pytest
  • Loading branch information
rexledesma authored and yuhan committed Oct 3, 2023
1 parent acd3bf4 commit 22e0e1e
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def get_manifest_and_translator_from_dbt_assets(
check.invariant(len(dbt_assets) == 1, "Exactly one dbt AssetsDefinition is required")
dbt_assets_def = dbt_assets[0]
metadata_by_key = dbt_assets_def.metadata_by_key or {}
first_asset_key = next(iter(dbt_assets_def.keys))
first_asset_key = next(iter(dbt_assets_def.metadata_by_key.keys()))
first_metadata = metadata_by_key.get(first_asset_key, {})
manifest_wrapper: Optional["DbtManifestWrapper"] = first_metadata.get(MANIFEST_METADATA_KEY)
if manifest_wrapper is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from dagster._core.execution.context.compute import OpExecutionContext
from dbt.contracts.results import NodeStatus, TestStatus
from dbt.node_types import NodeType
from dbt.version import __version__ as dbt_version
from packaging import version
from pydantic import Field, root_validator, validator
from typing_extensions import Literal
Expand Down Expand Up @@ -581,8 +582,6 @@ def validate_profiles_dir(cls, profiles_dir: str) -> str:
@root_validator(pre=True)
def validate_dbt_version(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validate that the dbt version is supported."""
from dbt.version import __version__ as dbt_version

if version.parse(dbt_version) < version.parse("1.4.0"):
raise ValueError(
"To use `dagster_dbt.DbtCliResource`, you must use `dbt-core>=1.4.0`. Currently,"
Expand Down Expand Up @@ -780,6 +779,16 @@ def my_dbt_op(dbt: DbtCliResource):
manifest, dagster_dbt_translator = get_manifest_and_translator_from_dbt_assets(
[assets_def]
)

# When dbt is enabled with asset checks, we turn off any indirection with dbt selection.
# This way, the Dagster context completely determines what is executed in a dbt
# invocation with a subsetted selection.
if (
version.parse(dbt_version) >= version.parse("1.5.0")
and dagster_dbt_translator.settings.enable_asset_checks
):
env["DBT_INDIRECT_SELECTION"] = "empty"

selection_args = get_subset_selection_for_context(
context=context,
manifest=manifest,
Expand Down Expand Up @@ -844,6 +853,7 @@ def get_subset_selection_for_context(
default_dbt_selection += ["--exclude", exclude]

dbt_resource_props_by_output_name = get_dbt_resource_props_by_output_name(manifest)
dbt_resource_props_by_test_name = get_dbt_resource_props_by_test_name(manifest)

# TODO: this should be a property on the context if this is a permanent indicator for
# determining whether the current execution context is performing a subsetted execution.
Expand All @@ -867,6 +877,15 @@ def get_subset_selection_for_context(

selected_dbt_resources.append(fqn_selector)

for _, check_name in context.selected_asset_check_keys:
test_resource_props = dbt_resource_props_by_test_name[check_name]

# Explicitly select a dbt resource by its fully qualified name (FQN).
# https://docs.getdbt.com/reference/node-selection/methods#the-file-or-fqn-method
fqn_selector = f"fqn:{'.'.join(test_resource_props['fqn'])}"

selected_dbt_resources.append(fqn_selector)

# Take the union of all the selected resources.
# https://docs.getdbt.com/reference/node-selection/set-operators#unions
union_selected_dbt_resources = ["--select"] + [" ".join(selected_dbt_resources)]
Expand All @@ -889,3 +908,13 @@ def get_dbt_resource_props_by_output_name(
for node in node_info_by_dbt_unique_id.values()
if node["resource_type"] in ASSET_RESOURCE_TYPES
}


def get_dbt_resource_props_by_test_name(
manifest: Mapping[str, Any]
) -> Mapping[str, Mapping[str, Any]]:
return {
dbt_resource_props["name"]: dbt_resource_props
for unique_id, dbt_resource_props in manifest["nodes"].items()
if unique_id.startswith("test")
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import json
import os
from pathlib import Path
from typing import List
from typing import List, Optional

import pytest
from dagster import AssetCheckResult, AssetExecutionContext, AssetKey, materialize
from dagster import AssetCheckResult, AssetExecutionContext, AssetKey, AssetSelection, materialize
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from dagster_dbt.asset_decorator import dbt_assets
from dagster_dbt.asset_defs import load_assets_from_dbt_manifest
from dagster_dbt.core.resources_v2 import DbtCliResource
from dagster_dbt.dagster_dbt_translator import DagsterDbtTranslator, DagsterDbtTranslatorSettings
from dbt.version import __version__ as dbt_version
from packaging import version

pytest.importorskip("dbt.version", minversion="1.4")

is_dbt_1_4 = version.parse("1.4.0") <= version.parse(dbt_version) < version.parse("1.5.0")


test_asset_checks_dbt_project_dir = (
Path(__file__).joinpath("..", "dbt_projects", "test_dagster_asset_checks").resolve()
Expand Down Expand Up @@ -110,7 +114,37 @@ def __init__(self, test_arg: str, *args, **kwargs):
],
],
)
def test_asset_check_execution(dbt_commands: List[List[str]]) -> None:
@pytest.mark.parametrize(
"selection",
[
None,
pytest.param(
AssetSelection.keys(AssetKey(["customers"])),
marks=pytest.mark.xfail(
is_dbt_1_4,
reason="DBT_INDIRECT_SELECTION=empty is not supported in dbt 1.4",
),
),
pytest.param(
AssetSelection.keys(AssetKey(["customers"])).without_checks(),
marks=pytest.mark.xfail(
is_dbt_1_4,
reason="DBT_INDIRECT_SELECTION=empty is not supported in dbt 1.4",
),
),
AssetSelection.keys(AssetKey(["customers"]))
- AssetSelection.keys(AssetKey(["customers"])).without_checks(),
],
ids=[
"no selection",
"select customers with checks",
"select customers without checks",
"select only checks for customers",
],
)
def test_asset_check_execution(
dbt_commands: List[List[str]], selection: Optional[AssetSelection]
) -> None:
dbt = DbtCliResource(project_dir=os.fspath(test_asset_checks_dbt_project_dir))

@dbt_assets(manifest=manifest, dagster_dbt_translator=dagster_dbt_translator_with_checks)
Expand All @@ -123,6 +157,7 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
resources={
"dbt": dbt,
},
selection=selection,
)

assert result.success
Expand Down

0 comments on commit 22e0e1e

Please sign in to comment.