Skip to content

Commit

Permalink
[external-assets] Ensure assets defs are present for all asset checks…
Browse files Browse the repository at this point in the history
… in AssetGraph (#20435)

## Summary & Motivation

There is a bug in the asset graph that I surfaced in an upstack PR,
which is that the `AssetsDefinition` for an asset check isn't available
from the asset graph if an `AssetsDefinition` has been subsetted to only
a check with no assets. This refactors `AssetGraph` to support this
niche case, which opens the door to using `AssetGraph` as the basis for
`AssetLayer`.

## How I Tested These Changes

Existing test suite. The "new" functionality is tested upstack by #20405
  • Loading branch information
smackesey authored Mar 12, 2024
1 parent 41d1965 commit 5d77fcd
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 18 deletions.
71 changes: 56 additions & 15 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,25 +121,30 @@ def execution_set_asset_and_check_keys(self) -> AbstractSet[AssetKeyOrCheckKey]:


class AssetGraph(BaseAssetGraph[AssetNode]):
_asset_checks_defs_by_key: Mapping[AssetCheckKey, AssetChecksDefinition]
# maps asset check keys to the definition where it is computed
_asset_check_compute_defs_by_key: Mapping[
AssetCheckKey, Union[AssetsDefinition, AssetChecksDefinition]
]

def __init__(
self,
asset_nodes_by_key: Mapping[AssetKey, AssetNode],
asset_checks_defs_by_key: Mapping[AssetCheckKey, AssetChecksDefinition],
asset_check_compute_defs_by_key: Mapping[
AssetCheckKey, Union[AssetsDefinition, AssetChecksDefinition]
],
):
self._asset_nodes_by_key = asset_nodes_by_key
self._asset_checks_defs_by_key = asset_checks_defs_by_key
self._asset_check_compute_defs_by_key = asset_check_compute_defs_by_key
self._asset_nodes_by_check_key = {
**{
check_key: asset
for asset in asset_nodes_by_key.values()
for check_key in asset.check_keys
},
**{
key: asset_nodes_by_key[checks_def.asset_key]
for key, checks_def in asset_checks_defs_by_key.items()
if checks_def.asset_key in asset_nodes_by_key
key: asset_nodes_by_key[acd.asset_key]
for key, acd in asset_check_compute_defs_by_key.items()
if isinstance(acd, AssetChecksDefinition) and acd.asset_key in asset_nodes_by_key
},
}

Expand Down Expand Up @@ -226,22 +231,25 @@ def from_assets(
for key in ad.keys
}

asset_checks_defs_by_key = {
key: check for check in (asset_checks or []) for key in check.keys
asset_check_compute_defs_by_key = {
**{key: checks_def for checks_def in (asset_checks or []) for key in checks_def.keys},
**{key: assets_def for assets_def in assets_defs for key in assets_def.check_keys},
}

return AssetGraph(
asset_nodes_by_key=asset_nodes_by_key,
asset_checks_defs_by_key=asset_checks_defs_by_key,
asset_check_compute_defs_by_key=asset_check_compute_defs_by_key,
)

def get_execution_set_asset_and_check_keys(
self, asset_or_check_key: AssetKeyOrCheckKey
) -> AbstractSet[AssetKeyOrCheckKey]:
if isinstance(asset_or_check_key, AssetKey):
return self.get(asset_or_check_key).execution_set_asset_and_check_keys
# only checks emitted by AssetsDefinition have required keys
elif asset_or_check_key in self._asset_checks_defs_by_key:
# all AssetsCheckDefinition can emit only as subset of keys
elif isinstance(
self._asset_check_compute_defs_by_key[asset_or_check_key], AssetChecksDefinition
):
return {asset_or_check_key}
else:
asset_node = self._asset_nodes_by_check_key[asset_or_check_key]
Expand All @@ -252,11 +260,44 @@ def get_execution_set_asset_and_check_keys(

@cached_property
def assets_defs(self) -> Sequence[AssetsDefinition]:
return list(dict.fromkeys(asset.assets_def for asset in self.asset_nodes))
return list(
{
*(asset.assets_def for asset in self.asset_nodes),
*(
ad
for ad in self._asset_check_compute_defs_by_key.values()
if isinstance(ad, AssetsDefinition)
),
}
)

def assets_defs_for_keys(self, keys: Iterable[AssetKey]) -> Sequence[AssetsDefinition]:
return list(dict.fromkeys([self.get(key).assets_def for key in keys]))
def assets_defs_for_keys(
self, keys: Iterable[AssetKeyOrCheckKey]
) -> Sequence[AssetsDefinition]:
return list(
{
*[self.get(key).assets_def for key in keys if isinstance(key, AssetKey)],
*[
ad
for k, ad in self._asset_check_compute_defs_by_key.items()
if k in keys and isinstance(ad, AssetsDefinition)
],
}
)

@property
def asset_checks_defs(self) -> Sequence[AssetChecksDefinition]:
return list(dict.fromkeys(self._asset_checks_defs_by_key.values()))
return list(
{
acd
for acd in self._asset_check_compute_defs_by_key.values()
if isinstance(acd, AssetChecksDefinition)
}
)

@cached_property
def asset_check_keys(self) -> AbstractSet[AssetCheckKey]:
return {
*(key for ad in self.assets_defs for key in ad.check_keys),
*(key for acd in self.asset_checks_defs for key in acd.keys),
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ def root_executable_asset_keys(self) -> AbstractSet[AssetKey]:
self.asset_dep_graph, self.observable_asset_keys | self.materializable_asset_keys
)

@cached_property
def asset_check_keys(self) -> AbstractSet[AssetCheckKey]:
return {key for asset in self.asset_nodes for key in asset.check_keys}
@property
@abstractmethod
def asset_check_keys(self) -> AbstractSet[AssetCheckKey]: ...

@cached_property
def all_partitions_defs(self) -> Sequence[PartitionsDefinition]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,10 @@ def external_asset_nodes_by_key(self) -> Mapping[AssetKey, "ExternalAssetNode"]:
def asset_checks(self) -> Sequence["ExternalAssetCheck"]:
return list(dict.fromkeys(self._asset_checks_by_key.values()))

@cached_property
def asset_check_keys(self) -> AbstractSet[AssetCheckKey]:
return {key for asset in self.asset_nodes for key in asset.check_keys}

def asset_keys_for_job(self, job_name: str) -> AbstractSet[AssetKey]:
return {node.key for node in self.asset_nodes if job_name in node.job_names}

Expand Down

0 comments on commit 5d77fcd

Please sign in to comment.