Skip to content

Commit

Permalink
feat: Cancel historical backfill process before starting anew (#363)
Browse files Browse the repository at this point in the history
This PR updates Coordinator such that **only one** Historical Backfill
process can exist for a given Indexer Function. Currently, these
processes do not have awareness of each other, meaning a developer can
trigger multiple simultaneous backfills, with them all writing to the
same Redis Stream. The major changes are detailed below.

## Streamer/Historical backfill tasks
In this PR, I've introduced the `historical_block_processing::Streamer`
struct. This is used to encapsulate the async task which is spawned for
handling the Historical Backfill. I went with "Streamer", as I envision
this becoming the single process which handles both historical/real-time
messages for a given Indexer, as described in
#216 (comment).

`Streamer` is responsible for starting the `async` task, and provides a
basic API for interacting with it. Right now, that only includes
cancelling it. Cancelling a `Streamer` will drop the existing future,
preventing any new messages from being added, and then delete the Redis
Stream removing all existing messages.

## Indexer Registry Mutex/Async
`indexer_registry` is the in-memory representation of the given Indexer
Registry. Currently, we pass around the `MutexGuard` to mutate this
data, resulting in the lock being held for much longer than is needed,
blocking all other consumers from taking the lock. There isn't much
contention for this lock, so it isn't a problem right now, but could end
up being one in future.

Adding in the `Streamer` `Mutex` made mitigating this problem trivial,
so I went ahead and made some changes. I've updated the code so that we
pass around the `Mutex` itself, rather than the guard, allowing us to
the hold the lock for the least amount of time possible. This required
updating most methods within `indexer_registry` to `async`, so that we
could take the `async` `lock`.

With the `Mutex` being used rather than the `MutexGuard`, it seemed
sensible to add `indexer_registry` to `QueryAPIContext`, rather than
passing it round individually.
  • Loading branch information
morgsmccauley authored Nov 7, 2023
1 parent 6b53ca2 commit 0476306
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 102 deletions.
1 change: 1 addition & 0 deletions indexer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions indexer/queryapi_coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ prometheus = "0.13.0"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.55"
tokio = { version = "1.1", features = ["sync", "time", "macros", "rt-multi-thread"] }
tokio-util = "0.6.7"
tokio-stream = { version = "0.1" }
tracing = "0.1.34"

Expand Down
112 changes: 79 additions & 33 deletions indexer/queryapi_coordinator/src/historical_block_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,91 @@ pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_a
pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes.
pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20;

pub fn spawn_historical_message_thread(
block_height: BlockHeight,
new_indexer_function: &IndexerFunction,
redis_connection_manager: &storage::ConnectionManager,
s3_client: &S3Client,
chain_id: &ChainId,
json_rpc_client: &JsonRpcClient,
) -> Option<JoinHandle<i64>> {
let redis_connection_manager = redis_connection_manager.clone();
let s3_client = s3_client.clone();
let chain_id = chain_id.clone();
let json_rpc_client = json_rpc_client.clone();

new_indexer_function.start_block_height.map(|_| {
let new_indexer_function_copy = new_indexer_function.clone();
tokio::spawn(async move {
process_historical_messages_or_handle_error(
block_height,
new_indexer_function_copy,
&redis_connection_manager,
&s3_client,
&chain_id,
&json_rpc_client,
)
.await
})
})
pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
cancellation_token: tokio_util::sync::CancellationToken,
}

/// Represents the async task used to process and push historical messages
pub struct Streamer {
task: Option<Task>,
}

impl Streamer {
pub fn new() -> Self {
Streamer { task: None }
}

pub fn start(
&mut self,
current_block_height: BlockHeight,
indexer: IndexerFunction,
redis_connection_manager: storage::ConnectionManager,
s3_client: S3Client,
chain_id: ChainId,
json_rpc_client: JsonRpcClient,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("Streamer has already been started",));
}

let cancellation_token = tokio_util::sync::CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();

let handle = tokio::spawn(async move {
tokio::select! {
_ = cancellation_token_clone.cancelled() => {
storage::del(
&redis_connection_manager,
storage::generate_historical_stream_key(&indexer.get_full_name()),
)
.await
},
_ = process_historical_messages_or_handle_error(
current_block_height,
indexer.clone(),
&redis_connection_manager,
&s3_client,
&chain_id,
&json_rpc_client,
) => {
Ok(())
}
}
});

self.task = Some(Task {
handle,
cancellation_token,
});

Ok(())
}

