Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Restart stalled Block Streams & Executors #891

Merged
merged 3 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion coordinator/src/handlers/block_streams.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#![cfg_attr(test, allow(dead_code))]

use std::time::{Duration, SystemTime};

pub use block_streamer::StreamInfo;

use anyhow::Context;
use block_streamer::block_streamer_client::BlockStreamerClient;
use block_streamer::{
start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, GetStreamRequest,
ListStreamsRequest, StartStreamRequest, Status, StopStreamRequest,
ListStreamsRequest, ProcessingState, StartStreamRequest, Status, StopStreamRequest,
};
use near_primitives::types::AccountId;
use registry_types::StartBlock;
Expand Down Expand Up @@ -235,6 +237,34 @@ impl BlockStreamsHandler {
Ok(())
}

async fn ensure_healthy(
&self,
config: &IndexerConfig,
block_stream: &StreamInfo,
) -> anyhow::Result<()> {
if let Some(health) = block_stream.health.as_ref() {
let updated_at =
SystemTime::UNIX_EPOCH + Duration::from_secs(health.updated_at_timestamp_secs);

let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(30);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make the stale seconds limit a constant so that we can configure them individually in the same place. I imagine we may end up with several more.

let stalled = matches!(
health.processing_state.try_into(),
Ok(ProcessingState::Stalled)
);

if !stale && !stalled {
return Ok(());
}
}

tracing::info!("Restarting stalled block stream");

self.stop(block_stream.stream_id.clone()).await?;
self.resume_block_stream(config).await?;

Ok(())
}

pub async fn synchronise_block_stream(
&self,
config: &IndexerConfig,
Expand All @@ -246,6 +276,7 @@ impl BlockStreamsHandler {

if let Some(block_stream) = block_stream {
if block_stream.version == config.get_registry_version() {
self.ensure_healthy(config, &block_stream).await?;
return Ok(());
}

Expand Down
28 changes: 27 additions & 1 deletion coordinator/src/handlers/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ pub use runner::ExecutorInfo;

use anyhow::Context;
use runner::runner_client::RunnerClient;
use runner::{GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest};
use runner::{
ExecutionState, GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest,
StopExecutorRequest,
};
use tonic::transport::channel::Channel;
use tonic::Request;

Expand Down Expand Up @@ -119,13 +122,36 @@ impl ExecutorsHandler {
Ok(())
}

async fn ensure_healthy(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executors don't need a stale check since data is pushed rather than polled

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why block streams do need a stale check? I'm a bit confused what criteria our reported state data can be stale. and why Executors wouldn't be susceptible to it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Health data is scraped differently for each, block streams are polled while executors push data up. For the former, it felt natural to add the "scrape time" to the exported data, and therefore to check that within Coordinator.

Thinking about this more though it's still possible for executor data to be stale, i.e. data hasn't been pushed up in a while. Perhaps we should be checking for that too.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I feel a stale check is really an indication of mistrust in the Block Streamer and Executor working correctly, which is indeed valid. If we continue to tighten the accuracy of the status of the stream and executor, we can configure Coordinator to have a different timeout for each activity. Like, 3 min for querying bitmaps, 15 seconds for publishing a block, 10 minutes for waiting for stream size to reduce, etc. Same for executor. Say 5 minutes for executing code, 20 seconds for waiting block download, no timeout for waiting for stream message, etc.

Then, we no longer rely on block stream or executor simply giving heartbeats, where a failure case is that something crashed. Anyway, it's a thought. It involves more thorough logic in Coordinator, which could lead to more bugs if done wrong. Once we nail the simply stale timeout, perhaps we can think over what benefits we can gain by introducing more specific logic.

&self,
config: &IndexerConfig,
executor: ExecutorInfo,
) -> anyhow::Result<()> {
if let Some(health) = executor.health {
if !matches!(
health.execution_state.try_into(),
Ok(ExecutionState::Stalled)
) {
return Ok(());
}
}

tracing::info!("Restarting stalled executor");

self.stop(executor.executor_id).await?;
self.start(config).await?;

Ok(())
}

pub async fn synchronise_executor(&self, config: &IndexerConfig) -> anyhow::Result<()> {
let executor = self
.get(config.account_id.clone(), config.function_name.clone())
.await?;

if let Some(executor) = executor {
if executor.version == config.get_registry_version() {
self.ensure_healthy(config, executor).await?;
return Ok(());
}

Expand Down
Loading