From 8ef52d07f1e826c3b74fff3b5289915e219495ac Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Tue, 6 Aug 2024 12:59:44 -0700 Subject: [PATCH] Fix decreasing block error and add logs --- block-streamer/src/block_stream.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 561e2fc7..2e4a61e0 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -120,18 +120,22 @@ impl BlockStream { } } - fn start_health_monitoring_task(&self, redis: Arc) -> JoinHandle<()> { + fn start_health_monitoring_task( + &self, + redis: Arc, + start_block_height: near_indexer_primitives::types::BlockHeight, + ) -> JoinHandle<()> { tokio::spawn({ let config = self.indexer_config.clone(); let health = self.health.clone(); let redis_stream = self.redis_stream.clone(); + let stalled_timeout_seconds = 120; async move { - let mut last_processed_block = - redis.get_last_processed_block(&config).await.unwrap(); - + let mut last_processed_block = Some(start_block_height); loop { - tokio::time::sleep(std::time::Duration::from_secs(120)).await; + tokio::time::sleep(std::time::Duration::from_secs(stalled_timeout_seconds)) + .await; let new_last_processed_block = if let Ok(block) = redis.get_last_processed_block(&config).await { @@ -183,6 +187,11 @@ impl BlockStream { health_lock.processing_state = ProcessingState::Waiting; } Ordering::Equal => { + tracing::error!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "No block has been processed for {stalled_timeout_seconds} seconds" + ); health_lock.processing_state = ProcessingState::Stalled; } Ordering::Greater => { @@ -266,7 +275,8 @@ impl BlockStream { let cancellation_token = tokio_util::sync::CancellationToken::new(); - let monitor_handle = self.start_health_monitoring_task(redis.clone()); + let monitor_handle = + self.start_health_monitoring_task(redis.clone(), start_block_height.clone()); let stream_handle = self.start_block_stream_task( start_block_height,