From 2f5ded71fda53f47cf69d76bd41ba52294455a42 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Wed, 21 Feb 2024 17:17:54 -0500 Subject: [PATCH] [external-assets] Build base asset jobs using AssetGraph [INTERNAL_BRANCH=sean/external-assets-asset-graph-nodes] --- .../dagster/_core/definitions/asset_graph.py | 19 ++++++- .../dagster/_core/definitions/assets_job.py | 55 ++++++------------- .../_core/definitions/internal_asset_graph.py | 3 + .../repository_data_builder.py | 3 +- 4 files changed, 39 insertions(+), 41 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 9e843d79e6678..c2a614289155a 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -281,11 +281,21 @@ def toposorted_asset_keys_by_level(self) -> Sequence[AbstractSet[AssetKey]]: {key for key in level} for level in toposort.toposort(self.asset_dep_graph["upstream"]) ] + @property + @cached_method + def unpartitioned_asset_keys(self) -> AbstractSet[AssetKey]: + return {node.key for node in self.asset_nodes if not node.is_partitioned} + def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]: return {node.key for node in self.asset_nodes if node.group_name == group_name} - @property @cached_method + def asset_keys_for_partitions_def( + self, partitions_def: PartitionsDefinition + ) -> AbstractSet[AssetKey]: + return {node.key for node in self.asset_nodes if node.partitions_def == partitions_def} + + @functools.cached_property def root_materializable_asset_keys(self) -> AbstractSet[AssetKey]: """Materializable asset keys that have no materializable parents.""" from .asset_selection import AssetSelection @@ -305,6 +315,13 @@ def root_executable_asset_keys(self) -> AbstractSet[AssetKey]: def asset_check_keys(self) -> AbstractSet[AssetCheckKey]: return {key for asset in self.asset_nodes for key in asset.check_keys} + @property + @cached_method + def all_partitions_defs(self) -> Sequence[PartitionsDefinition]: + return sorted( + set(node.partitions_def for node in self.asset_nodes if node.partitions_def), key=repr + ) + @property @cached_method def all_group_names(self) -> AbstractSet[str]: diff --git a/python_modules/dagster/dagster/_core/definitions/assets_job.py b/python_modules/dagster/dagster/_core/definitions/assets_job.py index 5a9883e307f35..83200047f6d19 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets_job.py +++ b/python_modules/dagster/dagster/_core/definitions/assets_job.py @@ -5,7 +5,6 @@ Any, Dict, Iterable, - List, Mapping, Optional, Sequence, @@ -18,6 +17,7 @@ import dagster._check as check from dagster._core.definitions.hook_definition import HookDefinition +from dagster._core.definitions.internal_asset_graph import InternalAssetGraph from dagster._core.definitions.policy import RetryPolicy from dagster._core.errors import DagsterInvalidDefinitionError from dagster._core.selector.subset_selector import AssetSelectionData @@ -61,58 +61,37 @@ def is_base_asset_job_name(name: str) -> bool: def get_base_asset_jobs( - assets: Sequence[AssetsDefinition], - asset_checks: Sequence[AssetChecksDefinition], + asset_graph: InternalAssetGraph, resource_defs: Optional[Mapping[str, ResourceDefinition]], executor_def: Optional[ExecutorDefinition], ) -> Sequence[JobDefinition]: - executable_assets = [a for a in assets if a.is_executable] - unexecutable_assets = [a for a in assets if not a.is_executable] - - executable_assets_by_partitions_def: Dict[ - Optional[PartitionsDefinition], List[AssetsDefinition] - ] = defaultdict(list) - for asset in executable_assets: - executable_assets_by_partitions_def[asset.partitions_def].append(asset) - # sort to ensure some stability in the ordering - all_partitions_defs = sorted( - [p for p in executable_assets_by_partitions_def.keys() if p], key=repr - ) - - if len(all_partitions_defs) == 0: + if len(asset_graph.all_partitions_defs) == 0: + executable_asset_keys = asset_graph.executable_asset_keys + loadable_asset_keys = asset_graph.all_asset_keys - executable_asset_keys return [ build_assets_job( name=ASSET_BASE_JOB_PREFIX, - executable_assets=executable_assets, - loadable_assets=unexecutable_assets, - asset_checks=asset_checks, + executable_assets=asset_graph.assets_defs_for_keys(executable_asset_keys), + loadable_assets=asset_graph.assets_defs_for_keys(loadable_asset_keys), + asset_checks=asset_graph.asset_checks_defs, executor_def=executor_def, resource_defs=resource_defs, ) ] else: - unpartitioned_executable_assets = executable_assets_by_partitions_def.get(None, []) jobs = [] - - for i, partitions_def in enumerate(all_partitions_defs): - # all base jobs contain all unpartitioned assets - executable_assets_for_job = [ - *executable_assets_by_partitions_def[partitions_def], - *unpartitioned_executable_assets, - ] + for i, partitions_def in enumerate(asset_graph.all_partitions_defs): + executable_asset_keys = asset_graph.executable_asset_keys & { + *asset_graph.asset_keys_for_partitions_def(partitions_def), + *asset_graph.unpartitioned_asset_keys, + } + loadable_asset_keys = asset_graph.all_asset_keys - executable_asset_keys jobs.append( build_assets_job( f"{ASSET_BASE_JOB_PREFIX}_{i}", - executable_assets=executable_assets_for_job, - loadable_assets=[ - *( - asset - for asset in executable_assets - if asset not in executable_assets_for_job - ), - *unexecutable_assets, - ], - asset_checks=asset_checks, + executable_assets=asset_graph.assets_defs_for_keys(executable_asset_keys), + loadable_assets=asset_graph.assets_defs_for_keys(loadable_asset_keys), + asset_checks=asset_graph.asset_checks_defs, resource_defs=resource_defs, executor_def=executor_def, partitions_def=partitions_def, diff --git a/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py index 40b1d07011c7f..9d37c758e5e6a 100644 --- a/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py @@ -238,6 +238,9 @@ def get_execution_unit_asset_and_check_keys( def assets_defs(self) -> Sequence[AssetsDefinition]: return list(dict.fromkeys(asset.assets_def for asset in self.asset_nodes)) + def assets_defs_for_keys(self, keys: Iterable[AssetKey]) -> Sequence[AssetsDefinition]: + return list(dict.fromkeys([self.get(key).assets_def for key in keys])) + @property def asset_checks_defs(self) -> Sequence[AssetChecksDefinition]: return list(dict.fromkeys(self._asset_checks_defs_by_key.values())) diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py index 02a613a4062e5..6f70a6c87031b 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py @@ -237,10 +237,9 @@ def build_caching_repository_data_from_list( ) if assets_defs or source_assets or asset_checks_defs: for job_def in get_base_asset_jobs( - assets=asset_graph.assets_defs, + asset_graph=asset_graph, executor_def=default_executor_def, resource_defs=top_level_resources, - asset_checks=asset_checks_defs, ): jobs[job_def.name] = job_def