Skip to content

Commit

Permalink
surface check keys
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm authored and rexledesma committed Sep 26, 2023
1 parent 4b2dad6 commit d405c22
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 80 deletions.
66 changes: 28 additions & 38 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,6 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
)
check_key_by_output[node_output_handle] = check_spec.key

for check_key in assets_def.asset_check_keys:
check_names_by_asset_key = check_names_by_asset_key_by_node_handle.setdefault(
node_handle, defaultdict(set)
)
check_names_by_asset_key[check_key.asset_key].add(check_key.name)

dep_asset_keys_by_node_output_handle = defaultdict(set)
for asset_key, node_output_handles in dep_node_output_handles_by_asset_key.items():
for node_output_handle in node_output_handles:
Expand Down Expand Up @@ -551,15 +545,18 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
source_assets_by_key = {source_asset.key: source_asset for source_asset in source_assets}

assets_defs_by_node_handle: Dict[NodeHandle, "AssetsDefinition"] = {
# nodes for assets
**{
node_handle: assets_defs_by_key[asset_key]
for asset_key, node_handles in dep_node_handles_by_asset_key.items()
for node_handle in node_handles
},
# nodes for asset checks. Required for AssetsDefs that have selected checks
# but not assets
**{
node_handle: assets_def
for node_handle, assets_def in assets_defs_by_outer_node_handle.items()
if assets_def.asset_check_keys
if assets_def.check_keys
},
}

Expand Down Expand Up @@ -815,41 +812,36 @@ def build_asset_selection_job(
build_source_asset_observation_job,
)

included_assets = []
excluded_assets = list(assets)
included_source_assets = []

if asset_selection is None and asset_check_selection is None:
# no selections, include everything
included_assets = list(assets)
excluded_assets = []
included_source_assets = []
included_checks = list(asset_checks)
included_source_assets = list(source_assets)
included_checks_defs = list(asset_checks)
else:
if asset_selection is not None or asset_check_selection is not None:
selected_asset_keys = asset_selection or set()
selected_asset_check_keys = asset_check_selection

(included_assets, excluded_assets) = _subset_assets_defs(
assets, selected_asset_keys, selected_asset_check_keys
)
included_source_assets = _subset_source_assets(source_assets, selected_asset_keys)
# Filter to assets that match either selected assets or include a selected check.
# E.g. a multi asset can be included even if it's not in asset_selection, if it has a selected check
# defined with check_specs
(included_assets, excluded_assets) = _subset_assets_defs(
assets, asset_selection or set(), asset_check_selection
)
included_source_assets = _subset_source_assets(source_assets, asset_selection or set())

