Skip to content

Commit

Permalink
asset materialization event
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 3, 2024
1 parent c22be8d commit fc771b4
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
AssetKey,
AssetMaterialization,
AssetObservation,
CoercibleToAssetKey,
ExpectationResult,
UserEvent,
)
Expand Down Expand Up @@ -1457,6 +1458,14 @@ def job_def(self) -> JobDefinition:
"""
return self.op_execution_context.job_def

@public
def latest_materialization_event(
self, key: CoercibleToAssetKey
) -> Optional[AssetMaterialization]:
return self._step_execution_context.latest_materialization_event.get(
AssetKey.from_coercible(key)
)

######## Deprecated methods

@deprecated(**_get_deprecation_kwargs("dagster_run"))
Expand Down Expand Up @@ -1893,6 +1902,3 @@ def enter_execution_context(
_current_asset_execution_context: ContextVar[Optional[AssetExecutionContext]] = ContextVar(
"_current_asset_execution_context", default=None
)


# TODO - remove
21 changes: 16 additions & 5 deletions python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
extract_data_version_from_entry,
)
from dagster._core.definitions.dependency import OpNode
from dagster._core.definitions.events import AssetKey, AssetLineageInfo
from dagster._core.definitions.events import AssetKey, AssetLineageInfo, AssetMaterialization
from dagster._core.definitions.hook_definition import HookDefinition
from dagster._core.definitions.job_base import IJob
from dagster._core.definitions.job_definition import JobDefinition
Expand Down Expand Up @@ -571,6 +571,8 @@ def __init__(
self._output_metadata: Dict[str, Any] = {}
self._seen_outputs: Dict[str, Union[str, Set[str]]] = {}

self.latest_materialization_event: Dict[AssetKey, Optional[AssetMaterialization]] = {}

self._input_asset_version_info: Dict[AssetKey, Optional["InputAssetVersionInfo"]] = {}
self._is_external_input_asset_version_info_loaded = False
self._data_version_cache: Dict[AssetKey, "DataVersion"] = {}
Expand Down Expand Up @@ -955,11 +957,16 @@ def is_external_input_asset_version_info_loaded(self) -> bool:

def get_input_asset_version_info(self, key: AssetKey) -> Optional["InputAssetVersionInfo"]:
if key not in self._input_asset_version_info:
self._fetch_input_asset_version_info(key)
self._fetch_input_asset_materialization_and_version_info(key)
return self._input_asset_version_info[key]

# "external" refers to records for inputs generated outside of this step
def fetch_external_input_asset_version_info(self) -> None:
def fetch_external_input_asset_materialization_and_version_info(self) -> None:
"""Fetches the latest observation or materialization for each upstream dependency
in order to determine the version info. As a side effect we create a dictionary
of the materialization events so that the AssetContext can access the latest materialization
event.
"""
output_keys = self.get_output_asset_keys()

all_dep_keys: List[AssetKey] = []
Expand All @@ -973,18 +980,22 @@ def fetch_external_input_asset_version_info(self) -> None:

self._input_asset_version_info = {}
for key in all_dep_keys:
self._fetch_input_asset_version_info(key)
self._fetch_input_asset_materialization_and_version_info(key)
self._is_external_input_asset_version_info_loaded = True

def _fetch_input_asset_version_info(self, key: AssetKey) -> None:
def _fetch_input_asset_materialization_and_version_info(self, key: AssetKey) -> None:
from dagster._core.definitions.data_version import (
extract_data_version_from_entry,
)

event = self._get_input_asset_event(key)
if event is None:
self._input_asset_version_info[key] = None
self.latest_materialization_event[key] = None
else:
self.latest_materialization_event[key] = (
event.asset_materialization if event.asset_materialization else None
)
storage_id = event.storage_id
# Input name will be none if this is an internal dep
input_name = self.job_def.asset_layer.input_for_asset_key(self.node_handle, key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ def core_dagster_event_sequence_for_step(
inputs = {}

if step_context.is_sda_step:
step_context.fetch_external_input_asset_version_info()
step_context.fetch_external_input_asset_materialization_and_version_info()

for step_input in step_context.step.step_inputs:
input_def = step_context.op_def.input_def_named(step_input.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
DagsterInstance,
Definitions,
GraphDefinition,
MaterializeResult,
OpExecutionContext,
Output,
asset,
Expand Down Expand Up @@ -426,3 +427,33 @@ def a(context: AssetExecutionContext):
assert context == AssetExecutionContext.get()

assert materialize([a]).success


def test_upstream_metadata():
# with output metadata
@asset
def upstream(context: AssetExecutionContext):
context.add_output_metadata({"foo": "bar"})

@asset
def downstream(context: AssetExecutionContext, upstream):
mat = context.latest_materialization_event("upstream")
assert mat is not None
assert mat.metadata["foo"].value == "bar"

materialize([upstream, downstream])


def test_upstream_metadata_materialize_result():
# with asset materialization
@asset
def upstream():
return MaterializeResult(metadata={"foo": "bar"})

@asset
def downstream(context: AssetExecutionContext, upstream):
mat = context.latest_materialization_event("upstream")
assert mat is not None
assert mat.metadata["foo"].value == "bar"

materialize([upstream, downstream])

0 comments on commit fc771b4

Please sign in to comment.