diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py index 6ea665a950cb3..9ba023875364b 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py @@ -24,6 +24,7 @@ ) from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.definitions.asset_graph import AssetGraph +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.assets_job import ( get_base_asset_jobs, is_base_asset_job_name, @@ -32,7 +33,10 @@ AutoMaterializeSensorDefinition, ) from dagster._core.definitions.executor_definition import ExecutorDefinition -from dagster._core.definitions.external_asset import create_external_asset_from_source_asset +from dagster._core.definitions.external_asset import ( + create_external_asset_from_source_asset, + external_asset_from_spec, +) from dagster._core.definitions.graph_definition import GraphDefinition from dagster._core.definitions.internal_asset_graph import InternalAssetGraph from dagster._core.definitions.job_definition import JobDefinition @@ -228,11 +232,22 @@ def build_caching_repository_data_from_list( elif isinstance(definition, SourceAsset): source_assets.append(definition) assets_defs.append(create_external_asset_from_source_asset(definition)) + asset_keys.add(definition.key) elif isinstance(definition, AssetChecksDefinition): asset_checks_defs.append(definition) else: check.failed(f"Unexpected repository entry {definition}") + # Create unexecutable external assets definitions for any referenced keys for which no + # definition was provided. + all_referenced_asset_keys = { + *(key for asset_def in assets_defs for key in asset_def.dependency_keys), + *(checks_def.asset_key for checks_def in asset_checks_defs), + } + for key in all_referenced_asset_keys.difference(asset_keys): + assets_defs.append(external_asset_from_spec(AssetSpec(key=key))) + asset_keys.add(key) + if assets_defs or source_assets or asset_checks_defs: for job_def in get_base_asset_jobs( assets=assets_defs, diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index 7496158652ed3..b7189f8395573 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -1642,20 +1642,7 @@ def external_asset_nodes_from_defs( group_name_by_asset_key.update(asset_layer.group_names_by_assets()) - asset_keys_without_definitions = all_upstream_asset_keys.difference(assets_defs_by_key.keys()) - - asset_nodes = [ - ExternalAssetNode( - asset_key=asset_key, - dependencies=list(deps[asset_key].values()), - depended_by=list(dep_by[asset_key].values()), - execution_type=AssetExecutionType.UNEXECUTABLE, - job_names=[], - group_name=group_name_by_asset_key.get(asset_key), - code_version=code_version_by_asset_key.get(asset_key), - ) - for asset_key in asset_keys_without_definitions - ] + asset_nodes: List[ExternalAssetNode] = [] for asset_key, node_tuple_list in node_defs_by_asset_key.items(): node_output_handle, job_def = node_tuple_list[0] @@ -1735,11 +1722,7 @@ def external_asset_nodes_from_defs( # Ensure any external assets that are have no nodes in any job are included in the asset graph for asset in assets_defs_by_key.values(): - for key in [ - key - for key in asset.keys - if (key not in node_defs_by_asset_key) and key not in asset_keys_without_definitions - ]: + for key in [key for key in asset.keys if key not in node_defs_by_asset_key]: asset_nodes.append( ExternalAssetNode( asset_key=key, diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index e9bf80d073763..c68a2b9d20b31 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -424,21 +424,6 @@ def op1(): assert asset_graph.get_execution_unit_asset_keys(asset_def.key) == {asset_def.key} -def test_get_non_source_roots_missing_source(asset_graph_from_assets: Callable[..., AssetGraph]): - @asset - def foo(): - pass - - @asset(deps=["this_source_is_fake", "source_asset"]) - def bar(foo): - pass - - source_asset = SourceAsset("source_asset") - - asset_graph = asset_graph_from_assets([foo, bar, source_asset]) - assert asset_graph.get_materializable_roots(AssetKey("bar")) == {AssetKey("foo")} - - def test_partitioned_source_asset(asset_graph_from_assets: Callable[..., AssetGraph]): partitions_def = DailyPartitionsDefinition(start_date="2022-01-01") diff --git a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py index a6c815ad67a9f..e673ad9e45369 100644 --- a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py +++ b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py @@ -857,6 +857,7 @@ def foo(bar): ), key=lambda n: n.asset_key, ) + assert external_asset_nodes == [ ExternalAssetNode( asset_key=AssetKey("bar"),