diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index e52f8d23..b9d0b0f9 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -346,11 +346,9 @@ mod tests { use mockall::predicate; use near_lake_framework::s3_client::GetObjectBytesError; + use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; - // FIX: near lake framework now infinitely retires - we need a way to stop it to allow the test - // to finish - #[ignore] #[tokio::test] async fn adds_matching_blocks_from_bitmap_and_lake() { let contract_filter = "queryapi.dataplatform.near"; @@ -394,11 +392,11 @@ mod tests { ) .returning(|_, _| { Ok(vec![ - crate::graphql::client::get_bitmaps_exact::GetBitmapsExactDataplatformNearReceiverBlocksBitmaps { - first_block_height: 107503702, - bitmap: "oA==".to_string(), - } -]) + crate::graphql::client::get_bitmaps_exact::GetBitmapsExactDataplatformNearReceiverBlocksBitmaps { + first_block_height: 107503702, + bitmap: "oA==".to_string(), + } + ]) }); mock_graphql_client @@ -408,6 +406,9 @@ mod tests { let mock_reciever_blocks_processor = ReceiverBlocksProcessor::new(mock_graphql_client, mock_s3_client); + let last_processed_block_height = Arc::new(AtomicU64::new(0)); + let last_processed_block_height_clone = last_processed_block_height.clone(); + let mut mock_redis = crate::redis::RedisClient::default(); mock_redis .expect_publish_block() @@ -425,7 +426,10 @@ mod tests { predicate::always(), predicate::in_iter([107503702, 107503703, 107503704, 107503705]), ) - .returning(|_, _| Ok(())) + .returning(move |_, height| { + last_processed_block_height_clone.store(height, Ordering::Relaxed); + Ok(()) + }) .times(4); mock_redis .expect_cache_streamer_message() @@ -448,23 +452,33 @@ mod tests { }, }; - start_block_stream( - 91940840, - &indexer_config, - std::sync::Arc::new(mock_redis), - std::sync::Arc::new(mock_reciever_blocks_processor), - mock_lake_s3_client, - &ChainId::Mainnet, + let mut block_stream = BlockStream::new( + indexer_config, + ChainId::Mainnet, 1, "stream key".to_string(), - ) - .await - .unwrap(); + ); + + block_stream + .start( + 91940840, + std::sync::Arc::new(mock_redis), + std::sync::Arc::new(mock_reciever_blocks_processor), + mock_lake_s3_client, + ) + .unwrap(); + + loop { + if last_processed_block_height.load(Ordering::Relaxed) == 107503705 { + break; + } + + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + + block_stream.cancel().await.unwrap(); } - // FIX: near lake framework now infinitely retires - we need a way to stop it to allow the test - // to finish - #[ignore] #[tokio::test] async fn skips_caching_of_lake_block_over_stream_size_limit() { let mut mock_lake_s3_client = crate::lake_s3_client::SharedLakeS3Client::default(); @@ -479,7 +493,7 @@ mod tests { mock_lake_s3_client .expect_list_common_prefixes() - .returning(|_, _| Ok(vec![])); + .returning(|_, _| Ok(vec![107503704.to_string(), 107503705.to_string()])); let mut mock_s3_client = crate::s3_client::S3Client::default(); @@ -504,6 +518,9 @@ mod tests { let mock_reciever_blocks_processor = ReceiverBlocksProcessor::new(mock_graphql_client, mock_s3_client); + let last_processed_block_height = Arc::new(AtomicU64::new(0)); + let last_processed_block_height_clone = last_processed_block_height.clone(); + let mut mock_redis_client = crate::redis::RedisClient::default(); mock_redis_client .expect_publish_block() @@ -521,7 +538,10 @@ mod tests { predicate::always(), predicate::in_iter([107503704, 107503705]), ) - .returning(|_, _| Ok(())) + .returning(move |_, height| { + last_processed_block_height_clone.store(height, Ordering::Relaxed); + Ok(()) + }) .times(2); mock_redis_client .expect_cache_streamer_message() @@ -544,18 +564,31 @@ mod tests { }, }; - start_block_stream( - 107503704, - &indexer_config, - std::sync::Arc::new(mock_redis_client), - std::sync::Arc::new(mock_reciever_blocks_processor), - mock_lake_s3_client, - &ChainId::Mainnet, + let mut block_stream = BlockStream::new( + indexer_config, + ChainId::Mainnet, 1, "stream key".to_string(), - ) - .await - .unwrap(); + ); + + block_stream + .start( + 107503704, + Arc::new(mock_redis_client), + Arc::new(mock_reciever_blocks_processor), + mock_lake_s3_client, + ) + .unwrap(); + + loop { + if last_processed_block_height.load(Ordering::Relaxed) == 107503705 { + break; + } + + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + + block_stream.cancel().await.unwrap(); } #[tokio::test] @@ -589,8 +622,8 @@ mod tests { process_bitmap_indexer_blocks( 107503704, - std::sync::Arc::new(mock_reciever_blocks_processor), - std::sync::Arc::new(mock_redis_client), + Arc::new(mock_reciever_blocks_processor), + Arc::new(mock_redis_client), &indexer_config, "stream key".to_string(), ) @@ -629,8 +662,8 @@ mod tests { process_bitmap_indexer_blocks( 107503704, - std::sync::Arc::new(mock_reciever_blocks_processor), - std::sync::Arc::new(mock_redis_client), + Arc::new(mock_reciever_blocks_processor), + Arc::new(mock_redis_client), &indexer_config, "stream key".to_string(), ) @@ -669,8 +702,8 @@ mod tests { process_bitmap_indexer_blocks( 107503704, - std::sync::Arc::new(mock_reciever_blocks_processor), - std::sync::Arc::new(mock_redis_client), + Arc::new(mock_reciever_blocks_processor), + Arc::new(mock_redis_client), &indexer_config, "stream key".to_string(), )