From 5d77fcdb50b25b5a209c56e2494fd2143d391805 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Tue, 12 Mar 2024 13:14:46 -0400 Subject: [PATCH] [external-assets] Ensure assets defs are present for all asset checks in AssetGraph (#20435) ## Summary & Motivation There is a bug in the asset graph that I surfaced in an upstack PR, which is that the `AssetsDefinition` for an asset check isn't available from the asset graph if an `AssetsDefinition` has been subsetted to only a check with no assets. This refactors `AssetGraph` to support this niche case, which opens the door to using `AssetGraph` as the basis for `AssetLayer`. ## How I Tested These Changes Existing test suite. The "new" functionality is tested upstack by #20405 --- .../dagster/_core/definitions/asset_graph.py | 71 +++++++++++++++---- .../_core/definitions/base_asset_graph.py | 6 +- .../_core/definitions/remote_asset_graph.py | 4 ++ 3 files changed, 63 insertions(+), 18 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 800ade27b64e2..e21ac351da09d 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -121,15 +121,20 @@ def execution_set_asset_and_check_keys(self) -> AbstractSet[AssetKeyOrCheckKey]: class AssetGraph(BaseAssetGraph[AssetNode]): - _asset_checks_defs_by_key: Mapping[AssetCheckKey, AssetChecksDefinition] + # maps asset check keys to the definition where it is computed + _asset_check_compute_defs_by_key: Mapping[ + AssetCheckKey, Union[AssetsDefinition, AssetChecksDefinition] + ] def __init__( self, asset_nodes_by_key: Mapping[AssetKey, AssetNode], - asset_checks_defs_by_key: Mapping[AssetCheckKey, AssetChecksDefinition], + asset_check_compute_defs_by_key: Mapping[ + AssetCheckKey, Union[AssetsDefinition, AssetChecksDefinition] + ], ): self._asset_nodes_by_key = asset_nodes_by_key - self._asset_checks_defs_by_key = asset_checks_defs_by_key + self._asset_check_compute_defs_by_key = asset_check_compute_defs_by_key self._asset_nodes_by_check_key = { **{ check_key: asset @@ -137,9 +142,9 @@ def __init__( for check_key in asset.check_keys }, **{ - key: asset_nodes_by_key[checks_def.asset_key] - for key, checks_def in asset_checks_defs_by_key.items() - if checks_def.asset_key in asset_nodes_by_key + key: asset_nodes_by_key[acd.asset_key] + for key, acd in asset_check_compute_defs_by_key.items() + if isinstance(acd, AssetChecksDefinition) and acd.asset_key in asset_nodes_by_key }, } @@ -226,13 +231,14 @@ def from_assets( for key in ad.keys } - asset_checks_defs_by_key = { - key: check for check in (asset_checks or []) for key in check.keys + asset_check_compute_defs_by_key = { + **{key: checks_def for checks_def in (asset_checks or []) for key in checks_def.keys}, + **{key: assets_def for assets_def in assets_defs for key in assets_def.check_keys}, } return AssetGraph( asset_nodes_by_key=asset_nodes_by_key, - asset_checks_defs_by_key=asset_checks_defs_by_key, + asset_check_compute_defs_by_key=asset_check_compute_defs_by_key, ) def get_execution_set_asset_and_check_keys( @@ -240,8 +246,10 @@ def get_execution_set_asset_and_check_keys( ) -> AbstractSet[AssetKeyOrCheckKey]: if isinstance(asset_or_check_key, AssetKey): return self.get(asset_or_check_key).execution_set_asset_and_check_keys - # only checks emitted by AssetsDefinition have required keys - elif asset_or_check_key in self._asset_checks_defs_by_key: + # all AssetsCheckDefinition can emit only as subset of keys + elif isinstance( + self._asset_check_compute_defs_by_key[asset_or_check_key], AssetChecksDefinition + ): return {asset_or_check_key} else: asset_node = self._asset_nodes_by_check_key[asset_or_check_key] @@ -252,11 +260,44 @@ def get_execution_set_asset_and_check_keys( @cached_property def assets_defs(self) -> Sequence[AssetsDefinition]: - return list(dict.fromkeys(asset.assets_def for asset in self.asset_nodes)) + return list( + { + *(asset.assets_def for asset in self.asset_nodes), + *( + ad + for ad in self._asset_check_compute_defs_by_key.values() + if isinstance(ad, AssetsDefinition) + ), + } + ) - def assets_defs_for_keys(self, keys: Iterable[AssetKey]) -> Sequence[AssetsDefinition]: - return list(dict.fromkeys([self.get(key).assets_def for key in keys])) + def assets_defs_for_keys( + self, keys: Iterable[AssetKeyOrCheckKey] + ) -> Sequence[AssetsDefinition]: + return list( + { + *[self.get(key).assets_def for key in keys if isinstance(key, AssetKey)], + *[ + ad + for k, ad in self._asset_check_compute_defs_by_key.items() + if k in keys and isinstance(ad, AssetsDefinition) + ], + } + ) @property def asset_checks_defs(self) -> Sequence[AssetChecksDefinition]: - return list(dict.fromkeys(self._asset_checks_defs_by_key.values())) + return list( + { + acd + for acd in self._asset_check_compute_defs_by_key.values() + if isinstance(acd, AssetChecksDefinition) + } + ) + + @cached_property + def asset_check_keys(self) -> AbstractSet[AssetCheckKey]: + return { + *(key for ad in self.assets_defs for key in ad.check_keys), + *(key for acd in self.asset_checks_defs for key in acd.keys), + } diff --git a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py index 883da2bfe012a..f8b2e99c0d1d3 100644 --- a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py @@ -242,9 +242,9 @@ def root_executable_asset_keys(self) -> AbstractSet[AssetKey]: self.asset_dep_graph, self.observable_asset_keys | self.materializable_asset_keys ) - @cached_property - def asset_check_keys(self) -> AbstractSet[AssetCheckKey]: - return {key for asset in self.asset_nodes for key in asset.check_keys} + @property + @abstractmethod + def asset_check_keys(self) -> AbstractSet[AssetCheckKey]: ... @cached_property def all_partitions_defs(self) -> Sequence[PartitionsDefinition]: diff --git a/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py index b28adf9103470..b397f2d188efe 100644 --- a/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py @@ -336,6 +336,10 @@ def external_asset_nodes_by_key(self) -> Mapping[AssetKey, "ExternalAssetNode"]: def asset_checks(self) -> Sequence["ExternalAssetCheck"]: return list(dict.fromkeys(self._asset_checks_by_key.values())) + @cached_property + def asset_check_keys(self) -> AbstractSet[AssetCheckKey]: + return {key for asset in self.asset_nodes for key in asset.check_keys} + def asset_keys_for_job(self, job_name: str) -> AbstractSet[AssetKey]: return {node.key for node in self.asset_nodes if job_name in node.job_names}