From 45dfb858205c69dd460bcb9b66a04047b608ad5a Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Wed, 8 Nov 2023 17:27:42 +1300 Subject: [PATCH] fix: Ensure Historical Stream is cleared before pushing new messages --- .../src/historical_block_processing.rs | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index f091c3f86..82118aea3 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -18,7 +18,7 @@ pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks ta pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20; pub struct Task { - handle: JoinHandle>, + handle: JoinHandle<()>, cancellation_token: tokio_util::sync::CancellationToken, } @@ -51,11 +51,11 @@ impl Streamer { let handle = tokio::spawn(async move { tokio::select! { _ = cancellation_token_clone.cancelled() => { - storage::del( - &redis_connection_manager, - storage::generate_historical_stream_key(&indexer.get_full_name()), - ) - .await + tracing::info!( + target: crate::INDEXER, + "Cancelling existing historical backfill for indexer: {:?}", + indexer.get_full_name(), + ); }, _ = process_historical_messages_or_handle_error( current_block_height, @@ -64,9 +64,7 @@ impl Streamer { &s3_client, &chain_id, &json_rpc_client, - ) => { - Ok(()) - } + ) => { } } }); @@ -81,7 +79,7 @@ impl Streamer { pub async fn cancel(&mut self) -> anyhow::Result<()> { if let Some(task) = self.task.take() { task.cancellation_token.cancel(); - task.handle.await??; + task.handle.await?; return Ok(()); } @@ -185,6 +183,11 @@ pub(crate) async fn process_historical_messages( blocks_from_index.append(&mut blocks_between_indexed_and_current_block); if !blocks_from_index.is_empty() { + storage::del( + redis_connection_manager, + storage::generate_historical_stream_key(&indexer_function.get_full_name()), + ) + .await?; storage::sadd( redis_connection_manager, storage::STREAMS_SET_KEY,