Skip to content

Commit

Permalink
Make AssetDaemonCursor whitelisted for serdes
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jan 19, 2024
1 parent 36f827a commit ef07600
Show file tree
Hide file tree
Showing 21 changed files with 604 additions and 580 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from dagster import AssetKey, RunRequest
from dagster._core.definitions.asset_daemon_cursor import (
AssetDaemonCursor,
LegacyAssetDaemonCursorWrapper,
)
from dagster._core.definitions.run_request import (
InstigatorType,
Expand All @@ -28,8 +27,10 @@
_PRE_SENSOR_AUTO_MATERIALIZE_INSTIGATOR_NAME,
_PRE_SENSOR_AUTO_MATERIALIZE_ORIGIN_ID,
_PRE_SENSOR_AUTO_MATERIALIZE_SELECTOR_ID,
asset_daemon_cursor_to_instigator_serialized_cursor,
)
from dagster._serdes import deserialize_value
from dagster._serdes.serdes import serialize_value
from dagster_graphql.test.utils import execute_dagster_graphql, infer_repository

from dagster_graphql_tests.graphql.graphql_context_test_suite import (
Expand Down Expand Up @@ -352,9 +353,9 @@ def test_automation_policy_sensor(self, graphql_context: WorkspaceRequestContext
status=InstigatorStatus.RUNNING,
instigator_data=SensorInstigatorData(
sensor_type=SensorType.AUTOMATION_POLICY,
cursor=LegacyAssetDaemonCursorWrapper(
AssetDaemonCursor.empty()._replace(evaluation_id=12345).serialize()
).to_compressed(),
cursor=asset_daemon_cursor_to_instigator_serialized_cursor(
AssetDaemonCursor.empty(12345)
),
),
)
)
Expand Down Expand Up @@ -708,7 +709,7 @@ def _test_get_evaluations_with_partitions(self, graphql_context: WorkspaceReques

def _test_current_evaluation_id(self, graphql_context: WorkspaceRequestContext):
graphql_context.instance.daemon_cursor_storage.set_cursor_values(
{_PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY: AssetDaemonCursor.empty().serialize()}
{_PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY: serialize_value(AssetDaemonCursor.empty(0))}
)

results = execute_dagster_graphql(
Expand All @@ -728,9 +729,7 @@ def _test_current_evaluation_id(self, graphql_context: WorkspaceRequestContext):
graphql_context.instance.daemon_cursor_storage.set_cursor_values(
{
_PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY: (
AssetDaemonCursor.empty()
.with_updates(0, set(), set(), set(), {}, 42, None, [], 0) # type: ignore
.serialize()
serialize_value(AssetDaemonCursor.empty(0).with_updates(0, 1.0, [], []))
)
}
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import functools
import hashlib
from abc import ABC, abstractmethod
from abc import ABC, abstractmethod, abstractproperty
from typing import (
TYPE_CHECKING,
AbstractSet,
Expand Down Expand Up @@ -148,6 +148,10 @@ def evaluate(
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
raise NotImplementedError()

@abstractproperty
def description(self) -> str:
raise NotImplementedError()

def __and__(self, other: "AssetCondition") -> "AssetCondition":
# group AndAutomationConditions together
if isinstance(self, AndAssetCondition):
Expand Down Expand Up @@ -192,7 +196,7 @@ def snapshot(self) -> AssetConditionSnapshot:
"""Returns a snapshot of this condition that can be used for serialization."""
return AssetConditionSnapshot(
class_name=self.__class__.__name__,
description=str(self),
description=self.description,
unique_id=self.unique_id,
)

Expand All @@ -205,25 +209,29 @@ class RuleCondition(

@property
def unique_id(self) -> str:
parts = [self.rule.__class__.__name__, self.rule.description]
parts = [self.rule.__class__.__name__, self.description]
return hashlib.md5("".join(parts).encode()).hexdigest()

@property
def description(self) -> str:
return self.rule.description

def evaluate(
self, context: AssetConditionEvaluationContext
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
context.root_context.daemon_context._verbose_log_fn( # noqa
f"Evaluating rule: {self.rule.to_snapshot()}"
)
true_subset, subsets_with_metadata = self.rule.evaluate_for_asset(context)
true_subset, subsets_with_metadata, extras = self.rule.evaluate_for_asset(context)
context.root_context.daemon_context._verbose_log_fn( # noqa
f"Rule returned {true_subset.size} partitions"
f"Rule returned {true_subset.size} partitions" f"{true_subset}"
)
return AssetConditionEvaluation(
condition_snapshot=self.snapshot,
true_subset=true_subset,
candidate_subset=context.candidate_subset,
subsets_with_metadata=subsets_with_metadata,
), [AssetConditionCursorExtras(condition_snapshot=self.snapshot, extras={})]
), [AssetConditionCursorExtras(condition_snapshot=self.snapshot, extras=extras)]


class AndAssetCondition(
Expand All @@ -232,6 +240,10 @@ class AndAssetCondition(
):
"""This class represents the condition that all of its children evaluate to true."""

@property
def description(self) -> str:
return "All of"

def evaluate(
self, context: AssetConditionEvaluationContext
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
Expand All @@ -258,6 +270,10 @@ class OrAssetCondition(
):
"""This class represents the condition that any of its children evaluate to true."""

@property
def description(self) -> str:
return "Any of"

def evaluate(
self, context: AssetConditionEvaluationContext
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
Expand Down Expand Up @@ -290,6 +306,10 @@ def __new__(cls, children: Sequence[AssetCondition]):
check.invariant(len(children) == 1)
return super().__new__(cls, children)

@property
def description(self) -> str:
return "Not"

@property
def child(self) -> AssetCondition:
return self.children[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,18 @@ def parent_will_update_subset(self) -> AssetSubset:
subset |= parent_subset._replace(asset_key=self.asset_key)
return subset

@property
@functools.cached_property
@root_property
def new_max_storage_id(self) -> Optional[int]:
"""Returns the new max storage ID for this asset, if any."""
# TODO: This is not a good way of doing this, as it opens us up to potential race conditions,
# but in the interest of keeping this PR simple, I'm leaving this logic as is. In the next
# PR, I'll update the code to return a "maximum observed record id" from inside the
# `get_asset_partitions_updated_after_cursor` call.
return self.instance_queryer.instance.event_log_storage.get_maximum_record_id()

@functools.cached_property
@root_property
def materialized_since_previous_tick_subset(self) -> AssetSubset:
"""Returns the set of asset partitions that were materialized since the previous tick."""
return AssetSubset.from_asset_partitions_set(
Expand All @@ -160,44 +171,28 @@ def materialized_since_previous_tick_subset(self) -> AssetSubset:
self.instance_queryer.get_asset_partitions_updated_after_cursor(
self.asset_key,
asset_partitions=None,
after_cursor=self.cursor.previous_max_storage_id if self.cursor else None,
after_cursor=self.cursor.previous_max_storage_id,
respect_materialization_data_versions=False,
),
)

@property
def previous_tick_requested_or_discarded_subset(self) -> AssetSubset:
if not self.cursor.previous_evaluation:
@root_property
def previous_tick_requested_subset(self) -> AssetSubset:
"""The set of asset partitions that were requested (or discarded) on the previous tick."""
previous_evaluation = self.cursor.previous_evaluation
if previous_evaluation is None:
return self.empty_subset()
return self.cursor.previous_evaluation.get_requested_or_discarded_subset(
self.root_context.condition
)

return previous_evaluation.get_requested_or_discarded_subset(self.condition)

@property
def materialized_requested_or_discarded_since_previous_tick_subset(self) -> AssetSubset:
"""Returns the set of asset partitions that were materialized since the previous tick."""
return (
self.materialized_since_previous_tick_subset
| self.previous_tick_requested_or_discarded_subset
)
return self.materialized_since_previous_tick_subset | self.previous_tick_requested_subset

@property
def never_materialized_requested_or_discarded_root_subset(self) -> AssetSubset:
if self.asset_key not in self.asset_graph.root_materializable_or_observable_asset_keys:
return self.empty_subset()

handled_subset = (
self.cursor.get_extras_value(self.condition, "handled_subset", AssetSubset)
or self.empty_subset()
)
unhandled_subset = handled_subset.inverse(
self.partitions_def,
dynamic_partitions_store=self.instance_queryer,
current_time=self.evaluation_time,
)
return unhandled_subset - self.materialized_since_previous_tick_subset

@property
@functools.cached_property
@root_property
def parent_has_updated_subset(self) -> AssetSubset:
"""Returns the set of asset partitions whose parents have updated since the last time this
condition was evaluated.
Expand All @@ -206,7 +201,7 @@ def parent_has_updated_subset(self) -> AssetSubset:
self.asset_key,
self.partitions_def,
self.root_context.instance_queryer.asset_partitions_with_newly_updated_parents(
latest_storage_id=self.previous_max_storage_id,
latest_storage_id=self.cursor.previous_max_storage_id,
child_asset_key=self.root_context.asset_key,
map_old_time_partitions=False,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
get_time_partitions_def,
)
from dagster._core.instance import DynamicPartitionsStore
from dagster._utils.cached_method import cached_method

from ... import PartitionKeyRange
from ..storage.tags import ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG
Expand Down Expand Up @@ -134,10 +133,6 @@ def cursor(self) -> AssetDaemonCursor:
def asset_graph(self) -> AssetGraph:
return self.instance_queryer.asset_graph

@property
def latest_storage_id(self) -> Optional[int]:
return self.cursor.latest_storage_id

@property
def auto_materialize_asset_keys(self) -> AbstractSet[AssetKey]:
return self._auto_materialize_asset_keys
Expand Down Expand Up @@ -177,46 +172,6 @@ def prefetch(self) -> None:
)
self.instance_queryer.prefetch_asset_records(self.asset_records_to_prefetch)
self._logger.info("Done prefetching asset records.")
self._logger.info(
f"Calculated a new latest_storage_id value of {self.get_new_latest_storage_id()}.\n"
f"Precalculating updated parents for {len(self.auto_materialize_asset_keys)} assets using previous "
f"latest_storage_id of {self.latest_storage_id}."
)
for asset_key in self.auto_materialize_asset_keys:
self.instance_queryer.asset_partitions_with_newly_updated_parents(
latest_storage_id=self.latest_storage_id, child_asset_key=asset_key
)
self._logger.info("Done precalculating updated parents.")

@cached_method
def get_new_latest_storage_id(self) -> Optional[int]:
"""Get the latest storage id across all cached asset records. We use this method as it uses
identical data to what is used to calculate assets with updated parents, and therefore
avoids certain classes of race conditions.
"""
new_latest_storage_id = self.latest_storage_id
for asset_key in self.auto_materialize_asset_keys_and_parents:
# ignore non-observable sources
if self.asset_graph.is_source(asset_key) and not self.asset_graph.is_observable(
asset_key
):
continue
# ignore cases where we know for sure there's no new event
if not self.instance_queryer.asset_partition_has_materialization_or_observation(
AssetKeyPartitionKey(asset_key), after_cursor=self.latest_storage_id
):
continue
# get the latest overall storage id for this asset key
asset_latest_storage_id = (
self.instance_queryer.get_latest_materialization_or_observation_storage_id(
AssetKeyPartitionKey(asset_key)
)
)
new_latest_storage_id = max(
filter(None, [new_latest_storage_id, asset_latest_storage_id]), default=None
)

return new_latest_storage_id

def evaluate_asset(
self,
Expand All @@ -241,11 +196,11 @@ def evaluate_asset(
self.asset_graph.auto_materialize_policies_by_key.get(asset_key)
).to_asset_condition()

asset_cursor = self.cursor.asset_cursor_for_key(asset_key, self.asset_graph)
asset_cursor = self.cursor.get_asset_cursor(asset_key)

context = AssetConditionEvaluationContext.create(
asset_key=asset_key,
cursor=self.cursor.asset_cursor_for_key(asset_key, self.asset_graph),
cursor=asset_cursor,
condition=asset_condition,
instance_queryer=self.instance_queryer,
data_time_resolver=self.data_time_resolver,
Expand All @@ -254,9 +209,15 @@ def evaluate_asset(
expected_data_time_mapping=expected_data_time_mapping,
)

evaluation, condition_cursor = asset_condition.evaluate(context)
evaluation, extras = asset_condition.evaluate(context)

new_asset_cursor = asset_cursor.with_updates(context, evaluation)
new_asset_cursor = AssetConditionCursor(
asset_key=asset_key,
previous_max_storage_id=context.new_max_storage_id,
previous_evaluation_timestamp=context.evaluation_time.timestamp(),
previous_evaluation=evaluation,
extras=extras,
)

expected_data_time = get_expected_data_time_for_asset_key(
context, will_materialize=evaluation.true_subset.size > 0
Expand Down Expand Up @@ -365,24 +326,21 @@ def evaluate(
return (
run_requests,
self.cursor.with_updates(
latest_storage_id=self.get_new_latest_storage_id(),
evaluation_id=self._evaluation_id,
asset_cursors=asset_cursors,
newly_observe_requested_asset_keys=[
asset_key
for run_request in auto_observe_run_requests
for asset_key in cast(Sequence[AssetKey], run_request.asset_selection)
],
observe_request_timestamp=observe_request_timestamp,
evaluations=evaluations,
evaluation_time=self.instance_queryer.evaluation_time,
asset_cursors=asset_cursors,
evaluation_timestamp=self.instance_queryer.evaluation_time.timestamp(),
),
# only record evaluations where something changed
[
evaluation
for evaluation in evaluations
if not evaluation.equivalent_to_stored_evaluation(
self.cursor.latest_evaluation_by_asset_key.get(evaluation.asset_key)
self.cursor.get_previous_evaluation(evaluation.asset_key)
)
],
)
Expand Down
Loading

0 comments on commit ef07600

Please sign in to comment.