diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index d1e921954c01e..adc0f34ad9751 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -38,6 +38,7 @@ AssetKey, AssetMaterialization, AssetObservation, + CoercibleToAssetKey, ExpectationResult, UserEvent, ) @@ -1478,6 +1479,29 @@ def job_def(self) -> JobDefinition: """ return self.op_execution_context.job_def + @public + def latest_materialization_for_upstream_asset( + self, key: CoercibleToAssetKey + ) -> Optional[AssetMaterialization]: + """Get the most recent AssetMaterialization event for the key. The key must be an upstream + asset for the currently materializing asset. Information like metadata and tags can be found + on the AssetMaterialization. If the key is not an upstream asset of the currently + materializing asset, an error will be raised. If no AssetMaterialization exists for key, None + will be returned. + + Returns: Optional[AssetMaterialization] + """ + materialization_events = ( + self.op_execution_context._step_execution_context.upstream_asset_materialization_events # noqa: SLF001 + ) + if AssetKey.from_coercible(key) in materialization_events.keys(): + return materialization_events.get(AssetKey.from_coercible(key)) + + raise DagsterInvariantViolationError( + f"Cannot fetch AssetMaterialization for asset {key}. {key} must be an upstream dependency" + "in order to call latest_materialization_for_upstream_asset." + ) + ######## Deprecated methods @deprecated(**_get_deprecation_kwargs("dagster_run")) diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 6962fe76caa76..4bddaac80c220 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -22,6 +22,7 @@ from dagster._core.definitions.events import ( AssetMaterialization, AssetObservation, + CoercibleToAssetKey, ExpectationResult, UserEvent, ) @@ -833,6 +834,13 @@ def for_type(self, dagster_type: DagsterType) -> TypeCheckContext: def observe_output(self, output_name: str, mapping_key: Optional[str] = None) -> None: self.op_execution_context.observe_output(output_name=output_name, mapping_key=mapping_key) + def latest_materialization_for_upstream_asset( + self, key: CoercibleToAssetKey + ) -> Optional[AssetMaterialization]: + raise DagsterInvalidPropertyError( + _property_msg("latest_materialization_for_upstream_asset", "method") + ) + def _validate_resource_requirements( resource_defs: Mapping[str, ResourceDefinition], op_def: OpDefinition diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index 25acd4c272b72..85b67cf0d00cf 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -30,7 +30,7 @@ extract_data_version_from_entry, ) from dagster._core.definitions.dependency import OpNode -from dagster._core.definitions.events import AssetKey, AssetLineageInfo +from dagster._core.definitions.events import AssetKey, AssetLineageInfo, AssetMaterialization from dagster._core.definitions.hook_definition import HookDefinition from dagster._core.definitions.job_base import IJob from dagster._core.definitions.job_definition import JobDefinition @@ -571,6 +571,10 @@ def __init__( self._output_metadata: Dict[str, Any] = {} self._seen_outputs: Dict[str, Union[str, Set[str]]] = {} + self._upstream_asset_materialization_events: Dict[ + AssetKey, Optional[AssetMaterialization] + ] = {} + self._input_asset_version_info: Dict[AssetKey, Optional["InputAssetVersionInfo"]] = {} self._is_external_input_asset_version_info_loaded = False self._data_version_cache: Dict[AssetKey, "DataVersion"] = {} @@ -949,17 +953,28 @@ def get_data_version(self, asset_key: AssetKey) -> "DataVersion": def input_asset_records(self) -> Optional[Mapping[AssetKey, Optional["InputAssetVersionInfo"]]]: return self._input_asset_version_info + @property + def upstream_asset_materialization_events( + self, + ) -> Dict[AssetKey, Optional[AssetMaterialization]]: + return self._upstream_asset_materialization_events + @property def is_external_input_asset_version_info_loaded(self) -> bool: return self._is_external_input_asset_version_info_loaded def get_input_asset_version_info(self, key: AssetKey) -> Optional["InputAssetVersionInfo"]: if key not in self._input_asset_version_info: - self._fetch_input_asset_version_info(key) + self._fetch_input_asset_materialization_and_version_info(key) return self._input_asset_version_info[key] # "external" refers to records for inputs generated outside of this step - def fetch_external_input_asset_version_info(self) -> None: + def fetch_external_input_asset_materialization_and_version_info(self) -> None: + """Fetches the latest observation or materialization for each upstream dependency + in order to determine the version info. As a side effect we create a dictionary + of the materialization events so that the AssetContext can access the latest materialization + event. + """ output_keys = self.get_output_asset_keys() all_dep_keys: List[AssetKey] = [] @@ -973,10 +988,10 @@ def fetch_external_input_asset_version_info(self) -> None: self._input_asset_version_info = {} for key in all_dep_keys: - self._fetch_input_asset_version_info(key) + self._fetch_input_asset_materialization_and_version_info(key) self._is_external_input_asset_version_info_loaded = True - def _fetch_input_asset_version_info(self, key: AssetKey) -> None: + def _fetch_input_asset_materialization_and_version_info(self, key: AssetKey) -> None: from dagster._core.definitions.data_version import ( extract_data_version_from_entry, ) @@ -984,6 +999,7 @@ def _fetch_input_asset_version_info(self, key: AssetKey) -> None: event = self._get_input_asset_event(key) if event is None: self._input_asset_version_info[key] = None + self._upstream_asset_materialization_events[key] = None else: storage_id = event.storage_id # Input name will be none if this is an internal dep @@ -1011,6 +1027,12 @@ def _fetch_input_asset_version_info(self, key: AssetKey) -> None: data_version = extract_data_version_from_entry(event.event_log_entry) else: data_version = extract_data_version_from_entry(event.event_log_entry) + # the AssetMaterialization fetched above is only accurate if the asset it not partitioned + # if the asset is partitioned, then the latest AssetMaterialization may be for a partition + # that is irrelevant to the current execution + self._upstream_asset_materialization_events[key] = ( + event.asset_materialization if event.asset_materialization else None + ) self._input_asset_version_info[key] = InputAssetVersionInfo( storage_id, data_version, event.run_id, event.timestamp ) diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index f0e75917644cf..0707fb3164fce 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -458,7 +458,7 @@ def core_dagster_event_sequence_for_step( inputs = {} if step_context.is_sda_step: - step_context.fetch_external_input_asset_version_info() + step_context.fetch_external_input_asset_materialization_and_version_info() for step_input in step_context.step.step_inputs: input_def = step_context.op_def.input_def_named(step_input.name) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py index 1fd029361981b..fca48fecadf55 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py @@ -9,6 +9,7 @@ DagsterInstance, Definitions, GraphDefinition, + MaterializeResult, OpExecutionContext, Output, asset, @@ -426,3 +427,33 @@ def a(context: AssetExecutionContext): assert context == AssetExecutionContext.get() assert materialize([a]).success + + +def test_upstream_metadata(): + # with output metadata + @asset + def upstream(context: AssetExecutionContext): + context.add_output_metadata({"foo": "bar"}) + + @asset + def downstream(context: AssetExecutionContext, upstream): + mat = context.latest_materialization_for_upstream_asset("upstream") + assert mat is not None + assert mat.metadata["foo"].value == "bar" + + materialize([upstream, downstream]) + + +def test_upstream_metadata_materialize_result(): + # with asset materialization + @asset + def upstream(): + return MaterializeResult(metadata={"foo": "bar"}) + + @asset + def downstream(context: AssetExecutionContext, upstream): + mat = context.latest_materialization_for_upstream_asset("upstream") + assert mat is not None + assert mat.metadata["foo"].value == "bar" + + materialize([upstream, downstream])