diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py b/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py index 973c6f6b593ab..c739fd0965e17 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py @@ -18,6 +18,7 @@ import pendulum +from dagster._core.definitions.asset_condition import HistoricalAllPartitionsSubsetSentinel from dagster._core.definitions.data_time import CachingDataTimeResolver from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey from dagster._core.definitions.metadata import MetadataValue @@ -162,6 +163,18 @@ def previous_true_subset(self) -> AssetSubset: return self.empty_subset() return self.previous_evaluation.true_subset + @property + def previous_candidate_subset(self) -> AssetSubset: + if self.previous_evaluation is None: + return self.empty_subset() + candidate_subset = self.previous_evaluation.candidate_subset + if isinstance(candidate_subset, HistoricalAllPartitionsSubsetSentinel): + return AssetSubset.all( + self.asset_key, self.partitions_def, self.instance_queryer, self.evaluation_time + ) + else: + return candidate_subset + @property def previous_subsets_with_metadata(self) -> Sequence["AssetSubsetWithMetadata"]: if self.previous_evaluation is None: diff --git a/python_modules/dagster/dagster/_core/definitions/asset_subset.py b/python_modules/dagster/dagster/_core/definitions/asset_subset.py index a8930e2c72ad0..f0c393208071f 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_subset.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_subset.py @@ -160,6 +160,14 @@ def from_asset_partitions_set( ), ) + def __contains__(self, item: AssetKeyPartitionKey) -> bool: + if not self.is_partitioned: + return ( + item.asset_key == self.asset_key and item.partition_key is None and self.bool_value + ) + else: + return item.asset_key == self.asset_key and item.partition_key in self.subset_value + @whitelist_for_serdes(serializer=AssetSubsetSerializer) class ValidAssetSubset(AssetSubset): @@ -212,14 +220,6 @@ def __or__(self, other: AssetSubset) -> "ValidAssetSubset": """Returns an AssetSubset representing self.asset_partitions | other.asset_partitions.""" return self._oper(self.get_valid(other), operator.or_) - def __contains__(self, item: AssetKeyPartitionKey) -> bool: - if not self.is_partitioned: - return ( - item.asset_key == self.asset_key and item.partition_key is None and self.bool_value - ) - else: - return item.asset_key == self.asset_key and item.partition_key in self.subset_value - def get_valid(self, other: AssetSubset) -> "ValidAssetSubset": """Creates a ValidAssetSubset from the given AssetSubset by returning a copy of the given AssetSubset if it is compatible with this AssetSubset, otherwise returns an empty subset. diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py index 69ae0fa271d54..1f1cc534a059b 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py @@ -17,7 +17,7 @@ import dagster._check as check from dagster._annotations import experimental, public -from dagster._core.definitions.asset_subset import AssetSubset +from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset from dagster._core.definitions.auto_materialize_rule_evaluation import ( AutoMaterializeDecisionType, AutoMaterializeRuleSnapshot, @@ -30,6 +30,8 @@ ) from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition from dagster._core.definitions.time_window_partitions import ( + TimeWindow, + TimeWindowPartitionsDefinition, get_time_partitions_def, ) from dagster._core.storage.dagster_run import RunsFilter @@ -182,6 +184,22 @@ def skip_on_not_all_parents_updated( """ return SkipOnNotAllParentsUpdatedRule(require_update_for_all_parent_partitions) + @staticmethod + def skip_on_not_all_parents_updated_since_cron( + cron_schedule: str, timezone: str = "UTC" + ) -> "SkipOnNotAllParentsUpdatedSinceCronRule": + """Skip materializing an asset partition if any of its parents have not been updated since + the latest tick of the given cron schedule. + + Args: + cron_schedule (str): A cron schedule string (e.g. "`0 * * * *`"). + timezone (str): The timezone in which this cron schedule should be evaluated. Defaults + to "UTC". + """ + return SkipOnNotAllParentsUpdatedSinceCronRule( + cron_schedule=cron_schedule, timezone=timezone + ) + @public @staticmethod def skip_on_required_but_nonexistent_parents() -> "SkipOnRequiredButNonexistentParentsRule": @@ -820,6 +838,207 @@ def evaluate_for_asset( return AssetConditionResult.create(context, true_subset, subsets_with_metadata) +@whitelist_for_serdes +class SkipOnNotAllParentsUpdatedSinceCronRule( + AutoMaterializeRule, + NamedTuple( + "_SkipOnNotAllParentsUpdatedSinceCronRule", + [("cron_schedule", str), ("timezone", str)], + ), +): + @property + def decision_type(self) -> AutoMaterializeDecisionType: + return AutoMaterializeDecisionType.SKIP + + @property + def description(self) -> str: + return f"waiting until all upstream assets have updated since the last cron schedule tick of '{self.cron_schedule}' (timezone: {self.timezone})" + + def passed_time_window(self, context: AssetConditionEvaluationContext) -> TimeWindow: + """Returns the window of time that has passed between the previous two cron ticks. All + parent assets must contain all data from this time window in order for this asset to be + materialized. + """ + previous_ticks = reverse_cron_string_iterator( + end_timestamp=context.evaluation_time.timestamp(), + cron_string=self.cron_schedule, + execution_timezone=self.timezone, + ) + end_time = next(previous_ticks) + start_time = next(previous_ticks) + + return TimeWindow(start=start_time, end=end_time) + + def get_parent_subset_updated_since_cron( + self, + context: AssetConditionEvaluationContext, + parent_asset_key: AssetKey, + passed_time_window: TimeWindow, + ) -> ValidAssetSubset: + """Returns the AssetSubset of a given parent asset that has been updated since the end of + the previous cron tick. If a value for this parent asset was computed on the previous + evaluation, and that evaluation happened within the same cron tick as the current evaluation, + then this value will be calculated incrementally from the previous value to avoid expensive + queries. + """ + if ( + # first tick of evaluating this condition + context.previous_evaluation_state is None + or context.previous_evaluation_timestamp is None + # new cron tick has happened since the previous tick + or passed_time_window.end.timestamp() > context.previous_evaluation_timestamp + ): + return context.instance_queryer.get_asset_subset_updated_after_time( + asset_key=parent_asset_key, after_time=passed_time_window.end + ) + else: + # previous state still valid + previous_parent_subsets = ( + context.previous_evaluation_state.get_extra_state(context.condition, list) or [] + ) + previous_parent_subset = next( + (s for s in previous_parent_subsets if s.asset_key == parent_asset_key), + context.empty_subset(), + ) + + # the set of asset partitions that have been updated since the previous evaluation + new_parent_subset = context.instance_queryer.get_asset_subset_updated_after_cursor( + asset_key=parent_asset_key, after_cursor=context.previous_max_storage_id + ) + return new_parent_subset | previous_parent_subset + + def get_parent_subsets_updated_since_cron_by_key( + self, context: AssetConditionEvaluationContext, passed_time_window: TimeWindow + ) -> Mapping[AssetKey, ValidAssetSubset]: + """Returns a mapping of parent asset keys to the AssetSubset of each parent that has been + updated since the end of the previous cron tick. Does not compute this value for time-window + partitioned parents, as their partitions encode the time windows they have processed. + """ + updated_subsets_by_key = {} + for parent_asset_key in context.asset_graph.get_parents(context.asset_key): + # no need to incrementally calculate updated time-window partitions definitions, as + # their partitions encode the time windows they have processed. + if isinstance( + context.asset_graph.get_partitions_def(parent_asset_key), + TimeWindowPartitionsDefinition, + ): + continue + updated_subsets_by_key[parent_asset_key] = self.get_parent_subset_updated_since_cron( + context, parent_asset_key, passed_time_window + ) + return updated_subsets_by_key + + def parent_updated_since_cron( + self, + context: AssetConditionEvaluationContext, + passed_time_window: TimeWindow, + parent_asset_key: AssetKey, + child_asset_partition: AssetKeyPartitionKey, + updated_parent_subset: ValidAssetSubset, + ) -> bool: + """Returns if, for a given child asset partition, the given parent asset been updated with + information from the required time window. + """ + parent_partitions_def = context.asset_graph.get_partitions_def(parent_asset_key) + + if isinstance(parent_partitions_def, TimeWindowPartitionsDefinition): + # for time window partitions definitions, we simply assert that all time partitions that + # were newly created between the previous cron ticks have been materialized + required_parent_partitions = parent_partitions_def.get_partition_keys_in_time_window( + time_window=passed_time_window + ) + + # for time window partitions definitions, we simply assert that all time partitions that + return all( + AssetKeyPartitionKey(parent_asset_key, partition_key) + in context.instance_queryer.get_materialized_asset_subset( + asset_key=parent_asset_key + ) + for partition_key in required_parent_partitions + ) + # for all other partitions definitions, we assert that all parent partition keys have + # been materialized since the previous cron tick + else: + if parent_partitions_def is None: + non_updated_parent_asset_partitions = updated_parent_subset.inverse( + parent_partitions_def + ).asset_partitions + else: + parent_subset = context.asset_graph.get_parent_partition_keys_for_child( + child_asset_partition.partition_key, + parent_asset_key, + child_asset_partition.asset_key, + context.instance_queryer, + context.evaluation_time, + ).partitions_subset + + non_updated_parent_asset_partitions = ( + ValidAssetSubset(parent_asset_key, parent_subset) - updated_parent_subset + ).asset_partitions + + return not any( + not context.will_update_asset_partition(p) + for p in non_updated_parent_asset_partitions + ) + + def evaluate_for_asset( + self, context: AssetConditionEvaluationContext + ) -> "AssetConditionResult": + from .asset_condition import AssetConditionResult + + passed_time_window = self.passed_time_window(context) + has_new_passed_time_window = passed_time_window.end.timestamp() > ( + context.previous_evaluation_timestamp or 0 + ) + updated_subsets_by_key = self.get_parent_subsets_updated_since_cron_by_key( + context, passed_time_window + ) + + # only need to evaluate net-new candidates and candidates whose parents have updated, unless + # this is the first tick after a new cron schedule tick + subset_to_evaluate = ( + ( + context.candidates_not_evaluated_on_previous_tick_subset + | context.candidate_parent_has_or_will_update_subset + ) + if not has_new_passed_time_window + else context.candidate_subset + ) + + # the set of candidates for whom all parents have been updated since the previous cron tick + all_parents_updated_subset = AssetSubset.from_asset_partitions_set( + context.asset_key, + context.partitions_def, + { + candidate + for candidate in subset_to_evaluate.asset_partitions + if all( + self.parent_updated_since_cron( + context, + passed_time_window, + parent_asset_key, + candidate, + updated_subsets_by_key.get(parent_asset_key, context.empty_subset()), + ) + for parent_asset_key in context.asset_graph.get_parents(candidate.asset_key) + ) + }, + ) + # if your parents were all updated since the previous cron tick on the previous evaluation, + # that will still be true unless a new cron tick has happened since the previous evaluation + if not has_new_passed_time_window: + all_parents_updated_subset = ( + context.previous_candidate_subset.as_valid(context.partitions_def) + - context.previous_true_subset + ) | all_parents_updated_subset + + return AssetConditionResult.create( + context, + true_subset=context.candidate_subset - all_parents_updated_subset, + extra_state=list(updated_subsets_by_key.values()), + ) + + @whitelist_for_serdes class SkipOnRequiredButNonexistentParentsRule( AutoMaterializeRule, NamedTuple("_SkipOnRequiredButNonexistentParentsRule", []) diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index 5cf3327a3c750..728160d379396 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -20,7 +20,7 @@ import dagster._check as check from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.asset_graph_subset import AssetGraphSubset -from dagster._core.definitions.asset_subset import AssetSubset +from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset from dagster._core.definitions.data_version import ( DATA_VERSION_TAG, DataVersion, @@ -39,6 +39,7 @@ DagsterDefinitionChangedDeserializationError, DagsterInvalidDefinitionError, ) +from dagster._core.event_api import AssetRecordsFilter, EventRecordsFilter from dagster._core.events import DagsterEventType from dagster._core.instance import DagsterInstance, DynamicPartitionsStore from dagster._core.storage.dagster_run import ( @@ -197,8 +198,6 @@ def _get_latest_materialization_or_observation_record( observable source assets, this will be an AssetObservation, otherwise it will be an AssetMaterialization. """ - from dagster._core.event_api import EventRecordsFilter - # in the simple case, just use the asset record if ( before_cursor is None @@ -350,8 +349,6 @@ def next_version_record( after_cursor: Optional[int], data_version: Optional[DataVersion], ) -> Optional["EventLogRecord"]: - from dagster._core.event_api import EventRecordsFilter - for record in self.instance.get_event_records( EventRecordsFilter( event_type=DagsterEventType.ASSET_OBSERVATION, @@ -612,9 +609,9 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( } break # the set of asset partitions which have been updated since the latest storage id - parent_partitions_subset = self.get_partitions_subset_updated_after_cursor( + parent_partitions_subset = self.get_asset_subset_updated_after_cursor( asset_key=parent_asset_key, after_cursor=latest_storage_id - ) + ).subset_value # we are mapping from the partitions of the parent asset to the partitions of # the child asset partition_mapping = self.asset_graph.get_partition_mapping( @@ -813,32 +810,66 @@ def get_asset_partitions_updated_after_cursor( ) @cached_method - def get_partitions_subset_updated_after_cursor( + def get_asset_subset_updated_after_cursor( self, *, asset_key: AssetKey, after_cursor: Optional[int] - ) -> PartitionsSubset: - """Returns a PartitionsSubset representing the set of partitions that have been updated - after the given cursor. This subset will contain only valid partition keys for the given - asset. - """ - new_asset_partitions = self.get_asset_partitions_updated_after_cursor( - asset_key, - asset_partitions=None, - after_cursor=after_cursor, - respect_materialization_data_versions=False, - ) - partitions_def = check.not_none(self.asset_graph.get_partitions_def(asset_key)) - return partitions_def.subset_with_partition_keys( - [ - asset_partition.partition_key - for asset_partition in new_asset_partitions - if asset_partition.partition_key is not None + ) -> ValidAssetSubset: + """Returns the AssetSubset of the given asset that has been updated after the given cursor.""" + partitions_def = self.asset_graph.get_partitions_def(asset_key) + if partitions_def is None: + return ValidAssetSubset( + asset_key, + value=self.asset_partition_has_materialization_or_observation( + AssetKeyPartitionKey(asset_key), after_cursor=after_cursor + ), + ) + else: + new_asset_partitions = { + ap + for ap in self.get_asset_partitions_updated_after_cursor( + asset_key, + asset_partitions=None, + after_cursor=after_cursor, + respect_materialization_data_versions=False, + ) + if ap.partition_key is not None and partitions_def.has_partition_key( - partition_key=asset_partition.partition_key, + partition_key=ap.partition_key, dynamic_partitions_store=self, current_time=self.evaluation_time, ) - ] + } + return AssetSubset.from_asset_partitions_set( + asset_key, partitions_def, new_asset_partitions + ) + + @cached_method + def get_asset_subset_updated_after_time( + self, *, asset_key: AssetKey, after_time: datetime + ) -> ValidAssetSubset: + """Returns the AssetSubset of the given asset that has been updated after the given time.""" + partitions_def = self.asset_graph.get_partitions_def(asset_key) + + method = ( + self.instance.fetch_materializations + if self._event_type_for_key(asset_key) == DagsterEventType.ASSET_MATERIALIZATION + else self.instance.fetch_observations + ) + first_event_after_time = next( + iter( + method( + AssetRecordsFilter(asset_key=asset_key, after_timestamp=after_time.timestamp()), + limit=1, + ascending=True, + ).records + ), + None, ) + if not first_event_after_time: + return AssetSubset.empty(asset_key, partitions_def=partitions_def) + else: + return self.get_asset_subset_updated_after_cursor( + asset_key=asset_key, after_cursor=first_event_after_time.storage_id - 1 + ) def get_parent_asset_partitions_updated_after_child( self, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon.py index a6746b0297142..d52084893fa9e 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon.py @@ -55,7 +55,11 @@ two_partitions_def, ) from .updated_scenarios.basic_scenarios import basic_scenarios -from .updated_scenarios.cron_scenarios import basic_hourly_cron_rule, get_cron_policy +from .updated_scenarios.cron_scenarios import ( + basic_hourly_cron_rule, + basic_hourly_cron_schedule, + get_cron_policy, +) from .updated_scenarios.partition_scenarios import partition_scenarios @@ -95,7 +99,7 @@ def _get_threadpool_executor(instance: DagsterInstance): AssetDaemonScenario( id="basic_hourly_cron_unpartitioned", initial_state=one_asset.with_asset_properties( - auto_materialize_policy=get_cron_policy(basic_hourly_cron_rule) + auto_materialize_policy=get_cron_policy(basic_hourly_cron_schedule) ).with_current_time("2020-01-01T00:05"), execution_fn=lambda state: state.evaluate_tick() .assert_requested_runs(run_request(["A"])) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/cron_scenarios.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/cron_scenarios.py index 666665e368aa2..98559ef004f80 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/cron_scenarios.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/cron_scenarios.py @@ -1,7 +1,9 @@ import pytest from dagster import AutoMaterializePolicy, AutoMaterializeRule from dagster._check import ParameterCheckError -from dagster._core.definitions.auto_materialize_rule import WaitingOnAssetsRuleEvaluationData +from dagster._core.definitions.auto_materialize_rule import ( + WaitingOnAssetsRuleEvaluationData, +) from ..asset_daemon_scenario import AssetDaemonScenario, AssetRuleEvaluationSpec, hour_partition_key from ..base_scenario import run_request @@ -13,28 +15,40 @@ one_asset_depends_on_two, three_assets_not_subsettable, time_partitions_start_str, + two_partitions_def, ) def get_cron_policy( - cron_rule: AutoMaterializeRule, + cron_schedule: str, + cron_timezone: str = "UTC", + all_partitions: bool = False, max_materializations_per_minute: int = 1, + use_cron_skip_rule: bool = False, ): return AutoMaterializePolicy( - rules={cron_rule, AutoMaterializeRule.skip_on_not_all_parents_updated()}, + rules={ + AutoMaterializeRule.materialize_on_cron(cron_schedule, cron_timezone, all_partitions), + AutoMaterializeRule.skip_on_not_all_parents_updated_since_cron( + cron_schedule, cron_timezone + ) + if use_cron_skip_rule + else AutoMaterializeRule.skip_on_not_all_parents_updated(), + }, max_materializations_per_minute=max_materializations_per_minute, ) +basic_hourly_cron_schedule = "0 * * * *" basic_hourly_cron_rule = AutoMaterializeRule.materialize_on_cron( - cron_schedule="0 * * * *", timezone="UTC" + cron_schedule=basic_hourly_cron_schedule, timezone="UTC" ) cron_scenarios = [ AssetDaemonScenario( id="basic_hourly_cron_unpartitioned", initial_state=one_asset.with_asset_properties( - auto_materialize_policy=get_cron_policy(basic_hourly_cron_rule) + auto_materialize_policy=get_cron_policy(basic_hourly_cron_schedule) ).with_current_time("2020-01-01T00:05"), execution_fn=lambda state: state.evaluate_tick() .assert_requested_runs(run_request(["A"])) @@ -60,7 +74,7 @@ def get_cron_policy( AssetDaemonScenario( id="basic_hourly_cron_unpartitioned_multi_asset", initial_state=three_assets_not_subsettable.with_asset_properties( - auto_materialize_policy=get_cron_policy(basic_hourly_cron_rule) + auto_materialize_policy=get_cron_policy(basic_hourly_cron_schedule) ).with_current_time("2020-01-01T00:05"), execution_fn=lambda state: state.evaluate_tick() .assert_requested_runs(run_request(["A", "B", "C"])) @@ -78,7 +92,7 @@ def get_cron_policy( id="basic_hourly_cron_partitioned", initial_state=one_asset.with_asset_properties( partitions_def=hourly_partitions_def, - auto_materialize_policy=get_cron_policy(basic_hourly_cron_rule), + auto_materialize_policy=get_cron_policy(basic_hourly_cron_schedule), ) .with_current_time(time_partitions_start_str) .with_current_time_advanced(days=1, minutes=5), @@ -108,11 +122,7 @@ def get_cron_policy( AssetDaemonScenario( id="basic_hourly_cron_partitioned_with_timezone", initial_state=one_asset.with_asset_properties( - auto_materialize_policy=get_cron_policy( - AutoMaterializeRule.materialize_on_cron( - cron_schedule="@daily", timezone="America/Los_Angeles" - ) - ), + auto_materialize_policy=get_cron_policy("@daily", cron_timezone="America/Los_Angeles"), partitions_def=daily_partitions_def, ).with_current_time("2020-01-02T12:00"), execution_fn=lambda state: state.evaluate_tick() @@ -133,7 +143,7 @@ def get_cron_policy( AssetDaemonScenario( id="hourly_cron_unpartitioned_wait_for_parents", initial_state=one_asset_depends_on_two.with_asset_properties( - keys="C", auto_materialize_policy=get_cron_policy(basic_hourly_cron_rule) + keys="C", auto_materialize_policy=get_cron_policy(basic_hourly_cron_schedule) ).with_current_time("2020-01-01T00:05"), execution_fn=lambda state: state.evaluate_tick() # don't materialize C because we're waiting for A and B @@ -201,7 +211,7 @@ def get_cron_policy( .with_asset_properties( keys="C", auto_materialize_policy=get_cron_policy( - basic_hourly_cron_rule, max_materializations_per_minute=100 + basic_hourly_cron_schedule, max_materializations_per_minute=100 ), ) .with_current_time(time_partitions_start_str), @@ -356,7 +366,8 @@ def get_cron_policy( id="hourly_cron_all_partitions", initial_state=one_asset.with_asset_properties( auto_materialize_policy=get_cron_policy( - basic_hourly_cron_rule._replace(all_partitions=True), + basic_hourly_cron_schedule, + all_partitions=True, max_materializations_per_minute=100, ), partitions_def=hourly_partitions_def, @@ -383,7 +394,8 @@ def get_cron_policy( initial_state=one_asset.with_asset_properties( partitions_def=dynamic_partitions_def, auto_materialize_policy=get_cron_policy( - basic_hourly_cron_rule._replace(all_partitions=True), + basic_hourly_cron_schedule, + all_partitions=True, max_materializations_per_minute=100, ), ), @@ -414,7 +426,7 @@ def get_cron_policy( initial_state=one_asset.with_asset_properties( partitions_def=dynamic_partitions_def, auto_materialize_policy=get_cron_policy( - basic_hourly_cron_rule._replace(all_partitions=False), + basic_hourly_cron_schedule, max_materializations_per_minute=100, ), ), @@ -435,6 +447,217 @@ def get_cron_policy( run_request("A", partition_key="5"), ), ), + AssetDaemonScenario( + id="hourly_cron_unpartitioned_wait_for_parents_with_cron_skip", + initial_state=one_asset_depends_on_two.with_asset_properties( + keys="C", + auto_materialize_policy=get_cron_policy( + basic_hourly_cron_schedule, use_cron_skip_rule=True + ), + ).with_current_time("2020-01-01T00:05"), + execution_fn=lambda state: state.evaluate_tick() + # don't materialize C because we're waiting for A and B + .assert_requested_runs() + .with_runs(run_request("A")) + .with_current_time_advanced(seconds=30) + .evaluate_tick() + # now just waiting on B + .assert_requested_runs() + .with_runs(run_request("B")) + .with_current_time_advanced(seconds=30) + .evaluate_tick() + .assert_requested_runs(run_request(["C"])) + # next tick should not request any more runs + .with_current_time_advanced(seconds=30) + .evaluate_tick() + .assert_requested_runs() + .assert_evaluation("C", []) + # even if both parents update, still on the same cron schedule tick + .with_runs(run_request(["A", "B"])) + .with_current_time_advanced(seconds=30) + .evaluate_tick() + .assert_requested_runs() + # moved to a new cron schedule tick, still do not request run, because parents have not + # been updated since cron schedule tick + .with_current_time_advanced(minutes=60) + .evaluate_tick("FOO") + .assert_requested_runs() + .with_runs(run_request("B")) + .with_current_time_advanced(seconds=30) + .evaluate_tick() + # still waiting on A + .assert_requested_runs() + .with_runs(run_request("A")) + .with_current_time_advanced(seconds=30) + .evaluate_tick() + # now C can go + .assert_requested_runs(run_request("C")) + .with_current_time_advanced(seconds=30) + .evaluate_tick() + # no more runs + .assert_requested_runs(), + ), + AssetDaemonScenario( + id="hourly_cron_unpartitioned_wait_for_parents_with_cron_skip_single_run", + initial_state=one_asset_depends_on_two.with_asset_properties( + # all assets get this policy + auto_materialize_policy=get_cron_policy( + basic_hourly_cron_schedule, use_cron_skip_rule=True + ), + ).with_current_time("2020-01-01T00:05"), + execution_fn=lambda state: state.evaluate_tick() + .assert_requested_runs(run_request(["A", "B", "C"])) + .with_current_time_advanced(seconds=30) + .evaluate_tick() + .assert_requested_runs() + .with_current_time_advanced(hours=1) + .evaluate_tick() + .assert_requested_runs(run_request(["A", "B", "C"])) + .with_current_time_advanced(seconds=30) + .evaluate_tick() + .assert_requested_runs(), + ), + AssetDaemonScenario( + id="hourly_cron_partitioned_wait_for_parents_with_cron_skip", + initial_state=one_asset_depends_on_two.with_asset_properties( + partitions_def=hourly_partitions_def, + ) + .with_asset_properties( + keys="C", + auto_materialize_policy=get_cron_policy( + basic_hourly_cron_schedule, + max_materializations_per_minute=100, + use_cron_skip_rule=True, + ), + ) + .with_current_time(time_partitions_start_str) + .with_current_time_advanced(hours=10), + execution_fn=lambda state: state.evaluate_tick() + # No runs, because we're waiting for A and B + .assert_requested_runs() + # A is materialized, still waiting for B + .with_current_time_advanced(seconds=30) + .with_runs(run_request("A", hour_partition_key(state.current_time, delta=0))) + .evaluate_tick() + .assert_requested_runs() + # B is materialized, but it's an old partition, so still waiting for the correct data + .with_current_time_advanced(seconds=30) + .with_runs(run_request("B", hour_partition_key(state.current_time, delta=-3))) + .evaluate_tick() + .assert_requested_runs() + # B is materialized with the latest partition + .with_current_time_advanced(seconds=30) + .with_runs(run_request("B", hour_partition_key(state.current_time, delta=0))) + .evaluate_tick() + .assert_requested_runs( + run_request("C", hour_partition_key(state.current_time, delta=0)), + ) + # no new runs the next tick + .with_current_time_advanced(seconds=30) + .evaluate_tick() + .assert_requested_runs() + # a new hour, no new runs until both A and B are materialized + .with_current_time_advanced(hours=1) + .with_runs(run_request("B", hour_partition_key(state.current_time, delta=1))) + .evaluate_tick() + .assert_requested_runs() + # now A is materialized, so C kicks off + .with_current_time_advanced(seconds=30) + .with_runs(run_request("A", hour_partition_key(state.current_time, delta=1))) + .evaluate_tick() + .assert_requested_runs( + run_request("C", hour_partition_key(state.current_time, delta=1)), + ), + ), + AssetDaemonScenario( + id="daily_unpartitioned_downstream_of_hourly_and_static_with_cron_skip", + initial_state=one_asset_depends_on_two.with_asset_properties( + keys="A", partitions_def=two_partitions_def + ) + .with_asset_properties( + keys="B", + partitions_def=hourly_partitions_def, + ) + .with_asset_properties( + keys="C", + auto_materialize_policy=get_cron_policy( + "0 0 * * *", # daily + use_cron_skip_rule=True, + ), + ) + .with_current_time(time_partitions_start_str) + .with_current_time_advanced(days=5, minutes=1), + execution_fn=lambda state: state.evaluate_tick() + # No runs, because we're waiting for A and B + .assert_requested_runs() + # One partition of A is materialized + .with_current_time_advanced(seconds=30) + .with_runs(run_request("A", "1")) + .evaluate_tick() + .assert_requested_runs() + # Most hours of B are materialized + .with_current_time_advanced(seconds=30) + .with_runs( + *[run_request("B", hour_partition_key(state.current_time, delta=-i)) for i in range(20)] + ) + .evaluate_tick() + .assert_requested_runs() + # Many hours later, other partition of B is materialized, still waiting for other + # partitions of A + .with_current_time_advanced(hours=15) + .with_runs(run_request("A", "2")) + .evaluate_tick() + .assert_requested_runs() + # some new partitions of B are materialized, still waiting for older ones within the time + # window + .with_current_time_advanced(seconds=30) + .with_runs( + *[ + run_request("B", hour_partition_key(state.current_time, delta=i + 1)) + for i in range(10) + ] + ) + .evaluate_tick() + .assert_requested_runs() + # remaining partitions of A are materialized, C kicks off + .with_current_time_advanced(seconds=30) + .with_runs( + *[ + run_request("B", hour_partition_key(state.current_time, delta=-i)) + for i in range(20, 24) + ] + ) + .evaluate_tick() + .assert_requested_runs(run_request("C")) + # next tick, no new runs + .with_current_time_advanced(seconds=30) + .evaluate_tick() + .assert_requested_runs() + # now a new day, no new runs until both A and B are materialized + .with_current_time_advanced(hours=10) + .evaluate_tick() + .assert_requested_runs() + # all of yesterday's partitions of B are materialized, but still waiting for A + .with_current_time_advanced(seconds=30) + .with_runs( + *[ + run_request("B", hour_partition_key(state.current_time, delta=i + 1)) + for i in range(10, 24) + ] + ) + .evaluate_tick() + .assert_requested_runs() + # now one partition of A is updated + .with_current_time_advanced(seconds=30) + .with_runs(run_request("A", "2")) + .evaluate_tick() + .assert_requested_runs() + # now other partition of A is updated, C kicks off + .with_current_time_advanced(seconds=30) + .with_runs(run_request("A", "1")) + .evaluate_tick() + .assert_requested_runs(run_request("C")), + ), ] diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/cursor_migration_scenarios.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/cursor_migration_scenarios.py index eb3df4686aac2..c2ee951913f23 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/cursor_migration_scenarios.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/cursor_migration_scenarios.py @@ -3,6 +3,7 @@ from dagster_tests.definitions_tests.auto_materialize_tests.updated_scenarios.cron_scenarios import ( basic_hourly_cron_rule, + basic_hourly_cron_schedule, get_cron_policy, ) @@ -114,7 +115,7 @@ AssetDaemonScenario( id="basic_hourly_cron_unpartitioned_migrate", initial_state=one_asset.with_asset_properties( - auto_materialize_policy=get_cron_policy(basic_hourly_cron_rule) + auto_materialize_policy=get_cron_policy(basic_hourly_cron_schedule) ).with_current_time("2020-01-01T00:05"), execution_fn=lambda state: state.evaluate_tick() .assert_requested_runs(run_request(["A"])) @@ -144,7 +145,7 @@ id="basic_hourly_cron_partitioned_migrate", initial_state=one_asset.with_asset_properties( partitions_def=hourly_partitions_def, - auto_materialize_policy=get_cron_policy(basic_hourly_cron_rule), + auto_materialize_policy=get_cron_policy(basic_hourly_cron_schedule), ) .with_current_time(time_partitions_start_str) .with_current_time_advanced(days=1, minutes=5),