From cb236968bbc9c07ca8031c0a039d14f95bd0ac17 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Thu, 18 Jul 2024 08:52:17 +1200 Subject: [PATCH] feat: Restart unhealthy executors --- coordinator/src/handlers/executors.rs | 28 ++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/coordinator/src/handlers/executors.rs b/coordinator/src/handlers/executors.rs index 01c9cc4b..4e12ef26 100644 --- a/coordinator/src/handlers/executors.rs +++ b/coordinator/src/handlers/executors.rs @@ -5,7 +5,10 @@ pub use runner::ExecutorInfo; use anyhow::Context; use runner::runner_client::RunnerClient; -use runner::{GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; +use runner::{ + ExecutionState, GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, + StopExecutorRequest, +}; use tonic::transport::channel::Channel; use tonic::Request; @@ -119,6 +122,28 @@ impl ExecutorsHandler { Ok(()) } + async fn ensure_healthy( + &self, + config: &IndexerConfig, + executor: ExecutorInfo, + ) -> anyhow::Result<()> { + if let Some(health) = executor.health { + if !matches!( + health.execution_state.try_into(), + Ok(ExecutionState::Stalled) + ) { + return Ok(()); + } + } + + tracing::info!("Restarting stalled executor"); + + self.stop(executor.executor_id).await?; + self.start(config).await?; + + Ok(()) + } + pub async fn synchronise_executor(&self, config: &IndexerConfig) -> anyhow::Result<()> { let executor = self .get(config.account_id.clone(), config.function_name.clone()) @@ -126,6 +151,7 @@ impl ExecutorsHandler { if let Some(executor) = executor { if executor.version == config.get_registry_version() { + self.ensure_healthy(config, executor).await?; return Ok(()); }