From 40d53e0e261fe047cbcd85588434d00a9e53904f Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Thu, 18 Jul 2024 08:26:43 +1200 Subject: [PATCH 1/3] feat: Restart unhealthy block streams --- coordinator/src/handlers/block_streams.rs | 25 ++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index bc34c000..d5859390 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -6,7 +6,7 @@ use anyhow::Context; use block_streamer::block_streamer_client::BlockStreamerClient; use block_streamer::{ start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, GetStreamRequest, - ListStreamsRequest, StartStreamRequest, Status, StopStreamRequest, + ListStreamsRequest, ProcessingState, StartStreamRequest, Status, StopStreamRequest, }; use near_primitives::types::AccountId; use registry_types::StartBlock; @@ -235,6 +235,28 @@ impl BlockStreamsHandler { Ok(()) } + async fn ensure_healthy( + &self, + config: &IndexerConfig, + block_stream: &StreamInfo, + ) -> anyhow::Result<()> { + if let Some(health) = block_stream.health.as_ref() { + if !matches!( + health.processing_state.try_into(), + Ok(ProcessingState::Stalled) + ) { + return Ok(()); + } + } + + tracing::info!("Restarting stalled block stream"); + + self.stop(block_stream.stream_id.clone()).await?; + self.resume_block_stream(config).await?; + + Ok(()) + } + pub async fn synchronise_block_stream( &self, config: &IndexerConfig, @@ -246,6 +268,7 @@ impl BlockStreamsHandler { if let Some(block_stream) = block_stream { if block_stream.version == config.get_registry_version() { + self.ensure_healthy(config, &block_stream).await?; return Ok(()); } From cb236968bbc9c07ca8031c0a039d14f95bd0ac17 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Thu, 18 Jul 2024 08:52:17 +1200 Subject: [PATCH 2/3] feat: Restart unhealthy executors --- coordinator/src/handlers/executors.rs | 28 ++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/coordinator/src/handlers/executors.rs b/coordinator/src/handlers/executors.rs index 01c9cc4b..4e12ef26 100644 --- a/coordinator/src/handlers/executors.rs +++ b/coordinator/src/handlers/executors.rs @@ -5,7 +5,10 @@ pub use runner::ExecutorInfo; use anyhow::Context; use runner::runner_client::RunnerClient; -use runner::{GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; +use runner::{ + ExecutionState, GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, + StopExecutorRequest, +}; use tonic::transport::channel::Channel; use tonic::Request; @@ -119,6 +122,28 @@ impl ExecutorsHandler { Ok(()) } + async fn ensure_healthy( + &self, + config: &IndexerConfig, + executor: ExecutorInfo, + ) -> anyhow::Result<()> { + if let Some(health) = executor.health { + if !matches!( + health.execution_state.try_into(), + Ok(ExecutionState::Stalled) + ) { + return Ok(()); + } + } + + tracing::info!("Restarting stalled executor"); + + self.stop(executor.executor_id).await?; + self.start(config).await?; + + Ok(()) + } + pub async fn synchronise_executor(&self, config: &IndexerConfig) -> anyhow::Result<()> { let executor = self .get(config.account_id.clone(), config.function_name.clone()) @@ -126,6 +151,7 @@ impl ExecutorsHandler { if let Some(executor) = executor { if executor.version == config.get_registry_version() { + self.ensure_healthy(config, executor).await?; return Ok(()); } From 5ce06ed4df4bcf3a9ecdd8a733946aa22e8d9aae Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Thu, 18 Jul 2024 09:24:22 +1200 Subject: [PATCH 3/3] feat: Restrart block streams with stale data --- coordinator/src/handlers/block_streams.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index d5859390..93878d61 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -1,5 +1,7 @@ #![cfg_attr(test, allow(dead_code))] +use std::time::{Duration, SystemTime}; + pub use block_streamer::StreamInfo; use anyhow::Context; @@ -241,10 +243,16 @@ impl BlockStreamsHandler { block_stream: &StreamInfo, ) -> anyhow::Result<()> { if let Some(health) = block_stream.health.as_ref() { - if !matches!( + let updated_at = + SystemTime::UNIX_EPOCH + Duration::from_secs(health.updated_at_timestamp_secs); + + let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(30); + let stalled = matches!( health.processing_state.try_into(), Ok(ProcessingState::Stalled) - ) { + ); + + if !stale && !stalled { return Ok(()); } }