From d64839ee13ac27bd3d5b81fba2a172de892c7dab Mon Sep 17 00:00:00 2001 From: Rex Ledesma Date: Wed, 20 Sep 2023 16:11:46 -0400 Subject: [PATCH] feat(asset-checks): allow asset check subsetting --- .../dagster_test/toys/asset_checks.py | 3 +- .../dagster/_core/definitions/asset_layer.py | 67 +++++++-- .../dagster/_core/definitions/assets.py | 64 +++++++- .../definitions/decorators/asset_decorator.py | 2 + .../_core/execution/context/compute.py | 41 +++++ .../test_asset_decorator_with_check_specs.py | 140 ++++++++++++++++++ 6 files changed, 294 insertions(+), 23 deletions(-) diff --git a/python_modules/dagster-test/dagster_test/toys/asset_checks.py b/python_modules/dagster-test/dagster_test/toys/asset_checks.py index e258f12b3e90c..054212885192b 100644 --- a/python_modules/dagster-test/dagster_test/toys/asset_checks.py +++ b/python_modules/dagster-test/dagster_test/toys/asset_checks.py @@ -413,7 +413,8 @@ def downstream_asset(): just_checks_job = define_asset_job( - name="just_checks_job", selection=AssetSelection.all_asset_checks() + name="just_checks_job", + selection=AssetSelection.checks_for_assets(checked_asset), ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index 0e1fedc447218..3efb6818f76d5 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -508,6 +508,12 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]: ) check_key_by_output[node_output_handle] = check_spec.key + for check_handle in assets_def.asset_check_handles: + check_names_by_asset_key = check_names_by_asset_key_by_node_handle.setdefault( + node_handle, defaultdict(set) + ) + check_names_by_asset_key[check_handle.asset_key].add(check_handle.name) + dep_asset_keys_by_node_output_handle = defaultdict(set) for asset_key, node_output_handles in dep_node_output_handles_by_asset_key.items(): for node_output_handle in node_output_handles: @@ -545,9 +551,16 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]: source_assets_by_key = {source_asset.key: source_asset for source_asset in source_assets} assets_defs_by_node_handle: Dict[NodeHandle, "AssetsDefinition"] = { - node_handle: assets_defs_by_key[asset_key] - for asset_key, node_handles in dep_node_handles_by_asset_key.items() - for node_handle in node_handles + **{ + node_handle: assets_defs_by_key[asset_key] + for asset_key, node_handles in dep_node_handles_by_asset_key.items() + for node_handle in node_handles + }, + **{ + node_handle: assets_def + for node_handle, assets_def in assets_defs_by_outer_node_handle.items() + if assets_def.asset_check_handles + }, } return AssetLayer( @@ -802,6 +815,10 @@ def build_asset_selection_job( build_source_asset_observation_job, ) + included_assets = [] + excluded_assets = list(assets) + included_source_assets = [] + if asset_selection is None and asset_check_selection is None: # no selections, include everything included_assets = list(assets) @@ -809,14 +826,14 @@ def build_asset_selection_job( included_source_assets = [] included_checks = list(asset_checks) else: - if asset_selection is not None: - (included_assets, excluded_assets) = _subset_assets_defs(assets, asset_selection) - included_source_assets = _subset_source_assets(source_assets, asset_selection) - else: - # if checks were specified, then exclude all assets - included_assets = [] - excluded_assets = list(assets) - included_source_assets = [] + if asset_selection is not None or asset_check_selection is not None: + selected_asset_keys = asset_selection or set() + selected_asset_check_handles = asset_check_selection + + (included_assets, excluded_assets) = _subset_assets_defs( + assets, selected_asset_keys, selected_asset_check_handles + ) + included_source_assets = _subset_source_assets(source_assets, selected_asset_keys) if asset_check_selection is not None: # NOTE: This filters to a checks def if any of the included specs are in the selection. @@ -880,6 +897,7 @@ def build_asset_selection_job( def _subset_assets_defs( assets: Iterable["AssetsDefinition"], selected_asset_keys: AbstractSet[AssetKey], + selected_asset_check_handles: Optional[AbstractSet[AssetCheckHandle]], ) -> Tuple[Sequence["AssetsDefinition"], Sequence["AssetsDefinition"],]: """Given a list of asset key selection queries, generate a set of AssetsDefinition objects representing the included/excluded definitions. @@ -890,18 +908,37 @@ def _subset_assets_defs( for asset in set(assets): # intersection selected_subset = selected_asset_keys & asset.keys + + # if specific checks were selected, only include those + if selected_asset_check_handles is not None: + selected_check_subset = selected_asset_check_handles & asset.asset_check_handles + # if no checks were selected, filter to checks that target selected assets + else: + selected_check_subset = { + handle + for handle in asset.asset_check_handles + if handle.asset_key in selected_subset + } + # all assets in this def are selected - if selected_subset == asset.keys: + if selected_subset == asset.keys and selected_check_subset == asset.asset_check_handles: included_assets.add(asset) # no assets in this def are selected - elif len(selected_subset) == 0: + elif len(selected_subset) == 0 and len(selected_check_subset) == 0: excluded_assets.add(asset) elif asset.can_subset: # subset of the asset that we want - subset_asset = asset.subset_for(selected_asset_keys) + subset_asset = asset.subset_for(selected_asset_keys, selected_check_subset) included_assets.add(subset_asset) # subset of the asset that we don't want - excluded_assets.add(asset.subset_for(asset.keys - subset_asset.keys)) + excluded_assets.add( + asset.subset_for( + selected_asset_keys=asset.keys - subset_asset.keys, + selected_asset_check_handles=( + asset.asset_check_handles - subset_asset.asset_check_handles + ), + ) + ) else: raise DagsterInvalidSubsetError( f"When building job, the AssetsDefinition '{asset.node_def.name}' " diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 390b47bf7a8fa..001c6fd6253ea 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -13,6 +13,7 @@ Optional, Sequence, Set, + Tuple, Union, cast, ) @@ -89,6 +90,7 @@ class AssetsDefinition(ResourceAddable, RequiresResources, IHasInternalInit): _backfill_policy: Optional[BackfillPolicy] _code_versions_by_key: Mapping[AssetKey, Optional[str]] _descriptions_by_key: Mapping[AssetKey, str] + _selected_asset_check_handles: AbstractSet[AssetCheckHandle] def __init__( self, @@ -109,6 +111,7 @@ def __init__( backfill_policy: Optional[BackfillPolicy] = None, descriptions_by_key: Optional[Mapping[AssetKey, str]] = None, check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]] = None, + selected_asset_check_handles: Optional[AbstractSet[AssetCheckHandle]] = None, # if adding new fields, make sure to handle them in the with_attributes, from_graph, and # get_attributes_dict methods ): @@ -254,18 +257,25 @@ def __init__( backfill_policy, "backfill_policy", BackfillPolicy ) - if selected_asset_keys is None: + if selected_asset_check_handles is None: self._check_specs_by_output_name = check_specs_by_output_name or {} else: self._check_specs_by_output_name = { output_name: check_spec for output_name, check_spec in (check_specs_by_output_name or {}).items() - if check_spec.asset_key in selected_asset_keys + if check_spec.handle in selected_asset_check_handles } self._check_specs_by_handle = { spec.key: spec for spec in self._check_specs_by_output_name.values() } + if selected_asset_check_handles is not None: + self._selected_asset_check_handles = selected_asset_check_handles + else: + self._selected_asset_check_handles = cast( + AbstractSet[AssetCheckHandle], + self._check_specs_by_handle.keys(), + ) if self._partitions_def is None: # check if backfill policy is BackfillPolicyType.SINGLE_RUN if asset is not partitioned @@ -305,6 +315,7 @@ def dagster_internal_init( backfill_policy: Optional[BackfillPolicy], descriptions_by_key: Optional[Mapping[AssetKey, str]], check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]], + selected_asset_check_handles: Optional[AbstractSet[AssetCheckHandle]], ) -> "AssetsDefinition": return AssetsDefinition( keys_by_input_name=keys_by_input_name, @@ -323,6 +334,7 @@ def dagster_internal_init( backfill_policy=backfill_policy, descriptions_by_key=descriptions_by_key, check_specs_by_output_name=check_specs_by_output_name, + selected_asset_check_handles=selected_asset_check_handles, ) def __call__(self, *args: object, **kwargs: object) -> object: @@ -675,6 +687,7 @@ def _from_node( can_subset=can_subset, selected_asset_keys=None, # node has no subselection info check_specs_by_output_name=check_specs_by_output_name, + selected_asset_check_handles=None, ) @public @@ -858,6 +871,27 @@ def check_specs(self) -> Iterable[AssetCheckSpec]: """ return self._check_specs_by_output_name.values() + @property + def asset_check_handles(self) -> AbstractSet[AssetCheckHandle]: + """Returns the selected asset checks associated by this AssetsDefinition. + + Returns: + AbstractSet[Tuple[AssetKey, str]]: The selected asset checks. An asset check is + identified by the asset key and the name of the check. + """ + return self._selected_asset_check_handles + + @public + @property + def check_handles(self) -> AbstractSet[Tuple[AssetKey, str]]: + """Returns the selected asset checks associated by this AssetsDefinition. + + Returns: + AbstractSet[Tuple[AssetKey, str]]: The selected asset checks. An asset check is + identified by the asset key and the name of the check. + """ + return self._selected_asset_check_handles + def is_asset_executable(self, asset_key: AssetKey) -> bool: """Returns True if the asset key is materializable by this AssetsDefinition. @@ -1090,12 +1124,17 @@ def _subset_graph_backed_asset( return get_graph_subset(self.node_def, op_selection) - def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefinition": - """Create a subset of this AssetsDefinition that will only materialize the assets in the - selected set. + def subset_for( + self, + selected_asset_keys: AbstractSet[AssetKey], + selected_asset_check_handles: AbstractSet[AssetCheckHandle], + ) -> "AssetsDefinition": + """Create a subset of this AssetsDefinition that will only materialize the assets and checks + in the selected set. Args: selected_asset_keys (AbstractSet[AssetKey]): The total set of asset keys + selected_asset_check_handles (AbstractSet[AssetCheckHandle]): The selected asset checks """ from dagster._core.definitions.graph_definition import GraphDefinition @@ -1106,10 +1145,17 @@ def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefin # Set of assets within selected_asset_keys which are outputted by this AssetDefinition asset_subselection = selected_asset_keys & self.keys + asset_check_subselection = selected_asset_check_handles & self.check_handles + # Early escape if all assets in AssetsDefinition are selected - if asset_subselection == self.keys: + if asset_subselection == self.keys and asset_check_subselection == self.check_handles: return self elif isinstance(self.node_def, GraphDefinition): # Node is graph-backed asset + check.invariant( + asset_check_subselection is None, + "Subsetting graph-backed assets with checks is not yet supported", + ) + subsetted_node = self._subset_graph_backed_asset( asset_subselection, ) @@ -1155,7 +1201,10 @@ def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefin return self.__class__(**merge_dicts(self.get_attributes_dict(), replaced_attributes)) else: # multi_asset subsetting - replaced_attributes = dict(selected_asset_keys=asset_subselection) + replaced_attributes = { + "selected_asset_keys": asset_subselection, + "selected_asset_check_handles": asset_check_subselection, + } return self.__class__(**merge_dicts(self.get_attributes_dict(), replaced_attributes)) @public @@ -1280,6 +1329,7 @@ def get_attributes_dict(self) -> Dict[str, Any]: backfill_policy=self._backfill_policy, descriptions_by_key=self._descriptions_by_key, check_specs_by_output_name=self._check_specs_by_output_name, + selected_asset_check_handles=self._selected_asset_check_handles, ) diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 613dfc586025a..513788cbd676a 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -476,6 +476,7 @@ def __call__(self, fn: Callable) -> AssetsDefinition: metadata_by_key={out_asset_key: self.metadata} if self.metadata else None, descriptions_by_key=None, # not supported for now check_specs_by_output_name=check_specs_by_output_name, + selected_asset_check_handles=None, # no subselection in decorator ) @@ -848,6 +849,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: descriptions_by_key=None, # not supported for now metadata_by_key=metadata_by_key, check_specs_by_output_name=check_specs_by_output_name, + selected_asset_check_handles=None, # no subselection in decorator ) return inner diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 663eaf4bf3b3f..26dc194ac1770 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -10,6 +10,7 @@ Optional, Sequence, Set, + Tuple, Union, cast, ) @@ -17,6 +18,7 @@ import dagster._check as check from dagster._annotations import deprecated, experimental, public from dagster._core.definitions.asset_check_spec import AssetCheckSpec +from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.data_version import ( DataProvenance, @@ -561,6 +563,45 @@ def selected_asset_keys(self) -> AbstractSet[AssetKey]: return set() return self.assets_def.keys + @public + @property + def has_asset_checks_def(self) -> bool: + """Return a boolean indicating the presence of a backing AssetChecksDefinition + for the current execution. + + Returns: + bool: True if there is a backing AssetChecksDefinition for the current execution, otherwise False. + """ + return self.job_def.asset_layer.asset_checks_def_for_node(self.node_handle) is not None + + @public + @property + def asset_checks_def(self) -> AssetChecksDefinition: + """The backing AssetChecksDefinition for what is currently executing, errors if not + available. + + Returns: + AssetChecksDefinition. + """ + asset_checks_def = self.job_def.asset_layer.asset_checks_def_for_node(self.node_handle) + if asset_checks_def is None: + raise DagsterInvalidPropertyError( + f"Op '{self.op.name}' does not have an asset checks definition." + ) + + return asset_checks_def + + @public + @property + def selected_asset_check_handles(self) -> AbstractSet[Tuple[AssetKey, str]]: + if self.has_assets_def: + return self.assets_def.check_handles + + if self.has_asset_checks_def: + check.failed("Subset selection is not yet supported within an AssetChecksDefinition") + + return set() + @public @property def selected_output_names(self) -> AbstractSet[str]: diff --git a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py index d8545bc103f50..ca4cead44bb0e 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py @@ -20,6 +20,9 @@ multi_asset, op, ) +from dagster._core.definitions.asset_check_spec import AssetCheckHandle +from dagster._core.definitions.asset_selection import AssetChecksForHandles, AssetSelection +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.errors import ( DagsterInvalidDefinitionError, DagsterInvariantViolationError, @@ -546,3 +549,140 @@ def asset1(): assert check_eval.asset_key == AssetKey("asset_one") assert check_eval.check_name == "check1" assert check_eval.metadata == {"foo": MetadataValue.text("bar")} + + +def test_can_subset_no_selection() -> None: + @multi_asset( + can_subset=True, + specs=[AssetSpec("asset1"), AssetSpec("asset2")], + check_specs=[ + AssetCheckSpec("check1", asset="asset1"), + AssetCheckSpec("check2", asset="asset2"), + ], + ) + def foo(context: AssetExecutionContext): + assert context.selected_asset_keys == {AssetKey("asset1"), AssetKey("asset2")} + assert context.selected_asset_check_handles == { + (AssetKey("asset1"), "check1"), + (AssetKey("asset2"), "check2"), + } + + yield Output(value=None, output_name="asset1") + yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True) + + result = materialize([foo]) + + assert len(result.get_asset_materialization_events()) == 1 + + [check_eval] = result.get_asset_check_evaluations() + + assert check_eval.asset_key == AssetKey("asset1") + assert check_eval.check_name == "check1" + + +def test_can_subset() -> None: + @multi_asset( + can_subset=True, + specs=[AssetSpec("asset1"), AssetSpec("asset2")], + check_specs=[ + AssetCheckSpec("check1", asset="asset1"), + AssetCheckSpec("check2", asset="asset2"), + ], + ) + def foo(context: AssetExecutionContext): + assert context.selected_asset_keys == {AssetKey("asset1")} + assert context.selected_asset_check_handles == {(AssetKey("asset1"), "check1")} + + yield Output(value=None, output_name="asset1") + yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True) + + result = materialize([foo], selection=["asset1"]) + + assert len(result.get_asset_materialization_events()) == 1 + + [check_eval] = result.get_asset_check_evaluations() + + assert check_eval.asset_key == AssetKey("asset1") + assert check_eval.check_name == "check1" + + +def test_can_subset_result_for_unselected_check() -> None: + @multi_asset( + can_subset=True, + specs=[AssetSpec("asset1"), AssetSpec("asset2")], + check_specs=[ + AssetCheckSpec("check1", asset="asset1"), + AssetCheckSpec("check2", asset="asset2"), + ], + ) + def foo(context: AssetExecutionContext): + assert context.selected_asset_keys == {AssetKey("asset1")} + assert context.selected_asset_check_handles == {(AssetKey("asset1"), "check1")} + + yield Output(value=None, output_name="asset1") + yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True) + yield AssetCheckResult(asset_key="asset2", check_name="check2", success=True) + + with pytest.raises(DagsterInvariantViolationError): + materialize([foo], selection=["asset1"]) + + +def test_can_subset_select_only_asset() -> None: + @multi_asset( + can_subset=True, + specs=[AssetSpec("asset1"), AssetSpec("asset2")], + check_specs=[ + AssetCheckSpec("check1", asset="asset1"), + AssetCheckSpec("check2", asset="asset2"), + ], + ) + def foo(context: AssetExecutionContext): + assert context.selected_asset_keys == {AssetKey("asset1")} + assert context.selected_asset_check_handles == set() + + yield Output(value=None, output_name="asset1") + + result = materialize( + [foo], + selection=AssetSelection.keys(AssetKey("asset1")) - AssetSelection.checks_for_assets(foo), + ) + + assert len(result.get_asset_materialization_events()) == 1 + + check_evals = result.get_asset_check_evaluations() + + assert len(check_evals) == 0 + + +def test_can_subset_select_only_check() -> None: + @multi_asset( + can_subset=True, + specs=[AssetSpec("asset1"), AssetSpec("asset2")], + check_specs=[ + AssetCheckSpec("check1", asset="asset1"), + AssetCheckSpec("check2", asset="asset2"), + ], + ) + def foo(context: AssetExecutionContext): + assert context.selected_asset_keys == set() + assert context.selected_asset_check_handles == {(AssetKey("asset1"), "check1")} + + if context.selected_asset_keys: + yield Output(value=None, output_name="asset1") + + if context.selected_asset_check_handles: + yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True) + + result = materialize( + [foo], + selection=AssetChecksForHandles( + [AssetCheckHandle(asset_key=AssetKey("asset1"), name="check1")] + ), + ) + + assert len(result.get_asset_materialization_events()) == 0 + + [check_eval] = result.get_asset_check_evaluations() + + assert check_eval.asset_key == AssetKey("asset1") + assert check_eval.check_name == "check1"