Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[auto-materialize] Frontload parent updated queries #18189

Merged
merged 4 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def __init__(
self._verbose_log_fn = (
self._logger.info if os.getenv("ASSET_DAEMON_VERBOSE_LOGS") else self._logger.debug
)
self.prefetch_updated_parents()

@property
def instance_queryer(self) -> "CachingInstanceQueryer":
Expand Down Expand Up @@ -172,6 +173,36 @@ def target_asset_keys_and_parents(self) -> AbstractSet[AssetKey]:
def respect_materialization_data_versions(self) -> bool:
return self._respect_materialization_data_versions

def prefetch_updated_parents(self) -> None:
"""Pre-populate the cached values here to avoid situations in which the new latest_storage_id
value is calculated a long time after we calculate the set of updated parents for a given
asset, as this can cause us to miss materializations.
"""
self.get_latest_storage_id()
for asset_key in self.target_asset_keys:
self.instance_queryer.asset_partitions_with_newly_updated_parents(
latest_storage_id=self.latest_storage_id, child_asset_key=asset_key
)

@cached_method
def get_latest_storage_id(self) -> Optional[int]:
"""Get the latest storage id across all target assets and parents. Use this method instead
of get_maximum_record_id() as this can generally be calculated from information already
cached in the instance queryer, and so does not require an additional query.
"""
return max(
filter(
None,
(
self.instance_queryer.get_latest_materialization_or_observation_storage_id(
AssetKeyPartitionKey(asset_key=asset_key)
)
for asset_key in self.target_asset_keys_and_parents
),
),
default=None,
)

@cached_method
def _get_never_handled_and_newly_handled_root_asset_partitions(
self,
Expand Down Expand Up @@ -527,7 +558,7 @@ def evaluate(
return (
run_requests,
self.cursor.with_updates(
latest_storage_id=self.instance_queryer.instance.event_log_storage.get_maximum_record_id(),
latest_storage_id=self.get_latest_storage_id(),
to_materialize=to_materialize,
to_discard=to_discard,
asset_graph=self.asset_graph,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,10 @@ def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]:
def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool:
return partition_key in self.get_dynamic_partitions(partitions_def_name)

@cached_method
def asset_partitions_with_newly_updated_parents(
self,
*,
latest_storage_id: Optional[int],
child_asset_key: AssetKey,
map_old_time_partitions: bool = True,
Expand Down