From 60eb8a13810d89951c798ad0ad956f2458f230d1 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 7 Nov 2023 15:36:20 +1300 Subject: [PATCH] feat: Cancel existing historical process before starting another --- .../src/historical_block_processing.rs | 24 +++++++++++++++---- .../src/indexer_registry.rs | 17 +++++++++---- indexer/queryapi_coordinator/src/main.rs | 2 +- 3 files changed, 34 insertions(+), 9 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 038f1d224..037490e99 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -17,11 +17,27 @@ pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_a pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes. pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20; - /// Represents the async task used to process and push historical messages pub struct Streamer { - handle: JoinHandle, - pub cancellation_token: tokio_util::sync::CancellationToken, + handle: Option>, + cancellation_token: tokio_util::sync::CancellationToken, +} + +impl Streamer { + pub async fn cancel(&mut self) -> anyhow::Result<()> { + if let Some(handle) = self.handle.take() { + tracing::info!(target: crate::INDEXER, "Cancelling historical process"); + + self.cancellation_token.cancel(); + handle.await.map_err(anyhow::Error::from)?; + + return Ok(()); + } + + Err(anyhow::anyhow!( + "Attempted to cancel already cancelled Streamer" + )) + } } pub fn spawn_historical_message_thread( @@ -60,7 +76,7 @@ pub fn spawn_historical_message_thread( }); Streamer { - handle, + handle: Some(handle), cancellation_token, } } diff --git a/indexer/queryapi_coordinator/src/indexer_registry.rs b/indexer/queryapi_coordinator/src/indexer_registry.rs index e3487e0db..7bfd8d1e2 100644 --- a/indexer/queryapi_coordinator/src/indexer_registry.rs +++ b/indexer/queryapi_coordinator/src/indexer_registry.rs @@ -89,16 +89,16 @@ pub(crate) fn build_registry_from_json(raw_registry: Value) -> IndexerRegistry { pub(crate) async fn index_registry_changes( current_block_height: BlockHeight, context: &QueryApiContext<'_>, -) { +) -> anyhow::Result<()> { index_and_process_remove_calls(context).await; - index_and_process_register_calls(current_block_height, context).await; + index_and_process_register_calls(current_block_height, context).await } async fn index_and_process_register_calls( current_block_height: BlockHeight, context: &QueryApiContext<'_>, -) { +) -> anyhow::Result<()> { let registry_method_name = "register_indexer_function"; let registry_calls_rule = build_registry_indexer_rule(registry_method_name, context.registry_contract_id); @@ -154,6 +154,14 @@ async fn index_and_process_register_calls( } if new_indexer_function.start_block_height.is_some() { + let mut streamers_lock = context.streamers.lock().await; + + if let Some(mut existing_streamer) = + streamers_lock.remove(&new_indexer_function.get_full_name()) + { + existing_streamer.cancel().await?; + } + let streamer = crate::historical_block_processing::spawn_historical_message_thread( current_block_height, @@ -164,7 +172,6 @@ async fn index_and_process_register_calls( context.json_rpc_client, ); - let mut streamers_lock = context.streamers.lock().await; streamers_lock.insert(new_indexer_function.get_full_name(), streamer); } @@ -173,6 +180,8 @@ async fn index_and_process_register_calls( }; } } + + Ok(()) } async fn index_and_process_remove_calls(context: &QueryApiContext<'_>) { diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index 04c078bf2..fe88af9c7 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -151,7 +151,7 @@ async fn handle_streamer_message(context: QueryApiContext<'_>) -> anyhow::Result ) .await?; - indexer_registry::index_registry_changes(block_height, &context).await; + indexer_registry::index_registry_changes(block_height, &context).await?; while let Some(indexer_function_with_matches) = indexer_function_filter_matches_futures.next().await