diff --git a/python_modules/dagster/dagster/_core/definitions/definitions_class.py b/python_modules/dagster/dagster/_core/definitions/definitions_class.py index 38557b2ad35a7..75e74e219d0aa 100644 --- a/python_modules/dagster/dagster/_core/definitions/definitions_class.py +++ b/python_modules/dagster/dagster/_core/definitions/definitions_class.py @@ -22,6 +22,7 @@ from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey from dagster._core.definitions.executor_definition import ExecutorDefinition from dagster._core.definitions.logger_definition import LoggerDefinition +from dagster._core.errors import DagsterInvariantViolationError from dagster._core.execution.build_resources import wrap_resources_for_execution from dagster._core.execution.with_resources import with_resources from dagster._core.executor.base import Executor @@ -540,6 +541,14 @@ def get_implicit_job_def_for_assets( ) -> Optional[JobDefinition]: return self.get_repository_def().get_implicit_job_def_for_assets(asset_keys) + def get_assets_def(self, key: CoercibleToAssetKey) -> AssetsDefinition: + asset_key = AssetKey.from_coercible(key) + for assets_def in self.get_asset_graph().assets: + if asset_key in assets_def.keys: + return assets_def + + raise DagsterInvariantViolationError(f"Could not find asset {asset_key}") + @cached_method def get_repository_def(self) -> RepositoryDefinition: """Definitions is implemented by wrapping RepositoryDefinition. Get that underlying object diff --git a/python_modules/dagster/dagster/_core/definitions/observable_asset.py b/python_modules/dagster/dagster/_core/definitions/observable_asset.py index e65026d0e0eae..664c3d6beb5ef 100644 --- a/python_modules/dagster/dagster/_core/definitions/observable_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/observable_asset.py @@ -6,7 +6,8 @@ AssetExecutionType, AssetSpec, ) -from dagster._core.definitions.decorators.asset_decorator import multi_asset +from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset +from dagster._core.definitions.source_asset import SourceAsset from dagster._core.errors import DagsterInvariantViolationError @@ -50,3 +51,36 @@ def an_asset() -> None: ) return an_asset + + +def create_unexecutable_observable_assets_def_from_source_asset(source_asset: SourceAsset): + check.invariant( + source_asset.observe_fn is None, + "Observable source assets not supported yet: observe_fn should be None", + ) + check.invariant( + source_asset.auto_observe_interval_minutes is None, + "Observable source assets not supported yet: auto_observe_interval_minutes should be None", + ) + + kwargs = { + "key": source_asset.key, + "metadata": { + **source_asset.metadata, + **{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}, + }, + "group_name": source_asset.group_name, + "description": source_asset.description, + "partitions_def": source_asset.partitions_def, + } + + if source_asset.io_manager_def: + kwargs["io_manager_def"] = source_asset.io_manager_def + elif source_asset.io_manager_key: + kwargs["io_manager_key"] = source_asset.io_manager_key + + @asset(**kwargs) + def shim_asset() -> None: + raise NotImplementedError(f"Asset {source_asset.key} is not executable") + + return shim_asset diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py index 7f4fa11ac566f..027ef0b43f02d 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py @@ -1,14 +1,26 @@ +from typing import AbstractSet, Iterable + import pytest from dagster import ( + AssetExecutionContext, AssetKey, AssetsDefinition, AutoMaterializePolicy, + DagsterInstance, + Definitions, + IOManager, + JobDefinition, + SourceAsset, _check as check, asset, ) from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.freshness_policy import FreshnessPolicy -from dagster._core.definitions.observable_asset import create_unexecutable_observable_assets_def +from dagster._core.definitions.observable_asset import ( + create_unexecutable_observable_assets_def, + create_unexecutable_observable_assets_def_from_source_asset, +) +from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition def test_observable_asset_basic_creation() -> None: @@ -72,3 +84,111 @@ def test_observable_asset_creation_with_deps() -> None: assert assets_def.asset_deps[expected_key] == { AssetKey(["observable_asset_two"]), } + + +def test_how_source_assets_are_backwards_compatible() -> None: + class DummyIOManager(IOManager): + def handle_output(self, context, obj) -> None: + pass + + def load_input(self, context) -> str: + return "hardcoded" + + source_asset = SourceAsset(key="source_asset", io_manager_def=DummyIOManager()) + + @asset + def an_asset(source_asset: str) -> str: + return "hardcoded" + "-computed" + + defs_with_source = Definitions(assets=[source_asset, an_asset]) + + instance = DagsterInstance.ephemeral() + + result_one = defs_with_source.get_implicit_global_asset_job_def().execute_in_process( + instance=instance + ) + + assert result_one.success + assert result_one.output_for_node("an_asset") == "hardcoded-computed" + + defs_with_shim = Definitions( + assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset] + ) + + assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition) + + result_two = defs_with_shim.get_implicit_global_asset_job_def().execute_in_process( + instance=instance, + # currently we have to explicitly select the asset to exclude the source from execution + asset_selection=[AssetKey("an_asset")], + ) + + assert result_two.success + assert result_two.output_for_node("an_asset") == "hardcoded-computed" + + +def get_job_for_assets(defs: Definitions, *coercibles_or_defs) -> JobDefinition: + job_def = defs.get_implicit_job_def_for_assets(set_from_coercibles_or_defs(coercibles_or_defs)) + assert job_def, "Expected to find a job def" + return job_def + + +def set_from_coercibles_or_defs(coercibles_or_defs: Iterable) -> AbstractSet["AssetKey"]: + return set( + [ + AssetKey.from_coercible_or_definition(coercible_or_def) + for coercible_or_def in coercibles_or_defs + ] + ) + + +def test_how_partitioned_source_assets_are_backwards_compatible() -> None: + class DummyIOManager(IOManager): + def handle_output(self, context, obj) -> None: + pass + + def load_input(self, context) -> str: + return "hardcoded" + + partitions_def = DailyPartitionsDefinition(start_date="2021-01-01") + source_asset = SourceAsset( + key="source_asset", io_manager_def=DummyIOManager(), partitions_def=partitions_def + ) + + @asset(partitions_def=partitions_def) + def an_asset(context: AssetExecutionContext, source_asset: str) -> str: + return source_asset + "-computed-" + context.partition_key + + assert an_asset.partitions_def is partitions_def + assert source_asset.partitions_def is partitions_def + + defs_with_source = Definitions(assets=[source_asset, an_asset]) + + instance = DagsterInstance.ephemeral() + + job_def = get_job_for_assets(defs_with_source, an_asset) + + assert job_def + result_one = job_def.execute_in_process(instance=instance, partition_key="2021-01-02") + + assert result_one.success + assert result_one.output_for_node("an_asset") == "hardcoded-computed-2021-01-02" + + shimmed_source_asset = create_unexecutable_observable_assets_def_from_source_asset(source_asset) + defs_with_shim = Definitions( + assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset] + ) + + assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition) + + job_def_two = get_job_for_assets(defs_with_shim, an_asset, shimmed_source_asset) + + result_two = job_def_two.execute_in_process( + instance=instance, + # currently we have to explicitly select the asset to exclude the source from execution + asset_selection=[AssetKey("an_asset")], + partition_key="2021-01-03", + ) + + assert result_two.success + assert result_two.output_for_node("an_asset") == "hardcoded-computed-2021-01-03"