Skip to content

Commit

Permalink
[amp refactor 3/3] Refactor 'discard when backfill in progress' condi…
Browse files Browse the repository at this point in the history
…tion into skip rule (#17165)

Refactors the 'discard when backfill in progress' into a
`AutoMaterializeRule.skip_on_backfill_in_progress` skip rule.
- Adds a `skip_all_partitions_of_backfilling_asset` argument (default
False) to enable skipping all partitions of the backfilling asset,
regardless of whether the partitions are targeted by the backfill. If
False, only skips the partitions targeted by the backfill.

Also modifies the behavior of `AutoMaterializePolicy.with_rules` to
override the existing rule if an instance of it already exists, so users
don't have to write the following:
```python
AutoMaterializePolicy.eager().without_rules(
    AutoMaterializeRule.skip_on_backfill_in_progress()
).with_rules(AutoMaterializeRule.skip_on_backfill_in_progress(True))
```
and instead can just use `with_rules` to overwrite the original
`skip_on_backfill_in_progress` rule:
```python
AutoMaterializePolicy.eager().with_rules(AutoMaterializeRule.skip_on_backfill_in_progress(True))
```
  • Loading branch information
clairelin135 authored Oct 17, 2023
1 parent caf571a commit 911d74e
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def get_implicit_auto_materialize_policy(
AutoMaterializeRule.skip_on_parent_outdated(),
AutoMaterializeRule.skip_on_parent_missing(),
AutoMaterializeRule.skip_on_required_but_nonexistent_parents(),
AutoMaterializeRule.skip_on_backfill_in_progress(),
}
if not bool(asset_graph.get_downstream_freshness_policies(asset_key=asset_key)):
rules.add(AutoMaterializeRule.materialize_on_parent_updated())
Expand Down Expand Up @@ -350,15 +351,6 @@ def evaluate_asset(
to_materialize.update(asset_partitions)
self._logger.debug("Done evaluating materialize rule")

# These should be conditions, but aren't currently, so we just manually strip out things
# from our materialization set
for candidate in list(to_materialize):
if candidate in self.instance_queryer.get_active_backfill_target_asset_graph_subset():
to_materialize.remove(candidate)
for rule_evaluation_data, asset_partitions in all_results:
all_results.remove((rule_evaluation_data, asset_partitions))
all_results.append((rule_evaluation_data, asset_partitions - {candidate}))

skip_context = materialize_context._replace(candidates=to_materialize)

for skip_rule in auto_materialize_policy.skip_rules:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def before_unpack(
AutoMaterializeRule.skip_on_parent_outdated(),
AutoMaterializeRule.skip_on_parent_missing(),
AutoMaterializeRule.skip_on_required_but_nonexistent_parents(),
AutoMaterializeRule.skip_on_backfill_in_progress(),
}
for backcompat_key, rule in backcompat_map.items():
if unpacked_dict.get(backcompat_key):
Expand Down Expand Up @@ -177,6 +178,7 @@ def eager(max_materializations_per_minute: Optional[int] = 1) -> "AutoMaterializ
AutoMaterializeRule.skip_on_parent_outdated(),
AutoMaterializeRule.skip_on_parent_missing(),
AutoMaterializeRule.skip_on_required_but_nonexistent_parents(),
AutoMaterializeRule.skip_on_backfill_in_progress(),
},
max_materializations_per_minute=check.opt_int_param(
max_materializations_per_minute, "max_materializations_per_minute"
Expand All @@ -202,6 +204,7 @@ def lazy(max_materializations_per_minute: Optional[int] = 1) -> "AutoMaterialize
AutoMaterializeRule.skip_on_parent_outdated(),
AutoMaterializeRule.skip_on_parent_missing(),
AutoMaterializeRule.skip_on_required_but_nonexistent_parents(),
AutoMaterializeRule.skip_on_backfill_in_progress(),
},
max_materializations_per_minute=check.opt_int_param(
max_materializations_per_minute, "max_materializations_per_minute"
Expand All @@ -226,8 +229,15 @@ def without_rules(self, *rules_to_remove: "AutoMaterializeRule") -> "AutoMateria

@public
def with_rules(self, *rules_to_add: "AutoMaterializeRule") -> "AutoMaterializePolicy":
"""Constructs a copy of this policy with the specified rules added."""
return self._replace(rules=self.rules.union(set(rules_to_add)))
"""Constructs a copy of this policy with the specified rules added. If an instance of a
provided rule with the same type exists on this policy, it will be replaced.
"""
new_rule_types = {type(rule) for rule in rules_to_add}
return self._replace(
rules=set(rules_to_add).union(
{rule for rule in self.rules if type(rule) not in new_rule_types}
)
)

@property
def policy_type(self) -> AutoMaterializePolicyType:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,20 @@ def skip_on_required_but_nonexistent_parents() -> "SkipOnRequiredButNonexistentP
"""
return SkipOnRequiredButNonexistentParentsRule()

@public
@staticmethod
def skip_on_backfill_in_progress(
all_partitions: bool = False,
) -> "SkipOnBackfillInProgressRule":
"""Skip an asset's partitions if targeted by an in-progress backfill.
Attributes:
all_partitions (bool): If True, skips all partitions of the asset being backfilled,
regardless of whether the specific partition is targeted by a backfill.
If False, skips only partitions targeted by a backfill. Defaults to False.
"""
return SkipOnBackfillInProgressRule(all_partitions)

def to_snapshot(self) -> AutoMaterializeRuleSnapshot:
"""Returns a serializable snapshot of this rule for historical evaluations."""
return AutoMaterializeRuleSnapshot.from_rule(self)
Expand Down Expand Up @@ -676,6 +690,46 @@ def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationRe
return []


@whitelist_for_serdes
class SkipOnBackfillInProgressRule(
AutoMaterializeRule,
NamedTuple("_SkipOnBackfillInProgressRule", [("all_partitions", bool)]),
):
@property
def decision_type(self) -> AutoMaterializeDecisionType:
return AutoMaterializeDecisionType.SKIP

@property
def description(self) -> str:
if self.all_partitions:
return "part of an asset targeted by an in-progress backfill"
else:
return "targeted by an in-progress backfill"

def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults:
backfill_in_progress_candidates: AbstractSet[AssetKeyPartitionKey] = set()

backfilling_subset = (
context.instance_queryer.get_active_backfill_target_asset_graph_subset()
)

if self.all_partitions:
backfill_in_progress_candidates = {
candidate
for candidate in context.candidates
if candidate.asset_key in backfilling_subset.asset_keys
}
else:
backfill_in_progress_candidates = {
candidate for candidate in context.candidates if candidate in backfilling_subset
}

if backfill_in_progress_candidates:
return [(None, backfill_in_progress_candidates)]

return []


@whitelist_for_serdes
class AutoMaterializeAssetEvaluation(NamedTuple):
"""Represents the results of the auto-materialize logic for a single asset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
run,
run_request,
single_asset_run,
with_auto_materialize_policy,
)

daily_partitions_def = DailyPartitionsDefinition("2013-01-05")
Expand Down Expand Up @@ -410,4 +411,123 @@
unevaluated_runs=[run(["asset2"])],
expected_run_requests=[run_request(["asset3"], partition_key="2020-01-02")],
),
"test_skip_on_backfill_in_progress": AssetReconciliationScenario(
assets=with_auto_materialize_policy(
hourly_to_daily_partitions,
AutoMaterializePolicy.eager(max_materializations_per_minute=None).without_rules(
# Remove some rules to simplify the test
AutoMaterializeRule.skip_on_parent_outdated(),
AutoMaterializeRule.skip_on_required_but_nonexistent_parents(),
),
),
active_backfill_targets=[
{
AssetKey("hourly"): TimeWindowPartitionsSubset(
hourly_partitions_def,
num_partitions=1,
included_partition_keys={"2013-01-05-04:00"},
)
}
],
unevaluated_runs=[],
current_time=create_pendulum_time(year=2013, month=1, day=5, hour=5),
expected_run_requests=[
run_request(["hourly"], partition_key="2013-01-05-00:00"),
run_request(["hourly"], partition_key="2013-01-05-01:00"),
run_request(["hourly"], partition_key="2013-01-05-02:00"),
run_request(["hourly"], partition_key="2013-01-05-03:00"),
],
expected_evaluations=[
AssetEvaluationSpec(
asset_key="hourly",
rule_evaluations=[
(
AutoMaterializeRuleEvaluation(
AutoMaterializeRule.materialize_on_missing().to_snapshot(),
evaluation_data=None,
),
[
"2013-01-05-00:00",
"2013-01-05-01:00",
"2013-01-05-02:00",
"2013-01-05-03:00",
"2013-01-05-04:00",
],
),
(
AutoMaterializeRuleEvaluation(
AutoMaterializeRule.skip_on_backfill_in_progress().to_snapshot(),
evaluation_data=None,
),
[
"2013-01-05-04:00",
],
),
],
num_requested=4,
num_skipped=1,
),
],
),
"test_skip_entire_asset_on_backfill_in_progress": AssetReconciliationScenario(
assets=with_auto_materialize_policy(
hourly_to_daily_partitions,
AutoMaterializePolicy.eager(max_materializations_per_minute=None)
.without_rules(
# Remove some rules to simplify the test
AutoMaterializeRule.skip_on_parent_outdated(),
AutoMaterializeRule.skip_on_required_but_nonexistent_parents(),
)
.with_rules(AutoMaterializeRule.skip_on_backfill_in_progress(all_partitions=True)),
),
active_backfill_targets=[
{
AssetKey("hourly"): TimeWindowPartitionsSubset(
hourly_partitions_def,
num_partitions=1,
included_partition_keys={"2013-01-05-04:00"},
)
}
],
unevaluated_runs=[],
current_time=create_pendulum_time(year=2013, month=1, day=5, hour=5),
expected_run_requests=[],
expected_evaluations=[
AssetEvaluationSpec(
asset_key="hourly",
rule_evaluations=[
(
AutoMaterializeRuleEvaluation(
AutoMaterializeRule.materialize_on_missing().to_snapshot(),
evaluation_data=None,
),
[
"2013-01-05-00:00",
"2013-01-05-01:00",
"2013-01-05-02:00",
"2013-01-05-03:00",
"2013-01-05-04:00",
],
),
(
AutoMaterializeRuleEvaluation(
AutoMaterializeRule.skip_on_backfill_in_progress(
all_partitions=True
).to_snapshot(),
evaluation_data=None,
),
[
"2013-01-05-00:00",
"2013-01-05-01:00",
"2013-01-05-02:00",
"2013-01-05-03:00",
"2013-01-05-04:00",
],
),
],
num_requested=0,
num_skipped=5,
),
],
),
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def test_without_rules():
AutoMaterializeRule.skip_on_parent_outdated(),
AutoMaterializeRule.skip_on_parent_missing(),
AutoMaterializeRule.skip_on_required_but_nonexistent_parents(),
AutoMaterializeRule.skip_on_backfill_in_progress(),
}
)

