Skip to content

Commit

Permalink
test: Cancel block_stream task to workaround infinite lake loop (#856)
Browse files Browse the repository at this point in the history
`near-lake-framework` will infinitely retry, making our tests infinitely
hang. This PR monitors the `last_processed_ block`, and cancels the task
from the outside when the last desired block is reached.
  • Loading branch information
morgsmccauley committed Jul 9, 2024
1 parent c64f8ff commit 8c40b2e
Showing 1 changed file with 73 additions and 40 deletions.
113 changes: 73 additions & 40 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,9 @@ mod tests {

use mockall::predicate;
use near_lake_framework::s3_client::GetObjectBytesError;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

// FIX: near lake framework now infinitely retires - we need a way to stop it to allow the test
// to finish
#[ignore]
#[tokio::test]
async fn adds_matching_blocks_from_bitmap_and_lake() {
let contract_filter = "queryapi.dataplatform.near";
Expand Down Expand Up @@ -394,11 +392,11 @@ mod tests {
)
.returning(|_, _| {
Ok(vec![
crate::graphql::client::get_bitmaps_exact::GetBitmapsExactDataplatformNearReceiverBlocksBitmaps {
first_block_height: 107503702,
bitmap: "oA==".to_string(),
}
])
crate::graphql::client::get_bitmaps_exact::GetBitmapsExactDataplatformNearReceiverBlocksBitmaps {
first_block_height: 107503702,
bitmap: "oA==".to_string(),
}
])
});

mock_graphql_client
Expand All @@ -408,6 +406,9 @@ mod tests {
let mock_reciever_blocks_processor =
ReceiverBlocksProcessor::new(mock_graphql_client, mock_s3_client);

let last_processed_block_height = Arc::new(AtomicU64::new(0));
let last_processed_block_height_clone = last_processed_block_height.clone();

let mut mock_redis = crate::redis::RedisClient::default();
mock_redis
.expect_publish_block()
Expand All @@ -425,7 +426,10 @@ mod tests {
predicate::always(),
predicate::in_iter([107503702, 107503703, 107503704, 107503705]),
)
.returning(|_, _| Ok(()))
.returning(move |_, height| {
last_processed_block_height_clone.store(height, Ordering::Relaxed);
Ok(())
})
.times(4);
mock_redis
.expect_cache_streamer_message()
Expand All @@ -448,23 +452,33 @@ mod tests {
},
};

start_block_stream(
91940840,
&indexer_config,
std::sync::Arc::new(mock_redis),
std::sync::Arc::new(mock_reciever_blocks_processor),
mock_lake_s3_client,
&ChainId::Mainnet,
let mut block_stream = BlockStream::new(
indexer_config,
ChainId::Mainnet,
1,
"stream key".to_string(),
)
.await
.unwrap();
);

block_stream
.start(
91940840,
std::sync::Arc::new(mock_redis),
std::sync::Arc::new(mock_reciever_blocks_processor),
mock_lake_s3_client,
)
.unwrap();

loop {
if last_processed_block_height.load(Ordering::Relaxed) >= 107503705 {
break;
}

tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}

block_stream.cancel().await.unwrap();
}

// FIX: near lake framework now infinitely retires - we need a way to stop it to allow the test
// to finish
#[ignore]
#[tokio::test]
async fn skips_caching_of_lake_block_over_stream_size_limit() {
let mut mock_lake_s3_client = crate::lake_s3_client::SharedLakeS3Client::default();
Expand All @@ -479,7 +493,7 @@ mod tests {

mock_lake_s3_client
.expect_list_common_prefixes()
.returning(|_, _| Ok(vec![]));
.returning(|_, _| Ok(vec![107503704.to_string(), 107503705.to_string()]));

let mut mock_s3_client = crate::s3_client::S3Client::default();

Expand All @@ -504,6 +518,9 @@ mod tests {
let mock_reciever_blocks_processor =
ReceiverBlocksProcessor::new(mock_graphql_client, mock_s3_client);

let last_processed_block_height = Arc::new(AtomicU64::new(0));
let last_processed_block_height_clone = last_processed_block_height.clone();

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_publish_block()
Expand All @@ -521,7 +538,10 @@ mod tests {
predicate::always(),
predicate::in_iter([107503704, 107503705]),
)
.returning(|_, _| Ok(()))
.returning(move |_, height| {
last_processed_block_height_clone.store(height, Ordering::Relaxed);
Ok(())
})
.times(2);
mock_redis_client
.expect_cache_streamer_message()
Expand All @@ -544,18 +564,31 @@ mod tests {
},
};

start_block_stream(
107503704,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_reciever_blocks_processor),
mock_lake_s3_client,
&ChainId::Mainnet,
let mut block_stream = BlockStream::new(
indexer_config,
ChainId::Mainnet,
1,
"stream key".to_string(),
)
.await
.unwrap();
);

block_stream
.start(
107503704,
Arc::new(mock_redis_client),
Arc::new(mock_reciever_blocks_processor),
mock_lake_s3_client,
)
.unwrap();

loop {
if last_processed_block_height.load(Ordering::Relaxed) >= 107503705 {
break;
}

tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}

block_stream.cancel().await.unwrap();
}

#[tokio::test]
Expand Down Expand Up @@ -589,8 +622,8 @@ mod tests {

process_bitmap_indexer_blocks(
107503704,
std::sync::Arc::new(mock_reciever_blocks_processor),
std::sync::Arc::new(mock_redis_client),
Arc::new(mock_reciever_blocks_processor),
Arc::new(mock_redis_client),
&indexer_config,
"stream key".to_string(),
)
Expand Down Expand Up @@ -629,8 +662,8 @@ mod tests {

process_bitmap_indexer_blocks(
107503704,
std::sync::Arc::new(mock_reciever_blocks_processor),
std::sync::Arc::new(mock_redis_client),
Arc::new(mock_reciever_blocks_processor),
Arc::new(mock_redis_client),
&indexer_config,
"stream key".to_string(),
)
Expand Down Expand Up @@ -669,8 +702,8 @@ mod tests {

process_bitmap_indexer_blocks(
107503704,
std::sync::Arc::new(mock_reciever_blocks_processor),
std::sync::Arc::new(mock_redis_client),
Arc::new(mock_reciever_blocks_processor),
Arc::new(mock_redis_client),
&indexer_config,
"stream key".to_string(),
)
Expand Down

0 comments on commit 8c40b2e

Please sign in to comment.