diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index bef0387868853..00f472de5e5f3 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -135,6 +135,7 @@ def __init__( self._verbose_log_fn = ( self._logger.info if os.getenv("ASSET_DAEMON_VERBOSE_LOGS") else self._logger.debug ) + self.prefetch_updated_parents() @property def instance_queryer(self) -> "CachingInstanceQueryer": @@ -172,6 +173,36 @@ def target_asset_keys_and_parents(self) -> AbstractSet[AssetKey]: def respect_materialization_data_versions(self) -> bool: return self._respect_materialization_data_versions + def prefetch_updated_parents(self) -> None: + """Pre-populate the cached values here to avoid situations in which the new latest_storage_id + value is calculated a long time after we calculate the set of updated parents for a given + asset, as this can cause us to miss materializations. + """ + self.get_latest_storage_id() + for asset_key in self.target_asset_keys: + self.instance_queryer.asset_partitions_with_newly_updated_parents( + latest_storage_id=self.latest_storage_id, child_asset_key=asset_key + ) + + @cached_method + def get_latest_storage_id(self) -> Optional[int]: + """Get the latest storage id across all target assets and parents. Use this method instead + of get_maximum_record_id() as this can generally be calculated from information already + cached in the instance queryer, and so does not require an additional query. + """ + return max( + filter( + None, + ( + self.instance_queryer.get_latest_materialization_or_observation_storage_id( + AssetKeyPartitionKey(asset_key=asset_key) + ) + for asset_key in self.target_asset_keys_and_parents + ), + ), + default=None, + ) + @cached_method def _get_never_handled_and_newly_handled_root_asset_partitions( self, @@ -527,7 +558,7 @@ def evaluate( return ( run_requests, self.cursor.with_updates( - latest_storage_id=self.instance_queryer.instance.event_log_storage.get_maximum_record_id(), + latest_storage_id=self.get_latest_storage_id(), to_materialize=to_materialize, to_discard=to_discard, asset_graph=self.asset_graph, diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index 1d96dece12266..b5218cc6f5a84 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -494,8 +494,10 @@ def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]: def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool: return partition_key in self.get_dynamic_partitions(partitions_def_name) + @cached_method def asset_partitions_with_newly_updated_parents( self, + *, latest_storage_id: Optional[int], child_asset_key: AssetKey, map_old_time_partitions: bool = True,