From 35f3321e96fbb99b3371cee6525ded718f497131 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Tue, 21 Nov 2023 11:20:34 -0500 Subject: [PATCH 1/4] frontload queries --- .../_core/definitions/asset_daemon_context.py | 16 ++++++++++++++++ .../dagster/_utils/caching_instance_queryer.py | 2 ++ 2 files changed, 18 insertions(+) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index bef0387868853..1805195c3b00d 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -135,6 +135,7 @@ def __init__( self._verbose_log_fn = ( self._logger.info if os.getenv("ASSET_DAEMON_VERBOSE_LOGS") else self._logger.debug ) + self.prefetch_updated_parents() @property def instance_queryer(self) -> "CachingInstanceQueryer": @@ -172,6 +173,21 @@ def target_asset_keys_and_parents(self) -> AbstractSet[AssetKey]: def respect_materialization_data_versions(self) -> bool: return self._respect_materialization_data_versions + def prefetch_updated_parents(self) -> None: + """Pre-populate the cached values here to avoid situations in which the new latest_storage_id + value is calculated a long time after we calculate the set of updated parents for a given + asset, as this can cause us to miss materializations. + """ + for asset_key in self.target_asset_keys: + self.instance_queryer.asset_partitions_with_newly_updated_parents( + latest_storage_id=self.latest_storage_id, child_asset_key=asset_key + ) + self.get_latest_storage_id() + + @cached_method + def get_latest_storage_id(self) -> Optional[int]: + return self.instance_queryer.instance.event_log_storage.get_maximum_record_id() + @cached_method def _get_never_handled_and_newly_handled_root_asset_partitions( self, diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index 1d96dece12266..b5218cc6f5a84 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -494,8 +494,10 @@ def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]: def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool: return partition_key in self.get_dynamic_partitions(partitions_def_name) + @cached_method def asset_partitions_with_newly_updated_parents( self, + *, latest_storage_id: Optional[int], child_asset_key: AssetKey, map_old_time_partitions: bool = True, From fe180afb6be69cc19dc1ec372d25730e8402996e Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Tue, 21 Nov 2023 11:28:38 -0500 Subject: [PATCH 2/4] update --- .../dagster/dagster/_core/definitions/asset_daemon_context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index 1805195c3b00d..a8b6fae80b3f6 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -543,7 +543,7 @@ def evaluate( return ( run_requests, self.cursor.with_updates( - latest_storage_id=self.instance_queryer.instance.event_log_storage.get_maximum_record_id(), + latest_storage_id=self.get_latest_storage_id(), to_materialize=to_materialize, to_discard=to_discard, asset_graph=self.asset_graph, From 1e50c074c1284684f91470879a23dd326e93d01d Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Mon, 27 Nov 2023 09:59:29 -0800 Subject: [PATCH 3/4] update order and method --- .../_core/definitions/asset_daemon_context.py | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index a8b6fae80b3f6..7d43f9f5a58f5 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -178,15 +178,31 @@ def prefetch_updated_parents(self) -> None: value is calculated a long time after we calculate the set of updated parents for a given asset, as this can cause us to miss materializations. """ + self.get_latest_storage_id() for asset_key in self.target_asset_keys: self.instance_queryer.asset_partitions_with_newly_updated_parents( latest_storage_id=self.latest_storage_id, child_asset_key=asset_key ) - self.get_latest_storage_id() @cached_method def get_latest_storage_id(self) -> Optional[int]: - return self.instance_queryer.instance.event_log_storage.get_maximum_record_id() + """Get the latest storage id across all target assets and parents. Use this method instead + of get_maximum_record_id() as this can generally be calculated from information already + cached in the instance queryer, and so does not require an additional query. + """ + return max( + filter( + None, + ( + self.instance_queryer.get_latest_materialization_or_observation_storage_id( + AssetKeyPartitionKey(asset_key=asset_key) + ) + for asset_key in self.target_asset_keys_and_parents + if not self.asset_graph.is_source(asset_key) + ), + ), + default=None, + ) @cached_method def _get_never_handled_and_newly_handled_root_asset_partitions( From 116a0d1dfd789f21af9193c8c0bcff485b981ff6 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Mon, 27 Nov 2023 16:44:49 -0800 Subject: [PATCH 4/4] update --- .../dagster/dagster/_core/definitions/asset_daemon_context.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index 7d43f9f5a58f5..00f472de5e5f3 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -198,7 +198,6 @@ def get_latest_storage_id(self) -> Optional[int]: AssetKeyPartitionKey(asset_key=asset_key) ) for asset_key in self.target_asset_keys_and_parents - if not self.asset_graph.is_source(asset_key) ), ), default=None,