Skip to content

Commit

Permalink
Better cursor handling
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jan 19, 2024
1 parent b9f618a commit e0f49ab
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import datetime
import functools
from dataclasses import dataclass
from typing import TYPE_CHECKING, AbstractSet, Any, Callable, Mapping, Optional, Sequence
from typing import TYPE_CHECKING, AbstractSet, Any, Callable, Mapping, Optional, Sequence, Tuple

from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
Expand Down Expand Up @@ -151,16 +151,6 @@ def parent_will_update_subset(self) -> AssetSubset:
subset |= parent_subset._replace(asset_key=self.asset_key)
return subset

@functools.cached_property
@root_property
def new_max_storage_id(self) -> Optional[int]:
"""Returns the new max storage ID for this asset, if any."""
# TODO: This is not a good way of doing this, as it opens us up to potential race conditions,
# but in the interest of keeping this PR simple, I'm leaving this logic as is. In the next
# PR, I'll update the code to return a "maximum observed record id" from inside the
# `get_asset_partitions_updated_after_cursor` call.
return self.instance_queryer.instance.event_log_storage.get_maximum_record_id()

@functools.cached_property
@root_property
def materialized_since_previous_tick_subset(self) -> AssetSubset:
Expand Down Expand Up @@ -193,19 +183,35 @@ def materialized_requested_or_discarded_since_previous_tick_subset(self) -> Asse

@functools.cached_property
@root_property
def parent_has_updated_subset(self) -> AssetSubset:
def _parent_has_updated_subset_and_new_latest_storage_id(
self,
) -> Tuple[AssetSubset, Optional[int]]:
"""Returns the set of asset partitions whose parents have updated since the last time this
condition was evaluated.
"""
return AssetSubset.from_asset_partitions_set(
self.asset_key,
self.partitions_def,
self.root_context.instance_queryer.asset_partitions_with_newly_updated_parents(
latest_storage_id=self.cursor.previous_max_storage_id,
child_asset_key=self.root_context.asset_key,
map_old_time_partitions=False,
),
(
asset_partitions,
cursor,
) = self.root_context.instance_queryer.asset_partitions_with_newly_updated_parents_and_new_cursor(
latest_storage_id=self.cursor.previous_max_storage_id,
child_asset_key=self.root_context.asset_key,
map_old_time_partitions=False,
)
return AssetSubset.from_asset_partitions_set(
self.asset_key, self.partitions_def, asset_partitions
), cursor

@property
@root_property
def parent_has_updated_subset(self) -> AssetSubset:
subset, _ = self._parent_has_updated_subset_and_new_latest_storage_id
return subset

@property
@root_property
def new_max_storage_id(self) -> AssetSubset:
_, storage_id = self._parent_has_updated_subset_and_new_latest_storage_id
return storage_id

@property
def candidate_parent_has_or_will_update_subset(self) -> AssetSubset:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1378,10 +1378,10 @@ def execute_asset_backfill_iteration_inner(

parent_materialized_asset_partitions = set().union(
*(
instance_queryer.asset_partitions_with_newly_updated_parents(
instance_queryer.asset_partitions_with_newly_updated_parents_and_new_cursor(
latest_storage_id=asset_backfill_data.latest_storage_id,
child_asset_key=asset_key,
)
)[0]
for asset_key in asset_backfill_data.target_subset.asset_keys
)
)
Expand Down
38 changes: 33 additions & 5 deletions python_modules/dagster/dagster/_utils/caching_instance_queryer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)
Expand Down Expand Up @@ -237,6 +238,11 @@ def get_latest_materialization_or_observation_storage_id(
Args:
asset_partition (AssetKeyPartitionKey): The asset partition to query.
"""
if asset_partition.partition_key is None:
record = self._get_latest_materialization_or_observation_record(
asset_partition=asset_partition
)
return record.storage_id if record else None
return self._get_latest_materialization_or_observation_storage_ids_by_asset_partition(
asset_key=asset_partition.asset_key
).get(asset_partition)
Expand Down Expand Up @@ -495,29 +501,36 @@ def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) ->
return partition_key in self.get_dynamic_partitions(partitions_def_name)

@cached_method
def asset_partitions_with_newly_updated_parents(
def asset_partitions_with_newly_updated_parents_and_new_cursor(
self,
*,
latest_storage_id: Optional[int],
child_asset_key: AssetKey,
map_old_time_partitions: bool = True,
) -> AbstractSet[AssetKeyPartitionKey]:
) -> Tuple[AbstractSet[AssetKeyPartitionKey], Optional[int]]:
"""Finds asset partitions of the given child whose parents have been materialized since
latest_storage_id.
"""
if self.asset_graph.is_source(child_asset_key):
return set()
return set(), latest_storage_id

child_partitions_def = self.asset_graph.get_partitions_def(child_asset_key)
child_time_partitions_def = get_time_partitions_def(child_partitions_def)

child_asset_partitions_with_updated_parents = set()

max_storage_ids = [
self.get_latest_materialization_or_observation_storage_id(
AssetKeyPartitionKey(child_asset_key)
)
]
for parent_asset_key in self.asset_graph.get_parents(child_asset_key):
# ignore non-observable sources
if self.asset_graph.is_source(parent_asset_key) and not self.asset_graph.is_observable(
parent_asset_key
):
continue

# if the parent has not been updated at all since the latest_storage_id, then skip
if not self.get_asset_partitions_updated_after_cursor(
asset_key=parent_asset_key,
Expand All @@ -527,6 +540,13 @@ def asset_partitions_with_newly_updated_parents(
):
continue

# keep track of the maximum storage id that we've seen for a given parent
max_storage_ids.append(
self.get_latest_materialization_or_observation_storage_id(
AssetKeyPartitionKey(parent_asset_key)
)
)

parent_partitions_def = self.asset_graph.get_partitions_def(parent_asset_key)
if parent_partitions_def is None:
latest_parent_record = check.not_none(
Expand Down Expand Up @@ -564,7 +584,10 @@ def asset_partitions_with_newly_updated_parents(
# we know a parent updated, and because the parent has a partitions def and the
# child does not, the child could not have been materialized in the same run
if child_partitions_def is None:
return {AssetKeyPartitionKey(child_asset_key)}
child_asset_partitions_with_updated_parents = {
AssetKeyPartitionKey(child_asset_key)
}
break
# the set of asset partitions which have been updated since the latest storage id
parent_partitions_subset = self.get_partitions_subset_updated_after_cursor(
asset_key=parent_asset_key, after_cursor=latest_storage_id
Expand Down Expand Up @@ -623,7 +646,12 @@ def asset_partitions_with_newly_updated_parents(
):
child_asset_partitions_with_updated_parents.add(child_asset_partition)

return child_asset_partitions_with_updated_parents
# the new latest storage id will be the greatest observed storage id among this asset and
# its parents
new_latest_storage_id = max(
filter(None, [latest_storage_id, *max_storage_ids]), default=None
)
return (child_asset_partitions_with_updated_parents, new_latest_storage_id)

####################
# RECONCILIATION
Expand Down

0 comments on commit e0f49ab

Please sign in to comment.