Skip to content

Commit

Permalink
fix: Avoid restarting block streams which have just started (#895)
Browse files Browse the repository at this point in the history
On Block Streamer startup, all Block Streams are started in one big
herd. Some of which can take a while to start processing. These end up
getting marked as "Stalled", and are therefore restarted.

This PR increases the monitoring scrape interval, so that Block Streams
have a longer window to prove they are "Processing", and therefore do
not get restarted. Also bumped the "stale" check in Coordinator, so they
are less likely to get marked as Stale.
  • Loading branch information
morgsmccauley authored Jul 18, 2024
1 parent f1c1757 commit 0177c26
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
2 changes: 1 addition & 1 deletion block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl BlockStream {
redis.get_last_processed_block(&config).await.unwrap();

loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
tokio::time::sleep(std::time::Duration::from_secs(15)).await;

let new_last_processed_block =
if let Ok(block) = redis.get_last_processed_block(&config).await {
Expand Down
12 changes: 8 additions & 4 deletions coordinator/src/handlers/block_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,21 +246,25 @@ impl BlockStreamsHandler {
let updated_at =
SystemTime::UNIX_EPOCH + Duration::from_secs(health.updated_at_timestamp_secs);

let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(30);
let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(60);
let stalled = matches!(
health.processing_state.try_into(),
Ok(ProcessingState::Stalled)
);

if !stale && !stalled {
return Ok(());
} else {
tracing::info!(stale, stalled, "Restarting stalled block stream");
}
} else {
tracing::info!("Restarting stalled block stream");
}

tracing::info!("Restarting stalled block stream");

self.stop(block_stream.stream_id.clone()).await?;
self.resume_block_stream(config).await?;

let height = self.get_continuation_block_height(config).await?;
self.start(height, config).await?;

Ok(())
}
Expand Down

0 comments on commit 0177c26

Please sign in to comment.