From f676625c6826af3b731037d64c84cbcda8e88bfc Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Tue, 6 Aug 2024 12:59:44 -0700 Subject: [PATCH 1/3] Fix decreasing block error and add logs --- block-streamer/src/block_stream.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 561e2fc7..2e4a61e0 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -120,18 +120,22 @@ impl BlockStream { } } - fn start_health_monitoring_task(&self, redis: Arc) -> JoinHandle<()> { + fn start_health_monitoring_task( + &self, + redis: Arc, + start_block_height: near_indexer_primitives::types::BlockHeight, + ) -> JoinHandle<()> { tokio::spawn({ let config = self.indexer_config.clone(); let health = self.health.clone(); let redis_stream = self.redis_stream.clone(); + let stalled_timeout_seconds = 120; async move { - let mut last_processed_block = - redis.get_last_processed_block(&config).await.unwrap(); - + let mut last_processed_block = Some(start_block_height); loop { - tokio::time::sleep(std::time::Duration::from_secs(120)).await; + tokio::time::sleep(std::time::Duration::from_secs(stalled_timeout_seconds)) + .await; let new_last_processed_block = if let Ok(block) = redis.get_last_processed_block(&config).await { @@ -183,6 +187,11 @@ impl BlockStream { health_lock.processing_state = ProcessingState::Waiting; } Ordering::Equal => { + tracing::error!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "No block has been processed for {stalled_timeout_seconds} seconds" + ); health_lock.processing_state = ProcessingState::Stalled; } Ordering::Greater => { @@ -266,7 +275,8 @@ impl BlockStream { let cancellation_token = tokio_util::sync::CancellationToken::new(); - let monitor_handle = self.start_health_monitoring_task(redis.clone()); + let monitor_handle = + self.start_health_monitoring_task(redis.clone(), start_block_height.clone()); let stream_handle = self.start_block_stream_task( start_block_height, From cca0df586b42e2fe3d23c5fbcf5ba420fbd9381e Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Tue, 6 Aug 2024 13:03:03 -0700 Subject: [PATCH 2/3] Address off by one issues --- block-streamer/src/block_stream.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 2e4a61e0..9e536070 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -132,7 +132,7 @@ impl BlockStream { let stalled_timeout_seconds = 120; async move { - let mut last_processed_block = Some(start_block_height); + let mut last_processed_block = Some(start_block_height - 1); loop { tokio::time::sleep(std::time::Duration::from_secs(stalled_timeout_seconds)) .await; @@ -352,7 +352,7 @@ pub(crate) async fn start_block_stream( .context("Failed while fetching and streaming bitmap indexer blocks")?; let last_indexed_near_lake_block = process_near_lake_blocks( - last_bitmap_indexer_block, + last_bitmap_indexer_block + 1, lake_s3_client, lake_prefetch_size, redis, From 7981794c4a5db407a73767f710875e1470601c42 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Tue, 6 Aug 2024 15:06:35 -0700 Subject: [PATCH 3/3] Remove +1 on lake handoff start block and use warn instead of error --- block-streamer/src/block_stream.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 9e536070..9489eaad 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -187,7 +187,7 @@ impl BlockStream { health_lock.processing_state = ProcessingState::Waiting; } Ordering::Equal => { - tracing::error!( + tracing::warn!( account_id = config.account_id.as_str(), function_name = config.function_name, "No block has been processed for {stalled_timeout_seconds} seconds" @@ -352,7 +352,7 @@ pub(crate) async fn start_block_stream( .context("Failed while fetching and streaming bitmap indexer blocks")?; let last_indexed_near_lake_block = process_near_lake_blocks( - last_bitmap_indexer_block + 1, + last_bitmap_indexer_block, lake_s3_client, lake_prefetch_size, redis,