Skip to content

Commit

Permalink
add marker to asset materialization event to load if the type was None
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Dec 14, 2023
1 parent c44d621 commit fe77305
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 5 deletions.
8 changes: 5 additions & 3 deletions python_modules/dagster/dagster/_core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,13 +956,12 @@ def step_skipped_event(step_context: IStepContext) -> "DagsterEvent":

@staticmethod
def asset_materialization(
step_context: IStepContext,
materialization: AssetMaterialization,
step_context: IStepContext, materialization: AssetMaterialization, has_value: bool = True
) -> "DagsterEvent":
return DagsterEvent.from_step(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
step_context=step_context,
event_specific_data=StepMaterializationData(materialization),
event_specific_data=StepMaterializationData(materialization, has_value=has_value),
message=(
materialization.description
if materialization.description
Expand Down Expand Up @@ -1488,13 +1487,15 @@ class StepMaterializationData(
[
("materialization", AssetMaterialization),
("asset_lineage", Sequence[AssetLineageInfo]),
("has_value", bool),
],
)
):
def __new__(
cls,
materialization: AssetMaterialization,
asset_lineage: Optional[Sequence[AssetLineageInfo]] = None,
has_value: bool = True,
):
return super(StepMaterializationData, cls).__new__(
cls,
Expand All @@ -1504,6 +1505,7 @@ def __new__(
asset_lineage=check.opt_sequence_param(
asset_lineage, "asset_lineage", of_type=AssetLineageInfo
),
has_value=has_value,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ def validate_and_coerce_op_result_to_iterator(
f" {type(result)}. {context.op_def.node_type_str.capitalize()} is explicitly defined to"
" return no results."
)

# `requires_typed_event_stream` is a mode where we require users to return/yield exactly the
# results that will be registered in the instance, without additional fancy inference (like
# wrapping `None` in an `Output`). We therefore skip any return-specific validation for this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ def _store_output(
output=output,
output_def=output_def,
manager_metadata={},
has_value=False,
)
# otherwise invoke the I/O manager
else:
Expand Down Expand Up @@ -810,6 +811,7 @@ def _gen_fn():
output=output,
output_def=output_def,
manager_metadata=manager_metadata,
has_value=True,
)

yield DagsterEvent.handled_output(
Expand All @@ -821,7 +823,7 @@ def _gen_fn():


def _log_asset_materialization_events_for_asset(
step_context, output_context, output, output_def, manager_metadata
step_context, output_context, output, output_def, manager_metadata, has_value
):
asset_key, partitions = _materializing_asset_key_and_partitions_for_output(output_context)
if asset_key:
Expand All @@ -844,7 +846,9 @@ def _log_asset_materialization_events_for_asset(

yield from (
(
DagsterEvent.asset_materialization(step_context, materialization)
DagsterEvent.asset_materialization(
step_context, materialization, has_value=has_value
)
for materialization in _get_output_asset_materializations(
asset_key,
partitions,
Expand Down
24 changes: 24 additions & 0 deletions python_modules/dagster/dagster/_core/execution/plan/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,9 +818,33 @@ def load_input_object(
def _load_input_with_input_manager(
input_manager: "InputManager", context: "InputContext"
) -> Iterator[object]:
from dagster._core.events import StepMaterializationData
from dagster._core.execution.context.system import StepExecutionContext

step_context = cast(StepExecutionContext, context.step_context)

if context.has_asset_key:
# check that the asset has been materialized
instance = (
context.instance
) # if use instance here need to test what happens when you make a context for unit testing
latest_input_materialization = instance.get_latest_materialization_event(
asset_key=context.asset_key
)

if (
latest_input_materialization is not None
and latest_input_materialization.asset_materialization is not None
):
event = latest_input_materialization.get_dagster_event()
if (
isinstance(event.event_specific_data, StepMaterializationData)
and not event.event_specific_data.has_value
):
# I/O manager was not used to store the output of the asset, which means the output was None
yield None
return

with op_execution_error_boundary(
DagsterExecutionLoadInputError,
msg_fn=lambda: f'Error occurred while loading input "{context.name}" of step "{step_context.step.key}":',
Expand Down

0 comments on commit fe77305

Please sign in to comment.