diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 9ad0a10d8b7dc..5fc1cc029b97d 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -30,7 +30,11 @@ from dagster._core.definitions.partition_mapping import MultiPartitionMapping from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition -from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvalidInvocationError +from dagster._core.errors import ( + DagsterInvalidDefinitionError, + DagsterInvalidInvocationError, + DagsterInvariantViolationError, +) from dagster._utils import IHasInternalInit from dagster._utils.merger import merge_dicts from dagster._utils.warnings import ( @@ -805,7 +809,9 @@ def get_output_name_for_asset_key(self, key: AssetKey) -> str: if key == asset_key: return output_name - check.failed(f"Asset key {key.to_user_string()} not found in AssetsDefinition") + raise DagsterInvariantViolationError( + f"Asset key {key.to_user_string()} not found in AssetsDefinition" + ) def get_op_def_for_asset_key(self, key: AssetKey) -> OpDefinition: """If this is an op-backed asset, returns the op def. If it's a graph-backed asset, diff --git a/python_modules/dagster/dagster/_core/definitions/result.py b/python_modules/dagster/dagster/_core/definitions/result.py new file mode 100644 index 0000000000000..337c4e68253f1 --- /dev/null +++ b/python_modules/dagster/dagster/_core/definitions/result.py @@ -0,0 +1,31 @@ +from typing import NamedTuple, Optional + +from .events import ( + AssetKey, + CoercibleToAssetKey, +) +from .metadata import MetadataUserInput + + +class MaterializeResult( + NamedTuple( + "_MaterializeResult", + [ + ("asset_key", Optional[AssetKey]), + ("metadata", Optional[MetadataUserInput]), + ], + ) +): + def __new__( + cls, + *, # enforce kwargs + asset_key: Optional[CoercibleToAssetKey] = None, + metadata: Optional[MetadataUserInput] = None, + ): + asset_key = AssetKey.from_coercible(asset_key) if asset_key else None + + return super().__new__( + cls, + asset_key=asset_key, + metadata=metadata, # check? + ) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index 726cc858e7505..1f77f9374e86a 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -27,6 +27,7 @@ ) from dagster._core.definitions.asset_layer import AssetLayer from dagster._core.definitions.op_definition import OpComputeFunction +from dagster._core.definitions.result import MaterializeResult from dagster._core.errors import DagsterExecutionStepExecutionError, DagsterInvariantViolationError from dagster._core.events import DagsterEvent from dagster._core.execution.context.compute import OpExecutionContext @@ -94,6 +95,7 @@ def _validate_event(event: Any, step_context: StepExecutionContext) -> OpOutputU ExpectationResult, AssetObservation, DagsterEvent, + MaterializeResult, ), ): raise DagsterInvariantViolationError( diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 340dadfac59f9..f9a905368f3da 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -27,6 +27,7 @@ ) from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction from dagster._core.definitions.op_definition import OpDefinition +from dagster._core.definitions.result import MaterializeResult from dagster._core.errors import DagsterInvariantViolationError from dagster._core.types.dagster_type import DagsterTypeKind, is_generic_output_annotation from dagster._utils.warnings import disable_dagster_warnings @@ -253,6 +254,8 @@ def validate_and_coerce_op_result_to_iterator( mapping_key=dynamic_output.mapping_key, metadata=dynamic_output.metadata, ) + elif isinstance(element, MaterializeResult): + yield element # coerced in to Output in outer iterator elif isinstance(element, Output): if annotation != inspect.Parameter.empty and not is_generic_output_annotation( annotation diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 48328b125971b..643505ff79d45 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -44,6 +44,7 @@ MultiPartitionKey, get_tags_from_multi_partition_key, ) +from dagster._core.definitions.result import MaterializeResult from dagster._core.errors import ( DagsterExecutionHandleOutputError, DagsterInvariantViolationError, @@ -74,6 +75,42 @@ from .utils import op_execution_error_boundary +def _process_asset_results_to_events( + step_context: StepExecutionContext, + user_event_sequence: Iterator[OpOutputUnion], +) -> Iterator[OpOutputUnion]: + """Handle converting AssetResult (& AssetCheckResult soon) to their appropriate events.""" + for user_event in user_event_sequence: + if isinstance(user_event, MaterializeResult): + assets_def = step_context.job_def.asset_layer.assets_def_for_node( + step_context.node_handle + ) + if not assets_def: + raise DagsterInvariantViolationError( + "MaterializeResult is only valid within asset computations, no backing" + " AssetsDefinition found." + ) + if user_event.asset_key: + asset_key = user_event.asset_key + else: + if len(assets_def.keys) != 1: + raise DagsterInvariantViolationError( + "MaterializeResult did not include asset_key and it can not be inferred." + f" Specify which asset_key, options are: {assets_def.keys}." + ) + asset_key = assets_def.key + + output_name = assets_def.get_output_name_for_asset_key(asset_key) + output = Output( + value=None, + output_name=output_name, + metadata=user_event.metadata, + ) + yield output + else: + yield user_event + + def _step_output_error_checked_user_event_sequence( step_context: StepExecutionContext, user_event_sequence: Iterator[OpOutputUnion] ) -> Iterator[OpOutputUnion]: @@ -403,8 +440,9 @@ def core_dagster_event_sequence_for_step( # It is important for this loop to be indented within the # timer block above in order for time to be recorded accurately. - for user_event in check.generator( - _step_output_error_checked_user_event_sequence(step_context, user_event_sequence) + for user_event in _step_output_error_checked_user_event_sequence( + step_context, + _process_asset_results_to_events(step_context, user_event_sequence), ): if isinstance(user_event, DagsterEvent): yield user_event diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py index b791a317b2775..2eba188c85aad 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py @@ -36,11 +36,14 @@ from dagster._core.definitions import AssetIn, SourceAsset, asset, multi_asset from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy +from dagster._core.definitions.events import AssetMaterialization +from dagster._core.definitions.result import MaterializeResult from dagster._core.errors import ( DagsterInvalidDefinitionError, DagsterInvalidInvocationError, DagsterInvalidPropertyError, DagsterInvariantViolationError, + DagsterStepOutputNotFoundError, ) from dagster._core.execution.context.compute import AssetExecutionContext from dagster._core.instance import DagsterInstance @@ -1544,13 +1547,8 @@ def test_asset_key_with_prefix(): def _exec_asset(asset_def): - asset_job = define_asset_job("testing", [asset_def]).resolve( - asset_graph=AssetGraph.from_assets([asset_def]) - ) - - result = asset_job.execute_in_process() + result = materialize([asset_def]) assert result.success - return result.asset_materializations_for_node(asset_def.node_def.name) @@ -1624,3 +1622,184 @@ def untyped(): match="has multiple outputs, but only one output was returned", ): untyped() + + +def test_return_materialization(): + # + # status quo - use add add_output_metadata + # + @asset + def add(context: AssetExecutionContext): + context.add_output_metadata( + metadata={"one": 1}, + ) + + mats = _exec_asset(add) + assert len(mats) == 1 + # working with core metadata repr values sucks, ie IntMetadataValue + assert "one" in mats[0].metadata + assert mats[0].tags + + # + # may want to update this pattern to work better... + # + @asset + def logged(context: AssetExecutionContext): + context.log_event( + AssetMaterialization( + asset_key=context.asset_key_for_output(), + metadata={"one": 1}, + ) + ) + + mats = _exec_asset(logged) + # ... currently get implicit materialization for output + logged event + assert len(mats) == 2 + assert "one" in mats[0].metadata + # assert mats[0].tags # fails + # assert "one" in mats[1].metadata # fails + assert mats[1].tags + + # + # main exploration + # + @asset + def ret_untyped(context: AssetExecutionContext): + return MaterializeResult( + metadata={"one": 1}, + ) + + mats = _exec_asset(ret_untyped) + assert len(mats) == 1, mats + assert "one" in mats[0].metadata + assert mats[0].tags + + # + # key mismatch + # + @asset + def ret_mismatch(context: AssetExecutionContext): + return MaterializeResult( + asset_key="random", + metadata={"one": 1}, + ) + + with pytest.raises( + DagsterInvariantViolationError, + match="Asset key random not found in AssetsDefinition", + ): + mats = _exec_asset(ret_mismatch) + + +def test_return_materialization_multi_asset(): + # + # yield successful + # + @multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + def multi(): + yield MaterializeResult( + asset_key="one", + metadata={"one": 1}, + ) + yield MaterializeResult( + asset_key="two", + metadata={"two": 2}, + ) + + mats = _exec_asset(multi) + + assert len(mats) == 2, mats + assert "one" in mats[0].metadata + assert mats[0].tags + assert "two" in mats[1].metadata + assert mats[1].tags + + # + # missing a non optional out + # + @multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + def missing(): + yield MaterializeResult( + asset_key="one", + metadata={"one": 1}, + ) + + # currently a less than ideal error + with pytest.raises( + DagsterStepOutputNotFoundError, + match=( + 'Core compute for op "missing" did not return an output for non-optional output "two"' + ), + ): + _exec_asset(missing) + + # + # missing asset_key + # + @multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + def no_key(): + yield MaterializeResult( + metadata={"one": 1}, + ) + yield MaterializeResult( + metadata={"two": 2}, + ) + + with pytest.raises( + DagsterInvariantViolationError, + match=( + "MaterializeResult did not include asset_key and it can not be inferred. Specify which" + " asset_key, options are:" + ), + ): + _exec_asset(no_key) + + # + # return tuple success + # + @multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + def ret_multi(): + return ( + MaterializeResult( + asset_key="one", + metadata={"one": 1}, + ), + MaterializeResult( + asset_key="two", + metadata={"two": 2}, + ), + ) + + mats = _exec_asset(ret_multi) + + assert len(mats) == 2, mats + assert "one" in mats[0].metadata + assert mats[0].tags + assert "two" in mats[1].metadata + assert mats[1].tags + + # + # return list error + # + @multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + def ret_list(): + return [ + MaterializeResult( + asset_key="one", + metadata={"one": 1}, + ), + MaterializeResult( + asset_key="two", + metadata={"two": 2}, + ), + ] + + # not the best + with pytest.raises( + DagsterInvariantViolationError, + match=( + "When using multiple outputs, either yield each output, or return a tuple containing a" + " value for each output." + ), + ): + _exec_asset(ret_list)