diff --git a/python_modules/dagster/dagster/_core/definitions/definitions_class.py b/python_modules/dagster/dagster/_core/definitions/definitions_class.py index 38557b2ad35a7..75e74e219d0aa 100644 --- a/python_modules/dagster/dagster/_core/definitions/definitions_class.py +++ b/python_modules/dagster/dagster/_core/definitions/definitions_class.py @@ -22,6 +22,7 @@ from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey from dagster._core.definitions.executor_definition import ExecutorDefinition from dagster._core.definitions.logger_definition import LoggerDefinition +from dagster._core.errors import DagsterInvariantViolationError from dagster._core.execution.build_resources import wrap_resources_for_execution from dagster._core.execution.with_resources import with_resources from dagster._core.executor.base import Executor @@ -540,6 +541,14 @@ def get_implicit_job_def_for_assets( ) -> Optional[JobDefinition]: return self.get_repository_def().get_implicit_job_def_for_assets(asset_keys) + def get_assets_def(self, key: CoercibleToAssetKey) -> AssetsDefinition: + asset_key = AssetKey.from_coercible(key) + for assets_def in self.get_asset_graph().assets: + if asset_key in assets_def.keys: + return assets_def + + raise DagsterInvariantViolationError(f"Could not find asset {asset_key}") + @cached_method def get_repository_def(self) -> RepositoryDefinition: """Definitions is implemented by wrapping RepositoryDefinition. Get that underlying object diff --git a/python_modules/dagster/dagster/_core/definitions/observable_asset.py b/python_modules/dagster/dagster/_core/definitions/observable_asset.py index e65026d0e0eae..c2e4434a6533b 100644 --- a/python_modules/dagster/dagster/_core/definitions/observable_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/observable_asset.py @@ -6,7 +6,8 @@ AssetExecutionType, AssetSpec, ) -from dagster._core.definitions.decorators.asset_decorator import multi_asset +from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset +from dagster._core.definitions.source_asset import SourceAsset from dagster._core.errors import DagsterInvariantViolationError @@ -50,3 +51,39 @@ def an_asset() -> None: ) return an_asset + + +def create_unexecutable_observable_assets_def_from_source_asset(source_asset: SourceAsset): + check.invariant( + source_asset.observe_fn is None, + "Observable source assets not supported yet: observe_fn should be None", + ) + check.invariant( + source_asset.partitions_def is None, + "Observable source assets not supported yet: partitions_def should be None", + ) + check.invariant( + source_asset.auto_observe_interval_minutes is None, + "Observable source assets not supported yet: auto_observe_interval_minutes should be None", + ) + + kwargs = { + "key": source_asset.key, + "metadata": { + **source_asset.metadata, + **{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}, + }, + "group_name": source_asset.group_name, + "description": source_asset.description, + } + + if source_asset.io_manager_def: + kwargs["io_manager_def"] = source_asset.io_manager_def + elif source_asset.io_manager_key: + kwargs["io_manager_key"] = source_asset.io_manager_key + + @asset(**kwargs) + def shim_asset() -> None: + raise NotImplementedError(f"Asset {source_asset.key} is not executable") + + return shim_asset diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py index 7f4fa11ac566f..1d1a098f0be35 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py @@ -3,12 +3,19 @@ AssetKey, AssetsDefinition, AutoMaterializePolicy, + DagsterInstance, + Definitions, + IOManager, + SourceAsset, _check as check, asset, ) from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.freshness_policy import FreshnessPolicy -from dagster._core.definitions.observable_asset import create_unexecutable_observable_assets_def +from dagster._core.definitions.observable_asset import ( + create_unexecutable_observable_assets_def, + create_unexecutable_observable_assets_def_from_source_asset, +) def test_observable_asset_basic_creation() -> None: @@ -72,3 +79,44 @@ def test_observable_asset_creation_with_deps() -> None: assert assets_def.asset_deps[expected_key] == { AssetKey(["observable_asset_two"]), } + + +def test_how_source_assets_are_backwards_compatible() -> None: + class DummyIOManager(IOManager): + def handle_output(self, context, obj) -> None: + pass + + def load_input(self, context) -> str: + return "hardcoded" + + source_asset = SourceAsset(key="source_asset", io_manager_def=DummyIOManager()) + + @asset + def an_asset(source_asset: str) -> str: + return "hardcoded" + "-computed" + + defs_with_source = Definitions(assets=[source_asset, an_asset]) + + instance = DagsterInstance.ephemeral() + + result_one = defs_with_source.get_implicit_global_asset_job_def().execute_in_process( + instance=instance + ) + + assert result_one.success + assert result_one.output_for_node("an_asset") == "hardcoded-computed" + + defs_with_shim = Definitions( + assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset] + ) + + assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition) + + result_two = defs_with_shim.get_implicit_global_asset_job_def().execute_in_process( + instance=instance, + # currently we have to explicitly select the asset to exclude the source from execution + asset_selection=[AssetKey("an_asset")], + ) + + assert result_two.success + assert result_two.output_for_node("an_asset") == "hardcoded-computed"