pub async fn cancel(&mut self) -> anyhow::Result<()> {
if let Some(task) = self.task.take() {
task.cancellation_token.cancel();
task.handle.await??;

return Ok(());
}

Err(anyhow::anyhow!(
"Attempted to cancel already cancelled, or not started, Streamer"
))
}
}

pub(crate) async fn process_historical_messages_or_handle_error(
block_height: BlockHeight,
current_block_height: BlockHeight,
indexer_function: IndexerFunction,
redis_connection_manager: &storage::ConnectionManager,
s3_client: &S3Client,
chain_id: &ChainId,
json_rpc_client: &JsonRpcClient,
) -> i64 {
match process_historical_messages(
block_height,
current_block_height,
indexer_function,
redis_connection_manager,
s3_client,
Expand All @@ -77,15 +123,15 @@ pub(crate) async fn process_historical_messages_or_handle_error(
}
}
pub(crate) async fn process_historical_messages(
block_height: BlockHeight,
current_block_height: BlockHeight,
indexer_function: IndexerFunction,
redis_connection_manager: &storage::ConnectionManager,
s3_client: &S3Client,
chain_id: &ChainId,
json_rpc_client: &JsonRpcClient,
) -> anyhow::Result<i64> {
let start_block = indexer_function.start_block_height.unwrap();
let block_difference: i64 = (block_height - start_block) as i64;
let block_difference: i64 = (current_block_height - start_block) as i64;
match block_difference {
i64::MIN..=-1 => {
bail!("Skipping back fill, start_block_height is greater than current block height: {:?} {:?}",
Expand All @@ -100,7 +146,7 @@ pub(crate) async fn process_historical_messages(
1..=i64::MAX => {
tracing::info!(
target: crate::INDEXER,
"Back filling {block_difference} blocks from {start_block} to current block height {block_height}: {:?} {:?}",
"Back filling {block_difference} blocks from {start_block} to current block height {current_block_height}: {:?} {:?}",
indexer_function.account_id,
indexer_function.function_name
);
Expand Down Expand Up @@ -129,7 +175,7 @@ pub(crate) async fn process_historical_messages(
let mut blocks_between_indexed_and_current_block: Vec<BlockHeight> =
filter_matching_unindexed_blocks_from_lake(
last_indexed_block,
block_height,
current_block_height,
&indexer_function,
s3_client,
chain_id,
Expand Down
81 changes: 39 additions & 42 deletions indexer/queryapi_coordinator/src/indexer_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,13 @@ use near_lake_framework::near_indexer_primitives::views::QueryRequest;
use serde_json::{json, Value};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use tokio::sync::MutexGuard;
use tokio::task::JoinHandle;
use unescape::unescape;

use crate::indexer_reducer;
use crate::indexer_reducer::FunctionCallInfo;
use crate::indexer_types::{IndexerFunction, IndexerRegistry};
use indexer_rule_type::indexer_rule::{IndexerRule, IndexerRuleKind, MatchingRule, Status};

pub(crate) fn registry_as_vec_of_indexer_functions(
registry: &IndexerRegistry,
) -> Vec<IndexerFunction> {
registry
.values()
.flat_map(|fns| fns.values())
.cloned()
.collect()
}

struct RegistryFunctionInvocation {
pub account_id: AccountId,
pub function_name: String,
Expand Down Expand Up @@ -99,20 +87,18 @@ pub(crate) fn build_registry_from_json(raw_registry: Value) -> IndexerRegistry {

/// Returns spawned start_from_block threads
pub(crate) async fn index_registry_changes(
block_height: BlockHeight,
registry: &mut MutexGuard<'_, IndexerRegistry>,
current_block_height: BlockHeight,
context: &QueryApiContext<'_>,
) -> Vec<JoinHandle<i64>> {
index_and_process_remove_calls(registry, context);
) -> anyhow::Result<()> {
index_and_process_remove_calls(context).await;

index_and_process_register_calls(block_height, registry, context)
index_and_process_register_calls(current_block_height, context).await
}

fn index_and_process_register_calls(
block_height: BlockHeight,
registry: &mut MutexGuard<IndexerRegistry>,
context: &QueryApiContext,
) -> Vec<JoinHandle<i64>> {
async fn index_and_process_register_calls(
current_block_height: BlockHeight,
context: &QueryApiContext<'_>,
) -> anyhow::Result<()> {
let registry_method_name = "register_indexer_function";
let registry_calls_rule =
build_registry_indexer_rule(registry_method_name, context.registry_contract_id);
Expand All @@ -122,7 +108,6 @@ fn index_and_process_register_calls(
context.chain_id,
context.streamer_message.block.header.height,
);
let mut spawned_start_from_block_threads = Vec::new();

if !registry_updates.is_empty() {
for update in registry_updates {
Expand All @@ -134,7 +119,8 @@ fn index_and_process_register_calls(
match new_indexer_function {
None => continue,
Some(mut new_indexer_function) => {
let fns = registry
let mut indexer_registry_lock = context.indexer_registry.lock().await;
let fns = indexer_registry_lock
.entry(new_indexer_function.account_id.clone())
.or_default();

Expand All @@ -145,7 +131,7 @@ fn index_and_process_register_calls(
tracing::info!(
target: crate::INDEXER,
"Block {}. Indexed creation call to {registry_method_name}: {:?} {:?}",
block_height,
current_block_height,
new_indexer_function.account_id.clone(),
new_indexer_function.function_name.clone()
);
Expand All @@ -156,7 +142,7 @@ fn index_and_process_register_calls(
tracing::info!(
target: crate::INDEXER,
"Block {}. Indexed update call to {registry_method_name}: {:?} {:?}",
block_height,
current_block_height,
new_indexer_function.account_id.clone(),
new_indexer_function.function_name.clone(),
);
Expand All @@ -168,32 +154,38 @@ fn index_and_process_register_calls(
}

if new_indexer_function.start_block_height.is_some() {
if let Some(thread) =
crate::historical_block_processing::spawn_historical_message_thread(
block_height,
&new_indexer_function,
context.redis_connection_manager,
context.s3_client,
context.chain_id,
context.json_rpc_client,
)
let mut streamers_lock = context.streamers.lock().await;

if let Some(mut existing_streamer) =
streamers_lock.remove(&new_indexer_function.get_full_name())
{
spawned_start_from_block_threads.push(thread);
existing_streamer.cancel().await?;
}

let mut streamer = crate::historical_block_processing::Streamer::new();

streamer.start(
current_block_height,
new_indexer_function.clone(),
context.redis_connection_manager.clone(),
context.s3_client.clone(),
context.chain_id.clone(),
context.json_rpc_client.clone(),
)?;

streamers_lock.insert(new_indexer_function.get_full_name(), streamer);
}

fns.insert(update.method_name.clone(), new_indexer_function);
}
};
}
}
spawned_start_from_block_threads

Ok(())
}

fn index_and_process_remove_calls(
registry: &mut MutexGuard<IndexerRegistry>,
context: &QueryApiContext,
) {
async fn index_and_process_remove_calls(context: &QueryApiContext<'_>) {
let registry_method_name = "remove_indexer_function";
let registry_calls_rule =
build_registry_indexer_rule(registry_method_name, context.registry_contract_id);
Expand All @@ -220,7 +212,12 @@ fn index_and_process_remove_calls(
function_invocation.account_id.clone(),
function_invocation.function_name.clone(),
);
match registry.entry(function_invocation.account_id.clone()) {
match context
.indexer_registry
.lock()
.await
.entry(function_invocation.account_id.clone())
{
Entry::Vacant(_) => {}
Entry::Occupied(mut fns) => {
fns.get_mut()
Expand Down
Loading

0 comments on commit 0476306

Please sign in to comment.