Expand All @@ -42,6 +43,7 @@ def test_without_rules():
AutoMaterializeRule.skip_on_parent_outdated(),
AutoMaterializeRule.skip_on_parent_missing(),
AutoMaterializeRule.skip_on_required_but_nonexistent_parents(),
AutoMaterializeRule.skip_on_backfill_in_progress(),
}
)

Expand Down Expand Up @@ -77,11 +79,30 @@ def test_with_rules():
AutoMaterializeRule.skip_on_parent_missing(),
AutoMaterializeRule.materialize_on_required_for_freshness(),
AutoMaterializeRule.skip_on_required_but_nonexistent_parents(),
AutoMaterializeRule.skip_on_backfill_in_progress(),
)
== AutoMaterializePolicy.eager()
)


def test_with_rules_override_existing_instance():
simple_policy = AutoMaterializePolicy(
rules={
AutoMaterializeRule.materialize_on_parent_updated(),
AutoMaterializeRule.skip_on_backfill_in_progress(),
}
)

simple_policy_with_override = simple_policy.with_rules(
AutoMaterializeRule.skip_on_backfill_in_progress(all_partitions=True),
)

assert simple_policy_with_override.rules == {
AutoMaterializeRule.skip_on_backfill_in_progress(all_partitions=True),
AutoMaterializeRule.materialize_on_parent_updated(),
}


@pytest.mark.parametrize(
"serialized_amp, expected_amp",
[
Expand Down

0 comments on commit 911d74e

Please sign in to comment.