Skip to content

Commit

Permalink
add MaterializeResult (#15932)
Browse files Browse the repository at this point in the history
The latest evolution of #14931
& #15392 intentionally
aligned with #15928 this PR
adds support for a new "Result" return type from assets that do not deal
with "Outputs" to be able to communicate materialization metadata.

## How I Tested These Changes

added tests.
  • Loading branch information
alangenfeld authored Aug 25, 2023
1 parent bfe788c commit 711d2e5
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 21 deletions.
10 changes: 8 additions & 2 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
)
from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvalidInvocationError
from dagster._core.errors import (
DagsterInvalidDefinitionError,
DagsterInvalidInvocationError,
DagsterInvariantViolationError,
)
from dagster._utils import IHasInternalInit
from dagster._utils.merger import merge_dicts
from dagster._utils.warnings import (
Expand Down Expand Up @@ -848,7 +852,9 @@ def get_output_name_for_asset_key(self, key: AssetKey) -> str:
if key == asset_key:
return output_name

check.failed(f"Asset key {key.to_user_string()} not found in AssetsDefinition")
raise DagsterInvariantViolationError(
f"Asset key {key.to_user_string()} not found in AssetsDefinition"
)

def get_op_def_for_asset_key(self, key: AssetKey) -> OpDefinition:
"""If this is an op-backed asset, returns the op def. If it's a graph-backed asset,
Expand Down
43 changes: 43 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import NamedTuple, Optional

from dagster._annotations import PublicAttr, experimental

from .events import (
AssetKey,
CoercibleToAssetKey,
)
from .metadata import MetadataUserInput


@experimental
class MaterializeResult(
NamedTuple(
"_MaterializeResult",
[
("asset_key", PublicAttr[Optional[AssetKey]]),
("metadata", PublicAttr[Optional[MetadataUserInput]]),
],
)
):
"""An object representing a successful materialization of an asset. These can be returned from
@asset and @multi_asset decorated functions to pass metadata or specify specific assets were
materialized.
Attributes:
asset_key (Optional[AssetKey]): Optional in @asset, required in @multi_asset to discern which asset this refers to.
metadata (Optional[MetadataUserInput]): Metadata to record with the corresponding AssetMaterialization event.
"""

def __new__(
cls,
*, # enforce kwargs
asset_key: Optional[CoercibleToAssetKey] = None,
metadata: Optional[MetadataUserInput] = None,
):
asset_key = AssetKey.from_coercible(asset_key) if asset_key else None

return super().__new__(
cls,
asset_key=asset_key,
metadata=metadata, # check?
)
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)
from dagster._core.definitions.asset_layer import AssetLayer
from dagster._core.definitions.op_definition import OpComputeFunction
from dagster._core.definitions.result import MaterializeResult
from dagster._core.errors import DagsterExecutionStepExecutionError, DagsterInvariantViolationError
from dagster._core.events import DagsterEvent
from dagster._core.execution.context.compute import OpExecutionContext
Expand All @@ -50,6 +51,7 @@
DagsterEvent,
AssetCheckEvaluation,
AssetCheckResult,
MaterializeResult,
]


Expand Down Expand Up @@ -102,6 +104,7 @@ def _validate_event(event: Any, step_context: StepExecutionContext) -> OpOutputU
DagsterEvent,
AssetCheckResult,
AssetCheckEvaluation,
MaterializeResult,
),
):
raise DagsterInvariantViolationError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
)
from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction
from dagster._core.definitions.op_definition import OpDefinition
from dagster._core.definitions.result import MaterializeResult
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.types.dagster_type import DagsterTypeKind, is_generic_output_annotation
from dagster._utils.warnings import disable_dagster_warnings
Expand Down Expand Up @@ -256,6 +257,8 @@ def validate_and_coerce_op_result_to_iterator(
mapping_key=dynamic_output.mapping_key,
metadata=dynamic_output.metadata,
)
elif isinstance(element, MaterializeResult):
yield element # coerced in to Output in outer iterator
elif isinstance(element, Output):
if annotation != inspect.Parameter.empty and not is_generic_output_annotation(
annotation
Expand Down
56 changes: 43 additions & 13 deletions python_modules/dagster/dagster/_core/execution/plan/execute_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import dagster._check as check
from dagster._core.definitions import (
AssetCheckEvaluation,
AssetCheckResult,
AssetKey,
AssetMaterialization,
AssetObservation,
Expand All @@ -25,6 +24,7 @@
OutputDefinition,
TypeCheck,
)
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from dagster._core.definitions.data_version import (
CODE_VERSION_TAG,
Expand All @@ -47,6 +47,7 @@
MultiPartitionKey,
get_tags_from_multi_partition_key,
)
from dagster._core.definitions.result import MaterializeResult
from dagster._core.errors import (
DagsterAssetCheckFailedError,
DagsterExecutionHandleOutputError,
Expand Down Expand Up @@ -78,16 +79,47 @@
from .utils import op_execution_error_boundary


def _asset_check_results_to_outputs_and_evaluations(
step_context: StepExecutionContext, user_event_sequence: Iterator[OpOutputUnion]
def _process_asset_results_to_events(
step_context: StepExecutionContext,
user_event_sequence: Iterator[OpOutputUnion],
) -> Iterator[OpOutputUnion]:
"""We convert each AssetCheckResult to two events:
- An Output, which allows downstream steps to depend on it
- An AssetCheckEvaluation, which combines the check result with information from the context
to create a full picture of the asset check's evaluation.
"""Handle converting MaterializeResult (& AssetCheckResult soon) to their appropriate events.
MaterializeResults get converted to an Output event, which is later use to derive an AssetMaterialization.
AssetCheckResult get converted to two events:
- An Output, which allows downstream steps to depend on it
- An AssetCheckEvaluation, which combines the check result with information from the context
to create a full picture of the asset check's evaluation.
"""
for user_event in user_event_sequence:
if isinstance(user_event, AssetCheckResult):
if isinstance(user_event, MaterializeResult):
assets_def = step_context.job_def.asset_layer.assets_def_for_node(
step_context.node_handle
)
if not assets_def:
raise DagsterInvariantViolationError(
"MaterializeResult is only valid within asset computations, no backing"
" AssetsDefinition found."
)
if user_event.asset_key:
asset_key = user_event.asset_key
else:
if len(assets_def.keys) != 1:
raise DagsterInvariantViolationError(
"MaterializeResult did not include asset_key and it can not be inferred."
f" Specify which asset_key, options are: {assets_def.keys}."
)
asset_key = assets_def.key

output_name = assets_def.get_output_name_for_asset_key(asset_key)
output = Output(
value=None,
output_name=output_name,
metadata=user_event.metadata,
)
yield output
elif isinstance(user_event, AssetCheckResult):
asset_check_evaluation = user_event.to_asset_check_evaluation(step_context)
spec = check.not_none(
step_context.job_def.asset_layer.get_spec_for_asset_check(
Expand Down Expand Up @@ -446,11 +478,9 @@ def core_dagster_event_sequence_for_step(

# It is important for this loop to be indented within the
# timer block above in order for time to be recorded accurately.
for user_event in check.generator(
_step_output_error_checked_user_event_sequence(
step_context,
_asset_check_results_to_outputs_and_evaluations(step_context, user_event_sequence),
)
for user_event in _step_output_error_checked_user_event_sequence(
step_context,
_process_asset_results_to_events(step_context, user_event_sequence),
):
if isinstance(user_event, DagsterEvent):
yield user_event
Expand Down
Loading

0 comments on commit 711d2e5

Please sign in to comment.