diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index bc34c000..93878d61 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -1,12 +1,14 @@ #![cfg_attr(test, allow(dead_code))] +use std::time::{Duration, SystemTime}; + pub use block_streamer::StreamInfo; use anyhow::Context; use block_streamer::block_streamer_client::BlockStreamerClient; use block_streamer::{ start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, GetStreamRequest, - ListStreamsRequest, StartStreamRequest, Status, StopStreamRequest, + ListStreamsRequest, ProcessingState, StartStreamRequest, Status, StopStreamRequest, }; use near_primitives::types::AccountId; use registry_types::StartBlock; @@ -235,6 +237,34 @@ impl BlockStreamsHandler { Ok(()) } + async fn ensure_healthy( + &self, + config: &IndexerConfig, + block_stream: &StreamInfo, + ) -> anyhow::Result<()> { + if let Some(health) = block_stream.health.as_ref() { + let updated_at = + SystemTime::UNIX_EPOCH + Duration::from_secs(health.updated_at_timestamp_secs); + + let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(30); + let stalled = matches!( + health.processing_state.try_into(), + Ok(ProcessingState::Stalled) + ); + + if !stale && !stalled { + return Ok(()); + } + } + + tracing::info!("Restarting stalled block stream"); + + self.stop(block_stream.stream_id.clone()).await?; + self.resume_block_stream(config).await?; + + Ok(()) + } + pub async fn synchronise_block_stream( &self, config: &IndexerConfig, @@ -246,6 +276,7 @@ impl BlockStreamsHandler { if let Some(block_stream) = block_stream { if block_stream.version == config.get_registry_version() { + self.ensure_healthy(config, &block_stream).await?; return Ok(()); } diff --git a/coordinator/src/handlers/executors.rs b/coordinator/src/handlers/executors.rs index 01c9cc4b..4e12ef26 100644 --- a/coordinator/src/handlers/executors.rs +++ b/coordinator/src/handlers/executors.rs @@ -5,7 +5,10 @@ pub use runner::ExecutorInfo; use anyhow::Context; use runner::runner_client::RunnerClient; -use runner::{GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; +use runner::{ + ExecutionState, GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, + StopExecutorRequest, +}; use tonic::transport::channel::Channel; use tonic::Request; @@ -119,6 +122,28 @@ impl ExecutorsHandler { Ok(()) } + async fn ensure_healthy( + &self, + config: &IndexerConfig, + executor: ExecutorInfo, + ) -> anyhow::Result<()> { + if let Some(health) = executor.health { + if !matches!( + health.execution_state.try_into(), + Ok(ExecutionState::Stalled) + ) { + return Ok(()); + } + } + + tracing::info!("Restarting stalled executor"); + + self.stop(executor.executor_id).await?; + self.start(config).await?; + + Ok(()) + } + pub async fn synchronise_executor(&self, config: &IndexerConfig) -> anyhow::Result<()> { let executor = self .get(config.account_id.clone(), config.function_name.clone()) @@ -126,6 +151,7 @@ impl ExecutorsHandler { if let Some(executor) = executor { if executor.version == config.get_registry_version() { + self.ensure_healthy(config, executor).await?; return Ok(()); }