Skip to content

Commit

Permalink
AssetConditionCursor
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jan 9, 2024
1 parent ed2b2ca commit 05629cd
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 127 deletions.
61 changes: 42 additions & 19 deletions python_modules/dagster/dagster/_core/definitions/asset_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
)

import dagster._check as check
from dagster._core.definitions.asset_daemon_cursor import (
AssetConditionCursorExtras,
)
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.metadata import MetadataMapping, MetadataValue
from dagster._serdes.serdes import whitelist_for_serdes
Expand Down Expand Up @@ -72,7 +75,7 @@ def equivalent_to_stored_evaluation(self, other: Optional["AssetConditionEvaluat
and self.child_evaluations == other.child_evaluations
)

def discard_subset(self, condition: "AssetCondition") -> Optional[AssetSubset]:
def discarded_subset(self, condition: "AssetCondition") -> Optional[AssetSubset]:
not_discard_condition = condition.not_discard_condition
if not not_discard_condition or len(self.child_evaluations) != 3:
return None
Expand All @@ -81,6 +84,12 @@ def discard_subset(self, condition: "AssetCondition") -> Optional[AssetSubset]:
discard_evaluation = not_discard_evaluation.child_evaluations[0]
return discard_evaluation.true_subset

def get_requested_or_discarded_subset(self, condition: "AssetCondition") -> AssetSubset:
discarded_subset = self.discarded_subset(condition)
if discarded_subset is None:
return self.true_subset
return self.true_subset | discarded_subset

def for_child(self, child_condition: "AssetCondition") -> Optional["AssetConditionEvaluation"]:
"""Returns the evaluation of a given child condition by finding the child evaluation that
has an identical hash to the given condition.
Expand Down Expand Up @@ -129,7 +138,9 @@ def unique_id(self) -> str:
return hashlib.md5("".join(parts).encode()).hexdigest()

@abstractmethod
def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation:
def evaluate(
self, context: AssetConditionEvaluationContext
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
raise NotImplementedError()

def __and__(self, other: "AssetCondition") -> "AssetCondition":
Expand Down Expand Up @@ -192,7 +203,9 @@ def unique_id(self) -> str:
parts = [self.rule.__class__.__name__, self.rule.description]
return hashlib.md5("".join(parts).encode()).hexdigest()

def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation:
def evaluate(
self, context: AssetConditionEvaluationContext
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
context.root_context.daemon_context._verbose_log_fn( # noqa
f"Evaluating rule: {self.rule.to_snapshot()}"
)
Expand All @@ -205,7 +218,7 @@ def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEv
true_subset=true_subset,
candidate_subset=context.candidate_subset,
subsets_with_metadata=subsets_with_metadata,
)
), [AssetConditionCursorExtras(condition_snapshot=self.snapshot, extras={})]


class AndAssetCondition(
Expand All @@ -214,20 +227,24 @@ class AndAssetCondition(
):
"""This class represents the condition that all of its children evaluate to true."""

def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation:
def evaluate(
self, context: AssetConditionEvaluationContext
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
child_evaluations: List[AssetConditionEvaluation] = []
child_extras: List[AssetConditionCursorExtras] = []
true_subset = context.candidate_subset
for child in self.children:
child_context = context.for_child(condition=child, candidate_subset=true_subset)
result = child.evaluate(child_context)
child_evaluations.append(result)
true_subset &= result.true_subset
child_evaluation, child_extra = child.evaluate(child_context)
child_evaluations.append(child_evaluation)
child_extras.extend(child_extra)
true_subset &= child_evaluation.true_subset
return AssetConditionEvaluation(
condition_snapshot=self.snapshot,
true_subset=true_subset,
candidate_subset=context.candidate_subset,
child_evaluations=child_evaluations,
)
), child_extras


class OrAssetCondition(
Expand All @@ -236,22 +253,26 @@ class OrAssetCondition(
):
"""This class represents the condition that any of its children evaluate to true."""

def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation:
def evaluate(
self, context: AssetConditionEvaluationContext
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
child_evaluations: List[AssetConditionEvaluation] = []
child_extras: List[AssetConditionCursorExtras] = []
true_subset = context.empty_subset()
for child in self.children:
child_context = context.for_child(
condition=child, candidate_subset=context.candidate_subset
)
result = child.evaluate(child_context)
child_evaluations.append(result)
true_subset |= result.true_subset
child_evaluation, child_extra = child.evaluate(child_context)
child_evaluations.append(child_evaluation)
child_extras.extend(child_extra)
true_subset |= child_evaluation.true_subset
return AssetConditionEvaluation(
condition_snapshot=self.snapshot,
true_subset=true_subset,
candidate_subset=context.candidate_subset,
child_evaluations=child_evaluations,
)
), child_extras


class NotAssetCondition(
Expand All @@ -268,16 +289,18 @@ def __new__(cls, children: Sequence[AssetCondition]):
def child(self) -> AssetCondition:
return self.children[0]

def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation:
def evaluate(
self, context: AssetConditionEvaluationContext
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
child_context = context.for_child(
condition=self.child, candidate_subset=context.candidate_subset
)
result = self.child.evaluate(child_context)
true_subset = context.candidate_subset - result.true_subset
child_evaluation, child_extras = self.child.evaluate(child_context)
true_subset = context.candidate_subset - child_evaluation.true_subset

return AssetConditionEvaluation(
condition_snapshot=self.snapshot,
true_subset=true_subset,
candidate_subset=context.candidate_subset,
child_evaluations=[result],
)
child_evaluations=[child_evaluation],
), child_extras
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer

from .asset_daemon_cursor import AssetDaemonAssetCursor
from .asset_daemon_cursor import AssetConditionCursor
from .asset_graph import AssetGraph
from .asset_subset import AssetSubset

if TYPE_CHECKING:
from dagster._core.definitions.asset_condition import AssetSubsetWithMetadata

from .asset_condition import AssetCondition, AssetConditionEvaluation
from .asset_condition import AssetCondition, AssetConditionEvaluation, AssetSubsetWithMetadata
from .asset_daemon_context import AssetDaemonContext


Expand All @@ -41,8 +39,8 @@ class AssetConditionEvaluationContext:

asset_key: AssetKey
condition: "AssetCondition"
asset_cursor: Optional[AssetDaemonAssetCursor]
previous_evaluation: Optional["AssetConditionEvaluation"]
cursor: AssetConditionCursor
previous_condition_evaluation: Optional["AssetConditionEvaluation"]
candidate_subset: AssetSubset

instance_queryer: CachingInstanceQueryer
Expand All @@ -58,21 +56,23 @@ class AssetConditionEvaluationContext:
def create(
asset_key: AssetKey,
condition: "AssetCondition",
asset_cursor: Optional[AssetDaemonAssetCursor],
cursor: AssetConditionCursor,
instance_queryer: CachingInstanceQueryer,
data_time_resolver: CachingDataTimeResolver,
daemon_context: "AssetDaemonContext",
evaluation_results_by_key: Mapping[AssetKey, "AssetConditionEvaluation"],
expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]],
) -> "AssetConditionEvaluationContext":
partitions_def = instance_queryer.asset_graph.get_partitions_def(asset_key)

return AssetConditionEvaluationContext(
asset_key=asset_key,
condition=condition,
asset_cursor=asset_cursor,
previous_evaluation=asset_cursor.previous_evaluation if asset_cursor else None,
cursor=cursor,
previous_condition_evaluation=cursor.previous_evaluation,
candidate_subset=AssetSubset.all(
asset_key,
instance_queryer.asset_graph.get_partitions_def(asset_key),
partitions_def,
instance_queryer,
instance_queryer.evaluation_time,
),
Expand All @@ -89,10 +89,10 @@ def for_child(
return dataclasses.replace(
self,
condition=condition,
candidate_subset=candidate_subset,
previous_evaluation=self.previous_evaluation.for_child(condition)
if self.previous_evaluation
previous_condition_evaluation=self.previous_condition_evaluation.for_child(condition)
if self.previous_condition_evaluation
else None,
candidate_subset=candidate_subset,
root_ref=self.root_context,
)

Expand All @@ -116,15 +116,23 @@ def evaluation_time(self) -> datetime.datetime:

@property
def previous_max_storage_id(self) -> Optional[int]:
if not self.asset_cursor:
return None
return self.asset_cursor.previous_max_storage_id
return self.cursor.previous_max_storage_id

@property
def previous_evaluation_timestamp(self) -> Optional[float]:
if not self.asset_cursor:
return None
return self.asset_cursor.previous_evaluation_timestamp
return self.cursor.previous_evaluation_timestamp

@property
def previous_true_subset(self) -> AssetSubset:
if self.previous_condition_evaluation is None:
return self.empty_subset()
return self.previous_condition_evaluation.true_subset

@property
def previous_subsets_with_metadata(self) -> Sequence["AssetSubsetWithMetadata"]:
if self.previous_condition_evaluation is None:
return []
return self.previous_condition_evaluation.subsets_with_metadata

@functools.cached_property
@root_property
Expand All @@ -143,16 +151,7 @@ def parent_will_update_subset(self) -> AssetSubset:
subset |= parent_subset._replace(asset_key=self.asset_key)
return subset

@functools.cached_property
@root_property
def previous_tick_requested_subset(self) -> AssetSubset:
"""Returns the set of asset partitions that were requested on the previous tick."""
if not self.previous_evaluation:
return self.empty_subset()
return self.previous_evaluation.true_subset

@functools.cached_property
@root_property
@property
def materialized_since_previous_tick_subset(self) -> AssetSubset:
"""Returns the set of asset partitions that were materialized since the previous tick."""
return AssetSubset.from_asset_partitions_set(
Expand All @@ -161,35 +160,35 @@ def materialized_since_previous_tick_subset(self) -> AssetSubset:
self.instance_queryer.get_asset_partitions_updated_after_cursor(
self.asset_key,
asset_partitions=None,
after_cursor=self.asset_cursor.previous_max_storage_id
if self.asset_cursor
else None,
after_cursor=self.cursor.previous_max_storage_id if self.cursor else None,
respect_materialization_data_versions=False,
),
)

@functools.cached_property
@root_property
@property
def previous_tick_requested_or_discarded_subset(self) -> AssetSubset:
if not self.cursor.previous_evaluation:
return self.empty_subset()
return self.cursor.previous_evaluation.get_requested_or_discarded_subset(
self.root_context.condition
)

@property
def materialized_requested_or_discarded_since_previous_tick_subset(self) -> AssetSubset:
"""Returns the set of asset partitions that were materialized since the previous tick."""
if not self.previous_evaluation:
return self.materialized_since_previous_tick_subset
return (
self.materialized_since_previous_tick_subset
| self.previous_evaluation.true_subset
| (self.previous_evaluation.discard_subset(self.condition) or self.empty_subset())
| self.previous_tick_requested_or_discarded_subset
)

@functools.cached_property
@root_property
@property
def never_materialized_requested_or_discarded_root_subset(self) -> AssetSubset:
if self.asset_key not in self.asset_graph.root_materializable_or_observable_asset_keys:
return self.empty_subset()

handled_subset = (
self.asset_cursor.materialized_requested_or_discarded_subset
if self.asset_cursor
else self.empty_subset()
self.cursor.get_extras_value(self.condition, "handled_subset", AssetSubset)
or self.empty_subset()
)
unhandled_subset = handled_subset.inverse(
self.partitions_def,
Expand All @@ -199,7 +198,6 @@ def never_materialized_requested_or_discarded_root_subset(self) -> AssetSubset:
return unhandled_subset - self.materialized_since_previous_tick_subset

@property
@root_property
def parent_has_updated_subset(self) -> AssetSubset:
"""Returns the set of asset partitions whose parents have updated since the last time this
condition was evaluated.
Expand Down Expand Up @@ -228,21 +226,13 @@ def candidates_not_evaluated_on_previous_tick_subset(self) -> AssetSubset:
"""Returns the set of candidates for this tick which were not candidates on the previous
tick.
"""
if not self.previous_evaluation or not self.previous_evaluation.candidate_subset:
if not self.previous_condition_evaluation:
return self.candidate_subset
return self.candidate_subset - self.previous_evaluation.candidate_subset

@property
def previous_tick_subsets_with_metadata(self) -> Sequence["AssetSubsetWithMetadata"]:
"""Returns the RuleEvaluationResults calculated on the previous tick for this condition."""
return self.previous_evaluation.subsets_with_metadata if self.previous_evaluation else []

@property
def previous_tick_true_subset(self) -> AssetSubset:
"""Returns the set of asset partitions that were true for this condition on the previous tick."""
if not self.previous_evaluation:
# when the candidate_subset is None, this indicates that the entire asset was evaluated
# for this condition on the previous tick
elif self.previous_condition_evaluation.candidate_subset is None:
return self.empty_subset()
return self.previous_evaluation.true_subset
return self.candidate_subset - self.previous_condition_evaluation.candidate_subset

def materializable_in_same_run(self, child_key: AssetKey, parent_key: AssetKey) -> bool:
"""Returns whether a child asset can be materialized in the same run as a parent asset."""
Expand Down Expand Up @@ -296,28 +286,3 @@ def will_update_asset_partition(self, asset_partition: AssetKeyPartitionKey) ->

def empty_subset(self) -> AssetSubset:
return AssetSubset.empty(self.asset_key, self.partitions_def)

def get_new_asset_cursor(
self, evaluation: "AssetConditionEvaluation"
) -> AssetDaemonAssetCursor:
"""Returns a new AssetDaemonAssetCursor based on the current cursor and the results of
this tick's evaluation.
"""
previous_handled_subset = (
self.asset_cursor.materialized_requested_or_discarded_subset
if self.asset_cursor
else self.empty_subset()
)
new_handled_subset = (
previous_handled_subset
| self.materialized_requested_or_discarded_since_previous_tick_subset
| evaluation.true_subset
| (evaluation.discard_subset(self.condition) or self.empty_subset())
)
return AssetDaemonAssetCursor(
asset_key=self.asset_key,
previous_max_storage_id=self.daemon_context.get_new_latest_storage_id(),
previous_evaluation=evaluation,
previous_evaluation_timestamp=self.evaluation_time.timestamp(),
materialized_requested_or_discarded_subset=new_handled_subset,
)
Loading

0 comments on commit 05629cd

Please sign in to comment.