Skip to content

Commit

Permalink
Add SkipOnNotAllParentsUpdatedSinceCronRule (#19553)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Adds a new SkipOnNotAllParentsUpdatedSinceCron rule. While this can certainly be used in a vacuum, the primary use case is for this to be paired with the `MaterializeOnCron` rule to produce accurate "schedule-like" behavior.

To explain more fully, it's fairly intuitive that if you have an asset in the middle of the graph, telling that asset to update at 9am every day is likely not going to result in good outcomes, as you have no idea if the parents will have been updated by that point in time or not.

You can help this situation by saying "if it's after 9am, only materialize if all of your parents have updated since the last time you ran", but this is brittle because those parents could have been updated yesterday (and so still contain 'old data')

This rule more accurately captures the true desires of someone setting up this sort of policy, by enforcing that those parents must have been updated more recently than a certain time of day.

It does this quite efficiently, all in all. The basic logic is to keep track of / build up the set of parent partitions updated since the previous tick iteratively.

On the first evaluation after a new cron schedule tick, we do an explicit query to the database to get the exact set of parent partitions updated since the previous tick (often this will be empty / cheap to get, assuming we do that evaluation pretty soon after we've rolled over to the new cron tick, there won't be time for a parent to have been materialized).

On subsequent evaluations (during the same cron schedule tick), we simply add in any newly-updated parent partitions since the previous evaluation (which we can get essentially for free as this information is used by a bunch of other rules and cached).

This means at any point in time, we can have an accurate set of parent partitions updated since the previous cron schedule tick, and so we can iterate through our candidate set and see which ones have all their parents in that set (or in the set of parents that will update this tick).

## How I Tested These Changes

Unit tests
  • Loading branch information
OwenKephart authored Feb 13, 2024
1 parent 4c247d3 commit fbf58b9
Show file tree
Hide file tree
Showing 7 changed files with 548 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import pendulum

from dagster._core.definitions.asset_condition import HistoricalAllPartitionsSubsetSentinel
from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._core.definitions.metadata import MetadataValue
Expand Down Expand Up @@ -162,6 +163,18 @@ def previous_true_subset(self) -> AssetSubset:
return self.empty_subset()
return self.previous_evaluation.true_subset

@property
def previous_candidate_subset(self) -> AssetSubset:
if self.previous_evaluation is None:
return self.empty_subset()
candidate_subset = self.previous_evaluation.candidate_subset
if isinstance(candidate_subset, HistoricalAllPartitionsSubsetSentinel):
return AssetSubset.all(
self.asset_key, self.partitions_def, self.instance_queryer, self.evaluation_time
)
else:
return candidate_subset

@property
def previous_subsets_with_metadata(self) -> Sequence["AssetSubsetWithMetadata"]:
if self.previous_evaluation is None:
Expand Down
16 changes: 8 additions & 8 deletions python_modules/dagster/dagster/_core/definitions/asset_subset.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ def from_asset_partitions_set(
),
)

def __contains__(self, item: AssetKeyPartitionKey) -> bool:
if not self.is_partitioned:
return (
item.asset_key == self.asset_key and item.partition_key is None and self.bool_value
)
else:
return item.asset_key == self.asset_key and item.partition_key in self.subset_value


@whitelist_for_serdes(serializer=AssetSubsetSerializer)
class ValidAssetSubset(AssetSubset):
Expand Down Expand Up @@ -212,14 +220,6 @@ def __or__(self, other: AssetSubset) -> "ValidAssetSubset":
"""Returns an AssetSubset representing self.asset_partitions | other.asset_partitions."""
return self._oper(self.get_valid(other), operator.or_)

def __contains__(self, item: AssetKeyPartitionKey) -> bool:
if not self.is_partitioned:
return (
item.asset_key == self.asset_key and item.partition_key is None and self.bool_value
)
else:
return item.asset_key == self.asset_key and item.partition_key in self.subset_value

