diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 135c53c568c88..8e4a9c9b4afa2 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -37,7 +37,11 @@ ) from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition -from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvalidInvocationError +from dagster._core.errors import ( + DagsterInvalidDefinitionError, + DagsterInvalidInvocationError, + DagsterInvariantViolationError, +) from dagster._utils import IHasInternalInit from dagster._utils.merger import merge_dicts from dagster._utils.warnings import ( @@ -848,7 +852,9 @@ def get_output_name_for_asset_key(self, key: AssetKey) -> str: if key == asset_key: return output_name - check.failed(f"Asset key {key.to_user_string()} not found in AssetsDefinition") + raise DagsterInvariantViolationError( + f"Asset key {key.to_user_string()} not found in AssetsDefinition" + ) def get_op_def_for_asset_key(self, key: AssetKey) -> OpDefinition: """If this is an op-backed asset, returns the op def. If it's a graph-backed asset, diff --git a/python_modules/dagster/dagster/_core/definitions/result.py b/python_modules/dagster/dagster/_core/definitions/result.py new file mode 100644 index 0000000000000..216502d509d56 --- /dev/null +++ b/python_modules/dagster/dagster/_core/definitions/result.py @@ -0,0 +1,43 @@ +from typing import NamedTuple, Optional + +from dagster._annotations import PublicAttr, experimental + +from .events import ( + AssetKey, + CoercibleToAssetKey, +) +from .metadata import MetadataUserInput + + +@experimental +class MaterializeResult( + NamedTuple( + "_MaterializeResult", + [ + ("asset_key", PublicAttr[Optional[AssetKey]]), + ("metadata", PublicAttr[Optional[MetadataUserInput]]), + ], + ) +): + """An object representing a successful materialization of an asset. These can be returned from + @asset and @multi_asset decorated functions to pass metadata or specify specific assets were + materialized. + + Attributes: + asset_key (Optional[AssetKey]): Optional in @asset, required in @multi_asset to discern which asset this refers to. + metadata (Optional[MetadataUserInput]): Metadata to record with the corresponding AssetMaterialization event. + """ + + def __new__( + cls, + *, # enforce kwargs + asset_key: Optional[CoercibleToAssetKey] = None, + metadata: Optional[MetadataUserInput] = None, + ): + asset_key = AssetKey.from_coercible(asset_key) if asset_key else None + + return super().__new__( + cls, + asset_key=asset_key, + metadata=metadata, # check? + ) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index 60d802cd28b04..d01579d484198 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -29,6 +29,7 @@ ) from dagster._core.definitions.asset_layer import AssetLayer from dagster._core.definitions.op_definition import OpComputeFunction +from dagster._core.definitions.result import MaterializeResult from dagster._core.errors import DagsterExecutionStepExecutionError, DagsterInvariantViolationError from dagster._core.events import DagsterEvent from dagster._core.execution.context.compute import OpExecutionContext @@ -50,6 +51,7 @@ DagsterEvent, AssetCheckEvaluation, AssetCheckResult, + MaterializeResult, ] @@ -100,6 +102,7 @@ def _validate_event(event: Any, step_context: StepExecutionContext) -> OpOutputU DagsterEvent, AssetCheckResult, AssetCheckEvaluation, + MaterializeResult, ), ): raise DagsterInvariantViolationError( diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 98f670d09d52e..dce65ea88fdc9 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -28,6 +28,7 @@ ) from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction from dagster._core.definitions.op_definition import OpDefinition +from dagster._core.definitions.result import MaterializeResult from dagster._core.errors import DagsterInvariantViolationError from dagster._core.types.dagster_type import DagsterTypeKind, is_generic_output_annotation from dagster._utils.warnings import disable_dagster_warnings @@ -256,6 +257,8 @@ def validate_and_coerce_op_result_to_iterator( mapping_key=dynamic_output.mapping_key, metadata=dynamic_output.metadata, ) + elif isinstance(element, MaterializeResult): + yield element # coerced in to Output in outer iterator elif isinstance(element, Output): if annotation != inspect.Parameter.empty and not is_generic_output_annotation( annotation diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index de54ab64dddad..69212e5805e6d 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -16,7 +16,6 @@ import dagster._check as check from dagster._core.definitions import ( AssetCheckEvaluation, - AssetCheckResult, AssetKey, AssetMaterialization, AssetObservation, @@ -25,6 +24,7 @@ OutputDefinition, TypeCheck, ) +from dagster._core.definitions.asset_check_result import AssetCheckResult from dagster._core.definitions.asset_check_spec import AssetCheckSeverity from dagster._core.definitions.data_version import ( CODE_VERSION_TAG, @@ -47,6 +47,7 @@ MultiPartitionKey, get_tags_from_multi_partition_key, ) +from dagster._core.definitions.result import MaterializeResult from dagster._core.errors import ( DagsterAssetCheckFailedError, DagsterExecutionHandleOutputError, @@ -78,16 +79,47 @@ from .utils import op_execution_error_boundary -def _asset_check_results_to_outputs_and_evaluations( - step_context: StepExecutionContext, user_event_sequence: Iterator[OpOutputUnion] +def _process_asset_results_to_events( + step_context: StepExecutionContext, + user_event_sequence: Iterator[OpOutputUnion], ) -> Iterator[OpOutputUnion]: - """We convert each AssetCheckResult to two events: - - An Output, which allows downstream steps to depend on it - - An AssetCheckEvaluation, which combines the check result with information from the context - to create a full picture of the asset check's evaluation. + """Handle converting MaterializeResult (& AssetCheckResult soon) to their appropriate events. + + MaterializeResults get converted to an Output event, which is later use to derive an AssetMaterialization. + + AssetCheckResult get converted to two events: + - An Output, which allows downstream steps to depend on it + - An AssetCheckEvaluation, which combines the check result with information from the context + to create a full picture of the asset check's evaluation. """ for user_event in user_event_sequence: - if isinstance(user_event, AssetCheckResult): + if isinstance(user_event, MaterializeResult): + assets_def = step_context.job_def.asset_layer.assets_def_for_node( + step_context.node_handle + ) + if not assets_def: + raise DagsterInvariantViolationError( + "MaterializeResult is only valid within asset computations, no backing" + " AssetsDefinition found." + ) + if user_event.asset_key: + asset_key = user_event.asset_key + else: + if len(assets_def.keys) != 1: + raise DagsterInvariantViolationError( + "MaterializeResult did not include asset_key and it can not be inferred." + f" Specify which asset_key, options are: {assets_def.keys}." + ) + asset_key = assets_def.key + + output_name = assets_def.get_output_name_for_asset_key(asset_key) + output = Output( + value=None, + output_name=output_name, + metadata=user_event.metadata, + ) + yield output + elif isinstance(user_event, AssetCheckResult): asset_check_evaluation = user_event.to_asset_check_evaluation(step_context) spec = check.not_none( step_context.job_def.asset_layer.get_spec_for_asset_check( @@ -446,11 +478,9 @@ def core_dagster_event_sequence_for_step( # It is important for this loop to be indented within the # timer block above in order for time to be recorded accurately. - for user_event in check.generator( - _step_output_error_checked_user_event_sequence( - step_context, - _asset_check_results_to_outputs_and_evaluations(step_context, user_event_sequence), - ) + for user_event in _step_output_error_checked_user_event_sequence( + step_context, + _process_asset_results_to_events(step_context, user_event_sequence), ): if isinstance(user_event, DagsterEvent): yield user_event diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py index c1e40e46e5ed9..815b466c92d3f 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py @@ -38,11 +38,14 @@ from dagster._core.definitions import AssetIn, SourceAsset, asset, multi_asset from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy +from dagster._core.definitions.events import AssetMaterialization +from dagster._core.definitions.result import MaterializeResult from dagster._core.errors import ( DagsterInvalidDefinitionError, DagsterInvalidInvocationError, DagsterInvalidPropertyError, DagsterInvariantViolationError, + DagsterStepOutputNotFoundError, ) from dagster._core.execution.context.compute import AssetExecutionContext from dagster._core.instance import DagsterInstance @@ -1574,13 +1577,8 @@ def test_asset_key_with_prefix(): def _exec_asset(asset_def): - asset_job = define_asset_job("testing", [asset_def]).resolve( - asset_graph=AssetGraph.from_assets([asset_def]) - ) - - result = asset_job.execute_in_process() + result = materialize([asset_def]) assert result.success - return result.asset_materializations_for_node(asset_def.node_def.name) @@ -1668,3 +1666,184 @@ def my_op(): node_def=my_op, partition_mappings={AssetKey("nonexistent_asset"): IdentityPartitionMapping()}, ) + + +def test_return_materialization(): + # + # status quo - use add add_output_metadata + # + @asset + def add(context: AssetExecutionContext): + context.add_output_metadata( + metadata={"one": 1}, + ) + + mats = _exec_asset(add) + assert len(mats) == 1 + # working with core metadata repr values sucks, ie IntMetadataValue + assert "one" in mats[0].metadata + assert mats[0].tags + + # + # may want to update this pattern to work better... + # + @asset + def logged(context: AssetExecutionContext): + context.log_event( + AssetMaterialization( + asset_key=context.asset_key_for_output(), + metadata={"one": 1}, + ) + ) + + mats = _exec_asset(logged) + # ... currently get implicit materialization for output + logged event + assert len(mats) == 2 + assert "one" in mats[0].metadata + # assert mats[0].tags # fails + # assert "one" in mats[1].metadata # fails + assert mats[1].tags + + # + # main exploration + # + @asset + def ret_untyped(context: AssetExecutionContext): + return MaterializeResult( + metadata={"one": 1}, + ) + + mats = _exec_asset(ret_untyped) + assert len(mats) == 1, mats + assert "one" in mats[0].metadata + assert mats[0].tags + + # + # key mismatch + # + @asset + def ret_mismatch(context: AssetExecutionContext): + return MaterializeResult( + asset_key="random", + metadata={"one": 1}, + ) + + with pytest.raises( + DagsterInvariantViolationError, + match="Asset key random not found in AssetsDefinition", + ): + mats = _exec_asset(ret_mismatch) + + +def test_return_materialization_multi_asset(): + # + # yield successful + # + @multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + def multi(): + yield MaterializeResult( + asset_key="one", + metadata={"one": 1}, + ) + yield MaterializeResult( + asset_key="two", + metadata={"two": 2}, + ) + + mats = _exec_asset(multi) + + assert len(mats) == 2, mats + assert "one" in mats[0].metadata + assert mats[0].tags + assert "two" in mats[1].metadata + assert mats[1].tags + + # + # missing a non optional out + # + @multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + def missing(): + yield MaterializeResult( + asset_key="one", + metadata={"one": 1}, + ) + + # currently a less than ideal error + with pytest.raises( + DagsterStepOutputNotFoundError, + match=( + 'Core compute for op "missing" did not return an output for non-optional output "two"' + ), + ): + _exec_asset(missing) + + # + # missing asset_key + # + @multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + def no_key(): + yield MaterializeResult( + metadata={"one": 1}, + ) + yield MaterializeResult( + metadata={"two": 2}, + ) + + with pytest.raises( + DagsterInvariantViolationError, + match=( + "MaterializeResult did not include asset_key and it can not be inferred. Specify which" + " asset_key, options are:" + ), + ): + _exec_asset(no_key) + + # + # return tuple success + # + @multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + def ret_multi(): + return ( + MaterializeResult( + asset_key="one", + metadata={"one": 1}, + ), + MaterializeResult( + asset_key="two", + metadata={"two": 2}, + ), + ) + + mats = _exec_asset(ret_multi) + + assert len(mats) == 2, mats + assert "one" in mats[0].metadata + assert mats[0].tags + assert "two" in mats[1].metadata + assert mats[1].tags + + # + # return list error + # + @multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + def ret_list(): + return [ + MaterializeResult( + asset_key="one", + metadata={"one": 1}, + ), + MaterializeResult( + asset_key="two", + metadata={"two": 2}, + ), + ] + + # not the best + with pytest.raises( + DagsterInvariantViolationError, + match=( + "When using multiple outputs, either yield each output, or return a tuple containing a" + " value for each output." + ), + ): + _exec_asset(ret_list)