diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index ffb20f5539b34..5789ac8e0e051 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -1,17 +1,5 @@ from collections import defaultdict -from typing import ( - TYPE_CHECKING, - AbstractSet, - Dict, - Iterable, - List, - Mapping, - NamedTuple, - Optional, - Sequence, - Set, - Tuple, -) +from typing import TYPE_CHECKING, AbstractSet, Dict, Iterable, Mapping, NamedTuple, Optional, Set import dagster._check as check from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec @@ -24,247 +12,9 @@ if TYPE_CHECKING: from dagster._core.definitions.asset_graph import AssetGraph, AssetNode from dagster._core.definitions.assets import AssetsDefinition - from dagster._core.definitions.base_asset_graph import AssetKeyOrCheckKey from dagster._core.definitions.partition_mapping import PartitionMapping -def _build_graph_dependencies( - graph_def: GraphDefinition, - parent_handle: Optional[NodeHandle], - outputs_by_graph_handle: Dict[NodeHandle, Mapping[str, NodeOutputHandle]], - non_asset_inputs_by_node_handle: Dict[NodeHandle, Sequence[NodeOutputHandle]], - assets_defs_by_node_handle: Mapping[NodeHandle, "AssetsDefinition"], -) -> None: - """Scans through every node in the graph, making a recursive call when a node is a graph. - - Builds two dictionaries: - - outputs_by_graph_handle: A mapping of every graph node handle to a dictionary with each out - name as a key and a NodeOutputHandle containing the op output name and op node handle - - non_asset_inputs_by_node_handle: A mapping of all node handles to all upstream node handles - that are not assets. Each key is a node output handle. - """ - dep_struct = graph_def.dependency_structure - - for sub_node_name, sub_node in graph_def.node_dict.items(): - curr_node_handle = NodeHandle(sub_node_name, parent=parent_handle) - if isinstance(sub_node.definition, GraphDefinition): - _build_graph_dependencies( - sub_node.definition, - curr_node_handle, - outputs_by_graph_handle, - non_asset_inputs_by_node_handle, - assets_defs_by_node_handle, - ) - outputs_by_graph_handle[curr_node_handle] = { - mapping.graph_output_name: NodeOutputHandle( - NodeHandle(mapping.maps_from.node_name, parent=curr_node_handle), - mapping.maps_from.output_name, - ) - for mapping in sub_node.definition.output_mappings - } - non_asset_inputs_by_node_handle[curr_node_handle] = [ - NodeOutputHandle( - NodeHandle(node_output.node_name, parent=parent_handle), - node_output.output_def.name, - ) - for node_output in dep_struct.all_upstream_outputs_from_node(sub_node_name) - if NodeHandle(node_output.node.name, parent=parent_handle) - not in assets_defs_by_node_handle - ] - - -def _get_dependency_node_output_handles( - non_asset_inputs_by_node_handle: Mapping[NodeHandle, Sequence[NodeOutputHandle]], - outputs_by_graph_handle: Mapping[NodeHandle, Mapping[str, NodeOutputHandle]], - dep_node_output_handles_by_node_output_handle: Dict[ - NodeOutputHandle, Sequence[NodeOutputHandle] - ], - node_output_handle: NodeOutputHandle, -) -> Sequence[NodeOutputHandle]: - """Given a node output handle, return all upstream op node output handles. All node output handles - belong in the same graph-backed asset node. - - Arguments: - outputs_by_graph_handle: A mapping of every graph node handle to a dictionary with each out - name as a key and a NodeOutputHandle containing the op output name and op node handle - non_asset_inputs_by_node_handle: A mapping of all node handles to all upstream node handles - that are not assets. Each key is a node output handle. - dep_node_output_handles_by_node_output_handle: A mapping of each non-graph node output handle - to all non-graph node output handle dependencies. Used for memoization to avoid scanning - already visited nodes. - curr_node_handle: The current node handle being traversed. - graph_output_name: Name of the node output being traversed. Only used if the current node is a - graph to trace the op that generates this output. - """ - curr_node_handle = node_output_handle.node_handle - - if node_output_handle in dep_node_output_handles_by_node_output_handle: - return dep_node_output_handles_by_node_output_handle[node_output_handle] - - dependency_node_output_handles: List[ - NodeOutputHandle - ] = [] # first node in list is node output handle that outputs the asset - - if curr_node_handle not in outputs_by_graph_handle: - dependency_node_output_handles.append(node_output_handle) - else: # is graph - dep_node_output_handle = outputs_by_graph_handle[curr_node_handle][ - node_output_handle.output_name - ] - dependency_node_output_handles.extend( - _get_dependency_node_output_handles( - non_asset_inputs_by_node_handle, - outputs_by_graph_handle, - dep_node_output_handles_by_node_output_handle, - dep_node_output_handle, - ) - ) - for dep_node_output_handle in non_asset_inputs_by_node_handle[curr_node_handle]: - dependency_node_output_handles.extend( - _get_dependency_node_output_handles( - non_asset_inputs_by_node_handle, - outputs_by_graph_handle, - dep_node_output_handles_by_node_output_handle, - dep_node_output_handle, - ) - ) - - if curr_node_handle not in outputs_by_graph_handle: - dep_node_output_handles_by_node_output_handle[node_output_handle] = ( - dependency_node_output_handles - ) - - return dependency_node_output_handles - - -def get_dep_node_handles_of_graph_backed_asset( - graph_def: GraphDefinition, assets_def: "AssetsDefinition" -) -> Mapping["AssetKeyOrCheckKey", Set[NodeHandle]]: - """Given a graph-backed asset with graph_def, return a mapping of asset keys outputted by the graph - to a list of node handles within graph_def that are the dependencies of the asset. - - Arguments: - graph_def: The graph definition of the graph-backed asset. - assets_def: The assets definition of the graph-backed asset. - - """ - # asset_key_to_dep_node_handles takes in a graph_def that represents the entire job, where each - # node is a top-level asset node. Create a dummy graph that wraps around graph_def and pass - # the dummy graph to asset_key_to_dep_node_handles - dummy_parent_graph = GraphDefinition("dummy_parent_graph", node_defs=[graph_def]) - (dep_node_handles_by_asset_key, _) = asset_or_check_key_to_dep_node_handles( - dummy_parent_graph, - {NodeHandle(name=graph_def.name, parent=None): assets_def}, - ) - return dep_node_handles_by_asset_key - - -def asset_or_check_key_to_dep_node_handles( - graph_def: GraphDefinition, - assets_defs_by_node_handle: Mapping[NodeHandle, "AssetsDefinition"], -) -> Tuple[ - Mapping["AssetKeyOrCheckKey", Set[NodeHandle]], - Mapping["AssetKeyOrCheckKey", Sequence[NodeOutputHandle]], -]: - """For each asset in assets_defs_by_node_handle, determines all the op handles and output handles - within the asset's node that are upstream dependencies of the asset. - - Returns a tuple with two objects: - 1. A mapping of each asset or check key to a set of node handles that are upstream dependencies of the asset. - 2. A mapping of each asset or check key to a list of node output handles that are upstream dependencies of the asset. - - Arguments: - graph_def: The graph definition of the job, where each top level node is an asset. - assets_defs_by_node_handle: A mapping of each node handle to the asset definition for that node. - """ - # A mapping of all node handles to all upstream node handles - # that are not assets. Each key is a node handle with node output handle value - non_asset_inputs_by_node_handle: Dict[NodeHandle, Sequence[NodeOutputHandle]] = {} - - # A mapping of every graph node handle to a dictionary with each out - # name as a key and node output handle value - outputs_by_graph_handle: Dict[NodeHandle, Mapping[str, NodeOutputHandle]] = {} - _build_graph_dependencies( - graph_def=graph_def, - parent_handle=None, - outputs_by_graph_handle=outputs_by_graph_handle, - non_asset_inputs_by_node_handle=non_asset_inputs_by_node_handle, - assets_defs_by_node_handle=assets_defs_by_node_handle, - ) - - dep_nodes_by_asset_or_check_key: Dict["AssetKeyOrCheckKey", List[NodeHandle]] = {} - dep_node_outputs_by_asset_or_check_key: Dict["AssetKeyOrCheckKey", List[NodeOutputHandle]] = {} - - for node_handle, assets_defs in assets_defs_by_node_handle.items(): - dep_node_output_handles_by_node: Dict[ - NodeOutputHandle, Sequence[NodeOutputHandle] - ] = {} # memoized map of node output handles to all node output handle dependencies that are from ops - for ( - output_name, - asset_or_check_key, - ) in assets_defs.asset_and_check_keys_by_output_name.items(): - dep_nodes_by_asset_or_check_key[ - asset_or_check_key - ] = [] # first element in list is node that outputs asset - - dep_node_outputs_by_asset_or_check_key[asset_or_check_key] = [] - - if node_handle not in outputs_by_graph_handle: - dep_nodes_by_asset_or_check_key[asset_or_check_key].extend([node_handle]) - else: # is graph - # node output handle for the given asset key - node_output_handle = outputs_by_graph_handle[node_handle][output_name] - - dep_node_output_handles = _get_dependency_node_output_handles( - non_asset_inputs_by_node_handle, - outputs_by_graph_handle, - dep_node_output_handles_by_node, - node_output_handle, - ) - - dep_node_outputs_by_asset_or_check_key[asset_or_check_key].extend( - dep_node_output_handles - ) - - # handle internal_asset_deps within graph-backed assets - for assets_def in assets_defs_by_node_handle.values(): - for asset_key, dep_asset_keys in assets_def.asset_deps.items(): - if asset_key not in assets_def.keys: - continue - for dep_asset_key in [key for key in dep_asset_keys if key in assets_def.keys]: - if len(dep_node_outputs_by_asset_or_check_key[asset_key]) == 0: - # This case occurs when the asset is not yielded from a graph-backed asset - continue - node_output_handle = dep_node_outputs_by_asset_or_check_key[asset_key][ - 0 - ] # first item in list is the original node output handle that outputs the asset - dep_asset_key_node_output_handles = [ - output_handle - for output_handle in dep_node_outputs_by_asset_or_check_key[dep_asset_key] - if output_handle != node_output_handle - ] - dep_node_outputs_by_asset_or_check_key[asset_key] = [ - node_output - for node_output in dep_node_outputs_by_asset_or_check_key[asset_key] - if node_output not in dep_asset_key_node_output_handles - ] - - # For graph-backed assets, we've resolved the upstream node output handles dependencies for each - # node output handle in dep_node_outputs_by_asset_or_check_key. We use this to find the upstream - # node handle dependencies. - for key, dep_node_outputs in dep_node_outputs_by_asset_or_check_key.items(): - dep_nodes_by_asset_or_check_key[key].extend( - [node_output.node_handle for node_output in dep_node_outputs] - ) - - dep_node_set_by_asset_or_check_key: Dict["AssetKeyOrCheckKey", Set[NodeHandle]] = {} - for key, dep_node_handles in dep_nodes_by_asset_or_check_key.items(): - dep_node_set_by_asset_or_check_key[key] = set(dep_node_handles) - return dep_node_set_by_asset_or_check_key, dep_node_outputs_by_asset_or_check_key - - class AssetLayer(NamedTuple): """Stores all of the asset-related information for a Dagster job. Maps each input / output in the underlying graph to the asset it represents (if any), and records the @@ -306,6 +56,8 @@ def from_graph_and_assets_node_mapping( assets_defs_by_outer_node_handle (Mapping[NodeHandle, AssetsDefinition]): A mapping from a NodeHandle pointing to the node in the graph where the AssetsDefinition ended up. """ + from .assets import asset_or_check_key_to_dep_node_handles + asset_key_by_input: Dict[NodeInputHandle, AssetKey] = {} asset_keys_by_node_output_handle: Dict[NodeOutputHandle, AssetKey] = {} check_key_by_output: Dict[NodeOutputHandle, AssetCheckKey] = {} diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 2c5dcc2488a6d..553a91e959214 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -25,7 +25,6 @@ from dagster._annotations import experimental_param, public from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec from dagster._core.definitions.asset_dep import AssetDep -from dagster._core.definitions.asset_layer import get_dep_node_handles_of_graph_backed_asset from dagster._core.definitions.asset_spec import ( SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET, @@ -67,7 +66,7 @@ from dagster._utils.warnings import ExperimentalWarning, disable_dagster_warnings from .asset_spec import SYSTEM_METADATA_KEY_IO_MANAGER_KEY, AssetSpec -from .dependency import NodeHandle +from .dependency import NodeHandle, NodeOutputHandle from .events import AssetKey, CoercibleToAssetKey, CoercibleToAssetKeyPrefix from .node_definition import NodeDefinition from .op_definition import OpDefinition @@ -1917,3 +1916,244 @@ def unique_id_from_asset_and_check_keys( return non_secure_md5_hash_str( json.dumps(sorted_asset_key_strs + sorted_check_key_strs).encode("utf-8") )[:8] + + +def _build_graph_dependencies( + graph_def: "GraphDefinition", + parent_handle: Optional[NodeHandle], + outputs_by_graph_handle: Dict[NodeHandle, Mapping[str, NodeOutputHandle]], + non_asset_inputs_by_node_handle: Dict[NodeHandle, Sequence[NodeOutputHandle]], + assets_defs_by_node_handle: Mapping[NodeHandle, "AssetsDefinition"], +) -> None: + """Scans through every node in the graph, making a recursive call when a node is a graph. + + Builds two dictionaries: + + outputs_by_graph_handle: A mapping of every graph node handle to a dictionary with each out + name as a key and a NodeOutputHandle containing the op output name and op node handle + + non_asset_inputs_by_node_handle: A mapping of all node handles to all upstream node handles + that are not assets. Each key is a node output handle. + """ + from .graph_definition import GraphDefinition + + dep_struct = graph_def.dependency_structure + + for sub_node_name, sub_node in graph_def.node_dict.items(): + curr_node_handle = NodeHandle(sub_node_name, parent=parent_handle) + if isinstance(sub_node.definition, GraphDefinition): + _build_graph_dependencies( + sub_node.definition, + curr_node_handle, + outputs_by_graph_handle, + non_asset_inputs_by_node_handle, + assets_defs_by_node_handle, + ) + outputs_by_graph_handle[curr_node_handle] = { + mapping.graph_output_name: NodeOutputHandle( + NodeHandle(mapping.maps_from.node_name, parent=curr_node_handle), + mapping.maps_from.output_name, + ) + for mapping in sub_node.definition.output_mappings + } + non_asset_inputs_by_node_handle[curr_node_handle] = [ + NodeOutputHandle( + NodeHandle(node_output.node_name, parent=parent_handle), + node_output.output_def.name, + ) + for node_output in dep_struct.all_upstream_outputs_from_node(sub_node_name) + if NodeHandle(node_output.node.name, parent=parent_handle) + not in assets_defs_by_node_handle + ] + + +def _get_dependency_node_output_handles( + non_asset_inputs_by_node_handle: Mapping[NodeHandle, Sequence[NodeOutputHandle]], + outputs_by_graph_handle: Mapping[NodeHandle, Mapping[str, NodeOutputHandle]], + dep_node_output_handles_by_node_output_handle: Dict[ + NodeOutputHandle, Sequence[NodeOutputHandle] + ], + node_output_handle: NodeOutputHandle, +) -> Sequence[NodeOutputHandle]: + """Given a node output handle, return all upstream op node output handles. All node output handles + belong in the same graph-backed asset node. + + Arguments: + outputs_by_graph_handle: A mapping of every graph node handle to a dictionary with each out + name as a key and a NodeOutputHandle containing the op output name and op node handle + non_asset_inputs_by_node_handle: A mapping of all node handles to all upstream node handles + that are not assets. Each key is a node output handle. + dep_node_output_handles_by_node_output_handle: A mapping of each non-graph node output handle + to all non-graph node output handle dependencies. Used for memoization to avoid scanning + already visited nodes. + curr_node_handle: The current node handle being traversed. + graph_output_name: Name of the node output being traversed. Only used if the current node is a + graph to trace the op that generates this output. + """ + curr_node_handle = node_output_handle.node_handle + + if node_output_handle in dep_node_output_handles_by_node_output_handle: + return dep_node_output_handles_by_node_output_handle[node_output_handle] + + dependency_node_output_handles: List[ + NodeOutputHandle + ] = [] # first node in list is node output handle that outputs the asset + + if curr_node_handle not in outputs_by_graph_handle: + dependency_node_output_handles.append(node_output_handle) + else: # is graph + dep_node_output_handle = outputs_by_graph_handle[curr_node_handle][ + node_output_handle.output_name + ] + dependency_node_output_handles.extend( + _get_dependency_node_output_handles( + non_asset_inputs_by_node_handle, + outputs_by_graph_handle, + dep_node_output_handles_by_node_output_handle, + dep_node_output_handle, + ) + ) + for dep_node_output_handle in non_asset_inputs_by_node_handle[curr_node_handle]: + dependency_node_output_handles.extend( + _get_dependency_node_output_handles( + non_asset_inputs_by_node_handle, + outputs_by_graph_handle, + dep_node_output_handles_by_node_output_handle, + dep_node_output_handle, + ) + ) + + if curr_node_handle not in outputs_by_graph_handle: + dep_node_output_handles_by_node_output_handle[node_output_handle] = ( + dependency_node_output_handles + ) + + return dependency_node_output_handles + + +def get_dep_node_handles_of_graph_backed_asset( + graph_def: "GraphDefinition", assets_def: "AssetsDefinition" +) -> Mapping["AssetKeyOrCheckKey", Set[NodeHandle]]: + """Given a graph-backed asset with graph_def, return a mapping of asset keys outputted by the graph + to a list of node handles within graph_def that are the dependencies of the asset. + + Arguments: + graph_def: The graph definition of the graph-backed asset. + assets_def: The assets definition of the graph-backed asset. + + """ + # asset_key_to_dep_node_handles takes in a graph_def that represents the entire job, where each + # node is a top-level asset node. Create a dummy graph that wraps around graph_def and pass + # the dummy graph to asset_key_to_dep_node_handles + from .graph_definition import GraphDefinition + + dummy_parent_graph = GraphDefinition("dummy_parent_graph", node_defs=[graph_def]) + (dep_node_handles_by_asset_key, _) = asset_or_check_key_to_dep_node_handles( + dummy_parent_graph, + {NodeHandle(name=graph_def.name, parent=None): assets_def}, + ) + return dep_node_handles_by_asset_key + + +def asset_or_check_key_to_dep_node_handles( + graph_def: "GraphDefinition", + assets_defs_by_node_handle: Mapping[NodeHandle, "AssetsDefinition"], +) -> Tuple[ + Mapping["AssetKeyOrCheckKey", Set[NodeHandle]], + Mapping["AssetKeyOrCheckKey", Sequence[NodeOutputHandle]], +]: + """For each asset in assets_defs_by_node_handle, determines all the op handles and output handles + within the asset's node that are upstream dependencies of the asset. + + Returns a tuple with two objects: + 1. A mapping of each asset or check key to a set of node handles that are upstream dependencies of the asset. + 2. A mapping of each asset or check key to a list of node output handles that are upstream dependencies of the asset. + + Arguments: + graph_def: The graph definition of the job, where each top level node is an asset. + assets_defs_by_node_handle: A mapping of each node handle to the asset definition for that node. + """ + # A mapping of all node handles to all upstream node handles + # that are not assets. Each key is a node handle with node output handle value + non_asset_inputs_by_node_handle: Dict[NodeHandle, Sequence[NodeOutputHandle]] = {} + + # A mapping of every graph node handle to a dictionary with each out + # name as a key and node output handle value + outputs_by_graph_handle: Dict[NodeHandle, Mapping[str, NodeOutputHandle]] = {} + _build_graph_dependencies( + graph_def=graph_def, + parent_handle=None, + outputs_by_graph_handle=outputs_by_graph_handle, + non_asset_inputs_by_node_handle=non_asset_inputs_by_node_handle, + assets_defs_by_node_handle=assets_defs_by_node_handle, + ) + + dep_nodes_by_asset_or_check_key: Dict["AssetKeyOrCheckKey", List[NodeHandle]] = {} + dep_node_outputs_by_asset_or_check_key: Dict["AssetKeyOrCheckKey", List[NodeOutputHandle]] = {} + + for node_handle, assets_defs in assets_defs_by_node_handle.items(): + dep_node_output_handles_by_node: Dict[ + NodeOutputHandle, Sequence[NodeOutputHandle] + ] = {} # memoized map of node output handles to all node output handle dependencies that are from ops + for ( + output_name, + asset_or_check_key, + ) in assets_defs.asset_and_check_keys_by_output_name.items(): + dep_nodes_by_asset_or_check_key[ + asset_or_check_key + ] = [] # first element in list is node that outputs asset + + dep_node_outputs_by_asset_or_check_key[asset_or_check_key] = [] + + if node_handle not in outputs_by_graph_handle: + dep_nodes_by_asset_or_check_key[asset_or_check_key].extend([node_handle]) + else: # is graph + # node output handle for the given asset key + node_output_handle = outputs_by_graph_handle[node_handle][output_name] + + dep_node_output_handles = _get_dependency_node_output_handles( + non_asset_inputs_by_node_handle, + outputs_by_graph_handle, + dep_node_output_handles_by_node, + node_output_handle, + ) + + dep_node_outputs_by_asset_or_check_key[asset_or_check_key].extend( + dep_node_output_handles + ) + + # handle internal_asset_deps within graph-backed assets + for assets_def in assets_defs_by_node_handle.values(): + for asset_key, dep_asset_keys in assets_def.asset_deps.items(): + if asset_key not in assets_def.keys: + continue + for dep_asset_key in [key for key in dep_asset_keys if key in assets_def.keys]: + if len(dep_node_outputs_by_asset_or_check_key[asset_key]) == 0: + # This case occurs when the asset is not yielded from a graph-backed asset + continue + node_output_handle = dep_node_outputs_by_asset_or_check_key[asset_key][ + 0 + ] # first item in list is the original node output handle that outputs the asset + dep_asset_key_node_output_handles = [ + output_handle + for output_handle in dep_node_outputs_by_asset_or_check_key[dep_asset_key] + if output_handle != node_output_handle + ] + dep_node_outputs_by_asset_or_check_key[asset_key] = [ + node_output + for node_output in dep_node_outputs_by_asset_or_check_key[asset_key] + if node_output not in dep_asset_key_node_output_handles + ] + + # For graph-backed assets, we've resolved the upstream node output handles dependencies for each + # node output handle in dep_node_outputs_by_asset_or_check_key. We use this to find the upstream + # node handle dependencies. + for key, dep_node_outputs in dep_node_outputs_by_asset_or_check_key.items(): + dep_nodes_by_asset_or_check_key[key].extend( + [node_output.node_handle for node_output in dep_node_outputs] + ) + + dep_node_set_by_asset_or_check_key: Dict["AssetKeyOrCheckKey", Set[NodeHandle]] = {} + for key, dep_node_handles in dep_nodes_by_asset_or_check_key.items(): + dep_node_set_by_asset_or_check_key[key] = set(dep_node_handles) + return dep_node_set_by_asset_or_check_key, dep_node_outputs_by_asset_or_check_key