diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index 1b8e1cf85a26f..f7e36ffffb489 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -26,12 +26,16 @@ class AssetExecutionType(Enum): + OBSERVATION = "OBSERVATION" UNEXECUTABLE = "UNEXECUTABLE" MATERIALIZATION = "MATERIALIZATION" @staticmethod def is_executable(varietal_str: Optional[str]) -> bool: - return AssetExecutionType.str_to_enum(varietal_str) in {AssetExecutionType.MATERIALIZATION} + return AssetExecutionType.str_to_enum(varietal_str) in { + AssetExecutionType.MATERIALIZATION, + AssetExecutionType.OBSERVATION, + } @staticmethod def str_to_enum(varietal_str: Optional[str]) -> "AssetExecutionType": diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 7d61a5fede293..40a507f544bd2 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -21,6 +21,7 @@ from dagster._annotations import experimental_param, public from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec from dagster._core.definitions.asset_layer import get_dep_node_handles_of_graph_backed_asset +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType from dagster._core.definitions.freshness_policy import FreshnessPolicy @@ -905,6 +906,16 @@ def is_asset_executable(self, asset_key: AssetKey) -> bool: self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) ) + def asset_execution_type_for_asset(self, asset_key: AssetKey) -> AssetExecutionType: + from dagster._core.definitions.asset_spec import ( + SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, + AssetExecutionType, + ) + + return AssetExecutionType.str_to_enum( + self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) + ) + def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]: return self._partition_mappings.get(self._keys_by_input_name[input_name]) diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset.py b/python_modules/dagster/dagster/_core/definitions/external_asset.py index 1f813fb63a1a4..02d5cde96ce63 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset.py @@ -138,6 +138,12 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets else {} ) + injected_metadata = ( + {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value} + if source_asset.observe_fn is None + else {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value} + ) + kwargs = { "key": source_asset.key, "metadata": { diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 26cfc3a7dba7f..b42de69bfa2fc 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -25,6 +25,7 @@ TypeCheck, ) from dagster._core.definitions.asset_check_result import AssetCheckResult +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.data_version import ( CODE_VERSION_TAG, DATA_VERSION_IS_USER_PROVIDED_TAG, @@ -779,15 +780,38 @@ def _gen_fn(): asset_key, partitions = _asset_key_and_partitions_for_output(output_context) if asset_key: - for materialization in _get_output_asset_materializations( - asset_key, - partitions, - output, - output_def, - manager_metadata, - step_context, - ): - yield DagsterEvent.asset_materialization(step_context, materialization) + asset_layer = step_context.job_def.asset_layer + execution_type = ( + asset_layer.assets_def_for_asset(asset_key).asset_execution_type_for_asset(asset_key) + if asset_layer.has_assets_def_for_asset(asset_key) + else AssetExecutionType.MATERIALIZATION + ) + + check.invariant( + execution_type != AssetExecutionType.UNEXECUTABLE, + "There should never be unexecutable assets here", + ) + + check.invariant( + execution_type in {AssetExecutionType.MATERIALIZATION, AssetExecutionType.OBSERVATION}, + f"Unexpected asset execution type {execution_type}", + ) + + yield from ( + ( + DagsterEvent.asset_materialization(step_context, materialization) + for materialization in _get_output_asset_materializations( + asset_key, + partitions, + output, + output_def, + manager_metadata, + step_context, + ) + ) + if execution_type == AssetExecutionType.MATERIALIZATION + else () + ) yield DagsterEvent.handled_output( step_context, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py index 3a8470691c5d9..9bcb95e5f381c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py @@ -211,7 +211,9 @@ def test_observable_source_asset_decorator() -> None: def an_observable_source_asset() -> DataVersion: return DataVersion("foo") - defs = Definitions(assets=[create_external_asset_from_source_asset(an_observable_source_asset)]) + assets_def = create_external_asset_from_source_asset(an_observable_source_asset) + assert assets_def.is_asset_executable(an_observable_source_asset.key) + defs = Definitions(assets=[assets_def]) instance = DagsterInstance.ephemeral() result = defs.get_implicit_global_asset_job_def().execute_in_process(instance=instance) @@ -225,6 +227,4 @@ def an_observable_source_asset() -> DataVersion: assert observation_event.asset_observation_data.asset_observation.data_version == "foo" all_materializations = result.get_asset_materialization_events() - # Note this does not make sense. We need to make framework changes to allow for the omission of - # a materialzation event - assert len(all_materializations) == 1 + assert len(all_materializations) == 0