Skip to content

Commit

Permalink
[amp-refactor][8/n] Whitelist AssetDaemonCursor for serdes (#18954)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Here, we finally (mostly) free ourselves from the tyranny of the bespoke serdes protocol that we were using for the AssetDaemonCursor.

At a high level, a few things need to happen in order for this to be possible:

1. Before, we didn't actually bother passing the "extras" value through to the new cursor, because we didn't actually serialize the new cursor once it was created (and instead relied on the legacy method of keeping track of the values we cared about between ticks). Now, we need to actually wire that up.
2. Before, the legacy serdes protocol handled cases where a serialized subset became invalid between ticks (because we were using the SerializedPartitionsSubset class). Now, we need to have some separate handling on the AssetSubset class to handle similar scenarios gracefully.

This PR also adds in a description field to each of the Condition objects. This was mostly to help with debugging when working on this PR (makes it easier to understand the snapshots when they're printed out), but it's something we know we'll want in the future, so I just went ahead and kept it.

We do still have to keep around a somewhat nasty backcompat path to handle the first tick after the conversion from legacy cursor to new cursor, but this should only be hit a single time per deployment.

In theory, it'd be possible to just start from an empty cursor if it was not possible to deserialize it into the new scheme, but the consequence would be that:

a) If anything was requested on the tick directly before the conversion, it would get re-requested (as we'd have no recollection of requesting that asset)
b) The tick would take much longer, as we'd need to rebuild everything from scratch.

This backcompat path can be removed at some point in the future, once we're confident that a large percentage of users will be operating with the new cursor structure (e.g. some number of months).

## How I Tested These Changes
  • Loading branch information
OwenKephart authored and PedramNavid committed Jan 26, 2024
1 parent 7d3c5f3 commit 1823a74
Show file tree
Hide file tree
Showing 21 changed files with 604 additions and 580 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from dagster import AssetKey, RunRequest
from dagster._core.definitions.asset_daemon_cursor import (
AssetDaemonCursor,
LegacyAssetDaemonCursorWrapper,
)
from dagster._core.definitions.run_request import (
InstigatorType,
Expand All @@ -28,8 +27,10 @@
_PRE_SENSOR_AUTO_MATERIALIZE_INSTIGATOR_NAME,
_PRE_SENSOR_AUTO_MATERIALIZE_ORIGIN_ID,
_PRE_SENSOR_AUTO_MATERIALIZE_SELECTOR_ID,
asset_daemon_cursor_to_instigator_serialized_cursor,
)
from dagster._serdes import deserialize_value
from dagster._serdes.serdes import serialize_value
from dagster_graphql.test.utils import execute_dagster_graphql, infer_repository

from dagster_graphql_tests.graphql.graphql_context_test_suite import (
Expand Down Expand Up @@ -352,9 +353,9 @@ def test_automation_policy_sensor(self, graphql_context: WorkspaceRequestContext
status=InstigatorStatus.RUNNING,
instigator_data=SensorInstigatorData(
sensor_type=SensorType.AUTOMATION_POLICY,
cursor=LegacyAssetDaemonCursorWrapper(
AssetDaemonCursor.empty()._replace(evaluation_id=12345).serialize()
).to_compressed(),
cursor=asset_daemon_cursor_to_instigator_serialized_cursor(
AssetDaemonCursor.empty(12345)
),
),
)
)
Expand Down Expand Up @@ -708,7 +709,7 @@ def _test_get_evaluations_with_partitions(self, graphql_context: WorkspaceReques

def _test_current_evaluation_id(self, graphql_context: WorkspaceRequestContext):
graphql_context.instance.daemon_cursor_storage.set_cursor_values(
{_PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY: AssetDaemonCursor.empty().serialize()}
{_PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY: serialize_value(AssetDaemonCursor.empty(0))}
)

results = execute_dagster_graphql(
Expand All @@ -728,9 +729,7 @@ def _test_current_evaluation_id(self, graphql_context: WorkspaceRequestContext):
graphql_context.instance.daemon_cursor_storage.set_cursor_values(
{
_PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY: (
AssetDaemonCursor.empty()
.with_updates(0, set(), set(), set(), {}, 42, None, [], 0) # type: ignore
.serialize()
serialize_value(AssetDaemonCursor.empty(0).with_updates(0, 1.0, [], []))
)
}
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import functools
import hashlib
from abc import ABC, abstractmethod
from abc import ABC, abstractmethod, abstractproperty
from typing import (
TYPE_CHECKING,
AbstractSet,
Expand Down Expand Up @@ -148,6 +148,10 @@ def evaluate(
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
raise NotImplementedError()

@abstractproperty
def description(self) -> str:
raise NotImplementedError()

def __and__(self, other: "AssetCondition") -> "AssetCondition":
# group AndAutomationConditions together
if isinstance(self, AndAssetCondition):
Expand Down Expand Up @@ -192,7 +196,7 @@ def snapshot(self) -> AssetConditionSnapshot:
"""Returns a snapshot of this condition that can be used for serialization."""
return AssetConditionSnapshot(
class_name=self.__class__.__name__,
description=str(self),
description=self.description,
unique_id=self.unique_id,
)

Expand All @@ -205,25 +209,29 @@ class RuleCondition(

@property
def unique_id(self) -> str:
parts = [self.rule.__class__.__name__, self.rule.description]
parts = [self.rule.__class__.__name__, self.description]
return hashlib.md5("".join(parts).encode()).hexdigest()

@property
def description(self) -> str:
return self.rule.description

def evaluate(
self, context: AssetConditionEvaluationContext
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
context.root_context.daemon_context._verbose_log_fn( # noqa
f"Evaluating rule: {self.rule.to_snapshot()}"
)
true_subset, subsets_with_metadata = self.rule.evaluate_for_asset(context)
true_subset, subsets_with_metadata, extras = self.rule.evaluate_for_asset(context)
context.root_context.daemon_context._verbose_log_fn( # noqa
f"Rule returned {true_subset.size} partitions"
f"Rule returned {true_subset.size} partitions" f"{true_subset}"
)
return AssetConditionEvaluation(
condition_snapshot=self.snapshot,
true_subset=true_subset,
candidate_subset=context.candidate_subset,
subsets_with_metadata=subsets_with_metadata,
), [AssetConditionCursorExtras(condition_snapshot=self.snapshot, extras={})]
), [AssetConditionCursorExtras(condition_snapshot=self.snapshot, extras=extras)]


class AndAssetCondition(
Expand All @@ -232,6 +240,10 @@ class AndAssetCondition(
):
"""This class represents the condition that all of its children evaluate to true."""

@property
def description(self) -> str:
return "All of"

def evaluate(
self, context: AssetConditionEvaluationContext
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
Expand All @@ -258,6 +270,10 @@ class OrAssetCondition(
):
"""This class represents the condition that any of its children evaluate to true."""

@property
def description(self) -> str:
return "Any of"

def evaluate(
self, context: AssetConditionEvaluationContext
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
Expand Down Expand Up @@ -290,6 +306,10 @@ def __new__(cls, children: Sequence[AssetCondition]):
check.invariant(len(children) == 1)
return super().__new__(cls, children)

@property
def description(self) -> str:
return "Not"

@property
def child(self) -> AssetCondition:
return self.children[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,18 @@ def parent_will_update_subset(self) -> AssetSubset:
subset |= parent_subset._replace(asset_key=self.asset_key)
return subset

@property
@functools.cached_property
@root_property
def new_max_storage_id(self) -> Optional[int]:
"""Returns the new max storage ID for this asset, if any."""
# TODO: This is not a good way of doing this, as it opens us up to potential race conditions,
# but in the interest of keeping this PR simple, I'm leaving this logic as is. In the next
# PR, I'll update the code to return a "maximum observed record id" from inside the
# `get_asset_partitions_updated_after_cursor` call.
return self.instance_queryer.instance.event_log_storage.get_maximum_record_id()

@functools.cached_property
@root_property
def materialized_since_previous_tick_subset(self) -> AssetSubset:
"""Returns the set of asset partitions that were materialized since the previous tick."""
return AssetSubset.from_asset_partitions_set(
Expand All @@ -160,44 +171,28 @@ def materialized_since_previous_tick_subset(self) -> AssetSubset:
self.instance_queryer.get_asset_partitions_updated_after_cursor(
self.asset_key,
asset_partitions=None,
after_cursor=self.cursor.previous_max_storage_id if self.cursor else None,
after_cursor=self.cursor.previous_max_storage_id,
respect_materialization_data_versions=False,
),
)

@property
def previous_tick_requested_or_discarded_subset(self) -> AssetSubset:
if not self.cursor.previous_evaluation:
@root_property
def previous_tick_requested_subset(self) -> AssetSubset:
"""The set of asset partitions that were requested (or discarded) on the previous tick."""
previous_evaluation = self.cursor.previous_evaluation
if previous_evaluation is None:
return self.empty_subset()
return self.cursor.previous_evaluation.get_requested_or_discarded_subset(
self.root_context.condition
)

return previous_evaluation.get_requested_or_discarded_subset(self.condition)

@property
def materialized_requested_or_discarded_since_previous_tick_subset(self) -> AssetSubset:
"""Returns the set of asset partitions that were materialized since the previous tick."""
return (
self.materialized_since_previous_tick_subset
| self.previous_tick_requested_or_discarded_subset
)
return self.materialized_since_previous_tick_subset | self.previous_tick_requested_subset

@property
def never_materialized_requested_or_discarded_root_subset(self) -> AssetSubset:
if self.asset_key not in self.asset_graph.root_materializable_or_observable_asset_keys:
return self.empty_subset()

handled_subset = (
self.cursor.get_extras_value(self.condition, "handled_subset", AssetSubset)
or self.empty_subset()
)
unhandled_subset = handled_subset.inverse(
self.partitions_def,
dynamic_partitions_store=self.instance_queryer,
current_time=self.evaluation_time,
)
return unhandled_subset - self.materialized_since_previous_tick_subset

@property
@functools.cached_property
@root_property
def parent_has_updated_subset(self) -> AssetSubset:
"""Returns the set of asset partitions whose parents have updated since the last time this
condition was evaluated.
Expand All @@ -206,7 +201,7 @@ def parent_has_updated_subset(self) -> AssetSubset:
self.asset_key,
self.partitions_def,
self.root_context.instance_queryer.asset_partitions_with_newly_updated_parents(
latest_storage_id=self.previous_max_storage_id,
latest_storage_id=self.cursor.previous_max_storage_id,
child_asset_key=self.root_context.asset_key,
map_old_time_partitions=False,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
get_time_partitions_def,
)
from dagster._core.instance import DynamicPartitionsStore
from dagster._utils.cached_method import cached_method

from ... import PartitionKeyRange
from ..storage.tags import ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG
Expand Down Expand Up @@ -134,10 +133,6 @@ def cursor(self) -> AssetDaemonCursor:
def asset_graph(self) -> AssetGraph:
return self.instance_queryer.asset_graph

@property
def latest_storage_id(self) -> Optional[int]:
return self.cursor.latest_storage_id

@property
def auto_materialize_asset_keys(self) -> AbstractSet[AssetKey]:
return self._auto_materialize_asset_keys
Expand Down Expand Up @@ -177,46 +172,6 @@ def prefetch(self) -> None:
)
self.instance_queryer.prefetch_asset_records(self.asset_records_to_prefetch)
self._logger.info("Done prefetching asset records.")
self._logger.info(
f"Calculated a new latest_storage_id value of {self.get_new_latest_storage_id()}.\n"
f"Precalculating updated parents for {len(self.auto_materialize_asset_keys)} assets using previous "
f"latest_storage_id of {self.latest_storage_id}."
)
for asset_key in self.auto_materialize_asset_keys:
self.instance_queryer.asset_partitions_with_newly_updated_parents(
latest_storage_id=self.latest_storage_id, child_asset_key=asset_key
)
self._logger.info("Done precalculating updated parents.")

@cached_method
def get_new_latest_storage_id(self) -> Optional[int]:
"""Get the latest storage id across all cached asset records. We use this method as it uses
identical data to what is used to calculate assets with updated parents, and therefore
avoids certain classes of race conditions.
"""
new_latest_storage_id = self.latest_storage_id
for asset_key in self.auto_materialize_asset_keys_and_parents:
# ignore non-observable sources
if self.asset_graph.is_source(asset_key) and not self.asset_graph.is_observable(
asset_key
):
continue
# ignore cases where we know for sure there's no new event
if not self.instance_queryer.asset_partition_has_materialization_or_observation(
AssetKeyPartitionKey(asset_key), after_cursor=self.latest_storage_id
):
continue
# get the latest overall storage id for this asset key
asset_latest_storage_id = (
self.instance_queryer.get_latest_materialization_or_observation_storage_id(
AssetKeyPartitionKey(asset_key)
)
)
new_latest_storage_id = max(
filter(None, [new_latest_storage_id, asset_latest_storage_id]), default=None
)

return new_latest_storage_id

def evaluate_asset(
self,
Expand All @@ -241,11 +196,11 @@ def evaluate_asset(
self.asset_graph.auto_materialize_policies_by_key.get(asset_key)
).to_asset_condition()

asset_cursor = self.cursor.asset_cursor_for_key(asset_key, self.asset_graph)
asset_cursor = self.cursor.get_asset_cursor(asset_key)

context = AssetConditionEvaluationContext.create(
asset_key=asset_key,
cursor=self.cursor.asset_cursor_for_key(asset_key, self.asset_graph),
cursor=asset_cursor,
condition=asset_condition,
instance_queryer=self.instance_queryer,
data_time_resolver=self.data_time_resolver,
Expand All @@ -254,9 +209,15 @@ def evaluate_asset(
expected_data_time_mapping=expected_data_time_mapping,
)

evaluation, condition_cursor = asset_condition.evaluate(context)
evaluation, extras = asset_condition.evaluate(context)

new_asset_cursor = asset_cursor.with_updates(context, evaluation)
new_asset_cursor = AssetConditionCursor(
asset_key=asset_key,
previous_max_storage_id=context.new_max_storage_id,
previous_evaluation_timestamp=context.evaluation_time.timestamp(),
previous_evaluation=evaluation,
extras=extras,
)

expected_data_time = get_expected_data_time_for_asset_key(
context, will_materialize=evaluation.true_subset.size > 0
Expand Down Expand Up @@ -365,24 +326,21 @@ def evaluate(
return (
run_requests,
self.cursor.with_updates(
latest_storage_id=self.get_new_latest_storage_id(),
evaluation_id=self._evaluation_id,
asset_cursors=asset_cursors,
newly_observe_requested_asset_keys=[
asset_key
for run_request in auto_observe_run_requests
for asset_key in cast(Sequence[AssetKey], run_request.asset_selection)
],
observe_request_timestamp=observe_request_timestamp,
evaluations=evaluations,
evaluation_time=self.instance_queryer.evaluation_time,
asset_cursors=asset_cursors,
evaluation_timestamp=self.instance_queryer.evaluation_time.timestamp(),
),
# only record evaluations where something changed
[
evaluation
for evaluation in evaluations
if not evaluation.equivalent_to_stored_evaluation(
self.cursor.latest_evaluation_by_asset_key.get(evaluation.asset_key)
self.cursor.get_previous_evaluation(evaluation.asset_key)
)
],
)
Expand Down
Loading

0 comments on commit 1823a74

Please sign in to comment.