Skip to content

Commit

Permalink
fix: Decreasing Processed Block Error (#988)
Browse files Browse the repository at this point in the history
Block streams are failing due to their start block being lower than the
current. This PR fixes this issue by seeding the health task with the
input start block. This should fix the issue in most cases. I've also
ensured that the handoff between receiver_blocks and lake also do not
repeat a block.
  • Loading branch information
darunrs committed Aug 6, 2024
1 parent e67d2da commit 3be5862
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,22 @@ impl BlockStream {
}
}

fn start_health_monitoring_task(&self, redis: Arc<RedisClient>) -> JoinHandle<()> {
fn start_health_monitoring_task(
&self,
redis: Arc<RedisClient>,
start_block_height: near_indexer_primitives::types::BlockHeight,
) -> JoinHandle<()> {
tokio::spawn({
let config = self.indexer_config.clone();
let health = self.health.clone();
let redis_stream = self.redis_stream.clone();
let stalled_timeout_seconds = 120;

async move {
let mut last_processed_block =
redis.get_last_processed_block(&config).await.unwrap();

let mut last_processed_block = Some(start_block_height - 1);
loop {
tokio::time::sleep(std::time::Duration::from_secs(120)).await;
tokio::time::sleep(std::time::Duration::from_secs(stalled_timeout_seconds))
.await;

let new_last_processed_block =
if let Ok(block) = redis.get_last_processed_block(&config).await {
Expand Down Expand Up @@ -183,6 +187,11 @@ impl BlockStream {
health_lock.processing_state = ProcessingState::Waiting;
}
Ordering::Equal => {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"No block has been processed for {stalled_timeout_seconds} seconds"
);
health_lock.processing_state = ProcessingState::Stalled;
}
Ordering::Greater => {
Expand Down Expand Up @@ -266,7 +275,8 @@ impl BlockStream {

let cancellation_token = tokio_util::sync::CancellationToken::new();

let monitor_handle = self.start_health_monitoring_task(redis.clone());
let monitor_handle =
self.start_health_monitoring_task(redis.clone(), start_block_height.clone());

let stream_handle = self.start_block_stream_task(
start_block_height,
Expand Down

0 comments on commit 3be5862

Please sign in to comment.