From 928618ebb608015cc02a1f13496f20866104226e Mon Sep 17 00:00:00 2001 From: alangenfeld Date: Fri, 23 Jun 2023 16:52:21 -0500 Subject: [PATCH] [prototype] return AssetMaterialization --- .../dagster/_core/definitions/events.py | 19 +- .../dagster/_core/execution/context/system.py | 22 +++ .../dagster/_core/execution/plan/compute.py | 2 +- .../_core/execution/plan/compute_generator.py | 18 +- .../_core/execution/plan/execute_step.py | 38 +++- .../asset_defs_tests/test_assets.py | 174 +++++++++++++++++- 6 files changed, 258 insertions(+), 15 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/events.py b/python_modules/dagster/dagster/_core/definitions/events.py index 0bba8e66ab33f..9d57ab5a89948 100644 --- a/python_modules/dagster/dagster/_core/definitions/events.py +++ b/python_modules/dagster/dagster/_core/definitions/events.py @@ -20,6 +20,7 @@ import dagster._seven as seven from dagster._annotations import PublicAttr, experimental_param, public from dagster._core.definitions.data_version import DataVersion +from dagster._core.errors import DagsterInvariantViolationError from dagster._core.storage.tags import MULTIDIMENSIONAL_PARTITION_PREFIX, SYSTEM_TAG_PREFIX from dagster._serdes import whitelist_for_serdes from dagster._serdes.serdes import NamedTupleSerializer @@ -484,7 +485,7 @@ class AssetMaterialization( Args: asset_key (Union[str, List[str], AssetKey]): A key to identify the materialized asset across - job runs + job runs. Optional in cases when the key can be inferred from the current context. description (Optional[str]): A longer human-readable description of the materialized value. partition (Optional[str]): The name of the partition that was materialized. @@ -498,18 +499,32 @@ class AssetMaterialization( def __new__( cls, - asset_key: CoercibleToAssetKey, + asset_key: Optional[CoercibleToAssetKey] = None, description: Optional[str] = None, metadata: Optional[Mapping[str, RawMetadataValue]] = None, partition: Optional[str] = None, tags: Optional[Mapping[str, str]] = None, ): from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionKey + from dagster._core.execution.context.compute import get_execution_context if isinstance(asset_key, AssetKey): check.inst_param(asset_key, "asset_key", AssetKey) elif isinstance(asset_key, str): asset_key = AssetKey(parse_asset_key_string(asset_key)) + elif asset_key is None: + current_ctx = get_execution_context() + if current_ctx is None: + raise DagsterInvariantViolationError( + "Could not infer asset_key, not currently in the context of an execution." + ) + keys = current_ctx.selected_asset_keys + if len(keys) != 1: + raise DagsterInvariantViolationError( + f"Could not infer asset_key, there are {len(keys)} in the current execution" + " context. Specify the appropriate asset_key." + ) + asset_key = next(iter(keys)) else: check.sequence_param(asset_key, "asset_key", of_type=str) asset_key = AssetKey(asset_key) diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index 652232f92f7b7..2fe01167969b3 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -22,6 +22,7 @@ cast, ) + import dagster._check as check from dagster._annotations import public from dagster._core.definitions.data_version import ( @@ -547,6 +548,7 @@ def __init__( self._output_metadata: Dict[str, Any] = {} self._seen_outputs: Dict[str, Union[str, Set[str]]] = {} + self._seen_user_asset_mats = {} self._input_asset_version_info: Dict[AssetKey, Optional["InputAssetVersionInfo"]] = {} self._is_external_input_asset_version_info_loaded = False @@ -872,6 +874,26 @@ def is_sda_step(self) -> bool: return True return False + def asset_key_for_output(self, output_name: str) -> AssetKey: + # note: duped on AssetExecutionContext + asset_output_info = self.job_def.asset_layer.asset_info_for_output( + node_handle=self.node_handle, output_name=output_name + ) + if asset_output_info is None: + check.failed(f"Output '{output_name}' has no asset") + else: + return asset_output_info.key + + def observe_user_asset_mat(self, asset_key: AssetKey, event: "AssetMaterialization"): + # will need to store N events for partitions + self._seen_user_asset_mats[asset_key] = event + + def get_observed_user_asset_mat(self, asset_key: AssetKey) -> Optional["AssetMaterialization"]: + return self._seen_user_asset_mats.get(asset_key) + + def has_observed_user_asset_mats(self) -> bool: + return bool(self._seen_user_asset_mats) + def set_data_version(self, asset_key: AssetKey, data_version: "DataVersion") -> None: self._data_version_cache[asset_key] = data_version diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index b95a272e9e949..ec51580cd11dc 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -196,7 +196,7 @@ def execute_core_compute( op_output_names = {output.name for output in step.step_outputs} omitted_outputs = op_output_names.difference(emitted_result_names) - if omitted_outputs: + if omitted_outputs and not step_context.has_observed_user_asset_mats(): step_context.log.info( f"{step_context.op_def.node_type_str} '{step.node_handle}' did not fire " f"outputs {omitted_outputs!r}" diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 340dadfac59f9..bc79ec855bc95 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -17,6 +17,7 @@ from typing_extensions import get_args +import dagster._check as check from dagster._config.pythonic_config import Config from dagster._core.definitions import ( AssetMaterialization, @@ -205,17 +206,22 @@ def validate_and_coerce_op_result_to_iterator( # this happens when a user explicitly returns a generator in the op for event in result: yield event - elif isinstance(result, (AssetMaterialization, ExpectationResult)): + + # [A] yield it here... + # elif isinstance(result, AssetMaterialization): + # yield result + + elif isinstance(result, (ExpectationResult)): raise DagsterInvariantViolationError( f"Error in {context.describe_op()}: If you are " - "returning an AssetMaterialization " - "or an ExpectationResult from " - f"{context.op_def.node_type_str} you must yield them " + "returning an ExpectationResult from " + f"{context.op_def.node_type_str} you must yield it " "directly, or log them using the OpExecutionContext.log_event method to avoid " "ambiguity with an implied result from returning a " "value. Check out the docs on logging events here: " "https://docs.dagster.io/concepts/ops-jobs-graphs/op-events#op-events-and-exceptions" ) + elif result is not None and not output_defs: raise DagsterInvariantViolationError( f"Error in {context.describe_op()}: Unexpectedly returned output of type" @@ -253,6 +259,8 @@ def validate_and_coerce_op_result_to_iterator( mapping_key=dynamic_output.mapping_key, metadata=dynamic_output.metadata, ) + elif isinstance(element, AssetMaterialization): + yield element elif isinstance(element, Output): if annotation != inspect.Parameter.empty and not is_generic_output_annotation( annotation @@ -291,3 +299,5 @@ def validate_and_coerce_op_result_to_iterator( "https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#with-conditional-branching" ) yield Output(output_name=output_def.name, value=element) + else: + check.failed("do we ever hit this unhandled case?") diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 48328b125971b..7deb1bbcf2839 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -90,10 +90,16 @@ def _step_output_error_checked_user_event_sequence( output_names = list([output_def.name for output_def in step.step_outputs]) for user_event in user_event_sequence: - if not isinstance(user_event, (Output, DynamicOutput)): + if not isinstance(user_event, (Output, DynamicOutput, AssetMaterialization)): yield user_event continue + # [A] ... to swallow here + if isinstance(user_event, AssetMaterialization): + step_context.observe_user_asset_mat(user_event.asset_key, user_event) + # defer yielding til post resolve to apply tags + continue + # do additional processing on Outputs output = user_event if not step.has_step_output(cast(str, output.output_name)): @@ -198,6 +204,15 @@ def _step_output_error_checked_user_event_sequence( f'Emitting implicit Nothing for output "{step_output_def.name}" on {op_label}' ) yield Output(output_name=step_output_def.name, value=None) + + if step_context.is_sda_step and step_context.get_observed_user_asset_mat( + step_context.asset_key_for_output(step_output_def.name) + ): + # think its fine to omit log + # step_context.log.info( + # f"Emitting implicit Nothing for materialized asset {op_label}" + # ) + yield Output(output_name=step_output_def.name, value=None) elif not step_output_def.is_dynamic: raise DagsterStepOutputNotFoundError( f"Core compute for {op_label} did not return an output for non-optional " @@ -413,6 +428,7 @@ def core_dagster_event_sequence_for_step( yield evt # for now, I'm ignoring AssetMaterializations yielded manually, but we might want # to do something with these in the above path eventually + # ^ wat? elif isinstance(user_event, AssetMaterialization): yield DagsterEvent.asset_materialization(step_context, user_event) elif isinstance(user_event, AssetObservation): @@ -536,7 +552,11 @@ def _get_output_asset_materializations( if backfill_id: tags[BACKFILL_ID_TAG] = backfill_id - if asset_partitions: + user_event = step_context.get_observed_user_asset_mat(asset_key) + if asset_partitions and user_event: + # this will be a bit involved + check.failed("unhandled") + elif asset_partitions: for partition in asset_partitions: with disable_dagster_warnings(): tags.update( @@ -551,6 +571,20 @@ def _get_output_asset_materializations( metadata=all_metadata, tags=tags, ) + elif user_event: + if tags: + yield AssetMaterialization( + **{ # dirty mergey + **user_event._asdict(), + "tags": { + **(user_event.tags if user_event.tags else {}), + **tags, + }, + }, + ) + else: + yield user_event + else: with disable_dagster_warnings(): yield AssetMaterialization(asset_key=asset_key, metadata=all_metadata, tags=tags) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py index b791a317b2775..903e569434a29 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py @@ -1,3 +1,4 @@ +import pytest import ast import datetime import tempfile @@ -36,11 +37,13 @@ from dagster._core.definitions import AssetIn, SourceAsset, asset, multi_asset from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy +from dagster._core.definitions.events import AssetMaterialization from dagster._core.errors import ( DagsterInvalidDefinitionError, DagsterInvalidInvocationError, DagsterInvalidPropertyError, DagsterInvariantViolationError, + DagsterStepOutputNotFoundError, ) from dagster._core.execution.context.compute import AssetExecutionContext from dagster._core.instance import DagsterInstance @@ -1544,13 +1547,8 @@ def test_asset_key_with_prefix(): def _exec_asset(asset_def): - asset_job = define_asset_job("testing", [asset_def]).resolve( - asset_graph=AssetGraph.from_assets([asset_def]) - ) - - result = asset_job.execute_in_process() + result = materialize([asset_def]) assert result.success - return result.asset_materializations_for_node(asset_def.node_def.name) @@ -1624,3 +1622,167 @@ def untyped(): match="has multiple outputs, but only one output was returned", ): untyped() + + +def test_return_materialization(): + # + # status quo - use add add_output_metadata + # + @asset + def add(context: AssetExecutionContext): + context.add_output_metadata( + metadata={"one": 1}, + ) + + mats = _exec_asset(add) + assert len(mats) == 1 + # working with core metadata repr values sucks, ie IntMetadataValue + assert "one" in mats[0].metadata + assert mats[0].tags + + # + # side quest: may want to update this pattern to work as well + # + @asset + def logged(context: AssetExecutionContext): + context.log_event( + AssetMaterialization( + metadata={"one": 1}, + ) + ) + + mats = _exec_asset(logged) + # should we change this? currently get implicit materialization for output + logged event + assert len(mats) == 2 + assert "one" in mats[0].metadata + # assert mats[0].tags # fails + # assert "one" in mats[1].metadata # fails + assert mats[1].tags + + # + # main exploration + # + @asset + def ret_untyped(context: AssetExecutionContext): + return AssetMaterialization( + metadata={"one": 1}, + ) + + mats = _exec_asset(ret_untyped) + assert len(mats) == 1, mats + assert "one" in mats[0].metadata + assert mats[0].tags + + +def test_return_materialization_multi_asset(): + # + # yield successful + # + @multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + def multi(): + yield AssetMaterialization( + asset_key="one", + metadata={"one": 1}, + ) + yield AssetMaterialization( + asset_key="two", + metadata={"two": 2}, + ) + + mats = _exec_asset(multi) + + assert len(mats) == 2, mats + assert "one" in mats[0].metadata + assert mats[0].tags + assert "two" in mats[1].metadata + assert mats[1].tags + + # + # missing a non optional out + # + @multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + def missing(): + yield AssetMaterialization( + asset_key="one", + metadata={"one": 1}, + ) + + # currently a less than ideal error + with pytest.raises( + DagsterStepOutputNotFoundError, + match=( + 'Core compute for op "missing" did not return an output for non-optional output "two"' + ), + ): + _exec_asset(missing) + + # + # missing asset_key + # + @multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + def no_key(): + yield AssetMaterialization( + metadata={"one": 1}, + ) + yield AssetMaterialization( + metadata={"two": 2}, + ) + + with pytest.raises( + DagsterInvariantViolationError, + match=( + "Could not infer asset_key, there are 2 in the current execution context. Specify the" + " appropriate asset_key." + ), + ): + _exec_asset(no_key) + + # + # return tuple success + # + @multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + def ret_multi(): + return ( + AssetMaterialization( + asset_key="one", + metadata={"one": 1}, + ), + AssetMaterialization( + asset_key="two", + metadata={"two": 2}, + ), + ) + + mats = _exec_asset(ret_multi) + + assert len(mats) == 2, mats + assert "one" in mats[0].metadata + assert mats[0].tags + assert "two" in mats[1].metadata + assert mats[1].tags + + # + # return list error + # + @multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + def ret_list(): + return [ + AssetMaterialization( + asset_key="one", + metadata={"one": 1}, + ), + AssetMaterialization( + asset_key="two", + metadata={"two": 2}, + ), + ] + + # not the best + with pytest.raises( + DagsterInvariantViolationError, + match=( + "When using multiple outputs, either yield each output, or return a tuple containing a" + " value for each output." + ), + ): + _exec_asset(ret_list)