From 8ef4522cdc205fc9328133cb4a2eacab47199f3a Mon Sep 17 00:00:00 2001 From: alangenfeld Date: Tue, 19 Sep 2023 15:01:44 -0500 Subject: [PATCH] [exploration] unexecutable assets not backed by jobs --- .../dagster_graphql/schema/asset_graph.py | 19 +++--- .../dagster/_core/definitions/assets.py | 6 ++ .../dagster/_core/definitions/assets_job.py | 28 ++++---- .../_core/definitions/observable_asset.py | 2 +- .../repository_data_builder.py | 2 +- .../host_representation/external_data.py | 32 +++++++++ .../asset_defs_tests/test_assets_job.py | 4 +- .../test_observable_assets.py | 65 +++++++++++++++++++ 8 files changed, 134 insertions(+), 24 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index b85889416b23c..0de76b790d07f 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -346,8 +346,9 @@ def stale_status_loader(self) -> StaleStatusLoader: def get_external_job(self) -> ExternalJob: if self._external_job is None: check.invariant( - len(self._external_asset_node.job_names) >= 1, - "Asset must be part of at least one job", + len(self._external_asset_node.job_names) >= 1 + or not self._external_asset_node.is_executable, + "Executable asset must be part of at least one job", ) self._external_job = self._external_repository.get_full_external_job( self._external_asset_node.job_names[0] @@ -455,8 +456,8 @@ def get_required_resource_keys_rec( def is_graph_backed_asset(self) -> bool: return self.graphName is not None - def is_source_asset(self) -> bool: - return self._external_asset_node.is_source + def is_source_or_unexecutable_asset(self) -> bool: + return self._external_asset_node.is_source or not self._external_asset_node.is_executable def resolve_hasMaterializePermission( self, @@ -581,7 +582,7 @@ def resolve_assetObservations( ] def resolve_configField(self, _graphene_info: ResolveInfo) -> Optional[GrapheneConfigTypeField]: - if self.is_source_asset(): + if self.is_source_or_unexecutable_asset(): return None external_pipeline = self.get_external_job() node_def_snap = self.get_node_definition_snap() @@ -814,7 +815,7 @@ def resolve_jobs(self, _graphene_info: ResolveInfo) -> Sequence[GraphenePipeline ] def resolve_isSource(self, _graphene_info: ResolveInfo) -> bool: - return self.is_source_asset() + return self._external_asset_node.is_source def resolve_isPartitioned(self, _graphene_info: ResolveInfo) -> bool: return self._external_asset_node.partitions_def_data is not None @@ -978,7 +979,7 @@ def resolve_metadata_entries( def resolve_op( self, _graphene_info: ResolveInfo ) -> Optional[Union[GrapheneSolidDefinition, GrapheneCompositeSolidDefinition]]: - if self.is_source_asset(): + if self.is_source_or_unexecutable_asset(): return None external_pipeline = self.get_external_job() node_def_snap = self.get_node_definition_snap() @@ -1057,7 +1058,7 @@ def resolve_repository(self, graphene_info: ResolveInfo) -> "GrapheneRepository" def resolve_required_resources( self, _graphene_info: ResolveInfo ) -> Sequence[GrapheneResourceRequirement]: - if self.is_source_asset(): + if self.is_source_or_unexecutable_asset(): return [] node_def_snap = self.get_node_definition_snap() all_unique_keys = self.get_required_resource_keys(node_def_snap) @@ -1070,7 +1071,7 @@ def resolve_type( "GrapheneListDagsterType", "GrapheneNullableDagsterType", "GrapheneRegularDagsterType" ] ]: - if self.is_source_asset(): + if self.is_source_or_unexecutable_asset(): return None external_pipeline = self.get_external_job() output_name = self.external_asset_node.output_name diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index a0bc0d0d4f0f3..e07c251e2563b 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -876,6 +876,12 @@ def is_asset_executable(self, asset_key: AssetKey) -> bool: self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) ) + def is_executable(self): + for key in self.keys: + if not self.is_asset_executable(key): + return False + return True + def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]: return self._partition_mappings.get(self._keys_by_input_name[input_name]) diff --git a/python_modules/dagster/dagster/_core/definitions/assets_job.py b/python_modules/dagster/dagster/_core/definitions/assets_job.py index cf186f3d2f6a9..002be9ed7044e 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets_job.py +++ b/python_modules/dagster/dagster/_core/definitions/assets_job.py @@ -58,29 +58,35 @@ def is_base_asset_job_name(name: str) -> bool: def get_base_asset_jobs( - assets: Sequence[AssetsDefinition], + assets_defs: Sequence[AssetsDefinition], source_assets: Sequence[SourceAsset], asset_checks: Sequence[AssetChecksDefinition], resource_defs: Optional[Mapping[str, ResourceDefinition]], executor_def: Optional[ExecutorDefinition], ) -> Sequence[JobDefinition]: - assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = ( + # bucket the passed in AssetsDefinitions that are executable by partition. + # un-executable assets are intentionally omitted. + exe_assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = ( defaultdict(list) ) - for assets_def in assets: - assets_by_partitions_def[assets_def.partitions_def].append(assets_def) + for assets_def in assets_defs: + if assets_def.is_executable(): + exe_assets_by_partitions_def[assets_def.partitions_def].append(assets_def) # We need to create "empty" jobs for each partitions def that is used by an observable but no # materializable asset. They are empty because we don't assign the source asset to the `assets`, # but rather the `source_assets` argument of `build_assets_job`. for observable in [sa for sa in source_assets if sa.is_observable]: - if observable.partitions_def not in assets_by_partitions_def: - assets_by_partitions_def[observable.partitions_def] = [] - if len(assets_by_partitions_def.keys()) == 0 or assets_by_partitions_def.keys() == {None}: + if observable.partitions_def not in exe_assets_by_partitions_def: + exe_assets_by_partitions_def[observable.partitions_def] = [] + + if len(exe_assets_by_partitions_def.keys()) == 0 or exe_assets_by_partitions_def.keys() == { + None + }: return [ build_assets_job( name=ASSET_BASE_JOB_PREFIX, - assets=assets, + assets=exe_assets_by_partitions_def.get(None, []), asset_checks=asset_checks, source_assets=source_assets, executor_def=executor_def, @@ -88,9 +94,9 @@ def get_base_asset_jobs( ) ] else: - unpartitioned_assets = assets_by_partitions_def.get(None, []) + unpartitioned_assets = exe_assets_by_partitions_def.get(None, []) partitioned_assets_by_partitions_def = { - k: v for k, v in assets_by_partitions_def.items() if k is not None + k: v for k, v in exe_assets_by_partitions_def.items() if k is not None } jobs = [] @@ -102,7 +108,7 @@ def get_base_asset_jobs( build_assets_job( f"{ASSET_BASE_JOB_PREFIX}_{i}", assets=[*assets_with_partitions, *unpartitioned_assets], - source_assets=[*source_assets, *assets], + source_assets=[*source_assets, *assets_defs], asset_checks=asset_checks, resource_defs=resource_defs, executor_def=executor_def, diff --git a/python_modules/dagster/dagster/_core/definitions/observable_asset.py b/python_modules/dagster/dagster/_core/definitions/observable_asset.py index e65026d0e0eae..cec5969b628ba 100644 --- a/python_modules/dagster/dagster/_core/definitions/observable_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/observable_asset.py @@ -44,7 +44,7 @@ def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]): ) @multi_asset(specs=new_specs) - def an_asset() -> None: + def an_asset(): raise DagsterInvariantViolationError( f"You have attempted to execute an unexecutable asset {[spec.key for spec in specs]}" ) diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py index cca69953a8fb6..bab5c5d01ae17 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py @@ -193,7 +193,7 @@ def build_caching_repository_data_from_list( if assets_defs or source_assets or asset_checks_defs: for job_def in get_base_asset_jobs( - assets=assets_defs, + assets_defs=assets_defs, source_assets=source_assets, executor_def=default_executor_def, resource_defs=top_level_resources, diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index edea50f20c70e..798ca60d94f70 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -10,6 +10,7 @@ from enum import Enum from typing import ( TYPE_CHECKING, + AbstractSet, Any, Dict, Iterable, @@ -51,6 +52,7 @@ SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, AssetExecutionType, ) +from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.assets_job import is_base_asset_job_name from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.backfill_policy import BackfillPolicy @@ -1341,6 +1343,11 @@ def external_repository_data_from_def( asset_graph = external_asset_graph_from_defs( jobs, source_assets_by_key=repository_def.source_assets_by_key, + unexecutable_assets={ + assets_def + for assets_def in repository_def.assets_defs_by_key.values() + if not assets_def.is_executable() + }, ) nested_resource_map = _get_nested_resources_map( @@ -1462,6 +1469,7 @@ def external_asset_checks_from_defs( def external_asset_graph_from_defs( job_defs: Sequence[JobDefinition], source_assets_by_key: Mapping[AssetKey, SourceAsset], + unexecutable_assets: Optional[AbstractSet[AssetsDefinition]] = None, # drop default value ) -> Sequence[ExternalAssetNode]: node_defs_by_asset_key: Dict[AssetKey, List[Tuple[NodeOutputHandle, JobDefinition]]] = ( defaultdict(list) @@ -1659,6 +1667,30 @@ def external_asset_graph_from_defs( ) ) + # resolve deps unexec assets since they are not encoded in jobs + for unexec_def in unexecutable_assets or []: + for key in unexec_def.keys: + for depends_on in unexec_def.asset_deps[key]: + deps[key][depends_on] = ExternalAssetDependency(depends_on) + dep_by[depends_on][key] = ExternalAssetDependedBy(key) + + # build nodes for unexec assets + for unexec_def in unexecutable_assets or []: + for key in unexec_def.keys: + asset_nodes.append( + ExternalAssetNode( + asset_key=key, + dependencies=list(deps[key].values()), + depended_by=list(dep_by[key].values()), + metadata=unexec_def.metadata_by_key[key], + group_name=unexec_def.group_names_by_key[key], + is_source=False, + is_observable=False, + # could use a special kind tag for more UI punch + # compute_kind=... + ) + ) + return asset_nodes diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py index 191d82c108420..8901fe59bf70a 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py @@ -1959,7 +1959,7 @@ def hourly_asset(): ... def unpartitioned_asset(): ... jobs = get_base_asset_jobs( - assets=[ + assets_defs=[ daily_asset, daily_asset2, daily_asset_different_start_date, @@ -2004,7 +2004,7 @@ def asset_b(): ... def asset_x(asset_b: B): ... jobs = get_base_asset_jobs( - assets=[ + assets_defs=[ asset_x, ], source_assets=[ diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py index 7f4fa11ac566f..57f2390d0a743 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py @@ -3,12 +3,20 @@ AssetKey, AssetsDefinition, AutoMaterializePolicy, + Definitions, _check as check, asset, ) from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._core.definitions.observable_asset import create_unexecutable_observable_assets_def +from dagster._core.definitions.repository_definition.repository_definition import ( + RepositoryDefinition, +) +from dagster._core.errors import DagsterInvalidSubsetError +from dagster._core.host_representation.external_data import ( + external_repository_data_from_def, +) def test_observable_asset_basic_creation() -> None: @@ -72,3 +80,60 @@ def test_observable_asset_creation_with_deps() -> None: assert assets_def.asset_deps[expected_key] == { AssetKey(["observable_asset_two"]), } + + +def test_non_executable_asset_excluded_from_job() -> None: + upstream_asset = create_unexecutable_observable_assets_def( + specs=[AssetSpec("upstream_asset")], + ) + + @asset(deps=[upstream_asset]) + def downstream_asset() -> None: ... + + defs = Definitions(assets=[upstream_asset, downstream_asset]) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + + # ensure that explict selection fails + with pytest.raises( + DagsterInvalidSubsetError, + match=( + r'Assets provided in asset_selection argument \["upstream_asset"\] do not exist in' + r" parent asset group or job" + ), + ): + defs.get_implicit_global_asset_job_def().execute_in_process( + asset_selection=[AssetKey("upstream_asset")] + ) + + +def test_external_rep(): + table_a = AssetSpec("table_A") + table_b = AssetSpec("table_B", deps=[table_a]) + table_c = AssetSpec("table_C", deps=[table_a]) + table_d = AssetSpec("table_D", deps=[table_b, table_c]) + + those_assets = create_unexecutable_observable_assets_def( + specs=[table_a, table_b, table_c, table_d] + ) + + defs = Definitions(assets=[those_assets]) + repo = defs.get_inner_repository_for_loading_process() + assert isinstance(repo, RepositoryDefinition) + external_repo = external_repository_data_from_def(repo) + + assert len(external_repo.external_asset_graph_data) == 4 + + nodes_by_key = {node.asset_key: node for node in external_repo.external_asset_graph_data} + + assert len(nodes_by_key[table_a.key].depended_by) == 2 + assert len(nodes_by_key[table_a.key].dependencies) == 0 + + assert len(nodes_by_key[table_b.key].depended_by) == 1 + assert len(nodes_by_key[table_b.key].dependencies) == 1 + + assert len(nodes_by_key[table_c.key].depended_by) == 1 + assert len(nodes_by_key[table_c.key].dependencies) == 1 + + assert len(nodes_by_key[table_d.key].depended_by) == 0 + assert len(nodes_by_key[table_d.key].dependencies) == 2