Skip to content

Commit

Permalink
clarity improvements for skip_on_parent_outdated rule (#16914)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Was browsing this code, and made some changes along the way to address
confusions I encountered.

## How I Tested These Changes
  • Loading branch information
sryza authored Sep 29, 2023
1 parent 202cb1d commit fa0e8d3
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationRe
partition_key=asset_partition.partition_key,
).parent_partitions

updated_parent_asset_partitions = context.instance_queryer.get_updated_parent_asset_partitions(
updated_parent_asset_partitions = context.instance_queryer.get_parent_asset_partitions_updated_after_child(
asset_partition,
parent_asset_partitions,
# do a precise check for updated parents, factoring in data versions, as long as
Expand Down Expand Up @@ -446,18 +446,16 @@ def description(self) -> str:
def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults:
asset_partitions_by_waiting_on_asset_keys = defaultdict(set)
for candidate in context.candidates:
unreconciled_ancestors = set()
outdated_ancestors = set()
# find the root cause of why this asset partition's parents are outdated (if any)
for parent in context.get_parents_that_will_not_be_materialized_on_current_tick(
asset_partition=candidate
):
unreconciled_ancestors.update(
context.instance_queryer.get_root_unreconciled_ancestors(
asset_partition=parent,
)
outdated_ancestors.update(
context.instance_queryer.get_outdated_ancestors(asset_partition=parent)
)
if unreconciled_ancestors:
asset_partitions_by_waiting_on_asset_keys[frozenset(unreconciled_ancestors)].add(
if outdated_ancestors:
asset_partitions_by_waiting_on_asset_keys[frozenset(outdated_ancestors)].add(
candidate
)
if asset_partitions_by_waiting_on_asset_keys:
Expand Down Expand Up @@ -553,7 +551,7 @@ def evaluate_for_asset(
).parent_partitions

updated_parent_partitions = (
context.instance_queryer.get_updated_parent_asset_partitions(
context.instance_queryer.get_parent_asset_partitions_updated_after_child(
candidate,
parent_partitions,
context.daemon_context.respect_materialization_data_versions,
Expand Down
45 changes: 22 additions & 23 deletions python_modules/dagster/dagster/_utils/caching_instance_queryer.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def __init__(

self._evaluation_time = evaluation_time if evaluation_time else pendulum.now("UTC")

self._root_unreconciled_ancestors_cache: Dict[AssetKeyPartitionKey, Set[AssetKey]] = {}
self._outdated_ancestors_cache: Dict[AssetKeyPartitionKey, Set[AssetKey]] = {}
self._respect_materialization_data_versions = (
self._instance.auto_materialize_respect_materialization_data_versions
)
Expand Down Expand Up @@ -757,9 +757,9 @@ def get_asset_partitions_updated_after_cursor(
asset_partitions (Optional[Sequence[AssetKeyPartitionKey]]): If supplied, will filter
the set of checked partitions to the given partitions.
after_cursor (Optional[int]): The cursor after which to look for updates.
use_asset_versions (bool): If True, will use data versions to filter out asset
partitions which were materialized, but not have not had their data versions
cahnged since the given cursor.
respect_materialization_data_versions (bool): If True, will use data versions to filter
out asset partitions which were materialized, but not have not had their data
versions changed since the given cursor.
NOTE: This boolean has been temporarily disabled
"""
if not self.asset_partition_has_materialization_or_observation(
Expand Down Expand Up @@ -800,13 +800,16 @@ def get_asset_partitions_updated_after_cursor(
asset_key, updated_after_cursor, after_cursor
)

def get_updated_parent_asset_partitions(
def get_parent_asset_partitions_updated_after_child(
self,
asset_partition: AssetKeyPartitionKey,
parent_asset_partitions: AbstractSet[AssetKeyPartitionKey],
respect_materialization_data_versions: bool,
ignored_parent_keys: AbstractSet[AssetKey],
) -> AbstractSet[AssetKeyPartitionKey]:
"""Returns values inside parent_asset_partitions that correspond to asset partitions that
have been updated since the latest materialization of asset_partition.
"""
parent_asset_partitions_by_key: Dict[AssetKey, Set[AssetKeyPartitionKey]] = defaultdict(set)
for parent in parent_asset_partitions:
parent_asset_partitions_by_key[parent.asset_key].add(parent)
Expand Down Expand Up @@ -851,15 +854,17 @@ def get_updated_parent_asset_partitions(
)
return updated_parents

def get_root_unreconciled_ancestors(
def get_outdated_ancestors(
self, *, asset_partition: AssetKeyPartitionKey
) -> AbstractSet[AssetKey]:
"""Return the set of root unreconciled ancestors of the given asset partition, i.e. the set
of ancestors of this asset partition whose parents have been updated more recently than
they have.
"""Return the set of assets that are ancestors of the given asset partition and have parents
that have been updated more recently than they have.
If two ancestors would be returned, but one of them is an ancestor of the other one, then
only the most upstream ancestor is included.
"""
if asset_partition in self._root_unreconciled_ancestors_cache:
return self._root_unreconciled_ancestors_cache[asset_partition]
if asset_partition in self._outdated_ancestors_cache:
return self._outdated_ancestors_cache[asset_partition]

if self.asset_graph.is_source(asset_partition.asset_key):
return set()
Expand Down Expand Up @@ -888,7 +893,7 @@ def get_root_unreconciled_ancestors(
for parent in parent_asset_partitions:
if (
parent not in visited
and parent not in self._root_unreconciled_ancestors_cache
and parent not in self._outdated_ancestors_cache
# do not evaluate self-dependency asset partitions
and parent.asset_key != current_partition.asset_key
):
Expand All @@ -911,7 +916,7 @@ def get_root_unreconciled_ancestors(
).parent_partitions

updated_parents: AbstractSet[AssetKeyPartitionKey] = (
self.get_updated_parent_asset_partitions(
self.get_parent_asset_partitions_updated_after_child(
asset_partition=current_partition,
parent_asset_partitions=parent_asset_partitions,
respect_materialization_data_versions=self._respect_materialization_data_versions,
Expand All @@ -920,17 +925,11 @@ def get_root_unreconciled_ancestors(
)
)

root_unreconciled_ancestors = (
{current_partition.asset_key} if updated_parents else set()
)
outdated_ancestors = {current_partition.asset_key} if updated_parents else set()

for parent in set(parent_asset_partitions) - updated_parents:
root_unreconciled_ancestors.update(
self._root_unreconciled_ancestors_cache.get(parent, set())
)
outdated_ancestors.update(self._outdated_ancestors_cache.get(parent, set()))

self._root_unreconciled_ancestors_cache[current_partition] = (
root_unreconciled_ancestors
)
self._outdated_ancestors_cache[current_partition] = outdated_ancestors

return self._root_unreconciled_ancestors_cache[asset_partition]
return self._outdated_ancestors_cache[asset_partition]

0 comments on commit fa0e8d3

Please sign in to comment.