def get_valid(self, other: AssetSubset) -> "ValidAssetSubset":
"""Creates a ValidAssetSubset from the given AssetSubset by returning a copy of the given
AssetSubset if it is compatible with this AssetSubset, otherwise returns an empty subset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import dagster._check as check
from dagster._annotations import experimental, public
from dagster._core.definitions.asset_subset import AssetSubset
from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset
from dagster._core.definitions.auto_materialize_rule_evaluation import (
AutoMaterializeDecisionType,
AutoMaterializeRuleSnapshot,
Expand All @@ -30,6 +30,8 @@
)
from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition
from dagster._core.definitions.time_window_partitions import (
TimeWindow,
TimeWindowPartitionsDefinition,
get_time_partitions_def,
)
from dagster._core.storage.dagster_run import RunsFilter
Expand Down Expand Up @@ -182,6 +184,22 @@ def skip_on_not_all_parents_updated(
"""
return SkipOnNotAllParentsUpdatedRule(require_update_for_all_parent_partitions)

@staticmethod
def skip_on_not_all_parents_updated_since_cron(
cron_schedule: str, timezone: str = "UTC"
) -> "SkipOnNotAllParentsUpdatedSinceCronRule":
"""Skip materializing an asset partition if any of its parents have not been updated since
the latest tick of the given cron schedule.
Args:
cron_schedule (str): A cron schedule string (e.g. "`0 * * * *`").
timezone (str): The timezone in which this cron schedule should be evaluated. Defaults
to "UTC".
"""
return SkipOnNotAllParentsUpdatedSinceCronRule(
cron_schedule=cron_schedule, timezone=timezone
)

