diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index 11da2b0e06371..77a0d2ad5ccc8 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -508,12 +508,6 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]: ) check_key_by_output[node_output_handle] = check_spec.key - for check_key in assets_def.asset_check_keys: - check_names_by_asset_key = check_names_by_asset_key_by_node_handle.setdefault( - node_handle, defaultdict(set) - ) - check_names_by_asset_key[check_key.asset_key].add(check_key.name) - dep_asset_keys_by_node_output_handle = defaultdict(set) for asset_key, node_output_handles in dep_node_output_handles_by_asset_key.items(): for node_output_handle in node_output_handles: @@ -551,15 +545,18 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]: source_assets_by_key = {source_asset.key: source_asset for source_asset in source_assets} assets_defs_by_node_handle: Dict[NodeHandle, "AssetsDefinition"] = { + # nodes for assets **{ node_handle: assets_defs_by_key[asset_key] for asset_key, node_handles in dep_node_handles_by_asset_key.items() for node_handle in node_handles }, + # nodes for asset checks. Required for AssetsDefs that have selected checks + # but not assets **{ node_handle: assets_def for node_handle, assets_def in assets_defs_by_outer_node_handle.items() - if assets_def.asset_check_keys + if assets_def.check_keys }, } @@ -815,41 +812,36 @@ def build_asset_selection_job( build_source_asset_observation_job, ) - included_assets = [] - excluded_assets = list(assets) - included_source_assets = [] - if asset_selection is None and asset_check_selection is None: # no selections, include everything included_assets = list(assets) excluded_assets = [] - included_source_assets = [] - included_checks = list(asset_checks) + included_source_assets = list(source_assets) + included_checks_defs = list(asset_checks) else: - if asset_selection is not None or asset_check_selection is not None: - selected_asset_keys = asset_selection or set() - selected_asset_check_keys = asset_check_selection - - (included_assets, excluded_assets) = _subset_assets_defs( - assets, selected_asset_keys, selected_asset_check_keys - ) - included_source_assets = _subset_source_assets(source_assets, selected_asset_keys) + # Filter to assets that match either selected assets or include a selected check. + # E.g. a multi asset can be included even if it's not in asset_selection, if it has a selected check + # defined with check_specs + (included_assets, excluded_assets) = _subset_assets_defs( + assets, asset_selection or set(), asset_check_selection + ) + included_source_assets = _subset_source_assets(source_assets, asset_selection or set()) - if asset_check_selection is not None: - # NOTE: This filters to a checks def if any of the included specs are in the selection. - # This needs to change to fully subsetting checks in multi assets. - included_checks = [ + if asset_check_selection is None: + # If assets were selected and checks are None, then include all checks on the selected assets. + # Note: once we start explicitly passing in asset checks instead of None from the front end, + # we can remove this logic. + included_checks_defs = [ asset_check for asset_check in asset_checks - if [spec for spec in asset_check.specs if spec.key in asset_check_selection] + if asset_check.asset_key in check.not_none(asset_selection) ] else: - # If assets were selected and checks weren't, then include all checks on the selected assets. - # Note: a future diff needs to add support for selecting assets, and not their checks. - included_checks = [ + # Otherwise, filter to explicitly selected checks defs + included_checks_defs = [ asset_check for asset_check in asset_checks - if asset_check.asset_key in check.not_none(asset_selection) + if [spec for spec in asset_check.specs if spec.key in asset_check_selection] ] if partitions_def: @@ -861,11 +853,11 @@ def build_asset_selection_job( f"{partitions_def}.", ) - if len(included_assets) or len(included_checks) > 0: + if len(included_assets) or len(included_checks_defs) > 0: asset_job = build_assets_job( name=name, assets=included_assets, - asset_checks=included_checks, + asset_checks=included_checks_defs, config=config, source_assets=[*source_assets, *excluded_assets], resource_defs=resource_defs, @@ -911,15 +903,15 @@ def _subset_assets_defs( # if specific checks were selected, only include those if selected_asset_check_keys is not None: - selected_check_subset = selected_asset_check_keys & asset.asset_check_keys + selected_check_subset = selected_asset_check_keys & asset.check_keys # if no checks were selected, filter to checks that target selected assets else: selected_check_subset = { - handle for handle in asset.asset_check_keys if handle.asset_key in selected_subset + handle for handle in asset.check_keys if handle.asset_key in selected_subset } # all assets in this def are selected - if selected_subset == asset.keys and selected_check_subset == asset.asset_check_keys: + if selected_subset == asset.keys and selected_check_subset == asset.check_keys: included_assets.add(asset) # no assets in this def are selected elif len(selected_subset) == 0 and len(selected_check_subset) == 0: @@ -932,9 +924,7 @@ def _subset_assets_defs( excluded_assets.add( asset.subset_for( selected_asset_keys=asset.keys - subset_asset.keys, - selected_asset_check_keys=( - asset.asset_check_keys - subset_asset.asset_check_keys - ), + selected_asset_check_keys=(asset.check_keys - subset_asset.check_keys), ) ) else: diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 631103dde0075..e4cdc3495b4f6 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -13,7 +13,6 @@ Optional, Sequence, Set, - Tuple, Union, cast, ) @@ -272,10 +271,7 @@ def __init__( if selected_asset_check_keys is not None: self._selected_asset_check_keys = selected_asset_check_keys else: - self._selected_asset_check_keys = cast( - AbstractSet[AssetCheckKey], - self._check_specs_by_handle.keys(), - ) + self._selected_asset_check_keys = self._check_specs_by_handle.keys() if self._partitions_def is None: # check if backfill policy is BackfillPolicyType.SINGLE_RUN if asset is not partitioned @@ -872,18 +868,7 @@ def check_specs(self) -> Iterable[AssetCheckSpec]: return self._check_specs_by_output_name.values() @property - def asset_check_keys(self) -> AbstractSet[AssetCheckKey]: - """Returns the selected asset checks associated by this AssetsDefinition. - - Returns: - AbstractSet[Tuple[AssetKey, str]]: The selected asset checks. An asset check is - identified by the asset key and the name of the check. - """ - return self._selected_asset_check_keys - - @public - @property - def check_keys(self) -> AbstractSet[Tuple[AssetKey, str]]: + def check_keys(self) -> AbstractSet[AssetCheckKey]: """Returns the selected asset checks associated by this AssetsDefinition. Returns: @@ -1127,7 +1112,7 @@ def _subset_graph_backed_asset( def subset_for( self, selected_asset_keys: AbstractSet[AssetKey], - selected_asset_check_keys: AbstractSet[AssetCheckKey], + selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]], ) -> "AssetsDefinition": """Create a subset of this AssetsDefinition that will only materialize the assets and checks in the selected set. @@ -1145,14 +1130,20 @@ def subset_for( # Set of assets within selected_asset_keys which are outputted by this AssetDefinition asset_subselection = selected_asset_keys & self.keys - asset_check_subselection = selected_asset_check_keys & self.check_keys + if selected_asset_check_keys is None: + # filter to checks that target selected asset keys + asset_check_subselection = { + key for key in self.check_keys if key.asset_key in asset_subselection + } + else: + asset_check_subselection = selected_asset_check_keys & self.check_keys # Early escape if all assets in AssetsDefinition are selected if asset_subselection == self.keys and asset_check_subselection == self.check_keys: return self elif isinstance(self.node_def, GraphDefinition): # Node is graph-backed asset check.invariant( - asset_check_subselection is None, + selected_asset_check_keys is None, "Subsetting graph-backed assets with checks is not yet supported", ) diff --git a/python_modules/dagster/dagster/_core/definitions/assets_job.py b/python_modules/dagster/dagster/_core/definitions/assets_job.py index cf186f3d2f6a9..d0ce5cacebaf1 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets_job.py +++ b/python_modules/dagster/dagster/_core/definitions/assets_job.py @@ -581,7 +581,7 @@ def _dfs(key, cur_color): ret.append(assets_def) else: for asset_keys in color_mapping.values(): - ret.append(assets_def.subset_for(asset_keys)) + ret.append(assets_def.subset_for(asset_keys, selected_asset_check_keys=None)) return ret diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 0c601e6e1dec9..1dcb2834e521e 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -10,14 +10,13 @@ Optional, Sequence, Set, - Tuple, Union, cast, ) import dagster._check as check from dagster._annotations import deprecated, experimental, public -from dagster._core.definitions.asset_check_spec import AssetCheckSpec +from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.data_version import ( @@ -593,7 +592,7 @@ def asset_checks_def(self) -> AssetChecksDefinition: @public @property - def selected_asset_check_keys(self) -> AbstractSet[Tuple[AssetKey, str]]: + def selected_asset_check_keys(self) -> AbstractSet[AssetCheckKey]: if self.has_assets_def: return self.assets_def.check_keys diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py index 533e87bf3c808..67741cce0af04 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py @@ -111,7 +111,9 @@ def test_subset_for(subset, expected_keys, expected_inputs, expected_outputs): def abc_(context, in1, in2, in3): pass - subbed = abc_.subset_for({AssetKey(key) for key in subset.split(",")}) + subbed = abc_.subset_for( + {AssetKey(key) for key in subset.split(",")}, selected_asset_check_keys=None + ) assert subbed.keys == ( {AssetKey(key) for key in expected_keys.split(",")} if expected_keys else set() @@ -293,7 +295,7 @@ def ma_op(): can_subset=True, ) - subset = ma.subset_for({AssetKey("b")}) + subset = ma.subset_for({AssetKey("b")}, selected_asset_check_keys=None) assert subset.group_names_by_key[AssetKey("b")] == "bar" @@ -349,7 +351,8 @@ def abc_(context, in1, in2, in3): } subbed_1 = replaced_1.subset_for( - {AssetKey(["foo", "bar_in1"]), AssetKey("in3"), AssetKey(["foo", "foo_a"]), AssetKey("b")} + {AssetKey(["foo", "bar_in1"]), AssetKey("in3"), AssetKey(["foo", "foo_a"]), AssetKey("b")}, + selected_asset_check_keys=None, ) assert subbed_1.keys == {AssetKey(["foo", "foo_a"]), AssetKey("b")} @@ -387,7 +390,8 @@ def abc_(context, in1, in2, in3): AssetKey(["again", "foo", "bar_in1"]), AssetKey(["again", "foo", "foo_a"]), AssetKey(["c"]), - } + }, + selected_asset_check_keys=None, ) assert subbed_2.keys == {AssetKey(["again", "foo", "foo_a"])} @@ -398,7 +402,7 @@ def abc_(context, start): pass with pytest.raises(CheckError, match="can_subset=False"): - abc_.subset_for({AssetKey("start"), AssetKey("a")}) + abc_.subset_for({AssetKey("start"), AssetKey("a")}, selected_asset_check_keys=None) def test_fail_for_non_topological_order(): diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py index f1b6df35879dc..6976860747c09 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py @@ -513,8 +513,12 @@ def do_run( elif not selected_keys: assets_in_run.extend(a.to_source_assets()) else: - assets_in_run.append(a.subset_for(asset_keys_set)) - assets_in_run.extend(a.subset_for(a.keys - selected_keys).to_source_assets()) + assets_in_run.append(a.subset_for(asset_keys_set, selected_asset_check_keys=None)) + assets_in_run.extend( + a.subset_for( + a.keys - selected_keys, selected_asset_check_keys=None + ).to_source_assets() + ) materialize_to_memory( instance=instance, partition_key=partition_key, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon_perf.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon_perf.py index de2316f15f662..94a7917831870 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon_perf.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon_perf.py @@ -176,7 +176,9 @@ def create(self: "InstanceSnapshot") -> None: ) selected_keys = selection.resolve(asset_graph) - sampled_multi_asset = multi_asset.subset_for(selected_keys) + sampled_multi_asset = multi_asset.subset_for( + selected_keys, selected_asset_check_keys=None + ) sampled_roots = [ra for ra in roots if ra.key in selected_keys] to_materialize = [*sampled_roots, sampled_multi_asset] else: diff --git a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py index db5296433be86..074600b44f7a3 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py @@ -563,8 +563,8 @@ def test_can_subset_no_selection() -> None: def foo(context: AssetExecutionContext): assert context.selected_asset_keys == {AssetKey("asset1"), AssetKey("asset2")} assert context.selected_asset_check_keys == { - (AssetKey("asset1"), "check1"), - (AssetKey("asset2"), "check2"), + AssetCheckKey(AssetKey("asset1"), "check1"), + AssetCheckKey(AssetKey("asset2"), "check2"), } yield Output(value=None, output_name="asset1") @@ -591,7 +591,7 @@ def test_can_subset() -> None: ) def foo(context: AssetExecutionContext): assert context.selected_asset_keys == {AssetKey("asset1")} - assert context.selected_asset_check_keys == {(AssetKey("asset1"), "check1")} + assert context.selected_asset_check_keys == {AssetCheckKey(AssetKey("asset1"), "check1")} yield Output(value=None, output_name="asset1") yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True) @@ -617,7 +617,7 @@ def test_can_subset_result_for_unselected_check() -> None: ) def foo(context: AssetExecutionContext): assert context.selected_asset_keys == {AssetKey("asset1")} - assert context.selected_asset_check_keys == {(AssetKey("asset1"), "check1")} + assert context.selected_asset_check_keys == {AssetCheckKey(AssetKey("asset1"), "check1")} yield Output(value=None, output_name="asset1") yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True) @@ -665,13 +665,9 @@ def test_can_subset_select_only_check() -> None: ) def foo(context: AssetExecutionContext): assert context.selected_asset_keys == set() - assert context.selected_asset_check_keys == {(AssetKey("asset1"), "check1")} + assert context.selected_asset_check_keys == {AssetCheckKey(AssetKey("asset1"), "check1")} - if context.selected_asset_keys: - yield Output(value=None, output_name="asset1") - - if context.selected_asset_check_keys: - yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True) + yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True) result = materialize( [foo],