-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Restart stalled Block Streams & Executors #891
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,10 @@ pub use runner::ExecutorInfo; | |
|
||
use anyhow::Context; | ||
use runner::runner_client::RunnerClient; | ||
use runner::{GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; | ||
use runner::{ | ||
ExecutionState, GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, | ||
StopExecutorRequest, | ||
}; | ||
use tonic::transport::channel::Channel; | ||
use tonic::Request; | ||
|
||
|
@@ -119,13 +122,36 @@ impl ExecutorsHandler { | |
Ok(()) | ||
} | ||
|
||
async fn ensure_healthy( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Executors don't need a stale check since data is pushed rather than polled There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you explain why block streams do need a stale check? I'm a bit confused what criteria our reported state data can be stale. and why Executors wouldn't be susceptible to it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Health data is scraped differently for each, block streams are polled while executors push data up. For the former, it felt natural to add the "scrape time" to the exported data, and therefore to check that within Coordinator. Thinking about this more though it's still possible for executor data to be stale, i.e. data hasn't been pushed up in a while. Perhaps we should be checking for that too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I feel a stale check is really an indication of mistrust in the Block Streamer and Executor working correctly, which is indeed valid. If we continue to tighten the accuracy of the status of the stream and executor, we can configure Coordinator to have a different timeout for each activity. Like, 3 min for querying bitmaps, 15 seconds for publishing a block, 10 minutes for waiting for stream size to reduce, etc. Same for executor. Say 5 minutes for executing code, 20 seconds for waiting block download, no timeout for waiting for stream message, etc. Then, we no longer rely on block stream or executor simply giving heartbeats, where a failure case is that something crashed. Anyway, it's a thought. It involves more thorough logic in Coordinator, which could lead to more bugs if done wrong. Once we nail the simply stale timeout, perhaps we can think over what benefits we can gain by introducing more specific logic. |
||
&self, | ||
config: &IndexerConfig, | ||
executor: ExecutorInfo, | ||
) -> anyhow::Result<()> { | ||
if let Some(health) = executor.health { | ||
if !matches!( | ||
health.execution_state.try_into(), | ||
Ok(ExecutionState::Stalled) | ||
) { | ||
return Ok(()); | ||
} | ||
} | ||
|
||
tracing::info!("Restarting stalled executor"); | ||
|
||
self.stop(executor.executor_id).await?; | ||
self.start(config).await?; | ||
|
||
Ok(()) | ||
} | ||
|
||
pub async fn synchronise_executor(&self, config: &IndexerConfig) -> anyhow::Result<()> { | ||
let executor = self | ||
.get(config.account_id.clone(), config.function_name.clone()) | ||
.await?; | ||
|
||
if let Some(executor) = executor { | ||
if executor.version == config.get_registry_version() { | ||
self.ensure_healthy(config, executor).await?; | ||
return Ok(()); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make the stale seconds limit a constant so that we can configure them individually in the same place. I imagine we may end up with several more.