diff --git a/coordinator/src/indexer_state.rs b/coordinator/src/indexer_state.rs index 8c0663e1..1200648f 100644 --- a/coordinator/src/indexer_state.rs +++ b/coordinator/src/indexer_state.rs @@ -18,7 +18,6 @@ pub enum ProvisionedState { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] pub struct OldIndexerState { - // store previous config to make comparison easier? pub account_id: AccountId, pub function_name: String, pub block_stream_synced_at: Option, @@ -28,7 +27,6 @@ pub struct OldIndexerState { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] pub struct IndexerState { - // store previous config to make comparison easier? pub account_id: AccountId, pub function_name: String, pub block_stream_synced_at: Option, diff --git a/coordinator/src/lifecycle.rs b/coordinator/src/lifecycle.rs index 5dda5133..7649defb 100644 --- a/coordinator/src/lifecycle.rs +++ b/coordinator/src/lifecycle.rs @@ -7,6 +7,7 @@ use crate::indexer_config::IndexerConfig; use crate::indexer_state::{IndexerState, IndexerStateManager}; use crate::redis::{KeyProvider, RedisClient}; use crate::registry::Registry; +use crate::utils::exponential_retry; const LOOP_THROTTLE_MS: u64 = 1000; @@ -81,7 +82,7 @@ impl<'a> LifecycleManager<'a> { async fn handle_running( &self, config: Option<&IndexerConfig>, - state: &IndexerState, + state: &mut IndexerState, ) -> LifecycleState { if config.is_none() { return LifecycleState::Deleting; @@ -103,6 +104,8 @@ impl<'a> LifecycleManager<'a> { return LifecycleState::Running; } + state.block_stream_synced_at = Some(config.get_registry_version()); + if let Err(error) = self.executors_handler.synchronise_executor(config).await { warn!(?error, "Failed to synchronise executor, retrying..."); @@ -267,7 +270,7 @@ impl<'a> LifecycleManager<'a> { LifecycleState::Initializing => { self.handle_initializing(config.as_ref(), &state).await } - LifecycleState::Running => self.handle_running(config.as_ref(), &state).await, + LifecycleState::Running => self.handle_running(config.as_ref(), &mut state).await, LifecycleState::Stopping => self.handle_stopping(config.as_ref()).await, LifecycleState::Stopped => self.handle_stopped(config.as_ref(), &state).await, LifecycleState::Repairing => self.handle_repairing(config.as_ref(), &state).await,