diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index 8e3ffd6710fd0..3ab938439afe0 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -32,6 +32,20 @@ # `AutoMaterializeRule`. SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES = "dagster/auto_observe_interval_minutes" +# SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET lives on the metadata of external assets that are +# created for undefined but referenced assets during asset graph normalization. For example, in the +# below definitions, `foo` is referenced by upstream `bar` but has no corresponding definition: +# +# +# @asset(deps=["foo"]) +# def bar(context: AssetExecutionContext): +# ... +# +# defs=Definitions(assets=[bar]) +# +# During normalization we create a "stub" definition for `foo` and attach this metadata to it. +SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET = "dagster/auto_created_stub_asset" + @whitelist_for_serdes class AssetExecutionType(Enum): diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 28488c16d5098..4a598e859cede 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -23,6 +23,7 @@ from dagster._core.definitions.asset_layer import get_dep_node_handles_of_graph_backed_asset from dagster._core.definitions.asset_spec import ( SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, + SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET, SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES, AssetExecutionType, ) @@ -938,6 +939,15 @@ def auto_observe_interval_minutes(self) -> Optional[float]: ) return value + # Applies to AssetsDefinition that were auto-created because some asset referenced a key as a + # dependency, but no definition was provided for that key. + @property + def is_auto_created_stub(self) -> bool: + return ( + self._get_external_asset_metadata_value(SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET) + is not None + ) + def _get_external_asset_metadata_value(self, metadata_key: str) -> object: first_key = next(iter(self.keys), None) return self.metadata_by_key.get(first_key, {}).get(metadata_key) if first_key else None diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py index 8ce4a49cb9d55..c84834780bd33 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py @@ -24,6 +24,10 @@ ) from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.definitions.asset_graph import AssetGraph +from dagster._core.definitions.asset_spec import ( + SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET, + AssetSpec, +) from dagster._core.definitions.assets_job import ( get_base_asset_jobs, is_base_asset_job_name, @@ -32,7 +36,10 @@ AutoMaterializeSensorDefinition, ) from dagster._core.definitions.executor_definition import ExecutorDefinition -from dagster._core.definitions.external_asset import create_external_asset_from_source_asset +from dagster._core.definitions.external_asset import ( + create_external_asset_from_source_asset, + external_asset_from_spec, +) from dagster._core.definitions.graph_definition import GraphDefinition from dagster._core.definitions.internal_asset_graph import InternalAssetGraph from dagster._core.definitions.job_definition import JobDefinition @@ -229,6 +236,7 @@ def build_caching_repository_data_from_list( elif isinstance(definition, SourceAsset): source_assets.append(definition) assets_defs.append(create_external_asset_from_source_asset(definition)) + asset_keys.add(definition.key) elif isinstance(definition, AssetChecksDefinition): asset_checks_defs.append(definition) else: @@ -247,6 +255,20 @@ def build_caching_repository_data_from_list( for ad in assets_defs ] + # Create unexecutable external assets definitions for any referenced keys for which no + # definition was provided. + all_referenced_asset_keys = { + *(key for asset_def in assets_defs for key in asset_def.dependency_keys), + *(checks_def.asset_key for checks_def in asset_checks_defs), + } + for key in all_referenced_asset_keys.difference(asset_keys): + assets_defs.append( + external_asset_from_spec( + AssetSpec(key=key, metadata={SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET: True}) + ) + ) + asset_keys.add(key) + if assets_defs or source_assets or asset_checks_defs: for job_def in get_base_asset_jobs( assets=assets_defs, diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index ca7801c707db1..1482d5545b228 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -1651,20 +1651,7 @@ def external_asset_nodes_from_defs( group_name_by_asset_key.update(asset_layer.group_names_by_assets()) - asset_keys_without_definitions = all_upstream_asset_keys.difference(assets_defs_by_key.keys()) - - asset_nodes = [ - ExternalAssetNode( - asset_key=asset_key, - dependencies=list(deps[asset_key].values()), - depended_by=list(dep_by[asset_key].values()), - execution_type=AssetExecutionType.UNEXECUTABLE, - job_names=[], - group_name=group_name_by_asset_key.get(asset_key), - code_version=code_version_by_asset_key.get(asset_key), - ) - for asset_key in asset_keys_without_definitions - ] + asset_nodes: List[ExternalAssetNode] = [] for asset_key, node_tuple_list in node_defs_by_asset_key.items(): node_output_handle, job_def = node_tuple_list[0] @@ -1744,11 +1731,15 @@ def external_asset_nodes_from_defs( # Ensure any external assets that are have no nodes in any job are included in the asset graph for asset in assets_defs_by_key.values(): - for key in [ - key - for key in asset.keys - if (key not in node_defs_by_asset_key) and key not in asset_keys_without_definitions - ]: + for key in [key for key in asset.keys if key not in node_defs_by_asset_key]: + # This is in place to preserve an implicit behavior in the Dagster UI where stub + # dependencies were rendered as if they weren't part of the default asset group. + group_name = ( + None + if asset.is_auto_created_stub + else group_name_by_asset_key.get(key, DEFAULT_GROUP_NAME) + ) + asset_nodes.append( ExternalAssetNode( asset_key=key, @@ -1758,7 +1749,7 @@ def external_asset_nodes_from_defs( job_names=[], op_description=asset.descriptions_by_key.get(key), metadata=asset.metadata_by_key.get(key), - group_name=asset.group_names_by_key.get(key), + group_name=group_name, is_source=True, is_observable=asset.is_observable, auto_observe_interval_minutes=asset.auto_observe_interval_minutes, diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index e9deab2624a22..d3b520cb5a430 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -405,21 +405,6 @@ def op1(): ... assert asset_graph.get_required_multi_asset_keys(asset_def.key) == set() -def test_get_non_source_roots_missing_source(asset_graph_from_assets: Callable[..., AssetGraph]): - @asset - def foo(): - pass - - @asset(deps=["this_source_is_fake", "source_asset"]) - def bar(foo): - pass - - source_asset = SourceAsset("source_asset") - - asset_graph = asset_graph_from_assets([foo, bar, source_asset]) - assert asset_graph.get_materializable_roots(AssetKey("bar")) == {AssetKey("foo")} - - def test_partitioned_source_asset(asset_graph_from_assets: Callable[..., AssetGraph]): partitions_def = DailyPartitionsDefinition(start_date="2022-01-01") diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py index 97a42485cb070..3b56f0a6710d3 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py @@ -84,19 +84,16 @@ def my_sling_assets(sling: SlingResource): def test_with_custom_name(replication_config: SlingReplicationParam): @sling_assets(replication_config=replication_config) - def my_sling_assets(): - ... + def my_sling_assets(): ... assert my_sling_assets.op.name == "my_sling_assets" @sling_assets(replication_config=replication_config) - def my_other_assets(): - ... + def my_other_assets(): ... assert my_other_assets.op.name == "my_other_assets" @sling_assets(replication_config=replication_config, name="custom_name") - def my_third_sling_assets(): - ... + def my_third_sling_assets(): ... assert my_third_sling_assets.op.name == "custom_name"