From 911d74e79873be88fb4bcd772de0dbf727dd3ff2 Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Tue, 17 Oct 2023 09:27:14 -0700 Subject: [PATCH] [amp refactor 3/3] Refactor 'discard when backfill in progress' condition into skip rule (#17165) Refactors the 'discard when backfill in progress' into a `AutoMaterializeRule.skip_on_backfill_in_progress` skip rule. - Adds a `skip_all_partitions_of_backfilling_asset` argument (default False) to enable skipping all partitions of the backfilling asset, regardless of whether the partitions are targeted by the backfill. If False, only skips the partitions targeted by the backfill. Also modifies the behavior of `AutoMaterializePolicy.with_rules` to override the existing rule if an instance of it already exists, so users don't have to write the following: ```python AutoMaterializePolicy.eager().without_rules( AutoMaterializeRule.skip_on_backfill_in_progress() ).with_rules(AutoMaterializeRule.skip_on_backfill_in_progress(True)) ``` and instead can just use `with_rules` to overwrite the original `skip_on_backfill_in_progress` rule: ```python AutoMaterializePolicy.eager().with_rules(AutoMaterializeRule.skip_on_backfill_in_progress(True)) ``` --- .../_core/definitions/asset_daemon_context.py | 10 +- .../definitions/auto_materialize_policy.py | 14 +- .../definitions/auto_materialize_rule.py | 54 ++++++++ .../scenarios/partition_scenarios.py | 120 ++++++++++++++++++ .../test_auto_materialize_policy.py | 21 +++ 5 files changed, 208 insertions(+), 11 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index deea05be4cd45..f2136cf196e87 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -72,6 +72,7 @@ def get_implicit_auto_materialize_policy( AutoMaterializeRule.skip_on_parent_outdated(), AutoMaterializeRule.skip_on_parent_missing(), AutoMaterializeRule.skip_on_required_but_nonexistent_parents(), + AutoMaterializeRule.skip_on_backfill_in_progress(), } if not bool(asset_graph.get_downstream_freshness_policies(asset_key=asset_key)): rules.add(AutoMaterializeRule.materialize_on_parent_updated()) @@ -350,15 +351,6 @@ def evaluate_asset( to_materialize.update(asset_partitions) self._logger.debug("Done evaluating materialize rule") - # These should be conditions, but aren't currently, so we just manually strip out things - # from our materialization set - for candidate in list(to_materialize): - if candidate in self.instance_queryer.get_active_backfill_target_asset_graph_subset(): - to_materialize.remove(candidate) - for rule_evaluation_data, asset_partitions in all_results: - all_results.remove((rule_evaluation_data, asset_partitions)) - all_results.append((rule_evaluation_data, asset_partitions - {candidate})) - skip_context = materialize_context._replace(candidates=to_materialize) for skip_rule in auto_materialize_policy.skip_rules: diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py index 2584acf0a46cb..fcd6f60a5d758 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py @@ -36,6 +36,7 @@ def before_unpack( AutoMaterializeRule.skip_on_parent_outdated(), AutoMaterializeRule.skip_on_parent_missing(), AutoMaterializeRule.skip_on_required_but_nonexistent_parents(), + AutoMaterializeRule.skip_on_backfill_in_progress(), } for backcompat_key, rule in backcompat_map.items(): if unpacked_dict.get(backcompat_key): @@ -177,6 +178,7 @@ def eager(max_materializations_per_minute: Optional[int] = 1) -> "AutoMaterializ AutoMaterializeRule.skip_on_parent_outdated(), AutoMaterializeRule.skip_on_parent_missing(), AutoMaterializeRule.skip_on_required_but_nonexistent_parents(), + AutoMaterializeRule.skip_on_backfill_in_progress(), }, max_materializations_per_minute=check.opt_int_param( max_materializations_per_minute, "max_materializations_per_minute" @@ -202,6 +204,7 @@ def lazy(max_materializations_per_minute: Optional[int] = 1) -> "AutoMaterialize AutoMaterializeRule.skip_on_parent_outdated(), AutoMaterializeRule.skip_on_parent_missing(), AutoMaterializeRule.skip_on_required_but_nonexistent_parents(), + AutoMaterializeRule.skip_on_backfill_in_progress(), }, max_materializations_per_minute=check.opt_int_param( max_materializations_per_minute, "max_materializations_per_minute" @@ -226,8 +229,15 @@ def without_rules(self, *rules_to_remove: "AutoMaterializeRule") -> "AutoMateria @public def with_rules(self, *rules_to_add: "AutoMaterializeRule") -> "AutoMaterializePolicy": - """Constructs a copy of this policy with the specified rules added.""" - return self._replace(rules=self.rules.union(set(rules_to_add))) + """Constructs a copy of this policy with the specified rules added. If an instance of a + provided rule with the same type exists on this policy, it will be replaced. + """ + new_rule_types = {type(rule) for rule in rules_to_add} + return self._replace( + rules=set(rules_to_add).union( + {rule for rule in self.rules if type(rule) not in new_rule_types} + ) + ) @property def policy_type(self) -> AutoMaterializePolicyType: diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py index 3ad65e794c248..b7b0815835b7f 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py @@ -297,6 +297,20 @@ def skip_on_required_but_nonexistent_parents() -> "SkipOnRequiredButNonexistentP """ return SkipOnRequiredButNonexistentParentsRule() + @public + @staticmethod + def skip_on_backfill_in_progress( + all_partitions: bool = False, + ) -> "SkipOnBackfillInProgressRule": + """Skip an asset's partitions if targeted by an in-progress backfill. + + Attributes: + all_partitions (bool): If True, skips all partitions of the asset being backfilled, + regardless of whether the specific partition is targeted by a backfill. + If False, skips only partitions targeted by a backfill. Defaults to False. + """ + return SkipOnBackfillInProgressRule(all_partitions) + def to_snapshot(self) -> AutoMaterializeRuleSnapshot: """Returns a serializable snapshot of this rule for historical evaluations.""" return AutoMaterializeRuleSnapshot.from_rule(self) @@ -676,6 +690,46 @@ def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationRe return [] +@whitelist_for_serdes +class SkipOnBackfillInProgressRule( + AutoMaterializeRule, + NamedTuple("_SkipOnBackfillInProgressRule", [("all_partitions", bool)]), +): + @property + def decision_type(self) -> AutoMaterializeDecisionType: + return AutoMaterializeDecisionType.SKIP + + @property + def description(self) -> str: + if self.all_partitions: + return "part of an asset targeted by an in-progress backfill" + else: + return "targeted by an in-progress backfill" + + def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults: + backfill_in_progress_candidates: AbstractSet[AssetKeyPartitionKey] = set() + + backfilling_subset = ( + context.instance_queryer.get_active_backfill_target_asset_graph_subset() + ) + + if self.all_partitions: + backfill_in_progress_candidates = { + candidate + for candidate in context.candidates + if candidate.asset_key in backfilling_subset.asset_keys + } + else: + backfill_in_progress_candidates = { + candidate for candidate in context.candidates if candidate in backfilling_subset + } + + if backfill_in_progress_candidates: + return [(None, backfill_in_progress_candidates)] + + return [] + + @whitelist_for_serdes class AutoMaterializeAssetEvaluation(NamedTuple): """Represents the results of the auto-materialize logic for a single asset. diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py index 8808cd41b6f3a..ac84aa2166f46 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py @@ -27,6 +27,7 @@ run, run_request, single_asset_run, + with_auto_materialize_policy, ) daily_partitions_def = DailyPartitionsDefinition("2013-01-05") @@ -410,4 +411,123 @@ unevaluated_runs=[run(["asset2"])], expected_run_requests=[run_request(["asset3"], partition_key="2020-01-02")], ), + "test_skip_on_backfill_in_progress": AssetReconciliationScenario( + assets=with_auto_materialize_policy( + hourly_to_daily_partitions, + AutoMaterializePolicy.eager(max_materializations_per_minute=None).without_rules( + # Remove some rules to simplify the test + AutoMaterializeRule.skip_on_parent_outdated(), + AutoMaterializeRule.skip_on_required_but_nonexistent_parents(), + ), + ), + active_backfill_targets=[ + { + AssetKey("hourly"): TimeWindowPartitionsSubset( + hourly_partitions_def, + num_partitions=1, + included_partition_keys={"2013-01-05-04:00"}, + ) + } + ], + unevaluated_runs=[], + current_time=create_pendulum_time(year=2013, month=1, day=5, hour=5), + expected_run_requests=[ + run_request(["hourly"], partition_key="2013-01-05-00:00"), + run_request(["hourly"], partition_key="2013-01-05-01:00"), + run_request(["hourly"], partition_key="2013-01-05-02:00"), + run_request(["hourly"], partition_key="2013-01-05-03:00"), + ], + expected_evaluations=[ + AssetEvaluationSpec( + asset_key="hourly", + rule_evaluations=[ + ( + AutoMaterializeRuleEvaluation( + AutoMaterializeRule.materialize_on_missing().to_snapshot(), + evaluation_data=None, + ), + [ + "2013-01-05-00:00", + "2013-01-05-01:00", + "2013-01-05-02:00", + "2013-01-05-03:00", + "2013-01-05-04:00", + ], + ), + ( + AutoMaterializeRuleEvaluation( + AutoMaterializeRule.skip_on_backfill_in_progress().to_snapshot(), + evaluation_data=None, + ), + [ + "2013-01-05-04:00", + ], + ), + ], + num_requested=4, + num_skipped=1, + ), + ], + ), + "test_skip_entire_asset_on_backfill_in_progress": AssetReconciliationScenario( + assets=with_auto_materialize_policy( + hourly_to_daily_partitions, + AutoMaterializePolicy.eager(max_materializations_per_minute=None) + .without_rules( + # Remove some rules to simplify the test + AutoMaterializeRule.skip_on_parent_outdated(), + AutoMaterializeRule.skip_on_required_but_nonexistent_parents(), + ) + .with_rules(AutoMaterializeRule.skip_on_backfill_in_progress(all_partitions=True)), + ), + active_backfill_targets=[ + { + AssetKey("hourly"): TimeWindowPartitionsSubset( + hourly_partitions_def, + num_partitions=1, + included_partition_keys={"2013-01-05-04:00"}, + ) + } + ], + unevaluated_runs=[], + current_time=create_pendulum_time(year=2013, month=1, day=5, hour=5), + expected_run_requests=[], + expected_evaluations=[ + AssetEvaluationSpec( + asset_key="hourly", + rule_evaluations=[ + ( + AutoMaterializeRuleEvaluation( + AutoMaterializeRule.materialize_on_missing().to_snapshot(), + evaluation_data=None, + ), + [ + "2013-01-05-00:00", + "2013-01-05-01:00", + "2013-01-05-02:00", + "2013-01-05-03:00", + "2013-01-05-04:00", + ], + ), + ( + AutoMaterializeRuleEvaluation( + AutoMaterializeRule.skip_on_backfill_in_progress( + all_partitions=True + ).to_snapshot(), + evaluation_data=None, + ), + [ + "2013-01-05-00:00", + "2013-01-05-01:00", + "2013-01-05-02:00", + "2013-01-05-03:00", + "2013-01-05-04:00", + ], + ), + ], + num_requested=0, + num_skipped=5, + ), + ], + ), } diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_auto_materialize_policy.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_auto_materialize_policy.py index 62401df15ff76..50f8515e7d765 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_auto_materialize_policy.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_auto_materialize_policy.py @@ -29,6 +29,7 @@ def test_without_rules(): AutoMaterializeRule.skip_on_parent_outdated(), AutoMaterializeRule.skip_on_parent_missing(), AutoMaterializeRule.skip_on_required_but_nonexistent_parents(), + AutoMaterializeRule.skip_on_backfill_in_progress(), } ) @@ -42,6 +43,7 @@ def test_without_rules(): AutoMaterializeRule.skip_on_parent_outdated(), AutoMaterializeRule.skip_on_parent_missing(), AutoMaterializeRule.skip_on_required_but_nonexistent_parents(), + AutoMaterializeRule.skip_on_backfill_in_progress(), } ) @@ -77,11 +79,30 @@ def test_with_rules(): AutoMaterializeRule.skip_on_parent_missing(), AutoMaterializeRule.materialize_on_required_for_freshness(), AutoMaterializeRule.skip_on_required_but_nonexistent_parents(), + AutoMaterializeRule.skip_on_backfill_in_progress(), ) == AutoMaterializePolicy.eager() ) +def test_with_rules_override_existing_instance(): + simple_policy = AutoMaterializePolicy( + rules={ + AutoMaterializeRule.materialize_on_parent_updated(), + AutoMaterializeRule.skip_on_backfill_in_progress(), + } + ) + + simple_policy_with_override = simple_policy.with_rules( + AutoMaterializeRule.skip_on_backfill_in_progress(all_partitions=True), + ) + + assert simple_policy_with_override.rules == { + AutoMaterializeRule.skip_on_backfill_in_progress(all_partitions=True), + AutoMaterializeRule.materialize_on_parent_updated(), + } + + @pytest.mark.parametrize( "serialized_amp, expected_amp", [