-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Cancel historical backfill process before starting anew #363
feat: Cancel historical backfill process before starting anew #363
Conversation
bf90a5f
to
de1e10b
Compare
} | ||
|
||
pub(crate) async fn process_historical_messages_or_handle_error( | ||
block_height: BlockHeight, | ||
current_block_height: BlockHeight, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got confused about what block_height
this actually represents, so ended up renaming it
use unescape::unescape; | ||
|
||
use crate::indexer_reducer; | ||
use crate::indexer_reducer::FunctionCallInfo; | ||
use crate::indexer_types::{IndexerFunction, IndexerRegistry}; | ||
use indexer_rule_type::indexer_rule::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; | ||
|
||
pub(crate) fn registry_as_vec_of_indexer_functions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now done inline
.lock() | ||
.await | ||
.entry(function_invocation.account_id.clone()) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also clean up streams here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good call. I'll do that in this issue #346
53d599c
to
b636916
Compare
This PR updates Coordinator such that only one Historical Backfill process can exist for a given Indexer Function. Currently, these processes do not have awareness of each other, meaning a developer can trigger multiple simultaneous backfills, with them all writing to the same Redis Stream. The major changes are detailed below.
Streamer/Historical backfill tasks
In this PR, I've introduced the
historical_block_processing::Streamer
struct. This is used to encapsulate the async task which is spawned for handling the Historical Backfill. I went with "Streamer", as I envision this becoming the single process which handles both historical/real-time messages for a given Indexer, as described in #216 (comment).Streamer
is responsible for starting theasync
task, and provides a basic API for interacting with it. Right now, that only includes cancelling it. Cancelling aStreamer
will drop the existing future, preventing any new messages from being added, and then delete the Redis Stream removing all existing messages.Indexer Registry Mutex/Async
indexer_registry
is the in-memory representation of the given Indexer Registry. Currently, we pass around theMutexGuard
to mutate this data, resulting in the lock being held for much longer than is needed, blocking all other consumers from taking the lock. There isn't much contention for this lock, so it isn't a problem right now, but could end up being one in future.Adding in the
Streamer
Mutex
made mitigating this problem trivial, so I went ahead and made some changes. I've updated the code so that we pass around theMutex
itself, rather than the guard, allowing us to the hold the lock for the least amount of time possible. This required updating most methods withinindexer_registry
toasync
, so that we could take theasync
lock
.With the
Mutex
being used rather than theMutexGuard
, it seemed sensible to addindexer_registry
toQueryAPIContext
, rather than passing it round individually.