Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Decreasing Processed Block Error #988

Merged
merged 3 commits into from
Aug 6, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,22 @@ impl BlockStream {
}
}

fn start_health_monitoring_task(&self, redis: Arc<RedisClient>) -> JoinHandle<()> {
fn start_health_monitoring_task(
&self,
redis: Arc<RedisClient>,
start_block_height: near_indexer_primitives::types::BlockHeight,
) -> JoinHandle<()> {
tokio::spawn({
let config = self.indexer_config.clone();
let health = self.health.clone();
let redis_stream = self.redis_stream.clone();
let stalled_timeout_seconds = 120;

async move {
let mut last_processed_block =
redis.get_last_processed_block(&config).await.unwrap();

let mut last_processed_block = Some(start_block_height - 1);
loop {
tokio::time::sleep(std::time::Duration::from_secs(120)).await;
tokio::time::sleep(std::time::Duration::from_secs(stalled_timeout_seconds))
.await;

let new_last_processed_block =
if let Ok(block) = redis.get_last_processed_block(&config).await {
Expand Down Expand Up @@ -183,6 +187,11 @@ impl BlockStream {
health_lock.processing_state = ProcessingState::Waiting;
}
Ordering::Equal => {
tracing::error!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use warn. error should be for unrecoverable situations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha

account_id = config.account_id.as_str(),
function_name = config.function_name,
"No block has been processed for {stalled_timeout_seconds} seconds"
);
health_lock.processing_state = ProcessingState::Stalled;
}
Ordering::Greater => {
Expand Down Expand Up @@ -266,7 +275,8 @@ impl BlockStream {

let cancellation_token = tokio_util::sync::CancellationToken::new();

let monitor_handle = self.start_health_monitoring_task(redis.clone());
let monitor_handle =
self.start_health_monitoring_task(redis.clone(), start_block_height.clone());

let stream_handle = self.start_block_stream_task(
start_block_height,
Expand Down Expand Up @@ -342,7 +352,7 @@ pub(crate) async fn start_block_stream(
.context("Failed while fetching and streaming bitmap indexer blocks")?;

let last_indexed_near_lake_block = process_near_lake_blocks(
last_bitmap_indexer_block,
last_bitmap_indexer_block + 1,
Copy link
Collaborator Author

@darunrs darunrs Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a +1 here too, as I suspect that the handoff results in a repeat processing of that particular block. The input is directly passed as the start block height argument for the stream. I believe the stream is capable of guessing forward if the specific block does not exist, since it uses list operations first before getting the files.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you confirmed this? or just a suspicion?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we use start_after, so the previous implementation was correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. I did see a repeat in queryapi_indexer, but I wasn't 100% sure. I'll keep it as is and separately investigate that.

lake_s3_client,
lake_prefetch_size,
redis,
Expand Down
Loading