diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index bf19038b..eff2a0d1 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -131,7 +131,7 @@ impl BlockStream { redis.get_last_processed_block(&config).await.unwrap(); loop { - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs(15)).await; let new_last_processed_block = if let Ok(block) = redis.get_last_processed_block(&config).await { diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index 93878d61..9f71149b 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -246,7 +246,7 @@ impl BlockStreamsHandler { let updated_at = SystemTime::UNIX_EPOCH + Duration::from_secs(health.updated_at_timestamp_secs); - let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(30); + let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(60); let stalled = matches!( health.processing_state.try_into(), Ok(ProcessingState::Stalled) @@ -254,13 +254,17 @@ impl BlockStreamsHandler { if !stale && !stalled { return Ok(()); + } else { + tracing::info!(stale, stalled, "Restarting stalled block stream"); } + } else { + tracing::info!("Restarting stalled block stream"); } - tracing::info!("Restarting stalled block stream"); - self.stop(block_stream.stream_id.clone()).await?; - self.resume_block_stream(config).await?; + + let height = self.get_continuation_block_height(config).await?; + self.start(height, config).await?; Ok(()) }