From e697f374bfc5c6f816a428001bfb50f3f34da920 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Fri, 12 Jul 2024 08:37:35 +1200 Subject: [PATCH] fix: Filter receiver blocks before starting block (#863) As we fetch blocks for a given day, there is possibility for that day to include blocks lesser than the provided `start_block_height`. This PR ensures that those blocks are filtered out. --- .../receiver_blocks_processor.rs | 58 ++++++++++++++----- 1 file changed, 44 insertions(+), 14 deletions(-) diff --git a/block-streamer/src/receiver_blocks/receiver_blocks_processor.rs b/block-streamer/src/receiver_blocks/receiver_blocks_processor.rs index 5dcb28dc..29dfb55e 100644 --- a/block-streamer/src/receiver_blocks/receiver_blocks_processor.rs +++ b/block-streamer/src/receiver_blocks/receiver_blocks_processor.rs @@ -187,7 +187,9 @@ impl ReceiverBlocksProcessor { } for block_height in bitmap_for_day.iter() { - yield block_height; + if block_height >= start_block_height { + yield block_height; + } } } } @@ -199,7 +201,7 @@ mod tests { use super::*; use mockall::predicate; - use futures::StreamExt; + use futures::TryStreamExt; fn exact_query_result( first_block_height: i64, @@ -314,14 +316,43 @@ mod tests { let stream = reciever_blocks_processor.stream_matching_block_heights(0, "someone.near".to_owned()); - tokio::pin!(stream); + assert_eq!(stream.try_collect::>().await.unwrap(), vec![1]); + } - let mut result_heights = vec![]; - while let Some(Ok(height)) = stream.next().await { - result_heights.push(height); - } + #[tokio::test] + async fn filters_blocks_before_start_block() { + let mut mock_s3_client = crate::s3_client::S3Client::default(); + mock_s3_client + .expect_get_text_file() + .returning(move |_, _| { + Ok(crate::test_utils::generate_block_with_timestamp( + &Utc::now().format("%Y-%m-%d").to_string(), + )) + }); + + let mut mock_graphql_client = crate::graphql::client::GraphQLClient::default(); + mock_graphql_client + .expect_get_bitmaps_exact() + .with( + predicate::eq(vec!["someone.near".to_string()]), + predicate::function(|date: &DateTime| { + date.date_naive() == Utc::now().date_naive() + }), + ) + .times(1) + // DecompressedBitmap: 0b11110000 + .returning(move |_, _| Ok(vec![exact_query_result(1, "kA==")])); + + let reciever_blocks_processor = + ReceiverBlocksProcessor::new(mock_graphql_client, mock_s3_client); - assert_eq!(result_heights, vec![1]); + let stream = + reciever_blocks_processor.stream_matching_block_heights(2, "someone.near".to_owned()); + + assert_eq!( + stream.try_collect::>().await.unwrap(), + vec![2, 3, 4] + ); } #[tokio::test] @@ -388,11 +419,10 @@ mod tests { let stream = reciever_blocks_processor .stream_matching_block_heights(0, "*.someone.near".to_string()); - tokio::pin!(stream); - let mut result_heights = vec![]; - while let Some(Ok(height)) = stream.next().await { - result_heights.push(height); - } - assert_eq!(result_heights, vec![1, 5, 10, 15, 100, 105]); + + assert_eq!( + stream.try_collect::>().await.unwrap(), + vec![1, 5, 10, 15, 100, 105] + ); } }