Skip to content

Commit

Permalink
refactor: Encapsulate Streamer start within impl
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Nov 7, 2023
1 parent 60eb8a1 commit 53d599c
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 58 deletions.
109 changes: 60 additions & 49 deletions indexer/queryapi_coordinator/src/historical_block_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,67 +17,78 @@ pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_a
pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes.
pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20;

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
cancellation_token: tokio_util::sync::CancellationToken,
}

/// Represents the async task used to process and push historical messages
pub struct Streamer {
handle: Option<JoinHandle<i64>>,
cancellation_token: tokio_util::sync::CancellationToken,
task: Option<Task>,
}

impl Streamer {
pub async fn cancel(&mut self) -> anyhow::Result<()> {
if let Some(handle) = self.handle.take() {
tracing::info!(target: crate::INDEXER, "Cancelling historical process");

self.cancellation_token.cancel();
handle.await.map_err(anyhow::Error::from)?;
pub fn new() -> Self {
Streamer { task: None }
}

return Ok(());
pub fn start(
&mut self,
current_block_height: BlockHeight,
indexer: IndexerFunction,
redis_connection_manager: storage::ConnectionManager,
s3_client: S3Client,
chain_id: ChainId,
json_rpc_client: JsonRpcClient,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("Streamer has already been started",));
}

Err(anyhow::anyhow!(
"Attempted to cancel already cancelled Streamer"
))
let cancellation_token = tokio_util::sync::CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();

let handle = tokio::spawn(async move {
tokio::select! {
_ = cancellation_token_clone.cancelled() => {
storage::del(
&redis_connection_manager,
storage::generate_historical_stream_key(&indexer.get_full_name()),
)
.await
},
_ = process_historical_messages_or_handle_error(
current_block_height,
indexer.clone(),
&redis_connection_manager,
&s3_client,
&chain_id,
&json_rpc_client,
) => {
Ok(())
}
}
});

self.task = Some(Task {
handle,
cancellation_token,
});

Ok(())
}
}

pub fn spawn_historical_message_thread(
current_block_height: BlockHeight,
indexer_function: &IndexerFunction,
redis_connection_manager: &storage::ConnectionManager,
s3_client: &S3Client,
chain_id: &ChainId,
json_rpc_client: &JsonRpcClient,
) -> Streamer {
let indexer = indexer_function.clone();
let redis_connection_manager = redis_connection_manager.clone();
let s3_client = s3_client.clone();
let chain_id = chain_id.clone();
let json_rpc_client = json_rpc_client.clone();

let cancellation_token = tokio_util::sync::CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();

let handle = tokio::spawn(async move {
tokio::select! {
_ = cancellation_token_clone.cancelled() => {
0
},
processed_blocks_count = process_historical_messages_or_handle_error(
current_block_height,
indexer,
&redis_connection_manager,
&s3_client,
&chain_id,
&json_rpc_client,
) => {
processed_blocks_count
}
pub async fn cancel(&mut self) -> anyhow::Result<()> {
if let Some(task) = self.task.take() {
task.cancellation_token.cancel();
task.handle.await??;

return Ok(());
}
});

Streamer {
handle: Some(handle),
cancellation_token,
Err(anyhow::anyhow!(
"Attempted to cancel already cancelled, or not started, Streamer"
))
}
}

Expand Down
19 changes: 10 additions & 9 deletions indexer/queryapi_coordinator/src/indexer_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,16 @@ async fn index_and_process_register_calls(
existing_streamer.cancel().await?;
}

let streamer =
crate::historical_block_processing::spawn_historical_message_thread(
current_block_height,
&new_indexer_function,
context.redis_connection_manager,
context.s3_client,
context.chain_id,
context.json_rpc_client,
);
let mut streamer = crate::historical_block_processing::Streamer::new();

streamer.start(
current_block_height,
new_indexer_function.clone(),
context.redis_connection_manager.clone(),
context.s3_client.clone(),
context.chain_id.clone(),
context.json_rpc_client.clone(),
)?;

streamers_lock.insert(new_indexer_function.get_full_name(), streamer);
}
Expand Down

0 comments on commit 53d599c

Please sign in to comment.