Skip to content

Commit

Permalink
feat: Cancel existing historical process before starting another
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Nov 7, 2023
1 parent de1e10b commit 60eb8a1
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 9 deletions.
24 changes: 20 additions & 4 deletions indexer/queryapi_coordinator/src/historical_block_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,27 @@ pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_a
pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes.
pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20;


/// Represents the async task used to process and push historical messages
pub struct Streamer {
handle: JoinHandle<i64>,
pub cancellation_token: tokio_util::sync::CancellationToken,
handle: Option<JoinHandle<i64>>,
cancellation_token: tokio_util::sync::CancellationToken,
}

impl Streamer {
pub async fn cancel(&mut self) -> anyhow::Result<()> {
if let Some(handle) = self.handle.take() {
tracing::info!(target: crate::INDEXER, "Cancelling historical process");

self.cancellation_token.cancel();
handle.await.map_err(anyhow::Error::from)?;

return Ok(());
}

Err(anyhow::anyhow!(
"Attempted to cancel already cancelled Streamer"
))
}
}

pub fn spawn_historical_message_thread(
Expand Down Expand Up @@ -60,7 +76,7 @@ pub fn spawn_historical_message_thread(
});

Streamer {
handle,
handle: Some(handle),
cancellation_token,
}
}
Expand Down
17 changes: 13 additions & 4 deletions indexer/queryapi_coordinator/src/indexer_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,16 @@ pub(crate) fn build_registry_from_json(raw_registry: Value) -> IndexerRegistry {
pub(crate) async fn index_registry_changes(
current_block_height: BlockHeight,
context: &QueryApiContext<'_>,
) {
) -> anyhow::Result<()> {
index_and_process_remove_calls(context).await;

index_and_process_register_calls(current_block_height, context).await;
index_and_process_register_calls(current_block_height, context).await
}

async fn index_and_process_register_calls(
current_block_height: BlockHeight,
context: &QueryApiContext<'_>,
) {
) -> anyhow::Result<()> {
let registry_method_name = "register_indexer_function";
let registry_calls_rule =
build_registry_indexer_rule(registry_method_name, context.registry_contract_id);
Expand Down Expand Up @@ -154,6 +154,14 @@ async fn index_and_process_register_calls(
}

if new_indexer_function.start_block_height.is_some() {
let mut streamers_lock = context.streamers.lock().await;

if let Some(mut existing_streamer) =
streamers_lock.remove(&new_indexer_function.get_full_name())
{
existing_streamer.cancel().await?;
}

let streamer =
crate::historical_block_processing::spawn_historical_message_thread(
current_block_height,
Expand All @@ -164,7 +172,6 @@ async fn index_and_process_register_calls(
context.json_rpc_client,
);

let mut streamers_lock = context.streamers.lock().await;
streamers_lock.insert(new_indexer_function.get_full_name(), streamer);
}

Expand All @@ -173,6 +180,8 @@ async fn index_and_process_register_calls(
};
}
}

Ok(())
}

async fn index_and_process_remove_calls(context: &QueryApiContext<'_>) {
Expand Down
2 changes: 1 addition & 1 deletion indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async fn handle_streamer_message(context: QueryApiContext<'_>) -> anyhow::Result
)
.await?;

indexer_registry::index_registry_changes(block_height, &context).await;
indexer_registry::index_registry_changes(block_height, &context).await?;

while let Some(indexer_function_with_matches) =
indexer_function_filter_matches_futures.next().await
Expand Down

0 comments on commit 60eb8a1

Please sign in to comment.