From 53d599ca83104e3330168a3582421a9e03b8b058 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 7 Nov 2023 16:13:45 +1300 Subject: [PATCH] refactor: Encapsulate `Streamer` start within `impl` --- .../src/historical_block_processing.rs | 109 ++++++++++-------- .../src/indexer_registry.rs | 19 +-- 2 files changed, 70 insertions(+), 58 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 037490e99..9354e1400 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -17,67 +17,78 @@ pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_a pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes. pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20; +pub struct Task { + handle: JoinHandle>, + cancellation_token: tokio_util::sync::CancellationToken, +} + /// Represents the async task used to process and push historical messages pub struct Streamer { - handle: Option>, - cancellation_token: tokio_util::sync::CancellationToken, + task: Option, } impl Streamer { - pub async fn cancel(&mut self) -> anyhow::Result<()> { - if let Some(handle) = self.handle.take() { - tracing::info!(target: crate::INDEXER, "Cancelling historical process"); - - self.cancellation_token.cancel(); - handle.await.map_err(anyhow::Error::from)?; + pub fn new() -> Self { + Streamer { task: None } + } - return Ok(()); + pub fn start( + &mut self, + current_block_height: BlockHeight, + indexer: IndexerFunction, + redis_connection_manager: storage::ConnectionManager, + s3_client: S3Client, + chain_id: ChainId, + json_rpc_client: JsonRpcClient, + ) -> anyhow::Result<()> { + if self.task.is_some() { + return Err(anyhow::anyhow!("Streamer has already been started",)); } - Err(anyhow::anyhow!( - "Attempted to cancel already cancelled Streamer" - )) + let cancellation_token = tokio_util::sync::CancellationToken::new(); + let cancellation_token_clone = cancellation_token.clone(); + + let handle = tokio::spawn(async move { + tokio::select! { + _ = cancellation_token_clone.cancelled() => { + storage::del( + &redis_connection_manager, + storage::generate_historical_stream_key(&indexer.get_full_name()), + ) + .await + }, + _ = process_historical_messages_or_handle_error( + current_block_height, + indexer.clone(), + &redis_connection_manager, + &s3_client, + &chain_id, + &json_rpc_client, + ) => { + Ok(()) + } + } + }); + + self.task = Some(Task { + handle, + cancellation_token, + }); + + Ok(()) } -} -pub fn spawn_historical_message_thread( - current_block_height: BlockHeight, - indexer_function: &IndexerFunction, - redis_connection_manager: &storage::ConnectionManager, - s3_client: &S3Client, - chain_id: &ChainId, - json_rpc_client: &JsonRpcClient, -) -> Streamer { - let indexer = indexer_function.clone(); - let redis_connection_manager = redis_connection_manager.clone(); - let s3_client = s3_client.clone(); - let chain_id = chain_id.clone(); - let json_rpc_client = json_rpc_client.clone(); - - let cancellation_token = tokio_util::sync::CancellationToken::new(); - let cancellation_token_clone = cancellation_token.clone(); - - let handle = tokio::spawn(async move { - tokio::select! { - _ = cancellation_token_clone.cancelled() => { - 0 - }, - processed_blocks_count = process_historical_messages_or_handle_error( - current_block_height, - indexer, - &redis_connection_manager, - &s3_client, - &chain_id, - &json_rpc_client, - ) => { - processed_blocks_count - } + pub async fn cancel(&mut self) -> anyhow::Result<()> { + if let Some(task) = self.task.take() { + task.cancellation_token.cancel(); + task.handle.await??; + + return Ok(()); } - }); - Streamer { - handle: Some(handle), - cancellation_token, + Err(anyhow::anyhow!( + "Attempted to cancel already cancelled, or not started, Streamer" + )) } } diff --git a/indexer/queryapi_coordinator/src/indexer_registry.rs b/indexer/queryapi_coordinator/src/indexer_registry.rs index 7bfd8d1e2..ddf8d0593 100644 --- a/indexer/queryapi_coordinator/src/indexer_registry.rs +++ b/indexer/queryapi_coordinator/src/indexer_registry.rs @@ -162,15 +162,16 @@ async fn index_and_process_register_calls( existing_streamer.cancel().await?; } - let streamer = - crate::historical_block_processing::spawn_historical_message_thread( - current_block_height, - &new_indexer_function, - context.redis_connection_manager, - context.s3_client, - context.chain_id, - context.json_rpc_client, - ); + let mut streamer = crate::historical_block_processing::Streamer::new(); + + streamer.start( + current_block_height, + new_indexer_function.clone(), + context.redis_connection_manager.clone(), + context.s3_client.clone(), + context.chain_id.clone(), + context.json_rpc_client.clone(), + )?; streamers_lock.insert(new_indexer_function.get_full_name(), streamer); }