From f0065f161eae81a9f4e782882ecdd62febb18451 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Wed, 28 Feb 2024 11:04:30 -0500 Subject: [PATCH] [external-assets] Tweaks to AssetGraph interface --- .../_core/definitions/asset_daemon_context.py | 3 +-- .../dagster/_core/definitions/asset_graph.py | 22 ++++++++++++------- .../_core/definitions/internal_asset_graph.py | 10 --------- .../unresolved_asset_job_definition.py | 8 +++++-- .../dagster/_core/execution/asset_backfill.py | 10 +-------- 5 files changed, 22 insertions(+), 31 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index 0b9b5b1920769..d404200bbaa47 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -1,5 +1,4 @@ import datetime -import itertools import logging import os import time @@ -230,7 +229,7 @@ def get_asset_condition_evaluations( num_checked_assets = 0 num_auto_materialize_asset_keys = len(self.auto_materialize_asset_keys) - for asset_key in itertools.chain(*self.asset_graph.toposort_asset_keys()): + for asset_key in self.asset_graph.toposorted_asset_keys: # an asset may have already been visited if it was part of a non-subsettable multi-asset if asset_key not in self.auto_materialize_asset_keys: continue diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 480d8c58efed3..31d453c76902c 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -118,6 +118,18 @@ def executable_asset_keys(self) -> AbstractSet[AssetKey]: def is_executable(self, asset_key: AssetKey) -> bool: ... + @property + @cached_method + def toposorted_asset_keys(self) -> Sequence[AssetKey]: + """Return topologically sorted asset keys in graph. Keys with the same topological level are + sorted alphabetically to provide stability. + """ + return [ + key + for keys_in_level in toposort.toposort(self.asset_dep_graph["upstream"]) + for key in sorted(keys_in_level) + ] + @abstractmethod def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]: ... @@ -519,12 +531,6 @@ def get_required_asset_and_check_keys( ) -> AbstractSet[AssetKeyOrCheckKey]: ... - @cached_method - def toposort_asset_keys(self) -> Sequence[AbstractSet[AssetKey]]: - return [ - {key for key in level} for level in toposort.toposort(self.asset_dep_graph["upstream"]) - ] - @cached_method def get_downstream_freshness_policies( self, *, asset_key: AssetKey @@ -751,10 +757,10 @@ def __init__( self._asset_graph = asset_graph self._include_required_multi_assets = include_required_multi_assets - toposorted_asset_keys = asset_graph.toposort_asset_keys() + asset_keys_by_level = toposort.toposort(asset_graph.asset_dep_graph["upstream"]) self._toposort_level_by_asset_key = { asset_key: i - for i, asset_keys in enumerate(toposorted_asset_keys) + for i, asset_keys in enumerate(asset_keys_by_level) for asset_key in asset_keys } self._heap = [self._queue_item(asset_partition) for asset_partition in items] diff --git a/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py index 916df96f7d56b..07e1c82303ad5 100644 --- a/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py @@ -86,16 +86,6 @@ def get_asset_checks_def(self, asset_check_key: AssetCheckKey) -> AssetChecksDef def has_asset_check(self, asset_check_key: AssetCheckKey) -> bool: return asset_check_key in self._asset_checks_defs_by_key - def includes_materializable_and_external_assets( - self, asset_keys: AbstractSet[AssetKey] - ) -> bool: - """Returns true if the given asset keys contains at least one materializable asset and - at least one external asset. - """ - selected_external_assets = self.external_asset_keys & asset_keys - selected_materializable_assets = self.materializable_asset_keys & asset_keys - return len(selected_external_assets) > 0 and len(selected_materializable_assets) > 0 - @property @cached_method def asset_dep_graph(self) -> DependencyGraph[AssetKey]: diff --git a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py index 049ea262d69f1..adcc5e05c08d7 100644 --- a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py @@ -183,9 +183,13 @@ def resolve( """Resolve this UnresolvedAssetJobDefinition into a JobDefinition.""" assets = asset_graph.assets_defs selected_asset_keys = self.selection.resolve(asset_graph) - if asset_graph.includes_materializable_and_external_assets(selected_asset_keys): + + if ( + len(selected_asset_keys & asset_graph.materializable_asset_keys) > 0 + and len(selected_asset_keys & asset_graph.external_asset_keys) > 0 + ): raise DagsterInvalidDefinitionError( - f"Asset selection for job '{self.name}' specified both regular assets and source " + f"Asset selection for job '{self.name}' specified both regular assets and external " "assets. This is not currently supported. Selections must be all regular " "assets or all source assets.", ) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index e7f928e8ad22e..fa4ab789a8adc 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -306,15 +306,7 @@ def get_targeted_asset_keys_topological_order( Orders keys in the same topological level alphabetically. """ - toposorted_keys = asset_graph.toposort_asset_keys() - - targeted_toposorted_keys = [] - for level_keys in toposorted_keys: - for key in sorted(level_keys): - if key in self.target_subset.asset_keys: - targeted_toposorted_keys.append(key) - - return targeted_toposorted_keys + return [k for k in asset_graph.toposorted_asset_keys if k in self.target_subset.asset_keys] def get_backfill_status_per_asset_key( self, asset_graph: AssetGraph