Skip to content

Commit

Permalink
[external-assets] Add AssetNode to AssetGraph
Browse files Browse the repository at this point in the history
[INTERNAL_BRANCH=sean/external-assets-asset-graph-nodes-1]
  • Loading branch information
smackesey committed Mar 7, 2024
1 parent 2520d84 commit f60e7cb
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 187 deletions.
56 changes: 30 additions & 26 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from functools import cached_property
from typing import AbstractSet, Iterable, Mapping, Optional, Sequence, Union

from dagster._core.definitions.asset_check_spec import AssetCheckKey
Expand All @@ -9,27 +10,38 @@
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.base_asset_graph import AssetKeyOrCheckKey, BaseAssetGraph, BaseAssetNode
from dagster._core.definitions.base_asset_graph import (
AssetKeyOrCheckKey,
BaseAssetGraph,
BaseAssetNode,
)
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.partition_mapping import PartitionMapping
from dagster._core.definitions.resolved_asset_deps import ResolvedAssetDependencies
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.selector.subset_selector import generate_asset_dep_graph
from dagster._utils.cached_method import cached_method
from dagster._core.selector.subset_selector import (
generate_asset_dep_graph,
)


class AssetNode(BaseAssetNode):
def __init__(
self, key: AssetKey, assets_def: AssetsDefinition, check_keys: AbstractSet[AssetCheckKey]
self,
key: AssetKey,
parent_keys: AbstractSet[AssetKey],
child_keys: AbstractSet[AssetKey],
assets_def: AssetsDefinition,
check_keys: AbstractSet[AssetCheckKey],
):
self.key = key
self.parent_keys = parent_keys
self.child_keys = child_keys
self.assets_def = assets_def
self._check_keys = check_keys

@property
@cached_method
def group_name(self) -> Optional[str]:
return self.assets_def.group_names_by_key.get(self.key)

Expand Down Expand Up @@ -111,7 +123,7 @@ def __init__(
asset_nodes_by_key: Mapping[AssetKey, AssetNode],
asset_checks_defs_by_key: Mapping[AssetCheckKey, AssetChecksDefinition],
):
self.asset_nodes_by_key = asset_nodes_by_key
self._asset_nodes_by_key = asset_nodes_by_key
self._asset_checks_defs_by_key = asset_checks_defs_by_key
self._asset_nodes_by_check_key = {
**{
Expand Down Expand Up @@ -187,42 +199,37 @@ def from_assets(
cls,
assets: Iterable[Union[AssetsDefinition, SourceAsset]],
asset_checks: Optional[Sequence[AssetChecksDefinition]] = None,
) -> "InternalAssetGraph":
) -> "AssetGraph":
asset_checks = asset_checks or []
assets_defs = cls.normalize_assets(assets, asset_checks)

# Build the set of AssetNodes. Each node holds key rather than object references to parent
# and child nodes.
dep_graph = generate_asset_dep_graph(assets_defs, [])
asset_nodes_by_key = {
k: AssetNode(
key=k,
key: AssetNode(
key=key,
parent_keys=dep_graph["upstream"][key],
child_keys=dep_graph["downstream"][key],
assets_def=ad,
check_keys={
*ad.check_keys,
*(ck for cd in asset_checks if cd.asset_key == k for ck in cd.keys),
*(ck for cd in asset_checks if cd.asset_key == key for ck in cd.keys),
},
)
for ad in assets_defs
for k in ad.keys
for key in ad.keys
}
dep_graph = generate_asset_dep_graph(assets_defs, [])
for key, node in asset_nodes_by_key.items():
node.set_children(
{asset_nodes_by_key[child_key] for child_key in dep_graph["downstream"][key]}
)
node.set_parents(
{asset_nodes_by_key[parent_key] for parent_key in dep_graph["upstream"][key]}
)

asset_checks_defs_by_key = {
key: check for check in (asset_checks or []) for key in check.keys
}

return InternalAssetGraph(
return AssetGraph(
asset_nodes_by_key=asset_nodes_by_key,
asset_checks_defs_by_key=asset_checks_defs_by_key,
)

##### COMMON ASSET GRAPH INTERFACE

def get_execution_set_asset_and_check_keys(
self, asset_or_check_key: AssetKeyOrCheckKey
) -> AbstractSet[AssetKeyOrCheckKey]:
Expand All @@ -238,13 +245,10 @@ def get_execution_set_asset_and_check_keys(
asset_unit_keys if asset_or_check_key in asset_unit_keys else {asset_or_check_key}
)

##### INTERNAL-SPECIFIC INTERFACE

def get_assets_def(self, asset_key: AssetKey) -> AssetsDefinition:
return self.get(asset_key).assets_def

@property
@cached_method
@cached_property
def assets_defs(self) -> Sequence[AssetsDefinition]:
return list(dict.fromkeys(asset.assets_def for asset in self.asset_nodes))

Expand Down
Loading

0 comments on commit f60e7cb

Please sign in to comment.