Skip to content

Commit

Permalink
fix: Ensure block streams are correctly resumed
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Jul 16, 2024
1 parent 100ea2e commit 4248bfb
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
2 changes: 0 additions & 2 deletions coordinator/src/indexer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub enum ProvisionedState {

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct OldIndexerState {
// store previous config to make comparison easier?
pub account_id: AccountId,
pub function_name: String,
pub block_stream_synced_at: Option<u64>,
Expand All @@ -28,7 +27,6 @@ pub struct OldIndexerState {

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct IndexerState {
// store previous config to make comparison easier?
pub account_id: AccountId,
pub function_name: String,
pub block_stream_synced_at: Option<u64>,
Expand Down
7 changes: 5 additions & 2 deletions coordinator/src/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::indexer_config::IndexerConfig;
use crate::indexer_state::{IndexerState, IndexerStateManager};
use crate::redis::{KeyProvider, RedisClient};
use crate::registry::Registry;
use crate::utils::exponential_retry;

const LOOP_THROTTLE_MS: u64 = 1000;

Expand Down Expand Up @@ -81,7 +82,7 @@ impl<'a> LifecycleManager<'a> {
async fn handle_running(
&self,
config: Option<&IndexerConfig>,
state: &IndexerState,
state: &mut IndexerState,
) -> LifecycleState {
if config.is_none() {
return LifecycleState::Deleting;
Expand All @@ -103,6 +104,8 @@ impl<'a> LifecycleManager<'a> {
return LifecycleState::Running;
}

state.block_stream_synced_at = Some(config.get_registry_version());

if let Err(error) = self.executors_handler.synchronise_executor(config).await {
warn!(?error, "Failed to synchronise executor, retrying...");

Expand Down Expand Up @@ -267,7 +270,7 @@ impl<'a> LifecycleManager<'a> {
LifecycleState::Initializing => {
self.handle_initializing(config.as_ref(), &state).await
}
LifecycleState::Running => self.handle_running(config.as_ref(), &state).await,
LifecycleState::Running => self.handle_running(config.as_ref(), &mut state).await,
LifecycleState::Stopping => self.handle_stopping(config.as_ref()).await,
LifecycleState::Stopped => self.handle_stopped(config.as_ref(), &state).await,
LifecycleState::Repairing => self.handle_repairing(config.as_ref(), &state).await,
Expand Down

0 comments on commit 4248bfb

Please sign in to comment.