diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 4d2e23e394fe8..4fbd4d1db24d0 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1459,17 +1459,26 @@ def job_def(self) -> JobDefinition: return self.op_execution_context.job_def @public - def latest_materialization_event( + def latest_materialization_for_upstream_asset( self, key: CoercibleToAssetKey ) -> Optional[AssetMaterialization]: - """Get the most recent AssetMaterialization event for the key. Information like metadata and tags - can be found on the AssetMaterialization. If the key is not an upstream asset of the currently - materializing asset, None will be returned. + """Get the most recent AssetMaterialization event for the key. The key must be an upstream + asset for the currently materializing asset. Information like metadata and tags can be found + on the AssetMaterialization. If the key is not an upstream asset of the currently + materializing asset, an error will be raised. If no AssetMaterialization exists for key, None + will be returned. Returns: Optional[AssetMaterialization] """ - return self.op_execution_context._step_execution_context.upstream_asset_materialization_events.get( # noqa: SLF001 - AssetKey.from_coercible(key) + materialization_events = ( + self.op_execution_context._step_execution_context.upstream_asset_materialization_events # noqa: SLF001 + ) + if AssetKey.from_coercible(key) in materialization_events.keys(): + return materialization_events.get(AssetKey.from_coercible(key)) + + raise DagsterInvariantViolationError( + f"Cannot fetch AssetMaterialization for asset {key}. {key} must be an upstream dependency" + "in order to call latest_materialization_for_upstream_asset." ) ######## Deprecated methods