From 94da5e07deb6068e7b8d2031ec2534943a17fa6a Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 31 Jul 2024 13:44:39 -0700 Subject: [PATCH 1/2] Increase timeout --- block-streamer/src/block_stream.rs | 2 +- coordinator/src/handlers/block_streams.rs | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index a9af3f10..c9a25ac6 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -131,7 +131,7 @@ impl BlockStream { redis.get_last_processed_block(&config).await.unwrap(); loop { - tokio::time::sleep(std::time::Duration::from_secs(15)).await; + tokio::time::sleep(std::time::Duration::from_secs(120)).await; let new_last_processed_block = if let Ok(block) = redis.get_last_processed_block(&config).await { diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index 01bb88a8..d79e3163 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -257,7 +257,11 @@ impl BlockStreamsHandler { if !stale && !stalled { return Ok(()); } else { - tracing::info!(stale, stalled, "Restarting stalled block stream"); + tracing::info!( + stale, + stalled, + "Restarting stalled block stream after {RESTART_TIMEOUT_SECONDS} seconds" + ); } } else { tracing::info!( From c1922b0a704791e9007bfbf5f49f21b9e8b1661d Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 31 Jul 2024 15:08:26 -0700 Subject: [PATCH 2/2] Increase coordinator stale timeout --- coordinator/src/handlers/block_streams.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index d79e3163..b3e9fb5c 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -248,7 +248,7 @@ impl BlockStreamsHandler { let updated_at = SystemTime::UNIX_EPOCH + Duration::from_secs(health.updated_at_timestamp_secs); - let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(60); + let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(180); let stalled = matches!( health.processing_state.try_into(), Ok(ProcessingState::Stalled)