From 599d04bd55511e2acf3093ef9d53f49da4d7de65 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Thu, 29 Feb 2024 08:25:47 -0500 Subject: [PATCH] [external-assets] Create external assets defs for undefined assets in repo construction --- .../repository_data_builder.py | 17 ++++++++++++++- .../host_representation/external_data.py | 21 ++----------------- .../asset_defs_tests/test_asset_graph.py | 15 ------------- .../test_external_data.py | 1 + 4 files changed, 19 insertions(+), 35 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py index 8ce4a49cb9d55..7a0fbfb2755a7 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py @@ -24,6 +24,7 @@ ) from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.definitions.asset_graph import AssetGraph +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.assets_job import ( get_base_asset_jobs, is_base_asset_job_name, @@ -32,7 +33,10 @@ AutoMaterializeSensorDefinition, ) from dagster._core.definitions.executor_definition import ExecutorDefinition -from dagster._core.definitions.external_asset import create_external_asset_from_source_asset +from dagster._core.definitions.external_asset import ( + create_external_asset_from_source_asset, + external_asset_from_spec, +) from dagster._core.definitions.graph_definition import GraphDefinition from dagster._core.definitions.internal_asset_graph import InternalAssetGraph from dagster._core.definitions.job_definition import JobDefinition @@ -229,6 +233,7 @@ def build_caching_repository_data_from_list( elif isinstance(definition, SourceAsset): source_assets.append(definition) assets_defs.append(create_external_asset_from_source_asset(definition)) + asset_keys.add(definition.key) elif isinstance(definition, AssetChecksDefinition): asset_checks_defs.append(definition) else: @@ -247,6 +252,16 @@ def build_caching_repository_data_from_list( for ad in assets_defs ] + # Create unexecutable external assets definitions for any referenced keys for which no + # definition was provided. + all_referenced_asset_keys = { + *(key for asset_def in assets_defs for key in asset_def.dependency_keys), + *(checks_def.asset_key for checks_def in asset_checks_defs), + } + for key in all_referenced_asset_keys.difference(asset_keys): + assets_defs.append(external_asset_from_spec(AssetSpec(key=key))) + asset_keys.add(key) + if assets_defs or source_assets or asset_checks_defs: for job_def in get_base_asset_jobs( assets=assets_defs, diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index 3c9fa5d01df43..0cd6e59519396 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -1651,20 +1651,7 @@ def external_asset_nodes_from_defs( group_name_by_asset_key.update(asset_layer.group_names_by_assets()) - asset_keys_without_definitions = all_upstream_asset_keys.difference(assets_defs_by_key.keys()) - - asset_nodes = [ - ExternalAssetNode( - asset_key=asset_key, - dependencies=list(deps[asset_key].values()), - depended_by=list(dep_by[asset_key].values()), - execution_type=AssetExecutionType.UNEXECUTABLE, - job_names=[], - group_name=group_name_by_asset_key.get(asset_key), - code_version=code_version_by_asset_key.get(asset_key), - ) - for asset_key in asset_keys_without_definitions - ] + asset_nodes: List[ExternalAssetNode] = [] for asset_key, node_tuple_list in node_defs_by_asset_key.items(): node_output_handle, job_def = node_tuple_list[0] @@ -1744,11 +1731,7 @@ def external_asset_nodes_from_defs( # Ensure any external assets that are have no nodes in any job are included in the asset graph for asset in assets_defs_by_key.values(): - for key in [ - key - for key in asset.keys - if (key not in node_defs_by_asset_key) and key not in asset_keys_without_definitions - ]: + for key in [key for key in asset.keys if key not in node_defs_by_asset_key]: asset_nodes.append( ExternalAssetNode( asset_key=key, diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index d56994451c91e..460f1f53df23b 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -424,21 +424,6 @@ def op1(): assert asset_graph.get_required_multi_asset_keys(asset_def.key) == set() -def test_get_non_source_roots_missing_source(asset_graph_from_assets: Callable[..., AssetGraph]): - @asset - def foo(): - pass - - @asset(deps=["this_source_is_fake", "source_asset"]) - def bar(foo): - pass - - source_asset = SourceAsset("source_asset") - - asset_graph = asset_graph_from_assets([foo, bar, source_asset]) - assert asset_graph.get_materializable_roots(AssetKey("bar")) == {AssetKey("foo")} - - def test_partitioned_source_asset(asset_graph_from_assets: Callable[..., AssetGraph]): partitions_def = DailyPartitionsDefinition(start_date="2022-01-01") diff --git a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py index ccf8dc718059d..c14214a9b09a4 100644 --- a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py +++ b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py @@ -857,6 +857,7 @@ def foo(bar): ), key=lambda n: n.asset_key, ) + assert external_asset_nodes == [ ExternalAssetNode( asset_key=AssetKey("bar"),