diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 58df1665daf06..829af7eca94d3 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -8,18 +8,22 @@ AbstractSet, Callable, Dict, + Generic, Iterable, Iterator, Mapping, NamedTuple, + NoReturn, Optional, Sequence, Set, + TypeVar, Union, cast, ) import toposort +from typing_extensions import Self import dagster._check as check from dagster._core.definitions.asset_subset import ValidAssetSubset @@ -67,57 +71,210 @@ class ParentsPartitionsResult(NamedTuple): required_but_nonexistent_parents_partitions: AbstractSet[AssetKeyPartitionKey] -class AssetGraph(ABC): +class AssetNode(ABC): + key: AssetKey + _children: Optional[AbstractSet[Self]] + _parents: Optional[AbstractSet[Self]] + + # Since both parent and child asset nodes contain refereneces to each other, it is impossible to + # construct a graph of all asset nodes with single-step construction. The nodes must first be + # constructed and then `set_neighbors` must be called to bind the references. + + @property + def children(self) -> AbstractSet[Self]: + if self._children is None: + self._neighbors_unbound_error("child", "children") + return self._children + + @property + def child_keys(self) -> AbstractSet[AssetKey]: + if self._children is None: + self._neighbors_unbound_error("child", "children") + return {child.key for child in self._children} + + def set_children(self, children: AbstractSet[Self]) -> None: + self._children = children + + @property + def parents(self) -> AbstractSet[Self]: + if self._parents is None: + self._neighbors_unbound_error("parent", "parents") + return self._parents + + @property + def parent_keys(self) -> AbstractSet[AssetKey]: + if self._parents is None: + self._neighbors_unbound_error("parent", "parents") + return {parent.key for parent in self._parents} + + def set_parents(self, parents: AbstractSet[Self]) -> None: + self._parents = parents + + def _neighbors_unbound_error(self, subject: str, plural: str) -> NoReturn: + check.failed( + f"Attempted to access {subject} nodes of {self} before they were set. {subject.title()}" + f" nodes must be bound after construction using `set_{plural}`." + ) + @property @abstractmethod - def asset_dep_graph(self) -> DependencyGraph[AssetKey]: + def group_name(self) -> Optional[str]: ... + @property @abstractmethod - def has_asset(self, asset_key: AssetKey) -> bool: + def is_materializable(self) -> bool: ... @property @abstractmethod - def all_asset_keys(self) -> AbstractSet[AssetKey]: + def is_observable(self) -> bool: ... @property @abstractmethod - def materializable_asset_keys(self) -> AbstractSet[AssetKey]: + def is_external(self) -> bool: ... + @property @abstractmethod - def is_materializable(self, asset_key: AssetKey) -> bool: + def is_executable(self) -> bool: ... @property @abstractmethod - def observable_asset_keys(self) -> AbstractSet[AssetKey]: + def is_partitioned(self) -> bool: ... + @property @abstractmethod - def is_observable(self, asset_key: AssetKey) -> bool: + def partitions_def(self) -> Optional[PartitionsDefinition]: ... @property @abstractmethod - def external_asset_keys(self) -> AbstractSet[AssetKey]: + def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]: ... + def get_partition_mapping(self, parent_asset: Self) -> PartitionMapping: + return infer_partition_mapping( + self.partition_mappings.get(parent_asset.key), + self.partitions_def, + parent_asset.partitions_def, + ) + + @property @abstractmethod - def is_external(self, asset_key: AssetKey) -> bool: + def freshness_policy(self) -> Optional[FreshnessPolicy]: ... @property @abstractmethod - def executable_asset_keys(self) -> AbstractSet[AssetKey]: + def auto_materialize_policy(self) -> Optional[AutoMaterializePolicy]: ... + @property + @abstractmethod + def auto_observe_interval_minutes(self) -> Optional[float]: + ... + + @property @abstractmethod - def is_executable(self, asset_key: AssetKey) -> bool: + def backfill_policy(self) -> Optional[BackfillPolicy]: ... + @property + @abstractmethod + def code_version(self) -> Optional[str]: + ... + + @property + @abstractmethod + def check_keys(self) -> AbstractSet[AssetCheckKey]: + ... + + @property + @abstractmethod + def execution_unit_asset_keys(self) -> AbstractSet[AssetKey]: + ... + + @property + @abstractmethod + def execution_unit_asset_and_check_keys(self) -> AbstractSet[Union[AssetKey, AssetCheckKey]]: + ... + + def __str__(self) -> str: + return f"{self.__class__.__name__}<{self.key.to_user_string()}>" + + +T_AssetNode = TypeVar("T_AssetNode", bound=AssetNode) + + +class AssetGraph(ABC, Generic[T_AssetNode]): + asset_nodes_by_key: Mapping[AssetKey, T_AssetNode] + asset_nodes_by_check_key: Mapping[AssetCheckKey, T_AssetNode] + + @property + def asset_nodes(self) -> Iterable[T_AssetNode]: + return self.asset_nodes_by_key.values() + + def has(self, asset_key: AssetKey) -> bool: + return asset_key in self.asset_nodes_by_key + + # To be removed in upstack PR and callsites replaced with `has` + def has_asset(self, asset_key: AssetKey) -> bool: + return self.has(asset_key) + + def get(self, asset_key: AssetKey) -> T_AssetNode: + return self.asset_nodes_by_key[asset_key] + + def get_for_check(self, asset_key: AssetCheckKey) -> T_AssetNode: + return self.asset_nodes_by_check_key[asset_key] + + @property + def asset_dep_graph(self) -> DependencyGraph[AssetKey]: + return { + "upstream": {node.key: {p.key for p in node.parents} for node in self.asset_nodes}, + "downstream": {node.key: {c.key for c in node.children} for node in self.asset_nodes}, + } + + @property + @cached_method + def all_asset_keys(self) -> AbstractSet[AssetKey]: + return {node.key for node in self.asset_nodes} + + @property + @cached_method + def materializable_asset_keys(self) -> AbstractSet[AssetKey]: + return {node.key for node in self.asset_nodes if node.is_materializable} + + def is_materializable(self, key: AssetKey) -> bool: + return self.get(key).is_materializable + + @property + @cached_method + def observable_asset_keys(self) -> AbstractSet[AssetKey]: + return {node.key for node in self.asset_nodes if node.is_observable} + + def is_observable(self, key: AssetKey) -> bool: + return self.get(key).is_observable + + @property + @cached_method + def external_asset_keys(self) -> AbstractSet[AssetKey]: + return {node.key for node in self.asset_nodes if node.is_external} + + def is_external(self, key: AssetKey) -> bool: + return self.get(key).is_external + + @property + @cached_method + def executable_asset_keys(self) -> AbstractSet[AssetKey]: + return {node.key for node in self.asset_nodes if node.is_executable} + + def is_executable(self, key: AssetKey) -> bool: + return self.get(key).is_executable + @property @cached_method def toposorted_asset_keys(self) -> Sequence[AssetKey]: @@ -140,18 +297,19 @@ def toposorted_asset_keys_by_level(self) -> Sequence[AbstractSet[AssetKey]]: {key for key in level} for level in toposort.toposort(self.asset_dep_graph["upstream"]) ] - @abstractmethod def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]: - ... + return {node.key for node in self.asset_nodes if node.group_name == group_name} - @functools.cached_property + @property + @cached_method def root_materializable_asset_keys(self) -> AbstractSet[AssetKey]: """Materializable asset keys that have no materializable parents.""" from .asset_selection import AssetSelection return AssetSelection.keys(*self.materializable_asset_keys).roots().resolve(self) - @functools.cached_property + @property + @cached_method def root_executable_asset_keys(self) -> AbstractSet[AssetKey]: """Executable asset keys that have no executable parents.""" return fetch_sources( @@ -159,27 +317,23 @@ def root_executable_asset_keys(self) -> AbstractSet[AssetKey]: ) @property - @abstractmethod + @cached_method + def asset_check_keys(self) -> AbstractSet[AssetCheckKey]: + return {key for asset in self.asset_nodes for key in asset.check_keys} + + @property + @cached_method def all_group_names(self) -> AbstractSet[str]: - ... + return {a.group_name for a in self.asset_nodes if a.group_name is not None} - @abstractmethod def get_partitions_def(self, asset_key: AssetKey) -> Optional[PartitionsDefinition]: - ... - - @abstractmethod - def get_partition_mappings(self, asset_key: AssetKey) -> Mapping[AssetKey, PartitionMapping]: - ... + # Performing an existence check temporarily until we change callsites + return self.get(asset_key).partitions_def if self.has(asset_key) else None def get_partition_mapping( self, asset_key: AssetKey, in_asset_key: AssetKey ) -> PartitionMapping: - partition_mappings = self.get_partition_mappings(asset_key) - return infer_partition_mapping( - partition_mappings.get(in_asset_key), - self.get_partitions_def(asset_key), - self.get_partitions_def(in_asset_key), - ) + return self.get(asset_key).get_partition_mapping(self.get(in_asset_key)) def get_partitions_in_range( self, @@ -199,38 +353,32 @@ def get_partitions_in_range( def is_partitioned(self, asset_key: AssetKey) -> bool: return self.get_partitions_def(asset_key) is not None - @abstractmethod def get_group_name(self, asset_key: AssetKey) -> Optional[str]: - ... + return self.get(asset_key).group_name - @abstractmethod def get_freshness_policy(self, asset_key: AssetKey) -> Optional[FreshnessPolicy]: - ... + return self.get(asset_key).freshness_policy - @abstractmethod def get_auto_materialize_policy(self, asset_key: AssetKey) -> Optional[AutoMaterializePolicy]: - ... + return self.get(asset_key).auto_materialize_policy - @abstractmethod def get_auto_observe_interval_minutes(self, asset_key: AssetKey) -> Optional[float]: - ... + return self.get(asset_key).auto_observe_interval_minutes - @abstractmethod def get_backfill_policy(self, asset_key: AssetKey) -> Optional[BackfillPolicy]: - ... + return self.get(asset_key).backfill_policy - @abstractmethod def get_code_version(self, asset_key: AssetKey) -> Optional[str]: - ... + return self.get(asset_key).code_version def have_same_partitioning(self, asset_key1: AssetKey, asset_key2: AssetKey) -> bool: """Returns whether the given assets have the same partitions definition.""" - return self.get_partitions_def(asset_key1) == self.get_partitions_def(asset_key2) + return self.get(asset_key1).partitions_def == self.get(asset_key2).partitions_def def have_same_or_no_partitioning(self, asset_keys: Iterable[AssetKey]) -> bool: partitions_defs = [] for asset_key in asset_keys: - partitions_def = self.get_partitions_def(asset_key) + partitions_def = self.get(asset_key).partitions_def if partitions_def: partitions_defs.append(partitions_def) @@ -240,25 +388,25 @@ def have_same_or_no_partitioning(self, asset_keys: Iterable[AssetKey]) -> bool: def get_children(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: """Returns all assets that depend on the given asset.""" - return self.asset_dep_graph["downstream"][asset_key] + return self.get(asset_key).child_keys def get_parents(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: """Returns all first-order dependencies of an asset.""" - return self.asset_dep_graph["upstream"].get(asset_key) or set() + return self.get(asset_key).parent_keys def get_ancestors( self, asset_key: AssetKey, include_self: bool = False ) -> AbstractSet[AssetKey]: """Returns all nth-order dependencies of an asset.""" ancestors = set() - next_parents = self.get_parents(asset_key) - {asset_key} # remove self-dependencies + next_parents = self.get(asset_key).parent_keys - {asset_key} # remove self-dependencies while next_parents: pending_next_parents = set() for node_key in next_parents: if node_key in ancestors: continue ancestors.add(node_key) - pending_next_parents.update(self.get_parents(node_key)) + pending_next_parents.update(self.get(node_key).parent_keys) next_parents = pending_next_parents @@ -342,18 +490,18 @@ def get_children_partitions( partition of that asset. """ result: Set[AssetKeyPartitionKey] = set() - for child_asset_key in self.get_children(asset_key): - if self.is_partitioned(child_asset_key): + for child in self.get(asset_key).children: + if child.is_partitioned: for child_partition_key in self.get_child_partition_keys_of_parent( dynamic_partitions_store, partition_key, asset_key, - child_asset_key, + child.key, current_time, ): - result.add(AssetKeyPartitionKey(child_asset_key, child_partition_key)) + result.add(AssetKeyPartitionKey(child.key, child_partition_key)) else: - result.add(AssetKeyPartitionKey(child_asset_key)) + result.add(AssetKeyPartitionKey(child.key)) return result def get_child_partition_keys_of_parent( @@ -420,7 +568,7 @@ def get_parents_partitions( valid_parent_partitions: Set[AssetKeyPartitionKey] = set() required_but_nonexistent_parent_partitions: Set[AssetKeyPartitionKey] = set() for parent_asset_key in self.get_parents(asset_key): - if self.has_asset(parent_asset_key) and self.is_partitioned(parent_asset_key): + if self.has(parent_asset_key) and self.is_partitioned(parent_asset_key): mapped_partitions_result = self.get_parent_partition_keys_for_child( partition_key, parent_asset_key, @@ -497,10 +645,10 @@ def get_parent_partition_keys_for_child( def has_materializable_parents(self, asset_key: AssetKey) -> bool: """Determines if an asset has any parents which are materializable.""" - if self.is_external(asset_key): + if self.get(asset_key).is_external: return False return any( - self.has_asset(parent_key) and self.is_materializable(parent_key) + self.has(parent_key) and self.get(parent_key).is_materializable for parent_key in self.get_parents(asset_key) - {asset_key} ) @@ -513,8 +661,8 @@ def get_materializable_roots(self, asset_key: AssetKey) -> AbstractSet[AssetKey] return { key for key in self.upstream_key_iterator(asset_key) - if self.has_asset(key) - and self.is_materializable(key) + if self.has(key) + and self.get(key).is_materializable and not self.has_materializable_parents(key) } @@ -530,12 +678,11 @@ def upstream_key_iterator(self, asset_key: AssetKey) -> Iterator[AssetKey]: queue.append(parent_key) visited.add(parent_key) - @abstractmethod def get_execution_unit_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: """For a given asset_key, return the set of asset keys that must be materialized at the same time. """ - ... + return self.get(asset_key).execution_unit_asset_keys @abstractmethod def get_execution_unit_asset_and_check_keys( @@ -550,6 +697,7 @@ def get_execution_unit_asset_and_check_keys( def get_downstream_freshness_policies( self, *, asset_key: AssetKey ) -> AbstractSet[FreshnessPolicy]: + asset = self.get(asset_key) downstream_policies = set().union( *( self.get_downstream_freshness_policies(asset_key=child_key) @@ -557,9 +705,8 @@ def get_downstream_freshness_policies( if child_key != asset_key ) ) - current_policy = self.get_freshness_policy(asset_key) - if self.get_partitions_def(asset_key) is None and current_policy is not None: - downstream_policies.add(current_policy) + if asset.partitions_def is None and asset.freshness_policy is not None: + downstream_policies.add(asset.freshness_policy) return downstream_policies diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py index 899fcff62c049..bb90ee7544b01 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py @@ -1,3 +1,4 @@ +import itertools import warnings from collections import defaultdict from typing import ( @@ -10,6 +11,7 @@ Mapping, Optional, Sequence, + Set, Tuple, ) @@ -20,11 +22,10 @@ from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.host_representation.external import ExternalRepository from dagster._core.host_representation.handle import RepositoryHandle -from dagster._core.selector.subset_selector import DependencyGraph from dagster._core.workspace.workspace import IWorkspace from dagster._utils.cached_method import cached_method -from .asset_graph import AssetGraph, AssetKeyOrCheckKey +from .asset_graph import AssetGraph, AssetKeyOrCheckKey, AssetNode from .backfill_policy import BackfillPolicy from .events import AssetKey from .freshness_policy import FreshnessPolicy @@ -38,15 +39,168 @@ ) -class ExternalAssetGraph(AssetGraph): +class GlobalAssetNode(AssetNode): def __init__( self, - asset_nodes_by_key: Mapping[AssetKey, "ExternalAssetNode"], + key: AssetKey, + repo_node_pairs: Sequence[Tuple[RepositoryHandle, "ExternalAssetNode"]], + check_keys: AbstractSet[AssetCheckKey], + execution_unit_keys: AbstractSet[AssetKeyOrCheckKey], + ): + self.key = key + self._repo_node_pairs = repo_node_pairs + self._external_asset_nodes = [node for _, node in repo_node_pairs] + self._check_keys = check_keys + self._execution_unit_keys = execution_unit_keys + + ##### COMMON ASSET NODE INTERFACE + + @property + @cached_method + def group_name(self) -> Optional[str]: + return self._priority_node.group_name + + @property + @cached_method + def is_materializable(self) -> bool: + return any(node.is_materializable for node in self._external_asset_nodes) + + @property + @cached_method + def is_observable(self) -> bool: + return any(node.is_observable for node in self._external_asset_nodes) + + @property + @cached_method + def is_external(self) -> bool: + return all(node.is_external for node in self._external_asset_nodes) + + @property + @cached_method + def is_executable(self) -> bool: + return any(node.is_executable for node in self._external_asset_nodes) + + @property + def is_partitioned(self) -> bool: + return self._priority_node.partitions_def_data is not None + + @property + @cached_method + def partitions_def(self) -> Optional[PartitionsDefinition]: + external_def = self._priority_node.partitions_def_data + return external_def.get_partitions_definition() if external_def else None + + @property + def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]: + if self.is_materializable: + return { + dep.upstream_asset_key: dep.partition_mapping + for dep in self._materializable_node.dependencies + if dep.partition_mapping is not None + } + else: + return {} + + @property + def freshness_policy(self) -> Optional[FreshnessPolicy]: + # It is currently not possible to access the freshness policy for an observation definition + # if a materialization definition also exists. This needs to be fixed. + return self._priority_node.freshness_policy + + @property + def auto_materialize_policy(self) -> Optional[AutoMaterializePolicy]: + return self._materializable_node.auto_materialize_policy if self.is_materializable else None + + @property + def auto_observe_interval_minutes(self) -> Optional[float]: + return self._observable_node.auto_observe_interval_minutes if self.is_observable else None + + @property + def backfill_policy(self) -> Optional[BackfillPolicy]: + return self._materializable_node.backfill_policy if self.is_materializable else None + + @property + def code_version(self) -> Optional[str]: + # It is currently not possible to access the code version for an observation definition if a + # materialization definition also exists. This needs to be fixed. + return self._priority_node.code_version + + @property + def check_keys(self) -> AbstractSet[AssetCheckKey]: + return self._check_keys + + @property + def execution_unit_asset_keys(self) -> AbstractSet[AssetKey]: + return {k for k in self.execution_unit_asset_and_check_keys if isinstance(k, AssetKey)} + + @property + def execution_unit_asset_and_check_keys(self) -> AbstractSet[AssetKeyOrCheckKey]: + return self._execution_unit_keys + + ##### GLOBAL-SPECIFIC INTERFACE + + @property + def job_names(self) -> Sequence[str]: + # It is currently not possible to access the job names for an observation definition if a + # materialization definition also exists. This needs to be fixed. + return self._priority_node.job_names if self.is_executable else [] + + @property + def priority_repository_handle(self) -> RepositoryHandle: + # This property supports existing behavior but it should be phased out, because it relies on + # materialization nodes shadowing observation nodes that would otherwise be exposed. + return next( + itertools.chain( + (repo for repo, node in self._repo_node_pairs if node.is_materializable), + (repo for repo, node in self._repo_node_pairs if node.is_observable), + (repo for repo, node in self._repo_node_pairs), + ) + ) + + @property + def repository_handles(self) -> Sequence[RepositoryHandle]: + return [repo_handle for repo_handle, _ in self._repo_node_pairs] + + ##### HELPERS + + @property + @cached_method + def _priority_node(self) -> "ExternalAssetNode": + # Return a materialization node if it exists, otherwise return an observable node if it + # exists, otherwise return any node. This exists to preserve implicit behavior, where the + # materialization node was previously preferred over the observable node. This is a + # temporary measure until we can appropriately scope the accessors that could apply to + # either a materialization or observation node. + return next( + itertools.chain( + (node for node in self._external_asset_nodes if node.is_materializable), + (node for node in self._external_asset_nodes if node.is_observable), + (node for node in self._external_asset_nodes), + ) + ) + + @property + @cached_method + def _materializable_node(self) -> "ExternalAssetNode": + return next(node for node in self._external_asset_nodes if node.is_materializable) + + @property + @cached_method + def _observable_node(self) -> "ExternalAssetNode": + return next((node for node in self._external_asset_nodes if node.is_observable)) + + +class ExternalAssetGraph(AssetGraph[GlobalAssetNode]): + def __init__( + self, + asset_nodes_by_key: Mapping[AssetKey, GlobalAssetNode], asset_checks_by_key: Mapping[AssetCheckKey, "ExternalAssetCheck"], + asset_check_execution_units_by_key: Mapping[AssetCheckKey, AbstractSet[AssetKeyOrCheckKey]], repo_handles_by_key: Mapping[AssetKey, RepositoryHandle], ): - self._asset_nodes_by_key = asset_nodes_by_key + self.asset_nodes_by_key = asset_nodes_by_key self._asset_checks_by_key = asset_checks_by_key + self._asset_check_execution_units_by_key = asset_check_execution_units_by_key self._repo_handles_by_key = repo_handles_by_key @classmethod @@ -95,177 +249,94 @@ def from_repository_handles_and_external_asset_nodes( repo_handle_external_asset_nodes: Sequence[Tuple[RepositoryHandle, "ExternalAssetNode"]], external_asset_checks: Sequence["ExternalAssetCheck"], ) -> "ExternalAssetGraph": + _warn_on_duplicate_nodes(repo_handle_external_asset_nodes) + repo_handles_by_key = { node.asset_key: repo_handle for repo_handle, node in repo_handle_external_asset_nodes if node.is_executable } - # Split the nodes into materializable, observable, and unexecutable nodes. Observable and - # unexecutable `ExternalAssetNode` represent both source and external assets-- the - # "External" in "ExternalAssetNode" is unrelated to the "external" in "external asset", this - # is just an unfortunate naming collision. `ExternalAssetNode` will be renamed eventually. - materializable_node_pairs: List[Tuple[RepositoryHandle, "ExternalAssetNode"]] = [] - observable_node_pairs: List[Tuple[RepositoryHandle, "ExternalAssetNode"]] = [] - unexecutable_node_pairs: List[Tuple[RepositoryHandle, "ExternalAssetNode"]] = [] + # Build an index of execution units by key. An execution unit is a set of assets and checks + # that must be executed together. ExternalAssetNodes and ExternalAssetChecks already have an + # optional execution_unit_id set. A null execution_unit_id indicates that the node or check + # can be executed independently. + execution_units_by_key = _build_execution_unit_index( + (node for _, node in repo_handle_external_asset_nodes), + external_asset_checks, + ) + + # Index all (RepositoryHandle, ExternalAssetNode) pairs by their asset key, then use this to + # build the set of GlobalAssetNodes (indexed by key). Each GlobalAssetNode wraps the set of + # pairs for an asset key. + repo_node_pairs_by_key: Dict[ + AssetKey, List[Tuple[RepositoryHandle, "ExternalAssetNode"]] + ] = defaultdict(list) for repo_handle, node in repo_handle_external_asset_nodes: - if node.is_source and node.is_observable: - observable_node_pairs.append((repo_handle, node)) - elif node.is_source: - unexecutable_node_pairs.append((repo_handle, node)) - else: - materializable_node_pairs.append((repo_handle, node)) - - asset_nodes_by_key = {} - - _warn_on_duplicate_nodes(materializable_node_pairs, AssetExecutionType.MATERIALIZATION) - _warn_on_duplicate_nodes(observable_node_pairs, AssetExecutionType.OBSERVATION) - - # It is possible for multiple nodes to exist that share the same key. This is invalid if - # more than one node is materializable or if more than one node is observable. It is valid - # if there is at most one materializable node and at most one observable node, with all - # other nodes unexecutable. The asset graph will receive only a single `ExternalAssetNode` - # representing the asset. This will always be the materializable node if one exists; then - # the observable node if it exists; then finally the first-encountered unexecutable node. - for repo_handle, node in materializable_node_pairs: - asset_nodes_by_key[node.asset_key] = node - - for repo_handle, node in observable_node_pairs: - if node.asset_key in asset_nodes_by_key: - current_node = asset_nodes_by_key[node.asset_key] - asset_nodes_by_key[node.asset_key] = current_node._replace(is_observable=True) - else: - asset_nodes_by_key[node.asset_key] = node - - for repo_handle, node in unexecutable_node_pairs: - if node.asset_key in asset_nodes_by_key: - continue - asset_nodes_by_key[node.asset_key] = node + repo_node_pairs_by_key[node.asset_key].append((repo_handle, node)) + + global_nodes_by_key = { + key: GlobalAssetNode( + key=key, + repo_node_pairs=repo_node_pairs, + check_keys={check.key for check in external_asset_checks if check.asset_key == key}, + execution_unit_keys=execution_units_by_key[key], + ) + for key, repo_node_pairs in repo_node_pairs_by_key.items() + } + # Build the dependency graph of asset keys, then use it to set the children and parents of + # the above-constructed GlobalAssetNodes. + all_keys = {node.asset_key for _, node in repo_handle_external_asset_nodes} + upstream = {key: set() for key in all_keys} + downstream = {key: set() for key in all_keys} + for _, node in repo_handle_external_asset_nodes: + for dep in node.dependencies: + upstream[node.asset_key].add(dep.upstream_asset_key) + downstream[dep.upstream_asset_key].add(node.asset_key) + for key, node in global_nodes_by_key.items(): + node.set_children({global_nodes_by_key[k] for k in downstream[key]}) + node.set_parents({global_nodes_by_key[k] for k in upstream[key]}) + + # Build the set of ExternalAssetChecks, indexed by key. Also the index of execution units for + # each asset check key. asset_checks_by_key: Dict[AssetCheckKey, "ExternalAssetCheck"] = {} for asset_check in external_asset_checks: asset_checks_by_key[asset_check.key] = asset_check + asset_check_execution_units_by_key = { + k: v for k, v in execution_units_by_key.items() if isinstance(k, AssetCheckKey) + } return cls( - asset_nodes_by_key, + global_nodes_by_key, asset_checks_by_key, + asset_check_execution_units_by_key, repo_handles_by_key=repo_handles_by_key, ) - @property - def asset_nodes(self) -> Sequence["ExternalAssetNode"]: - return list(self._asset_nodes_by_key.values()) - - def get_asset_node(self, asset_key: AssetKey) -> "ExternalAssetNode": - return self._asset_nodes_by_key[asset_key] - - def has_asset(self, asset_key: AssetKey) -> bool: - return asset_key in self._asset_nodes_by_key - - @property - def asset_checks(self) -> Sequence["ExternalAssetCheck"]: - return list(self._asset_checks_by_key.values()) + ##### COMMON ASSET GRAPH INTERFACE - def get_asset_check(self, asset_check_key: AssetCheckKey) -> "ExternalAssetCheck": - return self._asset_checks_by_key[asset_check_key] - - @property - @cached_method - def asset_dep_graph(self) -> DependencyGraph[AssetKey]: - upstream = {node.asset_key: set() for node in self.asset_nodes} - downstream = {node.asset_key: set() for node in self.asset_nodes} - for node in self.asset_nodes: - for dep in node.dependencies: - upstream[node.asset_key].add(dep.upstream_asset_key) - downstream[dep.upstream_asset_key].add(node.asset_key) - return {"upstream": upstream, "downstream": downstream} - - @property - @cached_method - def all_asset_keys(self) -> AbstractSet[AssetKey]: - return {node.asset_key for node in self.asset_nodes} - - @property - @cached_method - def materializable_asset_keys(self) -> AbstractSet[AssetKey]: - return { - node.asset_key - for node in self.asset_nodes - if node.execution_type == AssetExecutionType.MATERIALIZATION - } - - def is_materializable(self, asset_key: AssetKey) -> bool: - return self.get_asset_node(asset_key).execution_type == AssetExecutionType.MATERIALIZATION - - @property - @cached_method - def observable_asset_keys(self) -> AbstractSet[AssetKey]: - return { - node.asset_key - for node in self.asset_nodes - # check the separate `is_observable` field because `execution_type` will be - # `MATERIALIZATION` if there exists a materializable version of the asset - if node.is_observable - } + def get_execution_unit_asset_and_check_keys( + self, asset_or_check_key: AssetKeyOrCheckKey + ) -> AbstractSet[AssetKeyOrCheckKey]: + if isinstance(asset_or_check_key, AssetKey): + return self.get(asset_or_check_key).execution_unit_asset_and_check_keys + else: # AssetCheckKey + return self._asset_check_execution_units_by_key[asset_or_check_key] - def is_observable(self, asset_key: AssetKey) -> bool: - return self.get_asset_node(asset_key).is_observable + ##### GLOBAL-SPECIFIC METHODS @property - @cached_method - def external_asset_keys(self) -> AbstractSet[AssetKey]: - return { - node.asset_key - for node in self.asset_nodes - if node.execution_type != AssetExecutionType.MATERIALIZATION - } - - def is_external(self, asset_key: AssetKey) -> bool: - return self.get_asset_node(asset_key).execution_type != AssetExecutionType.MATERIALIZATION + def external_asset_nodes_by_key(self) -> Mapping[AssetKey, "ExternalAssetNode"]: + # This exists to support existing callsites but it should be removed ASAP. + return {k: node._priority_node for k, node in self.asset_nodes_by_key.items()} # noqa: SLF001 @property - @cached_method - def executable_asset_keys(self) -> AbstractSet[AssetKey]: - return {node.asset_key for node in self.asset_nodes if node.is_executable} - - def is_executable(self, asset_key: AssetKey) -> bool: - return self.get_asset_node(asset_key).execution_type != AssetExecutionType.UNEXECUTABLE - - def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]: - return {node.asset_key for node in self.asset_nodes if node.group_name == group_name} + def asset_checks(self) -> Sequence["ExternalAssetCheck"]: + return list(dict.fromkeys(self._asset_checks_by_key.values())) def asset_keys_for_job(self, job_name: str) -> AbstractSet[AssetKey]: - return {node.asset_key for node in self.asset_nodes if job_name in node.job_names} - - def get_execution_unit_asset_and_check_keys( - self, asset_or_check_key: AssetKeyOrCheckKey - ) -> AbstractSet[AssetKeyOrCheckKey]: - if isinstance(asset_or_check_key, AssetKey): - execution_unit_id = self.get_asset_node(asset_or_check_key).execution_unit_id - else: # AssetCheckKey - execution_unit_id = self.get_asset_check(asset_or_check_key).execution_unit_id - if execution_unit_id is None: - return {asset_or_check_key} - else: - return { - *( - node.asset_key - for node in self.asset_nodes - if node.execution_unit_id == execution_unit_id - ), - *( - node.key - for node in self.asset_checks - if node.execution_unit_id == execution_unit_id - ), - } - - def get_execution_unit_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: - return { - key - for key in self.get_execution_unit_asset_and_check_keys(asset_key) - if isinstance(key, AssetKey) - } + return {node.key for node in self.asset_nodes if job_name in node.job_names} @property @cached_method @@ -274,50 +345,17 @@ def all_job_names(self) -> AbstractSet[str]: @property @cached_method - def all_group_names(self) -> AbstractSet[str]: - return {node.group_name for node in self.asset_nodes if node.group_name} - - def get_partitions_def(self, asset_key: AssetKey) -> Optional[PartitionsDefinition]: - external_def = self.get_asset_node(asset_key).partitions_def_data - return external_def.get_partitions_definition() if external_def else None - - def get_partition_mappings( - self, asset_key: AssetKey - ) -> Optional[Mapping[AssetKey, PartitionMapping]]: - return { - dep.upstream_asset_key: dep.partition_mapping - for dep in self.get_asset_node(asset_key).dependencies - if dep.partition_mapping is not None - } - - def get_group_name(self, asset_key: AssetKey) -> Optional[str]: - return self.get_asset_node(asset_key).group_name - - def get_freshness_policy(self, asset_key: AssetKey) -> Optional[FreshnessPolicy]: - return self.get_asset_node(asset_key).freshness_policy - - def get_auto_materialize_policy(self, asset_key: AssetKey) -> Optional[AutoMaterializePolicy]: - return self.get_asset_node(asset_key).auto_materialize_policy - - def get_auto_observe_interval_minutes(self, asset_key: AssetKey) -> Optional[float]: - return self.get_asset_node(asset_key).auto_observe_interval_minutes - - def get_backfill_policy(self, asset_key: AssetKey) -> Optional[BackfillPolicy]: - return self.get_asset_node(asset_key).backfill_policy - - def get_code_version(self, asset_key: AssetKey) -> Optional[str]: - return self.get_asset_node(asset_key).code_version - - @property - def repository_handles_by_key(self) -> Mapping[AssetKey, RepositoryHandle]: - return self._repo_handles_by_key + def repository_handles_by_key(self) -> Mapping[AssetKey, Sequence[RepositoryHandle]]: + return {k: node.priority_repository_handle for k, node in self.asset_nodes_by_key.items()} def get_repository_handle(self, asset_key: AssetKey) -> RepositoryHandle: - return self._repo_handles_by_key[asset_key] + return self.get(asset_key).priority_repository_handle def get_materialization_job_names(self, asset_key: AssetKey) -> Sequence[str]: """Returns the names of jobs that materialize this asset.""" - return self.get_asset_node(asset_key).job_names + # This is a poorly named method because it will expose observation job names for assets with + # a defined observation but no materialization. + return self.get(asset_key).job_names def get_materialization_asset_keys_for_job(self, job_name: str) -> Sequence[AssetKey]: """Returns asset keys that are targeted for materialization in the given job.""" @@ -337,7 +375,7 @@ def get_implicit_job_name_for_assets( Note: all asset_keys should be in the same repository. """ - if all(self.is_observable(asset_key) for asset_key in asset_keys): + if all(self.get(asset_key).is_observable for asset_key in asset_keys): if external_repo is None: check.failed( "external_repo must be passed in when getting job names for observable assets" @@ -397,6 +435,32 @@ def split_asset_keys_by_repository( def _warn_on_duplicate_nodes( + repo_handle_external_asset_nodes: Sequence[Tuple[RepositoryHandle, "ExternalAssetNode"]], +) -> None: + # Split the nodes into materializable, observable, and unexecutable nodes. Observable and + # unexecutable `ExternalAssetNode` represent both source and external assets-- the + # "External" in "ExternalAssetNode" is unrelated to the "external" in "external asset", this + # is just an unfortunate naming collision. `ExternalAssetNode` will be renamed eventually. + materializable_node_pairs: List[Tuple[RepositoryHandle, "ExternalAssetNode"]] = [] + observable_node_pairs: List[Tuple[RepositoryHandle, "ExternalAssetNode"]] = [] + unexecutable_node_pairs: List[Tuple[RepositoryHandle, "ExternalAssetNode"]] = [] + for repo_handle, node in repo_handle_external_asset_nodes: + if node.is_source and node.is_observable: + observable_node_pairs.append((repo_handle, node)) + elif node.is_source: + unexecutable_node_pairs.append((repo_handle, node)) + else: + materializable_node_pairs.append((repo_handle, node)) + + # It is possible for multiple nodes to exist that share the same key. This is invalid if + # more than one node is materializable or if more than one node is observable. It is valid + # if there is at most one materializable node and at most one observable node, with all + # other nodes unexecutable. + _warn_on_duplicates_within_subset(materializable_node_pairs, AssetExecutionType.MATERIALIZATION) + _warn_on_duplicates_within_subset(observable_node_pairs, AssetExecutionType.OBSERVATION) + + +def _warn_on_duplicates_within_subset( node_pairs: Sequence[Tuple[RepositoryHandle, "ExternalAssetNode"]], execution_type: AssetExecutionType, ) -> None: @@ -415,3 +479,27 @@ def _warn_on_duplicate_nodes( f"Found {execution_type.value} nodes for some asset keys in multiple code locations." f" Only one {execution_type.value} node is allowed per asset key. Duplicates:\n {duplicate_str}" ) + + +def _build_execution_unit_index( + external_asset_nodes: Iterable["ExternalAssetNode"], + external_asset_checks: Iterable["ExternalAssetCheck"], +) -> Mapping[AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]]: + from dagster._core.host_representation.external_data import ExternalAssetNode + + all_items = [*external_asset_nodes, *external_asset_checks] + + execution_units_by_id: Dict[str, Set[AssetKeyOrCheckKey]] = defaultdict(set) + for item in all_items: + id = item.execution_unit_id + key = item.asset_key if isinstance(item, ExternalAssetNode) else item.key + if id is not None: + execution_units_by_id[id].add(key) + + execution_units_by_key: Dict[AssetKeyOrCheckKey, Set[AssetKeyOrCheckKey]] = {} + for item in all_items: + id = item.execution_unit_id + key = item.asset_key if isinstance(item, ExternalAssetNode) else item.key + execution_units_by_key[key] = execution_units_by_id[id] if id is not None else {key} + + return execution_units_by_key diff --git a/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py index 0962394fafe0c..1d705989b88f6 100644 --- a/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py @@ -2,7 +2,7 @@ from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.asset_checks import AssetChecksDefinition -from dagster._core.definitions.asset_graph import AssetGraph, AssetKeyOrCheckKey +from dagster._core.definitions.asset_graph import AssetGraph, AssetKeyOrCheckKey, AssetNode from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy @@ -13,31 +13,114 @@ from dagster._core.definitions.partition_mapping import PartitionMapping from dagster._core.definitions.resolved_asset_deps import ResolvedAssetDependencies from dagster._core.definitions.source_asset import SourceAsset -from dagster._core.selector.subset_selector import DependencyGraph, generate_asset_dep_graph +from dagster._core.selector.subset_selector import generate_asset_dep_graph from dagster._utils.cached_method import cached_method -class InternalAssetGraph(AssetGraph): +class LocalAssetNode(AssetNode): + def __init__( + self, key: AssetKey, assets_def: AssetsDefinition, check_keys: AbstractSet[AssetCheckKey] + ): + self.key = key + self.assets_def = assets_def + self._check_keys = check_keys + + @property + @cached_method + def group_name(self) -> Optional[str]: + return self.assets_def.group_names_by_key.get(self.key) + + @property + def is_materializable(self) -> bool: + return self.assets_def.is_materializable + + @property + def is_observable(self) -> bool: + return self.assets_def.is_observable + + @property + def is_external(self) -> bool: + return self.assets_def.is_external + + @property + def is_executable(self) -> bool: + return self.assets_def.is_executable + + @property + def is_partitioned(self) -> bool: + return self.assets_def.partitions_def is not None + + @property + def partitions_def(self) -> Optional[PartitionsDefinition]: + return self.assets_def.partitions_def + + @property + def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]: + return self.assets_def.partition_mappings + + @property + def freshness_policy(self) -> Optional[FreshnessPolicy]: + return self.assets_def.freshness_policies_by_key.get(self.key) + + @property + def auto_materialize_policy(self) -> Optional[AutoMaterializePolicy]: + return self.assets_def.auto_materialize_policies_by_key.get(self.key) + + @property + def auto_observe_interval_minutes(self) -> Optional[float]: + return self.assets_def.auto_observe_interval_minutes + + @property + def backfill_policy(self) -> Optional[BackfillPolicy]: + return self.assets_def.backfill_policy + + @property + def code_version(self) -> Optional[str]: + return self.assets_def.code_versions_by_key.get(self.key) + + @property + def check_keys(self) -> AbstractSet[AssetCheckKey]: + return self._check_keys + + @property + def execution_unit_asset_keys(self) -> AbstractSet[AssetKey]: + return ( + {self.key} + if len(self.assets_def.keys) <= 1 or self.assets_def.can_subset + else self.assets_def.keys + ) + + @property + def execution_unit_asset_and_check_keys(self) -> AbstractSet[AssetKeyOrCheckKey]: + if self.assets_def.can_subset or ( + len(self.assets_def.keys) <= 1 and not len(self.assets_def.check_keys) > 0 + ): + return {self.key} + else: + return {*self.assets_def.keys, *self.assets_def.check_keys} + + +class InternalAssetGraph(AssetGraph[LocalAssetNode]): + _asset_checks_defs_by_key: Mapping[AssetCheckKey, AssetChecksDefinition] + def __init__( self, - assets_defs: Sequence[AssetsDefinition], - asset_checks_defs: Sequence[AssetChecksDefinition], + asset_nodes_by_key: Mapping[AssetKey, LocalAssetNode], + asset_checks_defs_by_key: Mapping[AssetCheckKey, AssetChecksDefinition], ): - self._assets_defs = assets_defs - self._assets_defs_by_key = {key: asset for asset in self._assets_defs for key in asset.keys} - self._assets_defs_by_check_key = { - **{check_key: asset for asset in assets_defs for check_key in asset.check_keys}, + self.asset_nodes_by_key = asset_nodes_by_key + self._asset_checks_defs_by_key = asset_checks_defs_by_key + self._asset_nodes_by_check_key = { **{ - check_key: self._assets_defs_by_key[check.asset_key] - for check in asset_checks_defs - for check_key in check.keys - if check.asset_key in self._assets_defs_by_key + check_key: asset + for asset in asset_nodes_by_key.values() + for check_key in asset.check_keys + }, + **{ + key: asset_nodes_by_key[checks_def.asset_key] + for key, checks_def in asset_checks_defs_by_key.items() + if checks_def.asset_key in asset_nodes_by_key }, - } - - self._asset_checks_defs = asset_checks_defs - self._asset_checks_defs_by_key = { - key: check for check in asset_checks_defs for key in check.keys } @staticmethod @@ -95,147 +178,69 @@ def normalize_assets( @classmethod def from_assets( cls, - all_assets: Iterable[Union[AssetsDefinition, SourceAsset]], + assets: Iterable[Union[AssetsDefinition, SourceAsset]], asset_checks: Optional[Sequence[AssetChecksDefinition]] = None, ) -> "InternalAssetGraph": - assets_defs = cls.normalize_assets(all_assets) - return InternalAssetGraph( - assets_defs=assets_defs, - asset_checks_defs=asset_checks or [], - ) - - @property - def asset_check_keys(self) -> AbstractSet[AssetCheckKey]: - return { - *(key for check in self._asset_checks_defs for key in check.keys), - *(key for asset in self._assets_defs for key in asset.check_keys), + asset_checks = asset_checks or [] + assets_defs = cls.normalize_assets(assets, asset_checks) + + asset_nodes_by_key = { + k: LocalAssetNode( + key=k, + assets_def=ad, + check_keys={ + *ad.check_keys, + *(ck for cd in asset_checks if cd.asset_key == k for ck in cd.keys), + }, + ) + for ad in assets_defs + for k in ad.keys } + dep_graph = generate_asset_dep_graph(assets_defs, []) + for key, node in asset_nodes_by_key.items(): + node.set_children( + {asset_nodes_by_key[child_key] for child_key in dep_graph["downstream"][key]} + ) + node.set_parents( + {asset_nodes_by_key[parent_key] for parent_key in dep_graph["upstream"][key]} + ) - @property - def assets_defs(self) -> Sequence[AssetsDefinition]: - return self._assets_defs - - def get_assets_def(self, asset_key: AssetKey) -> AssetsDefinition: - return self._assets_defs_by_key[asset_key] - - def has_asset(self, asset_key: AssetKey) -> bool: - return asset_key in self._assets_defs_by_key - - def get_assets_def_for_check( - self, asset_check_key: AssetCheckKey - ) -> Optional[AssetsDefinition]: - return self._assets_defs_by_check_key.get(asset_check_key) - - @property - def asset_checks_defs(self) -> Sequence[AssetChecksDefinition]: - return self._asset_checks_defs - - def get_asset_checks_def(self, asset_check_key: AssetCheckKey) -> AssetChecksDefinition: - return self._asset_checks_defs_by_key[asset_check_key] - - def has_asset_check(self, asset_check_key: AssetCheckKey) -> bool: - return asset_check_key in self._asset_checks_defs_by_key - - @property - @cached_method - def asset_dep_graph(self) -> DependencyGraph[AssetKey]: - return generate_asset_dep_graph(self._assets_defs, []) - - @property - @cached_method - def all_asset_keys(self) -> AbstractSet[AssetKey]: - return {key for ad in self._assets_defs for key in ad.keys} - - @property - @cached_method - def materializable_asset_keys(self) -> AbstractSet[AssetKey]: - return {key for ad in self._assets_defs if ad.is_materializable for key in ad.keys} - - def is_materializable(self, asset_key: AssetKey) -> bool: - return self.get_assets_def(asset_key).is_materializable - - @property - @cached_method - def observable_asset_keys(self) -> AbstractSet[AssetKey]: - return {key for ad in self._assets_defs if ad.is_observable for key in ad.keys} - - def is_observable(self, asset_key: AssetKey) -> bool: - return self.get_assets_def(asset_key).is_observable - - @property - @cached_method - def external_asset_keys(self) -> AbstractSet[AssetKey]: - return {key for ad in self._assets_defs if ad.is_external for key in ad.keys} - - def is_external(self, asset_key: AssetKey) -> bool: - return self.get_assets_def(asset_key).is_external - - @property - @cached_method - def executable_asset_keys(self) -> AbstractSet[AssetKey]: - return {key for ad in self._assets_defs if ad.is_executable for key in ad.keys} - - def is_executable(self, asset_key: AssetKey) -> bool: - return self.get_assets_def(asset_key).is_executable - - def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]: - return { - key - for ad in self._assets_defs - for key in ad.keys - if ad.group_names_by_key[key] == group_name + asset_checks_defs_by_key = { + key: check for check in (asset_checks or []) for key in check.keys } - def get_execution_unit_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: - asset = self.get_assets_def(asset_key) - return {asset_key} if len(asset.keys) <= 1 or asset.can_subset else asset.keys + return InternalAssetGraph( + asset_nodes_by_key=asset_nodes_by_key, + asset_checks_defs_by_key=asset_checks_defs_by_key, + ) + + ##### COMMON ASSET GRAPH INTERFACE def get_execution_unit_asset_and_check_keys( self, asset_or_check_key: AssetKeyOrCheckKey ) -> AbstractSet[AssetKeyOrCheckKey]: if isinstance(asset_or_check_key, AssetKey): - asset = self.get_assets_def(asset_or_check_key) - else: # AssetCheckKey - # only checks emitted by AssetsDefinition have required keys - if self.has_asset_check(asset_or_check_key): - return {asset_or_check_key} - else: - asset = self.get_assets_def_for_check(asset_or_check_key) - if asset is None or asset_or_check_key not in asset.check_keys: - return {asset_or_check_key} - has_checks = len(asset.check_keys) > 0 - if asset.can_subset or len(asset.keys) <= 1 and not has_checks: + return self.get(asset_or_check_key).execution_unit_asset_and_check_keys + # only checks emitted by AssetsDefinition have required keys + elif asset_or_check_key in self._asset_checks_defs_by_key: return {asset_or_check_key} else: - return {*asset.keys, *asset.check_keys} - - @property - @cached_method - def all_group_names(self) -> AbstractSet[str]: - return { - group_name for ad in self._assets_defs for group_name in ad.group_names_by_key.values() - } - - def get_partitions_def(self, asset_key: AssetKey) -> Optional[PartitionsDefinition]: - return self.get_assets_def(asset_key).partitions_def - - def get_partition_mappings(self, asset_key: AssetKey) -> Mapping[AssetKey, PartitionMapping]: - return self.get_assets_def(asset_key).partition_mappings - - def get_group_name(self, asset_key: AssetKey) -> Optional[str]: - return self.get_assets_def(asset_key).group_names_by_key.get(asset_key) - - def get_freshness_policy(self, asset_key: AssetKey) -> Optional[FreshnessPolicy]: - return self.get_assets_def(asset_key).freshness_policies_by_key.get(asset_key) + asset_node = self._asset_nodes_by_check_key[asset_or_check_key] + asset_unit_keys = asset_node.execution_unit_asset_and_check_keys + return ( + asset_unit_keys if asset_or_check_key in asset_unit_keys else {asset_or_check_key} + ) - def get_auto_materialize_policy(self, asset_key: AssetKey) -> Optional[AutoMaterializePolicy]: - return self.get_assets_def(asset_key).auto_materialize_policies_by_key.get(asset_key) + ##### INTERNAL-SPECIFIC INTERFACE - def get_auto_observe_interval_minutes(self, asset_key: AssetKey) -> Optional[float]: - return self.get_assets_def(asset_key).auto_observe_interval_minutes + def get_assets_def(self, asset_key: AssetKey) -> AssetsDefinition: + return self.get(asset_key).assets_def - def get_backfill_policy(self, asset_key: AssetKey) -> Optional[BackfillPolicy]: - return self.get_assets_def(asset_key).backfill_policy + @property + @cached_method + def assets_defs(self) -> Sequence[AssetsDefinition]: + return list(dict.fromkeys(asset.assets_def for asset in self.asset_nodes)) - def get_code_version(self, asset_key: AssetKey) -> Optional[str]: - return self.get_assets_def(asset_key).code_versions_by_key.get(asset_key) + @property + def asset_checks_defs(self) -> Sequence[AssetChecksDefinition]: + return list(dict.fromkeys(self._asset_checks_defs_by_key.values())) diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index 5976722eef432..d04530901bd5c 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -1336,6 +1336,14 @@ def __new__( execution_type=check.inst_param(execution_type, "execution_type", AssetExecutionType), ) + @property + def is_materializable(self) -> bool: + return self.execution_type == AssetExecutionType.MATERIALIZATION + + @property + def is_external(self) -> bool: + return self.execution_type != AssetExecutionType.MATERIALIZATION + @property def is_executable(self) -> bool: return self.execution_type != AssetExecutionType.UNEXECUTABLE