-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Handle StartBlock
options within Coordinator & Block Streamer
#553
feat: Handle StartBlock
options within Coordinator & Block Streamer
#553
Conversation
3ffed37
to
67a48cc
Compare
67a48cc
to
0c9e53e
Compare
StartBlock
options within Coordinator & Block Streamer
@@ -1,3 +1,5 @@ | |||
#![cfg_attr(test, allow(dead_code))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cargo thinks that the real implementation is "unused", and the mock version is used instead. This suppresses those warnings.
@@ -0,0 +1,5 @@ | |||
mod handler; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved synchronisation logic out of main.rs
and it to its own mod. Moved the "handler" struct next to it too.
Ok(()) | ||
} | ||
|
||
#[tracing::instrument( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creates a tracing::span
which automatically adds this context to all log messages called within/below this function.
block-streamer/src/block_stream.rs
Outdated
) | ||
.await | ||
.unwrap_or_else(|err| { | ||
tracing::warn!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's currently an issue with the Delta Lake pipeline - which highlighted that any failures here prevent kill the entire block stream. Changed it to capture/log these errors, and then just continue with Near Lake Framework. Not ideal, but better than just stopping.
account_id = account_id.as_str(), | ||
function_name, | ||
version = indexer_config.get_registry_version(), | ||
"failed to sync block stream: {err:?}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Swallow errors, they will be retried on the next control loop
@@ -230,6 +239,20 @@ async fn merge_streams( | |||
} | |||
} | |||
|
|||
async fn update_stream_version( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to distinguish between "new" and "migrated" streams, as they are handled differently
51a6bf7
to
8e52efb
Compare
8e52efb
to
abf9b57
Compare
abf9b57
to
16083a0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome work on the migration! LGTM! Just some small comments.
redis_client | ||
.get_last_published_block(indexer_config) | ||
.await? | ||
.ok_or(anyhow::anyhow!("Indexer has no `last_published_block`")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this do? Is our intended behavior to return some other block height available to us? Like the version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fails the block stream creation. I didn't want to make any assumptions about where to start the block stream from as we may be incorrect, resulting in undesired behaviour 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense to me!
stream.account_id == account_id.to_string() | ||
&& &stream.function_name == function_name | ||
}) | ||
.map(|index| active_executors.swap_remove(index)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that I think about, what's the behavior if the account and function name is the same for multiple indexers? Does swap_remove stop at the first instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.position
finds the first occurrence , so yes, this will take the first executor. Runner prevents duplicate executors thought right? In saying that though we should probably handle the case here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Runner sort of does it through the deterministic hashing. I agree that it'd be nicer to have that called out in Coordinator since it would indicate an issue with our contract. And we can verify it here, instead of relying on our hashing mechanism in Runner, which might change.
This PR adds support for the updated Registry types, including the new
StartBlock
config option, across the Block Streamer/Coordinator.With
StartBlock
, the logic within Coordinator is much more straight forward - we no longer need to guess whether we should "continue" or "start over", that is build in to the configuration options.This only really affects the handling of Block Streams, the executor flow remains the same: always restart when a new version is published.
To summarise the Block Stream synchronisation process:
StartBlock::Continue
- Resumes process, keeping the current data in the Redis Stream, and start the Block Stream fromlast_published_block
StartBlock::Latest
- Starts a new process, clears the Redis Stream, starts the Block Stream from the registry version, essentially being latestStartBlock::Height(u64)
- Starts a new process, clears the Redis Stream, starts the Block Stream from the height configuredAdditionally, Accounts/Indexers which have just been migrated, and also streams which have been stopped but have unchanged versions (e.g. after Block Streamer restart), will be treated the same as
StartBlock::Continue
.