Skip to content

Commit

Permalink
add MaterializeResult
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Aug 18, 2023
1 parent 47735b7 commit 94edbff
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 10 deletions.
10 changes: 8 additions & 2 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
from dagster._core.definitions.partition_mapping import MultiPartitionMapping
from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvalidInvocationError
from dagster._core.errors import (
DagsterInvalidDefinitionError,
DagsterInvalidInvocationError,
DagsterInvariantViolationError,
)
from dagster._utils import IHasInternalInit
from dagster._utils.merger import merge_dicts
from dagster._utils.warnings import (
Expand Down Expand Up @@ -805,7 +809,9 @@ def get_output_name_for_asset_key(self, key: AssetKey) -> str:
if key == asset_key:
return output_name

check.failed(f"Asset key {key.to_user_string()} not found in AssetsDefinition")
raise DagsterInvariantViolationError(
f"Asset key {key.to_user_string()} not found in AssetsDefinition"
)

def get_op_def_for_asset_key(self, key: AssetKey) -> OpDefinition:
"""If this is an op-backed asset, returns the op def. If it's a graph-backed asset,
Expand Down
31 changes: 31 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import NamedTuple, Optional

from .events import (
AssetKey,
CoercibleToAssetKey,
)
from .metadata import MetadataUserInput


class MaterializeResult(
NamedTuple(
"_MaterializeResult",
[
("asset_key", Optional[AssetKey]),
("metadata", Optional[MetadataUserInput]),
],
)
):
def __new__(
cls,
*, # enforce kwargs
asset_key: Optional[CoercibleToAssetKey] = None,
metadata: Optional[MetadataUserInput] = None,
):
asset_key = AssetKey.from_coercible(asset_key) if asset_key else None

return super().__new__(
cls,
asset_key=asset_key,
metadata=metadata, # check?
)
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from dagster._core.definitions.asset_layer import AssetLayer
from dagster._core.definitions.op_definition import OpComputeFunction
from dagster._core.definitions.result import MaterializeResult
from dagster._core.errors import DagsterExecutionStepExecutionError, DagsterInvariantViolationError
from dagster._core.events import DagsterEvent
from dagster._core.execution.context.compute import OpExecutionContext
Expand Down Expand Up @@ -94,6 +95,7 @@ def _validate_event(event: Any, step_context: StepExecutionContext) -> OpOutputU
ExpectationResult,
AssetObservation,
DagsterEvent,
MaterializeResult,
),
):
raise DagsterInvariantViolationError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction
from dagster._core.definitions.op_definition import OpDefinition
from dagster._core.definitions.result import MaterializeResult
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.types.dagster_type import DagsterTypeKind, is_generic_output_annotation
from dagster._utils.warnings import disable_dagster_warnings
Expand Down Expand Up @@ -253,6 +254,25 @@ def validate_and_coerce_op_result_to_iterator(
mapping_key=dynamic_output.mapping_key,
metadata=dynamic_output.metadata,
)
elif isinstance(element, MaterializeResult):
yield element # coerced in to output in outer iterator
# # for now enforce ordering of tuple return results
# if element.asset_key and element.asset_key != context.asset_key_for_output(
# output_def.name
# ):
# raise DagsterInvariantViolationError(
# f"MaterializeResult returned with asset key {element.asset_key} does not"
# " match expected value"
# f" f{context.asset_key_for_output(output_def.name)}"
# )

# # leverage existing output -> asset materialization behavior
# yield Output(
# output_name=output_def.name,
# value=None,
# metadata=element.metadata,
# )

elif isinstance(element, Output):
if annotation != inspect.Parameter.empty and not is_generic_output_annotation(
annotation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
MultiPartitionKey,
get_tags_from_multi_partition_key,
)
from dagster._core.definitions.result import MaterializeResult
from dagster._core.errors import (
DagsterExecutionHandleOutputError,
DagsterInvariantViolationError,
Expand Down Expand Up @@ -74,6 +75,42 @@
from .utils import op_execution_error_boundary


def _process_asset_results_to_events(
step_context: StepExecutionContext,
user_event_sequence: Iterator[OpOutputUnion],
) -> Iterator[OpOutputUnion]:
"""Handle converting AssetResult (& AssetCheckResult soon) to their appropriate events."""
for user_event in user_event_sequence:
if isinstance(user_event, MaterializeResult):
assets_def = step_context.job_def.asset_layer.assets_def_for_node(
step_context.node_handle
)
if not assets_def:
raise DagsterInvariantViolationError(
"MaterializeResult is only valid within asset computations, no backing"
" AssetsDefinition found."
)
if user_event.asset_key:
asset_key = user_event.asset_key
else:
if len(assets_def.keys) != 1:
raise DagsterInvariantViolationError(
"MaterializeResult did not include asset_key and it can not be inferred."
f" Specify which asset_key, options are: {assets_def.keys}."
)
asset_key = assets_def.key

output_name = assets_def.get_output_name_for_asset_key(asset_key)
output = Output(
value=None,
output_name=output_name,
metadata=user_event.metadata,
)
yield output
else:
yield user_event


def _step_output_error_checked_user_event_sequence(
step_context: StepExecutionContext, user_event_sequence: Iterator[OpOutputUnion]
) -> Iterator[OpOutputUnion]:
Expand Down Expand Up @@ -403,8 +440,9 @@ def core_dagster_event_sequence_for_step(

# It is important for this loop to be indented within the
# timer block above in order for time to be recorded accurately.
for user_event in check.generator(
_step_output_error_checked_user_event_sequence(step_context, user_event_sequence)
for user_event in _step_output_error_checked_user_event_sequence(
step_context,
_process_asset_results_to_events(step_context, user_event_sequence),
):
if isinstance(user_event, DagsterEvent):
yield user_event
Expand Down
191 changes: 185 additions & 6 deletions python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@
from dagster._core.definitions import AssetIn, SourceAsset, asset, multi_asset
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.events import AssetMaterialization
from dagster._core.definitions.result import MaterializeResult
from dagster._core.errors import (
DagsterInvalidDefinitionError,
DagsterInvalidInvocationError,
DagsterInvalidPropertyError,
DagsterInvariantViolationError,
DagsterStepOutputNotFoundError,
)
from dagster._core.execution.context.compute import AssetExecutionContext
from dagster._core.instance import DagsterInstance
Expand Down Expand Up @@ -1544,13 +1547,8 @@ def test_asset_key_with_prefix():


def _exec_asset(asset_def):
asset_job = define_asset_job("testing", [asset_def]).resolve(
asset_graph=AssetGraph.from_assets([asset_def])
)

result = asset_job.execute_in_process()
result = materialize([asset_def])
assert result.success

return result.asset_materializations_for_node(asset_def.node_def.name)


Expand Down Expand Up @@ -1624,3 +1622,184 @@ def untyped():
match="has multiple outputs, but only one output was returned",
):
untyped()


def test_return_materialization():
#
# status quo - use add add_output_metadata
#
@asset
def add(context: AssetExecutionContext):
context.add_output_metadata(
metadata={"one": 1},
)

mats = _exec_asset(add)
assert len(mats) == 1
# working with core metadata repr values sucks, ie IntMetadataValue
assert "one" in mats[0].metadata
assert mats[0].tags

#
# may want to update this pattern to work better...
#
@asset
def logged(context: AssetExecutionContext):
context.log_event(
AssetMaterialization(
asset_key=context.asset_key_for_output(),
metadata={"one": 1},
)
)

mats = _exec_asset(logged)
# ... currently get implicit materialization for output + logged event
assert len(mats) == 2
assert "one" in mats[0].metadata
# assert mats[0].tags # fails
# assert "one" in mats[1].metadata # fails
assert mats[1].tags

#
# main exploration
#
@asset
def ret_untyped(context: AssetExecutionContext):
return MaterializeResult(
metadata={"one": 1},
)

mats = _exec_asset(ret_untyped)
assert len(mats) == 1, mats
assert "one" in mats[0].metadata
assert mats[0].tags

#
# key mismatch
#
@asset
def ret_mismatch(context: AssetExecutionContext):
return MaterializeResult(
asset_key="random",
metadata={"one": 1},
)

with pytest.raises(
DagsterInvariantViolationError,
match="Asset key random not found in AssetsDefinition",
):
mats = _exec_asset(ret_mismatch)


def test_return_materialization_multi_asset():
#
# yield successful
#
@multi_asset(outs={"one": AssetOut(), "two": AssetOut()})
def multi():
yield MaterializeResult(
asset_key="one",
metadata={"one": 1},
)
yield MaterializeResult(
asset_key="two",
metadata={"two": 2},
)

mats = _exec_asset(multi)

assert len(mats) == 2, mats
assert "one" in mats[0].metadata
assert mats[0].tags
assert "two" in mats[1].metadata
assert mats[1].tags

#
# missing a non optional out
#
@multi_asset(outs={"one": AssetOut(), "two": AssetOut()})
def missing():
yield MaterializeResult(
asset_key="one",
metadata={"one": 1},
)

# currently a less than ideal error
with pytest.raises(
DagsterStepOutputNotFoundError,
match=(
'Core compute for op "missing" did not return an output for non-optional output "two"'
),
):
_exec_asset(missing)

#
# missing asset_key
#
@multi_asset(outs={"one": AssetOut(), "two": AssetOut()})
def no_key():
yield MaterializeResult(
metadata={"one": 1},
)
yield MaterializeResult(
metadata={"two": 2},
)

with pytest.raises(
DagsterInvariantViolationError,
match=(
"MaterializeResult did not include asset_key and it can not be inferred. Specify which"
" asset_key, options are:"
),
):
_exec_asset(no_key)

#
# return tuple success
#
@multi_asset(outs={"one": AssetOut(), "two": AssetOut()})
def ret_multi():
return (
MaterializeResult(
asset_key="one",
metadata={"one": 1},
),
MaterializeResult(
asset_key="two",
metadata={"two": 2},
),
)

mats = _exec_asset(ret_multi)

assert len(mats) == 2, mats
assert "one" in mats[0].metadata
assert mats[0].tags
assert "two" in mats[1].metadata
assert mats[1].tags

#
# return list error
#
@multi_asset(outs={"one": AssetOut(), "two": AssetOut()})
def ret_list():
return [
MaterializeResult(
asset_key="one",
metadata={"one": 1},
),
MaterializeResult(
asset_key="two",
metadata={"two": 2},
),
]

# not the best
with pytest.raises(
DagsterInvariantViolationError,
match=(
"When using multiple outputs, either yield each output, or return a tuple containing a"
" value for each output."
),
):
_exec_asset(ret_list)

0 comments on commit 94edbff

Please sign in to comment.