Skip to content

Commit

Permalink
move asset_or_check_key_to_dep_node_handles to assets.py (#22672)
Browse files Browse the repository at this point in the history
## Summary & Motivation

`asset_or_check_key_to_dep_node_handles` is a utility function used in
both assets.py and asset_layer.py. This moves it, and other utility
functions it depends on, from asset_layer.py to assets.py. It doesn't
change any behavior or any code inside the function.

This prepares for an upstack change that removes usages of it from
asset_layer.py.

## How I Tested These Changes
  • Loading branch information
sryza authored and cmpadden committed Jun 24, 2024
1 parent 9d09ef3 commit 0a20096
Show file tree
Hide file tree
Showing 2 changed files with 245 additions and 253 deletions.
254 changes: 3 additions & 251 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,5 @@
from collections import defaultdict
from typing import (
TYPE_CHECKING,
AbstractSet,
Dict,
Iterable,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
)
from typing import TYPE_CHECKING, AbstractSet, Dict, Iterable, Mapping, NamedTuple, Optional, Set

import dagster._check as check
from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec
Expand All @@ -24,247 +12,9 @@
if TYPE_CHECKING:
from dagster._core.definitions.asset_graph import AssetGraph, AssetNode
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.base_asset_graph import AssetKeyOrCheckKey
from dagster._core.definitions.partition_mapping import PartitionMapping


def _build_graph_dependencies(
graph_def: GraphDefinition,
parent_handle: Optional[NodeHandle],
outputs_by_graph_handle: Dict[NodeHandle, Mapping[str, NodeOutputHandle]],
non_asset_inputs_by_node_handle: Dict[NodeHandle, Sequence[NodeOutputHandle]],
assets_defs_by_node_handle: Mapping[NodeHandle, "AssetsDefinition"],
) -> None:
"""Scans through every node in the graph, making a recursive call when a node is a graph.
Builds two dictionaries:
outputs_by_graph_handle: A mapping of every graph node handle to a dictionary with each out
name as a key and a NodeOutputHandle containing the op output name and op node handle
non_asset_inputs_by_node_handle: A mapping of all node handles to all upstream node handles
that are not assets. Each key is a node output handle.
"""
dep_struct = graph_def.dependency_structure

for sub_node_name, sub_node in graph_def.node_dict.items():
curr_node_handle = NodeHandle(sub_node_name, parent=parent_handle)
if isinstance(sub_node.definition, GraphDefinition):
_build_graph_dependencies(
sub_node.definition,
curr_node_handle,
outputs_by_graph_handle,
non_asset_inputs_by_node_handle,
assets_defs_by_node_handle,
)
outputs_by_graph_handle[curr_node_handle] = {
mapping.graph_output_name: NodeOutputHandle(
NodeHandle(mapping.maps_from.node_name, parent=curr_node_handle),
mapping.maps_from.output_name,
)
for mapping in sub_node.definition.output_mappings
}
non_asset_inputs_by_node_handle[curr_node_handle] = [
NodeOutputHandle(
NodeHandle(node_output.node_name, parent=parent_handle),
node_output.output_def.name,
)
for node_output in dep_struct.all_upstream_outputs_from_node(sub_node_name)
if NodeHandle(node_output.node.name, parent=parent_handle)
not in assets_defs_by_node_handle
]


def _get_dependency_node_output_handles(
non_asset_inputs_by_node_handle: Mapping[NodeHandle, Sequence[NodeOutputHandle]],
outputs_by_graph_handle: Mapping[NodeHandle, Mapping[str, NodeOutputHandle]],
dep_node_output_handles_by_node_output_handle: Dict[
NodeOutputHandle, Sequence[NodeOutputHandle]
],
node_output_handle: NodeOutputHandle,
) -> Sequence[NodeOutputHandle]:
"""Given a node output handle, return all upstream op node output handles. All node output handles
belong in the same graph-backed asset node.
Arguments:
outputs_by_graph_handle: A mapping of every graph node handle to a dictionary with each out
name as a key and a NodeOutputHandle containing the op output name and op node handle
non_asset_inputs_by_node_handle: A mapping of all node handles to all upstream node handles
that are not assets. Each key is a node output handle.
dep_node_output_handles_by_node_output_handle: A mapping of each non-graph node output handle
to all non-graph node output handle dependencies. Used for memoization to avoid scanning
already visited nodes.
curr_node_handle: The current node handle being traversed.
graph_output_name: Name of the node output being traversed. Only used if the current node is a
graph to trace the op that generates this output.
"""
curr_node_handle = node_output_handle.node_handle

if node_output_handle in dep_node_output_handles_by_node_output_handle:
return dep_node_output_handles_by_node_output_handle[node_output_handle]

dependency_node_output_handles: List[
NodeOutputHandle
] = [] # first node in list is node output handle that outputs the asset

if curr_node_handle not in outputs_by_graph_handle:
dependency_node_output_handles.append(node_output_handle)
else: # is graph
dep_node_output_handle = outputs_by_graph_handle[curr_node_handle][
node_output_handle.output_name
]
dependency_node_output_handles.extend(
_get_dependency_node_output_handles(
non_asset_inputs_by_node_handle,
outputs_by_graph_handle,
dep_node_output_handles_by_node_output_handle,
dep_node_output_handle,
)
)
for dep_node_output_handle in non_asset_inputs_by_node_handle[curr_node_handle]:
dependency_node_output_handles.extend(
_get_dependency_node_output_handles(
non_asset_inputs_by_node_handle,
outputs_by_graph_handle,
dep_node_output_handles_by_node_output_handle,
dep_node_output_handle,
)
)

if curr_node_handle not in outputs_by_graph_handle:
dep_node_output_handles_by_node_output_handle[node_output_handle] = (
dependency_node_output_handles
)

return dependency_node_output_handles


def get_dep_node_handles_of_graph_backed_asset(
graph_def: GraphDefinition, assets_def: "AssetsDefinition"
) -> Mapping["AssetKeyOrCheckKey", Set[NodeHandle]]:
"""Given a graph-backed asset with graph_def, return a mapping of asset keys outputted by the graph
to a list of node handles within graph_def that are the dependencies of the asset.
Arguments:
graph_def: The graph definition of the graph-backed asset.
assets_def: The assets definition of the graph-backed asset.
"""
# asset_key_to_dep_node_handles takes in a graph_def that represents the entire job, where each
# node is a top-level asset node. Create a dummy graph that wraps around graph_def and pass
# the dummy graph to asset_key_to_dep_node_handles
dummy_parent_graph = GraphDefinition("dummy_parent_graph", node_defs=[graph_def])
(dep_node_handles_by_asset_key, _) = asset_or_check_key_to_dep_node_handles(
dummy_parent_graph,
{NodeHandle(name=graph_def.name, parent=None): assets_def},
)
return dep_node_handles_by_asset_key


def asset_or_check_key_to_dep_node_handles(
graph_def: GraphDefinition,
assets_defs_by_node_handle: Mapping[NodeHandle, "AssetsDefinition"],
) -> Tuple[
Mapping["AssetKeyOrCheckKey", Set[NodeHandle]],
Mapping["AssetKeyOrCheckKey", Sequence[NodeOutputHandle]],
]:
"""For each asset in assets_defs_by_node_handle, determines all the op handles and output handles
within the asset's node that are upstream dependencies of the asset.
Returns a tuple with two objects:
1. A mapping of each asset or check key to a set of node handles that are upstream dependencies of the asset.
2. A mapping of each asset or check key to a list of node output handles that are upstream dependencies of the asset.
Arguments:
graph_def: The graph definition of the job, where each top level node is an asset.
assets_defs_by_node_handle: A mapping of each node handle to the asset definition for that node.
"""
# A mapping of all node handles to all upstream node handles
# that are not assets. Each key is a node handle with node output handle value
non_asset_inputs_by_node_handle: Dict[NodeHandle, Sequence[NodeOutputHandle]] = {}

# A mapping of every graph node handle to a dictionary with each out
# name as a key and node output handle value
outputs_by_graph_handle: Dict[NodeHandle, Mapping[str, NodeOutputHandle]] = {}
_build_graph_dependencies(
graph_def=graph_def,
parent_handle=None,
outputs_by_graph_handle=outputs_by_graph_handle,
non_asset_inputs_by_node_handle=non_asset_inputs_by_node_handle,
assets_defs_by_node_handle=assets_defs_by_node_handle,
)

dep_nodes_by_asset_or_check_key: Dict["AssetKeyOrCheckKey", List[NodeHandle]] = {}
dep_node_outputs_by_asset_or_check_key: Dict["AssetKeyOrCheckKey", List[NodeOutputHandle]] = {}

for node_handle, assets_defs in assets_defs_by_node_handle.items():
dep_node_output_handles_by_node: Dict[
NodeOutputHandle, Sequence[NodeOutputHandle]
] = {} # memoized map of node output handles to all node output handle dependencies that are from ops
for (
output_name,
asset_or_check_key,
) in assets_defs.asset_and_check_keys_by_output_name.items():
dep_nodes_by_asset_or_check_key[
asset_or_check_key
] = [] # first element in list is node that outputs asset

dep_node_outputs_by_asset_or_check_key[asset_or_check_key] = []

if node_handle not in outputs_by_graph_handle:
dep_nodes_by_asset_or_check_key[asset_or_check_key].extend([node_handle])
else: # is graph
# node output handle for the given asset key
node_output_handle = outputs_by_graph_handle[node_handle][output_name]

dep_node_output_handles = _get_dependency_node_output_handles(
non_asset_inputs_by_node_handle,
outputs_by_graph_handle,
dep_node_output_handles_by_node,
node_output_handle,
)

dep_node_outputs_by_asset_or_check_key[asset_or_check_key].extend(
dep_node_output_handles
)

# handle internal_asset_deps within graph-backed assets
for assets_def in assets_defs_by_node_handle.values():
for asset_key, dep_asset_keys in assets_def.asset_deps.items():
if asset_key not in assets_def.keys:
continue
for dep_asset_key in [key for key in dep_asset_keys if key in assets_def.keys]:
if len(dep_node_outputs_by_asset_or_check_key[asset_key]) == 0:
# This case occurs when the asset is not yielded from a graph-backed asset
continue
node_output_handle = dep_node_outputs_by_asset_or_check_key[asset_key][
0
] # first item in list is the original node output handle that outputs the asset
dep_asset_key_node_output_handles = [
output_handle
for output_handle in dep_node_outputs_by_asset_or_check_key[dep_asset_key]
if output_handle != node_output_handle
]
dep_node_outputs_by_asset_or_check_key[asset_key] = [
node_output
for node_output in dep_node_outputs_by_asset_or_check_key[asset_key]
if node_output not in dep_asset_key_node_output_handles
]

# For graph-backed assets, we've resolved the upstream node output handles dependencies for each
# node output handle in dep_node_outputs_by_asset_or_check_key. We use this to find the upstream
# node handle dependencies.
for key, dep_node_outputs in dep_node_outputs_by_asset_or_check_key.items():
dep_nodes_by_asset_or_check_key[key].extend(
[node_output.node_handle for node_output in dep_node_outputs]
)

dep_node_set_by_asset_or_check_key: Dict["AssetKeyOrCheckKey", Set[NodeHandle]] = {}
for key, dep_node_handles in dep_nodes_by_asset_or_check_key.items():
dep_node_set_by_asset_or_check_key[key] = set(dep_node_handles)
return dep_node_set_by_asset_or_check_key, dep_node_outputs_by_asset_or_check_key


class AssetLayer(NamedTuple):
"""Stores all of the asset-related information for a Dagster job. Maps each
input / output in the underlying graph to the asset it represents (if any), and records the
Expand Down Expand Up @@ -306,6 +56,8 @@ def from_graph_and_assets_node_mapping(
assets_defs_by_outer_node_handle (Mapping[NodeHandle, AssetsDefinition]): A mapping from
a NodeHandle pointing to the node in the graph where the AssetsDefinition ended up.
"""
from .assets import asset_or_check_key_to_dep_node_handles

asset_key_by_input: Dict[NodeInputHandle, AssetKey] = {}
asset_keys_by_node_output_handle: Dict[NodeOutputHandle, AssetKey] = {}
check_key_by_output: Dict[NodeOutputHandle, AssetCheckKey] = {}
Expand Down
Loading

0 comments on commit 0a20096

Please sign in to comment.