Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make upstream asset materialization events available on the context #18971

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
AssetKey,
AssetMaterialization,
AssetObservation,
CoercibleToAssetKey,
ExpectationResult,
UserEvent,
)
Expand Down Expand Up @@ -1478,6 +1479,29 @@ def job_def(self) -> JobDefinition:
"""
return self.op_execution_context.job_def

@public
def latest_materialization_for_upstream_asset(
self, key: CoercibleToAssetKey
) -> Optional[AssetMaterialization]:
"""Get the most recent AssetMaterialization event for the key. The key must be an upstream
asset for the currently materializing asset. Information like metadata and tags can be found
on the AssetMaterialization. If the key is not an upstream asset of the currently
materializing asset, an error will be raised. If no AssetMaterialization exists for key, None
will be returned.

Returns: Optional[AssetMaterialization]
"""
materialization_events = (
self.op_execution_context._step_execution_context.upstream_asset_materialization_events # noqa: SLF001
)
if AssetKey.from_coercible(key) in materialization_events.keys():
return materialization_events.get(AssetKey.from_coercible(key))

raise DagsterInvariantViolationError(
f"Cannot fetch AssetMaterialization for asset {key}. {key} must be an upstream dependency"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"a direct upstream dependency"

"in order to call latest_materialization_for_upstream_asset."
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My one concern is naming, in that it will perhaps lure the programmer into thinking that this will fetch the latest materialization event for any asset, rather than only the immeadiate upstream.

My recommendation is

latest_materialization_for_upstream_asset

and then hard error with a clear error message if it is a key that is not immediately upstream.

I think the current behavior in this PR qualifies as "surprising" and will lead to subtle errors.

######## Deprecated methods

@deprecated(**_get_deprecation_kwargs("dagster_run"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dagster._core.definitions.events import (
AssetMaterialization,
AssetObservation,
CoercibleToAssetKey,
ExpectationResult,
UserEvent,
)
Expand Down Expand Up @@ -833,6 +834,13 @@ def for_type(self, dagster_type: DagsterType) -> TypeCheckContext:
def observe_output(self, output_name: str, mapping_key: Optional[str] = None) -> None:
self.op_execution_context.observe_output(output_name=output_name, mapping_key=mapping_key)

def latest_materialization_for_upstream_asset(
self, key: CoercibleToAssetKey
) -> Optional[AssetMaterialization]:
raise DagsterInvalidPropertyError(
_property_msg("latest_materialization_for_upstream_asset", "method")
)


def _validate_resource_requirements(
resource_defs: Mapping[str, ResourceDefinition], op_def: OpDefinition
Expand Down
32 changes: 27 additions & 5 deletions python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
extract_data_version_from_entry,
)
from dagster._core.definitions.dependency import OpNode
from dagster._core.definitions.events import AssetKey, AssetLineageInfo
from dagster._core.definitions.events import AssetKey, AssetLineageInfo, AssetMaterialization
from dagster._core.definitions.hook_definition import HookDefinition
from dagster._core.definitions.job_base import IJob
from dagster._core.definitions.job_definition import JobDefinition
Expand Down Expand Up @@ -571,6 +571,10 @@ def __init__(
self._output_metadata: Dict[str, Any] = {}
self._seen_outputs: Dict[str, Union[str, Set[str]]] = {}

self._upstream_asset_materialization_events: Dict[
AssetKey, Optional[AssetMaterialization]
] = {}

self._input_asset_version_info: Dict[AssetKey, Optional["InputAssetVersionInfo"]] = {}
self._is_external_input_asset_version_info_loaded = False
self._data_version_cache: Dict[AssetKey, "DataVersion"] = {}
Expand Down Expand Up @@ -949,17 +953,28 @@ def get_data_version(self, asset_key: AssetKey) -> "DataVersion":
def input_asset_records(self) -> Optional[Mapping[AssetKey, Optional["InputAssetVersionInfo"]]]:
return self._input_asset_version_info

@property
def upstream_asset_materialization_events(
self,
) -> Dict[AssetKey, Optional[AssetMaterialization]]:
return self._upstream_asset_materialization_events

@property
def is_external_input_asset_version_info_loaded(self) -> bool:
return self._is_external_input_asset_version_info_loaded

def get_input_asset_version_info(self, key: AssetKey) -> Optional["InputAssetVersionInfo"]:
if key not in self._input_asset_version_info:
self._fetch_input_asset_version_info(key)
self._fetch_input_asset_materialization_and_version_info(key)
return self._input_asset_version_info[key]

# "external" refers to records for inputs generated outside of this step
def fetch_external_input_asset_version_info(self) -> None:
def fetch_external_input_asset_materialization_and_version_info(self) -> None:
"""Fetches the latest observation or materialization for each upstream dependency
in order to determine the version info. As a side effect we create a dictionary
of the materialization events so that the AssetContext can access the latest materialization
event.
"""
output_keys = self.get_output_asset_keys()

all_dep_keys: List[AssetKey] = []
Expand All @@ -973,17 +988,18 @@ def fetch_external_input_asset_version_info(self) -> None:

self._input_asset_version_info = {}
for key in all_dep_keys:
self._fetch_input_asset_version_info(key)
self._fetch_input_asset_materialization_and_version_info(key)
self._is_external_input_asset_version_info_loaded = True

def _fetch_input_asset_version_info(self, key: AssetKey) -> None:
def _fetch_input_asset_materialization_and_version_info(self, key: AssetKey) -> None:
from dagster._core.definitions.data_version import (
extract_data_version_from_entry,
)

event = self._get_input_asset_event(key)
if event is None:
self._input_asset_version_info[key] = None
self._upstream_asset_materialization_events[key] = None
else:
storage_id = event.storage_id
# Input name will be none if this is an internal dep
Expand Down Expand Up @@ -1011,6 +1027,12 @@ def _fetch_input_asset_version_info(self, key: AssetKey) -> None:
data_version = extract_data_version_from_entry(event.event_log_entry)
else:
data_version = extract_data_version_from_entry(event.event_log_entry)
# the AssetMaterialization fetched above is only accurate if the asset it not partitioned
# if the asset is partitioned, then the latest AssetMaterialization may be for a partition
# that is irrelevant to the current execution
self._upstream_asset_materialization_events[key] = (
event.asset_materialization if event.asset_materialization else None
)
self._input_asset_version_info[key] = InputAssetVersionInfo(
storage_id, data_version, event.run_id, event.timestamp
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ def core_dagster_event_sequence_for_step(
inputs = {}

if step_context.is_sda_step:
step_context.fetch_external_input_asset_version_info()
step_context.fetch_external_input_asset_materialization_and_version_info()

for step_input in step_context.step.step_inputs:
input_def = step_context.op_def.input_def_named(step_input.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
DagsterInstance,
Definitions,
GraphDefinition,
MaterializeResult,
OpExecutionContext,
Output,
asset,
Expand Down Expand Up @@ -426,3 +427,33 @@ def a(context: AssetExecutionContext):
assert context == AssetExecutionContext.get()

assert materialize([a]).success


def test_upstream_metadata():
# with output metadata
@asset
def upstream(context: AssetExecutionContext):
context.add_output_metadata({"foo": "bar"})

@asset
def downstream(context: AssetExecutionContext, upstream):
mat = context.latest_materialization_for_upstream_asset("upstream")
assert mat is not None
assert mat.metadata["foo"].value == "bar"

materialize([upstream, downstream])


def test_upstream_metadata_materialize_result():
# with asset materialization
@asset
def upstream():
return MaterializeResult(metadata={"foo": "bar"})

@asset
def downstream(context: AssetExecutionContext, upstream):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use deps 🤦

mat = context.latest_materialization_for_upstream_asset("upstream")
assert mat is not None
assert mat.metadata["foo"].value == "bar"

materialize([upstream, downstream])