diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index aca243fc85ac1..2fafc2383e4de 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1462,7 +1462,13 @@ def job_def(self) -> JobDefinition: def latest_materialization_event( self, key: CoercibleToAssetKey ) -> Optional[AssetMaterialization]: - return self._step_execution_context.latest_materialization_event.get( + """Get the most recent AssetMaterialization event for the key. Information like metadata and tags + can be found on the AssetMaterialization. If the key is not an upstream asset of the currently + materializing asset, None will be returned. + + Returns: Optional[AssetMaterialization] + """ + return self._step_execution_context.upstream_asset_materialization_events.get( AssetKey.from_coercible(key) ) diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index 65fefc1148894..bbf09a89fd4a1 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -571,7 +571,9 @@ def __init__( self._output_metadata: Dict[str, Any] = {} self._seen_outputs: Dict[str, Union[str, Set[str]]] = {} - self.latest_materialization_event: Dict[AssetKey, Optional[AssetMaterialization]] = {} + self._upstream_asset_materialization_events: Dict[ + AssetKey, Optional[AssetMaterialization] + ] = {} self._input_asset_version_info: Dict[AssetKey, Optional["InputAssetVersionInfo"]] = {} self._is_external_input_asset_version_info_loaded = False @@ -951,6 +953,12 @@ def get_data_version(self, asset_key: AssetKey) -> "DataVersion": def input_asset_records(self) -> Optional[Mapping[AssetKey, Optional["InputAssetVersionInfo"]]]: return self._input_asset_version_info + @property + def upstream_asset_materialization_events( + self, + ) -> Dict[AssetKey, Optional[AssetMaterialization]]: + return self._upstream_asset_materialization_events + @property def is_external_input_asset_version_info_loaded(self) -> bool: return self._is_external_input_asset_version_info_loaded @@ -991,9 +999,9 @@ def _fetch_input_asset_materialization_and_version_info(self, key: AssetKey) -> event = self._get_input_asset_event(key) if event is None: self._input_asset_version_info[key] = None - self.latest_materialization_event[key] = None + self._upstream_asset_materialization_events[key] = None else: - self.latest_materialization_event[key] = ( + self._upstream_asset_materialization_events[key] = ( event.asset_materialization if event.asset_materialization else None ) storage_id = event.storage_id