Skip to content

Commit

Permalink
[bug] Fix issue with partition mapping handling in the EntityMatches …
Browse files Browse the repository at this point in the history
…condition (#25278)

## Summary & Motivation

As title.

For self-dependent assets, the inference logic that determines the
direction we should map partitions in does not work for self-dependent
assets, as these are always upstream of themselves, meaning from_key
will always be a parent of to_key, and we will always take the first
branch.

This solves the issue by adding an optional direction parameter to the
EntityMatches condition, and threads that through to the underlying
mapping function, allowing us to determine what direction we should map
our partitions in.

## How I Tested These Changes

## Changelog

Fixed an issue which could cause incorrect evaluation results when using
self-dependent partition mappings with `AutomationConditions` that
operate over dependencies.
  • Loading branch information
OwenKephart authored Oct 15, 2024
1 parent dfb8e60 commit abf0d54
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, AbstractSet, Dict, NamedTuple, Optional, Type, TypeVar
from typing import TYPE_CHECKING, AbstractSet, Dict, Literal, NamedTuple, Optional, Type, TypeVar

from dagster import _check as check
from dagster._core.asset_graph_view.entity_subset import EntitySubset, _ValidatedEntitySubsetValue
Expand Down Expand Up @@ -219,26 +219,26 @@ def compute_parent_subset(
check.invariant(
parent_key in self.asset_graph.get(subset.key).parent_entity_keys,
)
return self.compute_mapped_subset(parent_key, subset)
return self.compute_mapped_subset(parent_key, subset, direction="up")

def compute_child_subset(
self, child_key: T_EntityKey, subset: EntitySubset[U_EntityKey]
) -> EntitySubset[T_EntityKey]:
check.invariant(
child_key in self.asset_graph.get(subset.key).child_entity_keys,
)
return self.compute_mapped_subset(child_key, subset)
return self.compute_mapped_subset(child_key, subset, direction="down")

def compute_mapped_subset(
self, to_key: T_EntityKey, from_subset: EntitySubset
self, to_key: T_EntityKey, from_subset: EntitySubset, direction: Literal["up", "down"]
) -> EntitySubset[T_EntityKey]:
from_key = from_subset.key
from_partitions_def = self.asset_graph.get(from_key).partitions_def
to_partitions_def = self.asset_graph.get(to_key).partitions_def

partition_mapping = self.asset_graph.get_partition_mapping(from_key, to_key)

if from_key in self.asset_graph.get(to_key).parent_entity_keys:
if direction == "down":
if from_partitions_def is None or to_partitions_def is None:
return (
self.get_empty_subset(key=to_key)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
import operator
from typing import TYPE_CHECKING, AbstractSet, Any, Callable, Generic, NamedTuple, TypeVar, Union
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Generic,
Literal,
NamedTuple,
TypeVar,
Union,
)

from typing_extensions import Self

Expand Down Expand Up @@ -105,8 +115,10 @@ def compute_child_subset(self, child_key: U_EntityKey) -> "EntitySubset[U_Entity
return self._asset_graph_view.compute_child_subset(child_key, self)

@cached_method
def compute_mapped_subset(self, to_key: U_EntityKey) -> "EntitySubset[U_EntityKey]":
return self._asset_graph_view.compute_mapped_subset(to_key, self)
def compute_mapped_subset(
self, to_key: U_EntityKey, direction: Literal["up", "down"]
) -> "EntitySubset[U_EntityKey]":
return self._asset_graph_view.compute_mapped_subset(to_key, self, direction=direction)

@property
def size(self) -> int:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ def _handle_execution_set(self, result: AutomationResult[AssetKey]) -> None:
# evaluated it may have had a different requested subset. however, because
# all these neighbors must be executed as a unit, we need to union together
# the subset of all required neighbors
neighbor_true_subset = result.true_subset.compute_mapped_subset(neighbor_key)
neighbor_true_subset = result.true_subset.compute_mapped_subset(
neighbor_key, direction="up"
)
if neighbor_key in self.current_results_by_key:
self.current_results_by_key[
neighbor_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,28 @@ def name(self) -> str:
return self.key.to_user_string()

def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]:
to_candidate_subset = context.candidate_subset.compute_mapped_subset(self.key)
# if the key we're mapping to is a child of the key we're mapping from and is not
# self-dependent, use the downstream mapping function, otherwise use upstream
if (
self.key in context.asset_graph.get(context.key).child_entity_keys
and self.key != context.key
):
directions = ("down", "up")
else:
directions = ("up", "down")

to_candidate_subset = context.candidate_subset.compute_mapped_subset(
self.key, direction=directions[0]
)
to_context = context.for_child_condition(
child_condition=self.operand, child_index=0, candidate_subset=to_candidate_subset
)

to_result = self.operand.evaluate(to_context)

true_subset = to_result.true_subset.compute_mapped_subset(context.key)
true_subset = to_result.true_subset.compute_mapped_subset(
context.key, direction=directions[1]
)
return AutomationResult(context=context, true_subset=true_subset, child_results=[to_result])


Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
from dagster import (
AssetDep,
AssetKey,
AssetMaterialization,
AutomationCondition,
DailyPartitionsDefinition,
Definitions,
DimensionPartitionMapping,
MultiPartitionMapping,
MultiPartitionsDefinition,
StaticPartitionsDefinition,
TimeWindowPartitionMapping,
asset,
evaluate_automation_conditions,
)
Expand Down Expand Up @@ -149,3 +156,51 @@ def B() -> None: ...
defs=_get_defs(four_partitions), instance=instance, cursor=result.cursor
)
assert result.total_requested == 0


def test_eager_multi_partitioned_self_dependency() -> None:
pd = MultiPartitionsDefinition(
{
"time": DailyPartitionsDefinition(start_date="2024-08-01"),
"static": StaticPartitionsDefinition(["a", "b", "c"]),
}
)

@asset(partitions_def=pd)
def parent() -> None: ...

@asset(
deps=[
parent,
AssetDep(
"child",
partition_mapping=MultiPartitionMapping(
{
"time": DimensionPartitionMapping(
"time", TimeWindowPartitionMapping(start_offset=-1, end_offset=-1)
),
}
),
),
],
partitions_def=pd,
automation_condition=AutomationCondition.eager().without(
AutomationCondition.in_latest_time_window()
),
)
def child() -> None: ...

defs = Definitions(assets=[parent, child])

with instance_for_test() as instance:
# nothing happening
result = evaluate_automation_conditions(defs=defs, instance=instance)
assert result.total_requested == 0

# materialize upstream
instance.report_runless_asset_event(
AssetMaterialization("parent", partition="a|2024-08-16")
)
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
# can't materialize downstream yet because previous partition of child is still missing
assert result.total_requested == 0

0 comments on commit abf0d54

Please sign in to comment.