Skip to content

Commit

Permalink
better naming
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 3, 2024
1 parent fc771b4 commit 6b030e7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1462,7 +1462,13 @@ def job_def(self) -> JobDefinition:
def latest_materialization_event(
self, key: CoercibleToAssetKey
) -> Optional[AssetMaterialization]:
return self._step_execution_context.latest_materialization_event.get(
"""Get the most recent AssetMaterialization event for the key. Information like metadata and tags
can be found on the AssetMaterialization. If the key is not an upstream asset of the currently
materializing asset, None will be returned.
Returns: Optional[AssetMaterialization]
"""
return self._step_execution_context.upstream_asset_materialization_events.get(
AssetKey.from_coercible(key)
)

Expand Down
14 changes: 11 additions & 3 deletions python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,9 @@ def __init__(
self._output_metadata: Dict[str, Any] = {}
self._seen_outputs: Dict[str, Union[str, Set[str]]] = {}

self.latest_materialization_event: Dict[AssetKey, Optional[AssetMaterialization]] = {}
self._upstream_asset_materialization_events: Dict[
AssetKey, Optional[AssetMaterialization]
] = {}

self._input_asset_version_info: Dict[AssetKey, Optional["InputAssetVersionInfo"]] = {}
self._is_external_input_asset_version_info_loaded = False
Expand Down Expand Up @@ -951,6 +953,12 @@ def get_data_version(self, asset_key: AssetKey) -> "DataVersion":
def input_asset_records(self) -> Optional[Mapping[AssetKey, Optional["InputAssetVersionInfo"]]]:
return self._input_asset_version_info

@property
def upstream_asset_materialization_events(
self,
) -> Dict[AssetKey, Optional[AssetMaterialization]]:
return self._upstream_asset_materialization_events

@property
def is_external_input_asset_version_info_loaded(self) -> bool:
return self._is_external_input_asset_version_info_loaded
Expand Down Expand Up @@ -991,9 +999,9 @@ def _fetch_input_asset_materialization_and_version_info(self, key: AssetKey) ->
event = self._get_input_asset_event(key)
if event is None:
self._input_asset_version_info[key] = None
self.latest_materialization_event[key] = None
self._upstream_asset_materialization_events[key] = None
else:
self.latest_materialization_event[key] = (
self._upstream_asset_materialization_events[key] = (
event.asset_materialization if event.asset_materialization else None
)
storage_id = event.storage_id
Expand Down

0 comments on commit 6b030e7

Please sign in to comment.