From fe773050a7053070323726c006d4671aa72d42a2 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 14 Dec 2023 13:33:28 -0500 Subject: [PATCH] add marker to asset materialization event to load if the type was None --- .../dagster/dagster/_core/events/__init__.py | 8 ++++--- .../_core/execution/plan/compute_generator.py | 1 + .../_core/execution/plan/execute_step.py | 8 +++++-- .../dagster/_core/execution/plan/inputs.py | 24 +++++++++++++++++++ 4 files changed, 36 insertions(+), 5 deletions(-) diff --git a/python_modules/dagster/dagster/_core/events/__init__.py b/python_modules/dagster/dagster/_core/events/__init__.py index 0eece4a95e0d3..f5eb5b24a46ab 100644 --- a/python_modules/dagster/dagster/_core/events/__init__.py +++ b/python_modules/dagster/dagster/_core/events/__init__.py @@ -956,13 +956,12 @@ def step_skipped_event(step_context: IStepContext) -> "DagsterEvent": @staticmethod def asset_materialization( - step_context: IStepContext, - materialization: AssetMaterialization, + step_context: IStepContext, materialization: AssetMaterialization, has_value: bool = True ) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.ASSET_MATERIALIZATION, step_context=step_context, - event_specific_data=StepMaterializationData(materialization), + event_specific_data=StepMaterializationData(materialization, has_value=has_value), message=( materialization.description if materialization.description @@ -1488,6 +1487,7 @@ class StepMaterializationData( [ ("materialization", AssetMaterialization), ("asset_lineage", Sequence[AssetLineageInfo]), + ("has_value", bool), ], ) ): @@ -1495,6 +1495,7 @@ def __new__( cls, materialization: AssetMaterialization, asset_lineage: Optional[Sequence[AssetLineageInfo]] = None, + has_value: bool = True, ): return super(StepMaterializationData, cls).__new__( cls, @@ -1504,6 +1505,7 @@ def __new__( asset_lineage=check.opt_sequence_param( asset_lineage, "asset_lineage", of_type=AssetLineageInfo ), + has_value=has_value, ) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 375be39a7ea43..2ffdb26ed8f53 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -265,6 +265,7 @@ def validate_and_coerce_op_result_to_iterator( f" {type(result)}. {context.op_def.node_type_str.capitalize()} is explicitly defined to" " return no results." ) + # `requires_typed_event_stream` is a mode where we require users to return/yield exactly the # results that will be registered in the instance, without additional fancy inference (like # wrapping `None` in an `Output`). We therefore skip any return-specific validation for this diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 1c323d67b4165..69c8fb423d19e 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -726,6 +726,7 @@ def _store_output( output=output, output_def=output_def, manager_metadata={}, + has_value=False, ) # otherwise invoke the I/O manager else: @@ -810,6 +811,7 @@ def _gen_fn(): output=output, output_def=output_def, manager_metadata=manager_metadata, + has_value=True, ) yield DagsterEvent.handled_output( @@ -821,7 +823,7 @@ def _gen_fn(): def _log_asset_materialization_events_for_asset( - step_context, output_context, output, output_def, manager_metadata + step_context, output_context, output, output_def, manager_metadata, has_value ): asset_key, partitions = _materializing_asset_key_and_partitions_for_output(output_context) if asset_key: @@ -844,7 +846,9 @@ def _log_asset_materialization_events_for_asset( yield from ( ( - DagsterEvent.asset_materialization(step_context, materialization) + DagsterEvent.asset_materialization( + step_context, materialization, has_value=has_value + ) for materialization in _get_output_asset_materializations( asset_key, partitions, diff --git a/python_modules/dagster/dagster/_core/execution/plan/inputs.py b/python_modules/dagster/dagster/_core/execution/plan/inputs.py index bbea78df5a087..2d7fc25fb59fb 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/inputs.py +++ b/python_modules/dagster/dagster/_core/execution/plan/inputs.py @@ -818,9 +818,33 @@ def load_input_object( def _load_input_with_input_manager( input_manager: "InputManager", context: "InputContext" ) -> Iterator[object]: + from dagster._core.events import StepMaterializationData from dagster._core.execution.context.system import StepExecutionContext step_context = cast(StepExecutionContext, context.step_context) + + if context.has_asset_key: + # check that the asset has been materialized + instance = ( + context.instance + ) # if use instance here need to test what happens when you make a context for unit testing + latest_input_materialization = instance.get_latest_materialization_event( + asset_key=context.asset_key + ) + + if ( + latest_input_materialization is not None + and latest_input_materialization.asset_materialization is not None + ): + event = latest_input_materialization.get_dagster_event() + if ( + isinstance(event.event_specific_data, StepMaterializationData) + and not event.event_specific_data.has_value + ): + # I/O manager was not used to store the output of the asset, which means the output was None + yield None + return + with op_execution_error_boundary( DagsterExecutionLoadInputError, msg_fn=lambda: f'Error occurred while loading input "{context.name}" of step "{step_context.step.key}":',