if asset_check_selection is not None:
# NOTE: This filters to a checks def if any of the included specs are in the selection.
# This needs to change to fully subsetting checks in multi assets.
included_checks = [
if asset_check_selection is None:
# If assets were selected and checks are None, then include all checks on the selected assets.
# Note: once we start explicitly passing in asset checks instead of None from the front end,
# we can remove this logic.
included_checks_defs = [
asset_check
for asset_check in asset_checks
if [spec for spec in asset_check.specs if spec.key in asset_check_selection]
if asset_check.asset_key in check.not_none(asset_selection)
]
else:
# If assets were selected and checks weren't, then include all checks on the selected assets.
# Note: a future diff needs to add support for selecting assets, and not their checks.
included_checks = [
# Otherwise, filter to explicitly selected checks defs
included_checks_defs = [
asset_check
for asset_check in asset_checks
if asset_check.asset_key in check.not_none(asset_selection)
if [spec for spec in asset_check.specs if spec.key in asset_check_selection]
]

if partitions_def:
Expand All @@ -861,11 +853,11 @@ def build_asset_selection_job(
f"{partitions_def}.",
)

if len(included_assets) or len(included_checks) > 0:
if len(included_assets) or len(included_checks_defs) > 0:
asset_job = build_assets_job(
name=name,
assets=included_assets,
asset_checks=included_checks,
asset_checks=included_checks_defs,
config=config,
source_assets=[*source_assets, *excluded_assets],
resource_defs=resource_defs,
Expand Down Expand Up @@ -911,15 +903,15 @@ def _subset_assets_defs(

# if specific checks were selected, only include those
if selected_asset_check_keys is not None:
selected_check_subset = selected_asset_check_keys & asset.asset_check_keys
selected_check_subset = selected_asset_check_keys & asset.check_keys
# if no checks were selected, filter to checks that target selected assets
else:
selected_check_subset = {
handle for handle in asset.asset_check_keys if handle.asset_key in selected_subset
handle for handle in asset.check_keys if handle.asset_key in selected_subset
}

# all assets in this def are selected
if selected_subset == asset.keys and selected_check_subset == asset.asset_check_keys:
if selected_subset == asset.keys and selected_check_subset == asset.check_keys:
included_assets.add(asset)
# no assets in this def are selected
elif len(selected_subset) == 0 and len(selected_check_subset) == 0:
Expand All @@ -932,9 +924,7 @@ def _subset_assets_defs(
excluded_assets.add(
asset.subset_for(
selected_asset_keys=asset.keys - subset_asset.keys,
selected_asset_check_keys=(
asset.asset_check_keys - subset_asset.asset_check_keys
),
selected_asset_check_keys=(asset.check_keys - subset_asset.check_keys),
)
)
else:
Expand Down
31 changes: 11 additions & 20 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)
Expand Down Expand Up @@ -272,10 +271,7 @@ def __init__(
if selected_asset_check_keys is not None:
self._selected_asset_check_keys = selected_asset_check_keys
else:
self._selected_asset_check_keys = cast(
AbstractSet[AssetCheckKey],
self._check_specs_by_handle.keys(),
)
self._selected_asset_check_keys = self._check_specs_by_handle.keys()

if self._partitions_def is None:
# check if backfill policy is BackfillPolicyType.SINGLE_RUN if asset is not partitioned
Expand Down Expand Up @@ -872,18 +868,7 @@ def check_specs(self) -> Iterable[AssetCheckSpec]:
return self._check_specs_by_output_name.values()

@property
def asset_check_keys(self) -> AbstractSet[AssetCheckKey]:
"""Returns the selected asset checks associated by this AssetsDefinition.
Returns:
AbstractSet[Tuple[AssetKey, str]]: The selected asset checks. An asset check is
identified by the asset key and the name of the check.
"""
return self._selected_asset_check_keys

@public
@property
def check_keys(self) -> AbstractSet[Tuple[AssetKey, str]]:
def check_keys(self) -> AbstractSet[AssetCheckKey]:
"""Returns the selected asset checks associated by this AssetsDefinition.
Returns:
Expand Down Expand Up @@ -1127,7 +1112,7 @@ def _subset_graph_backed_asset(
def subset_for(
self,
selected_asset_keys: AbstractSet[AssetKey],
selected_asset_check_keys: AbstractSet[AssetCheckKey],
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]],
) -> "AssetsDefinition":
"""Create a subset of this AssetsDefinition that will only materialize the assets and checks
in the selected set.
Expand All @@ -1145,14 +1130,20 @@ def subset_for(

# Set of assets within selected_asset_keys which are outputted by this AssetDefinition
asset_subselection = selected_asset_keys & self.keys
asset_check_subselection = selected_asset_check_keys & self.check_keys
if selected_asset_check_keys is None:
# filter to checks that target selected asset keys
asset_check_subselection = {
key for key in self.check_keys if key.asset_key in asset_subselection
}
else:
asset_check_subselection = selected_asset_check_keys & self.check_keys

# Early escape if all assets in AssetsDefinition are selected
if asset_subselection == self.keys and asset_check_subselection == self.check_keys:
return self
elif isinstance(self.node_def, GraphDefinition): # Node is graph-backed asset
check.invariant(
asset_check_subselection is None,
selected_asset_check_keys is None,
"Subsetting graph-backed assets with checks is not yet supported",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ def _dfs(key, cur_color):
ret.append(assets_def)
else:
for asset_keys in color_mapping.values():
ret.append(assets_def.subset_for(asset_keys))
ret.append(assets_def.subset_for(asset_keys, selected_asset_check_keys=None))

return ret

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)

import dagster._check as check
from dagster._annotations import deprecated, experimental, public
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.data_version import (
Expand Down Expand Up @@ -593,7 +592,7 @@ def asset_checks_def(self) -> AssetChecksDefinition:

@public
@property
def selected_asset_check_keys(self) -> AbstractSet[Tuple[AssetKey, str]]:
def selected_asset_check_keys(self) -> AbstractSet[AssetCheckKey]:
if self.has_assets_def:
return self.assets_def.check_keys

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ def test_subset_for(subset, expected_keys, expected_inputs, expected_outputs):
def abc_(context, in1, in2, in3):
pass

subbed = abc_.subset_for({AssetKey(key) for key in subset.split(",")})
subbed = abc_.subset_for(
{AssetKey(key) for key in subset.split(",")}, selected_asset_check_keys=None
)

assert subbed.keys == (
{AssetKey(key) for key in expected_keys.split(",")} if expected_keys else set()
Expand Down Expand Up @@ -293,7 +295,7 @@ def ma_op():
can_subset=True,
)

subset = ma.subset_for({AssetKey("b")})
subset = ma.subset_for({AssetKey("b")}, selected_asset_check_keys=None)
assert subset.group_names_by_key[AssetKey("b")] == "bar"


Expand Down Expand Up @@ -349,7 +351,8 @@ def abc_(context, in1, in2, in3):
}

subbed_1 = replaced_1.subset_for(
{AssetKey(["foo", "bar_in1"]), AssetKey("in3"), AssetKey(["foo", "foo_a"]), AssetKey("b")}
{AssetKey(["foo", "bar_in1"]), AssetKey("in3"), AssetKey(["foo", "foo_a"]), AssetKey("b")},
selected_asset_check_keys=None,
)
assert subbed_1.keys == {AssetKey(["foo", "foo_a"]), AssetKey("b")}

Expand Down Expand Up @@ -387,7 +390,8 @@ def abc_(context, in1, in2, in3):
AssetKey(["again", "foo", "bar_in1"]),
AssetKey(["again", "foo", "foo_a"]),
AssetKey(["c"]),
}
},
selected_asset_check_keys=None,
)
assert subbed_2.keys == {AssetKey(["again", "foo", "foo_a"])}

Expand All @@ -398,7 +402,7 @@ def abc_(context, start):
pass

with pytest.raises(CheckError, match="can_subset=False"):
abc_.subset_for({AssetKey("start"), AssetKey("a")})
abc_.subset_for({AssetKey("start"), AssetKey("a")}, selected_asset_check_keys=None)


def test_fail_for_non_topological_order():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,8 +513,12 @@ def do_run(
elif not selected_keys:
assets_in_run.extend(a.to_source_assets())
else:
assets_in_run.append(a.subset_for(asset_keys_set))
assets_in_run.extend(a.subset_for(a.keys - selected_keys).to_source_assets())
assets_in_run.append(a.subset_for(asset_keys_set, selected_asset_check_keys=None))
assets_in_run.extend(
a.subset_for(
a.keys - selected_keys, selected_asset_check_keys=None
).to_source_assets()
)
materialize_to_memory(
instance=instance,
partition_key=partition_key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ def create(self: "InstanceSnapshot") -> None:
)

selected_keys = selection.resolve(asset_graph)
sampled_multi_asset = multi_asset.subset_for(selected_keys)
sampled_multi_asset = multi_asset.subset_for(
selected_keys, selected_asset_check_keys=None
)
sampled_roots = [ra for ra in roots if ra.key in selected_keys]
to_materialize = [*sampled_roots, sampled_multi_asset]
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,8 +563,8 @@ def test_can_subset_no_selection() -> None:
def foo(context: AssetExecutionContext):
assert context.selected_asset_keys == {AssetKey("asset1"), AssetKey("asset2")}
assert context.selected_asset_check_keys == {
(AssetKey("asset1"), "check1"),
(AssetKey("asset2"), "check2"),
AssetCheckKey(AssetKey("asset1"), "check1"),
AssetCheckKey(AssetKey("asset2"), "check2"),
}

yield Output(value=None, output_name="asset1")
Expand All @@ -591,7 +591,7 @@ def test_can_subset() -> None:
)
def foo(context: AssetExecutionContext):
assert context.selected_asset_keys == {AssetKey("asset1")}
assert context.selected_asset_check_keys == {(AssetKey("asset1"), "check1")}
assert context.selected_asset_check_keys == {AssetCheckKey(AssetKey("asset1"), "check1")}

yield Output(value=None, output_name="asset1")
yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True)
Expand All @@ -617,7 +617,7 @@ def test_can_subset_result_for_unselected_check() -> None:
)
def foo(context: AssetExecutionContext):
assert context.selected_asset_keys == {AssetKey("asset1")}
assert context.selected_asset_check_keys == {(AssetKey("asset1"), "check1")}
assert context.selected_asset_check_keys == {AssetCheckKey(AssetKey("asset1"), "check1")}

yield Output(value=None, output_name="asset1")
yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True)
Expand Down Expand Up @@ -665,13 +665,9 @@ def test_can_subset_select_only_check() -> None:
)
def foo(context: AssetExecutionContext):
assert context.selected_asset_keys == set()
assert context.selected_asset_check_keys == {(AssetKey("asset1"), "check1")}
assert context.selected_asset_check_keys == {AssetCheckKey(AssetKey("asset1"), "check1")}

if context.selected_asset_keys:
yield Output(value=None, output_name="asset1")

if context.selected_asset_check_keys:
yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True)
yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True)

result = materialize(
[foo],
Expand Down

0 comments on commit d405c22

Please sign in to comment.