From 9bb710d497b46a379a904b76ebdc80fac841ad91 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Thu, 18 Jul 2024 16:20:23 +1200 Subject: [PATCH 1/2] chore: Log why block stream was restarted --- coordinator/src/handlers/block_streams.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index 93878d61..0e59eff6 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -254,13 +254,17 @@ impl BlockStreamsHandler { if !stale && !stalled { return Ok(()); + } else { + tracing::info!(stale, stalled, "Restarting stalled block stream"); } + } else { + tracing::info!("Restarting stalled block stream"); } - tracing::info!("Restarting stalled block stream"); - self.stop(block_stream.stream_id.clone()).await?; - self.resume_block_stream(config).await?; + + let height = self.get_continuation_block_height(config).await?; + self.start(height, config).await?; Ok(()) } From 509679eab278e0e78c478a92738374c3834476e7 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Thu, 18 Jul 2024 16:36:13 +1200 Subject: [PATCH 2/2] fix: Dont report `Stalled` for streams which have just started --- block-streamer/src/block_stream.rs | 2 +- coordinator/src/handlers/block_streams.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index bf19038b..eff2a0d1 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -131,7 +131,7 @@ impl BlockStream { redis.get_last_processed_block(&config).await.unwrap(); loop { - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs(15)).await; let new_last_processed_block = if let Ok(block) = redis.get_last_processed_block(&config).await { diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index 0e59eff6..9f71149b 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -246,7 +246,7 @@ impl BlockStreamsHandler { let updated_at = SystemTime::UNIX_EPOCH + Duration::from_secs(health.updated_at_timestamp_secs); - let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(30); + let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(60); let stalled = matches!( health.processing_state.try_into(), Ok(ProcessingState::Stalled)