@public
@staticmethod
def skip_on_required_but_nonexistent_parents() -> "SkipOnRequiredButNonexistentParentsRule":
Expand Down Expand Up @@ -820,6 +838,207 @@ def evaluate_for_asset(
return AssetConditionResult.create(context, true_subset, subsets_with_metadata)


@whitelist_for_serdes
class SkipOnNotAllParentsUpdatedSinceCronRule(
AutoMaterializeRule,
NamedTuple(
"_SkipOnNotAllParentsUpdatedSinceCronRule",
[("cron_schedule", str), ("timezone", str)],
),
):
@property
def decision_type(self) -> AutoMaterializeDecisionType:
return AutoMaterializeDecisionType.SKIP

@property
def description(self) -> str:
return f"waiting until all upstream assets have updated since the last cron schedule tick of '{self.cron_schedule}' (timezone: {self.timezone})"

def passed_time_window(self, context: AssetConditionEvaluationContext) -> TimeWindow:
"""Returns the window of time that has passed between the previous two cron ticks. All
parent assets must contain all data from this time window in order for this asset to be
materialized.
"""
previous_ticks = reverse_cron_string_iterator(
end_timestamp=context.evaluation_time.timestamp(),
cron_string=self.cron_schedule,
execution_timezone=self.timezone,
)
end_time = next(previous_ticks)
start_time = next(previous_ticks)

return TimeWindow(start=start_time, end=end_time)

def get_parent_subset_updated_since_cron(
self,
context: AssetConditionEvaluationContext,
parent_asset_key: AssetKey,
passed_time_window: TimeWindow,
) -> ValidAssetSubset:
"""Returns the AssetSubset of a given parent asset that has been updated since the end of
the previous cron tick. If a value for this parent asset was computed on the previous
evaluation, and that evaluation happened within the same cron tick as the current evaluation,
then this value will be calculated incrementally from the previous value to avoid expensive
queries.
"""
if (
# first tick of evaluating this condition
context.previous_evaluation_state is None
or context.previous_evaluation_timestamp is None
# new cron tick has happened since the previous tick
or passed_time_window.end.timestamp() > context.previous_evaluation_timestamp
):
return context.instance_queryer.get_asset_subset_updated_after_time(
asset_key=parent_asset_key, after_time=passed_time_window.end
)
else:
# previous state still valid
previous_parent_subsets = (
context.previous_evaluation_state.get_extra_state(context.condition, list) or []
)
previous_parent_subset = next(
(s for s in previous_parent_subsets if s.asset_key == parent_asset_key),
context.empty_subset(),
)

# the set of asset partitions that have been updated since the previous evaluation
new_parent_subset = context.instance_queryer.get_asset_subset_updated_after_cursor(
asset_key=parent_asset_key, after_cursor=context.previous_max_storage_id
)
return new_parent_subset | previous_parent_subset

def get_parent_subsets_updated_since_cron_by_key(
self, context: AssetConditionEvaluationContext, passed_time_window: TimeWindow
) -> Mapping[AssetKey, ValidAssetSubset]:
"""Returns a mapping of parent asset keys to the AssetSubset of each parent that has been
updated since the end of the previous cron tick. Does not compute this value for time-window
partitioned parents, as their partitions encode the time windows they have processed.
"""
updated_subsets_by_key = {}
for parent_asset_key in context.asset_graph.get_parents(context.asset_key):
# no need to incrementally calculate updated time-window partitions definitions, as
# their partitions encode the time windows they have processed.
if isinstance(
context.asset_graph.get_partitions_def(parent_asset_key),
TimeWindowPartitionsDefinition,
):
continue
updated_subsets_by_key[parent_asset_key] = self.get_parent_subset_updated_since_cron(
context, parent_asset_key, passed_time_window
)
return updated_subsets_by_key

def parent_updated_since_cron(
self,
context: AssetConditionEvaluationContext,
passed_time_window: TimeWindow,
parent_asset_key: AssetKey,
child_asset_partition: AssetKeyPartitionKey,
updated_parent_subset: ValidAssetSubset,
) -> bool:
"""Returns if, for a given child asset partition, the given parent asset been updated with
information from the required time window.
"""
parent_partitions_def = context.asset_graph.get_partitions_def(parent_asset_key)

if isinstance(parent_partitions_def, TimeWindowPartitionsDefinition):
# for time window partitions definitions, we simply assert that all time partitions that
# were newly created between the previous cron ticks have been materialized
required_parent_partitions = parent_partitions_def.get_partition_keys_in_time_window(
time_window=passed_time_window
)

# for time window partitions definitions, we simply assert that all time partitions that
return all(
AssetKeyPartitionKey(parent_asset_key, partition_key)
in context.instance_queryer.get_materialized_asset_subset(
asset_key=parent_asset_key
)
for partition_key in required_parent_partitions
)
# for all other partitions definitions, we assert that all parent partition keys have
# been materialized since the previous cron tick
else:
if parent_partitions_def is None:
non_updated_parent_asset_partitions = updated_parent_subset.inverse(
parent_partitions_def
).asset_partitions
else:
parent_subset = context.asset_graph.get_parent_partition_keys_for_child(
child_asset_partition.partition_key,
parent_asset_key,
child_asset_partition.asset_key,
context.instance_queryer,
context.evaluation_time,
).partitions_subset

non_updated_parent_asset_partitions = (
ValidAssetSubset(parent_asset_key, parent_subset) - updated_parent_subset
).asset_partitions

return not any(
not context.will_update_asset_partition(p)
for p in non_updated_parent_asset_partitions
)

def evaluate_for_asset(
self, context: AssetConditionEvaluationContext
) -> "AssetConditionResult":
from .asset_condition import AssetConditionResult

passed_time_window = self.passed_time_window(context)
has_new_passed_time_window = passed_time_window.end.timestamp() > (
context.previous_evaluation_timestamp or 0
)
updated_subsets_by_key = self.get_parent_subsets_updated_since_cron_by_key(
context, passed_time_window
)

# only need to evaluate net-new candidates and candidates whose parents have updated, unless
# this is the first tick after a new cron schedule tick
subset_to_evaluate = (
(
context.candidates_not_evaluated_on_previous_tick_subset
| context.candidate_parent_has_or_will_update_subset
)
if not has_new_passed_time_window
else context.candidate_subset
)

# the set of candidates for whom all parents have been updated since the previous cron tick
all_parents_updated_subset = AssetSubset.from_asset_partitions_set(
context.asset_key,
context.partitions_def,
{
candidate
for candidate in subset_to_evaluate.asset_partitions
if all(
self.parent_updated_since_cron(
context,
passed_time_window,
parent_asset_key,
candidate,
updated_subsets_by_key.get(parent_asset_key, context.empty_subset()),
)
for parent_asset_key in context.asset_graph.get_parents(candidate.asset_key)
)
},
)
# if your parents were all updated since the previous cron tick on the previous evaluation,
# that will still be true unless a new cron tick has happened since the previous evaluation
if not has_new_passed_time_window:
all_parents_updated_subset = (
context.previous_candidate_subset.as_valid(context.partitions_def)
- context.previous_true_subset
) | all_parents_updated_subset

return AssetConditionResult.create(
context,
true_subset=context.candidate_subset - all_parents_updated_subset,
extra_state=list(updated_subsets_by_key.values()),
)


@whitelist_for_serdes
class SkipOnRequiredButNonexistentParentsRule(
AutoMaterializeRule, NamedTuple("_SkipOnRequiredButNonexistentParentsRule", [])
Expand Down
Loading

0 comments on commit fbf58b9

Please sign in to comment.