Skip to content

Commit

Permalink
[external-assets] Implement AssetGraph with AssetNode and RemoteAsset…
Browse files Browse the repository at this point in the history
…Node (#20114)

## Summary & Motivation

Internal companion PR: dagster-io/internal#8537

Initial implementation of asset nodes for the `AssetGraph`.

- `BaseAssetGraph` is now generic in a new `BaseAssetNode` class that
exposes the metadata for an asset.
- The node class for `AssetGraph` is `AssetNode`. It wraps an
`AssetsDefinition`.
- The node class for `RemoteAssetGraph` is `RemoteAssetNode`. It wraps a
list of `ExternalAssetNode` (to be renamed upstack) objects sourced from
one or more code locations.
- Moving to nodes with a common interface allows many property accessor
methods to be deleted on `BaseAssetGraph` and exposed on `BaseAssetNode`
instead. The use of a common interface on the two kinds of nodes allows
other method impls to be hoisted to the base `AssetGraph` class.
- To reduce noise in this PR, I have not changed callsites (with a few
exceptions), and instead just swapped out property accessor method
impls. Callsites are changed in an upstack PR, where e.g.
`asset_graph.get(<key>).auto_materialize_policy` is used.

## How I Tested These Changes

Existing test suite.
  • Loading branch information
smackesey authored and PedramNavid committed Mar 28, 2024
1 parent e4fb18e commit c5764e9
Show file tree
Hide file tree
Showing 5 changed files with 641 additions and 436 deletions.
303 changes: 156 additions & 147 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from functools import cached_property
from typing import AbstractSet, Iterable, Mapping, Optional, Sequence, Union

from dagster._core.definitions.asset_check_spec import AssetCheckKey
Expand All @@ -9,38 +10,132 @@
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.base_asset_graph import AssetKeyOrCheckKey, BaseAssetGraph
from dagster._core.definitions.base_asset_graph import (
AssetKeyOrCheckKey,
BaseAssetGraph,
BaseAssetNode,
)
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.partition_mapping import PartitionMapping
from dagster._core.definitions.resolved_asset_deps import ResolvedAssetDependencies
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.selector.subset_selector import DependencyGraph, generate_asset_dep_graph
from dagster._utils.cached_method import cached_method
from dagster._core.selector.subset_selector import (
generate_asset_dep_graph,
)


class AssetGraph(BaseAssetGraph):
class AssetNode(BaseAssetNode):
def __init__(
self,
assets_defs: Sequence[AssetsDefinition],
asset_checks_defs: Sequence[AssetChecksDefinition],
key: AssetKey,
parent_keys: AbstractSet[AssetKey],
child_keys: AbstractSet[AssetKey],
assets_def: AssetsDefinition,
check_keys: AbstractSet[AssetCheckKey],
):
self._assets_defs = assets_defs
self._assets_defs_by_key = {key: asset for asset in self._assets_defs for key in asset.keys}
self._assets_defs_by_check_key = {
**{check_key: asset for asset in assets_defs for check_key in asset.check_keys},
self.key = key
self.parent_keys = parent_keys
self.child_keys = child_keys
self.assets_def = assets_def
self._check_keys = check_keys

@property
def group_name(self) -> Optional[str]:
return self.assets_def.group_names_by_key.get(self.key)

@property
def is_materializable(self) -> bool:
return self.assets_def.is_materializable

@property
def is_observable(self) -> bool:
return self.assets_def.is_observable

@property
def is_external(self) -> bool:
return self.assets_def.is_external

@property
def is_executable(self) -> bool:
return self.assets_def.is_executable

@property
def is_partitioned(self) -> bool:
return self.assets_def.partitions_def is not None

@property
def partitions_def(self) -> Optional[PartitionsDefinition]:
return self.assets_def.partitions_def

@property
def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]:
return self.assets_def.partition_mappings

@property
def freshness_policy(self) -> Optional[FreshnessPolicy]:
return self.assets_def.freshness_policies_by_key.get(self.key)

@property
def auto_materialize_policy(self) -> Optional[AutoMaterializePolicy]:
return self.assets_def.auto_materialize_policies_by_key.get(self.key)

@property
def auto_observe_interval_minutes(self) -> Optional[float]:
return self.assets_def.auto_observe_interval_minutes

@property
def backfill_policy(self) -> Optional[BackfillPolicy]:
return self.assets_def.backfill_policy

@property
def code_version(self) -> Optional[str]:
return self.assets_def.code_versions_by_key.get(self.key)

@property
def check_keys(self) -> AbstractSet[AssetCheckKey]:
return self._check_keys

@property
def execution_set_asset_keys(self) -> AbstractSet[AssetKey]:
return (
{self.key}
if len(self.assets_def.keys) <= 1 or self.assets_def.can_subset
else self.assets_def.keys
)

@property
def execution_set_asset_and_check_keys(self) -> AbstractSet[AssetKeyOrCheckKey]:
if self.assets_def.can_subset or (
len(self.assets_def.keys) <= 1 and not len(self.assets_def.check_keys) > 0
):
return {self.key}
else:
return {*self.assets_def.keys, *self.assets_def.check_keys}


class AssetGraph(BaseAssetGraph[AssetNode]):
_asset_checks_defs_by_key: Mapping[AssetCheckKey, AssetChecksDefinition]

def __init__(
self,
asset_nodes_by_key: Mapping[AssetKey, AssetNode],
asset_checks_defs_by_key: Mapping[AssetCheckKey, AssetChecksDefinition],
):
self._asset_nodes_by_key = asset_nodes_by_key
self._asset_checks_defs_by_key = asset_checks_defs_by_key
self._asset_nodes_by_check_key = {
**{
check_key: self._assets_defs_by_key[check.asset_key]
for check in asset_checks_defs
for check_key in check.keys
if check.asset_key in self._assets_defs_by_key
check_key: asset
for asset in asset_nodes_by_key.values()
for check_key in asset.check_keys
},
**{
key: asset_nodes_by_key[checks_def.asset_key]
for key, checks_def in asset_checks_defs_by_key.items()
if checks_def.asset_key in asset_nodes_by_key
},
}

self._asset_checks_defs = asset_checks_defs
self._asset_checks_defs_by_key = {
key: check for check in asset_checks_defs for key in check.keys
}

@staticmethod
Expand Down Expand Up @@ -102,147 +197,61 @@ def normalize_assets(
@classmethod
def from_assets(
cls,
all_assets: Iterable[Union[AssetsDefinition, SourceAsset]],
assets: Iterable[Union[AssetsDefinition, SourceAsset]],
asset_checks: Optional[Sequence[AssetChecksDefinition]] = None,
) -> "AssetGraph":
assets_defs = cls.normalize_assets(all_assets)
return AssetGraph(
assets_defs=assets_defs,
asset_checks_defs=asset_checks or [],
)

@property
def asset_check_keys(self) -> AbstractSet[AssetCheckKey]:
return {
*(key for check in self._asset_checks_defs for key in check.keys),
*(key for asset in self._assets_defs for key in asset.check_keys),
asset_checks = asset_checks or []
assets_defs = cls.normalize_assets(assets, asset_checks)

# Build the set of AssetNodes. Each node holds key rather than object references to parent
# and child nodes.
dep_graph = generate_asset_dep_graph(assets_defs, [])
asset_nodes_by_key = {
key: AssetNode(
key=key,
parent_keys=dep_graph["upstream"][key],
child_keys=dep_graph["downstream"][key],
assets_def=ad,
check_keys={
*ad.check_keys,
*(ck for cd in asset_checks if cd.asset_key == key for ck in cd.keys),
},
)
for ad in assets_defs
for key in ad.keys
}

@property
def assets_defs(self) -> Sequence[AssetsDefinition]:
return self._assets_defs

def get_assets_def(self, asset_key: AssetKey) -> AssetsDefinition:
return self._assets_defs_by_key[asset_key]

def has_asset(self, asset_key: AssetKey) -> bool:
return asset_key in self._assets_defs_by_key

def get_assets_def_for_check(
self, asset_check_key: AssetCheckKey
) -> Optional[AssetsDefinition]:
return self._assets_defs_by_check_key.get(asset_check_key)

@property
def asset_checks_defs(self) -> Sequence[AssetChecksDefinition]:
return self._asset_checks_defs

def get_asset_checks_def(self, asset_check_key: AssetCheckKey) -> AssetChecksDefinition:
return self._asset_checks_defs_by_key[asset_check_key]

def has_asset_check(self, asset_check_key: AssetCheckKey) -> bool:
return asset_check_key in self._asset_checks_defs_by_key

@property
@cached_method
def asset_dep_graph(self) -> DependencyGraph[AssetKey]:
return generate_asset_dep_graph(self._assets_defs, [])

@property
@cached_method
def all_asset_keys(self) -> AbstractSet[AssetKey]:
return {key for ad in self._assets_defs for key in ad.keys}

@property
@cached_method
def materializable_asset_keys(self) -> AbstractSet[AssetKey]:
return {key for ad in self._assets_defs if ad.is_materializable for key in ad.keys}

def is_materializable(self, asset_key: AssetKey) -> bool:
return self.get_assets_def(asset_key).is_materializable

@property
@cached_method
def observable_asset_keys(self) -> AbstractSet[AssetKey]:
return {key for ad in self._assets_defs if ad.is_observable for key in ad.keys}

def is_observable(self, asset_key: AssetKey) -> bool:
return self.get_assets_def(asset_key).is_observable

@property
@cached_method
def external_asset_keys(self) -> AbstractSet[AssetKey]:
return {key for ad in self._assets_defs if ad.is_external for key in ad.keys}

def is_external(self, asset_key: AssetKey) -> bool:
return self.get_assets_def(asset_key).is_external

@property
@cached_method
def executable_asset_keys(self) -> AbstractSet[AssetKey]:
return {key for ad in self._assets_defs if ad.is_executable for key in ad.keys}

def is_executable(self, asset_key: AssetKey) -> bool:
return self.get_assets_def(asset_key).is_executable

def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]:
return {
key
for ad in self._assets_defs
for key in ad.keys
if ad.group_names_by_key[key] == group_name
asset_checks_defs_by_key = {
key: check for check in (asset_checks or []) for key in check.keys
}

def get_execution_set_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
asset = self.get_assets_def(asset_key)
return {asset_key} if len(asset.keys) <= 1 or asset.can_subset else asset.keys
return AssetGraph(
asset_nodes_by_key=asset_nodes_by_key,
asset_checks_defs_by_key=asset_checks_defs_by_key,
)

def get_execution_set_asset_and_check_keys(
self, asset_or_check_key: AssetKeyOrCheckKey
) -> AbstractSet[AssetKeyOrCheckKey]:
if isinstance(asset_or_check_key, AssetKey):
asset = self.get_assets_def(asset_or_check_key)
else: # AssetCheckKey
# only checks emitted by AssetsDefinition have required keys
if self.has_asset_check(asset_or_check_key):
return {asset_or_check_key}
else:
asset = self.get_assets_def_for_check(asset_or_check_key)
if asset is None or asset_or_check_key not in asset.check_keys:
return {asset_or_check_key}
has_checks = len(asset.check_keys) > 0
if asset.can_subset or len(asset.keys) <= 1 and not has_checks:
return self.get(asset_or_check_key).execution_set_asset_and_check_keys
# only checks emitted by AssetsDefinition have required keys
elif asset_or_check_key in self._asset_checks_defs_by_key:
return {asset_or_check_key}
else:
return {*asset.keys, *asset.check_keys}

@property
@cached_method
def all_group_names(self) -> AbstractSet[str]:
return {
group_name for ad in self._assets_defs for group_name in ad.group_names_by_key.values()
}

def get_partitions_def(self, asset_key: AssetKey) -> Optional[PartitionsDefinition]:
return self.get_assets_def(asset_key).partitions_def

def get_partition_mappings(self, asset_key: AssetKey) -> Mapping[AssetKey, PartitionMapping]:
return self.get_assets_def(asset_key).partition_mappings

def get_group_name(self, asset_key: AssetKey) -> Optional[str]:
return self.get_assets_def(asset_key).group_names_by_key.get(asset_key)

def get_freshness_policy(self, asset_key: AssetKey) -> Optional[FreshnessPolicy]:
return self.get_assets_def(asset_key).freshness_policies_by_key.get(asset_key)

def get_auto_materialize_policy(self, asset_key: AssetKey) -> Optional[AutoMaterializePolicy]:
return self.get_assets_def(asset_key).auto_materialize_policies_by_key.get(asset_key)
asset_node = self._asset_nodes_by_check_key[asset_or_check_key]
asset_unit_keys = asset_node.execution_set_asset_and_check_keys
return (
asset_unit_keys if asset_or_check_key in asset_unit_keys else {asset_or_check_key}
)

def get_auto_observe_interval_minutes(self, asset_key: AssetKey) -> Optional[float]:
return self.get_assets_def(asset_key).auto_observe_interval_minutes
def get_assets_def(self, asset_key: AssetKey) -> AssetsDefinition:
return self.get(asset_key).assets_def

def get_backfill_policy(self, asset_key: AssetKey) -> Optional[BackfillPolicy]:
return self.get_assets_def(asset_key).backfill_policy
@cached_property
def assets_defs(self) -> Sequence[AssetsDefinition]:
return list(dict.fromkeys(asset.assets_def for asset in self.asset_nodes))

def get_code_version(self, asset_key: AssetKey) -> Optional[str]:
return self.get_assets_def(asset_key).code_versions_by_key.get(asset_key)
@property
def asset_checks_defs(self) -> Sequence[AssetChecksDefinition]:
return list(dict.fromkeys(self._asset_checks_defs_by_key.values()))
Loading

0 comments on commit c5764e9

Please sign in to comment.