From 06532c8f64aed0601b6477cb3b1cd8d866ba727f Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Thu, 11 Jul 2024 11:32:23 +1200 Subject: [PATCH 1/2] fix: Filter receiver blocks before starting block As we fetch blocks for a given day, there is possibility for that day to include blocks lesser than the provided `start_block_height`. This PR ensures that those blocks are filtered out. --- .../receiver_blocks_processor.rs | 55 ++++++++++++++----- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/block-streamer/src/receiver_blocks/receiver_blocks_processor.rs b/block-streamer/src/receiver_blocks/receiver_blocks_processor.rs index 5dcb28dc..7c9a25d5 100644 --- a/block-streamer/src/receiver_blocks/receiver_blocks_processor.rs +++ b/block-streamer/src/receiver_blocks/receiver_blocks_processor.rs @@ -187,7 +187,9 @@ impl ReceiverBlocksProcessor { } for block_height in bitmap_for_day.iter() { - yield block_height; + if block_height > start_block_height { + yield block_height; + } } } } @@ -199,7 +201,7 @@ mod tests { use super::*; use mockall::predicate; - use futures::StreamExt; + use futures::TryStreamExt; fn exact_query_result( first_block_height: i64, @@ -314,14 +316,40 @@ mod tests { let stream = reciever_blocks_processor.stream_matching_block_heights(0, "someone.near".to_owned()); - tokio::pin!(stream); + assert_eq!(stream.try_collect::>().await.unwrap(), vec![1]); + } - let mut result_heights = vec![]; - while let Some(Ok(height)) = stream.next().await { - result_heights.push(height); - } + #[tokio::test] + async fn filters_blocks_before_start_block() { + let mut mock_s3_client = crate::s3_client::S3Client::default(); + mock_s3_client + .expect_get_text_file() + .returning(move |_, _| { + Ok(crate::test_utils::generate_block_with_timestamp( + &Utc::now().format("%Y-%m-%d").to_string(), + )) + }); + + let mut mock_graphql_client = crate::graphql::client::GraphQLClient::default(); + mock_graphql_client + .expect_get_bitmaps_exact() + .with( + predicate::eq(vec!["someone.near".to_string()]), + predicate::function(|date: &DateTime| { + date.date_naive() == Utc::now().date_naive() + }), + ) + .times(1) + // DecompressedBitmap: 0b11110000 + .returning(move |_, _| Ok(vec![exact_query_result(1, "kA==")])); + + let reciever_blocks_processor = + ReceiverBlocksProcessor::new(mock_graphql_client, mock_s3_client); + + let stream = + reciever_blocks_processor.stream_matching_block_heights(2, "someone.near".to_owned()); - assert_eq!(result_heights, vec![1]); + assert_eq!(stream.try_collect::>().await.unwrap(), vec![3, 4]); } #[tokio::test] @@ -388,11 +416,10 @@ mod tests { let stream = reciever_blocks_processor .stream_matching_block_heights(0, "*.someone.near".to_string()); - tokio::pin!(stream); - let mut result_heights = vec![]; - while let Some(Ok(height)) = stream.next().await { - result_heights.push(height); - } - assert_eq!(result_heights, vec![1, 5, 10, 15, 100, 105]); + + assert_eq!( + stream.try_collect::>().await.unwrap(), + vec![1, 5, 10, 15, 100, 105] + ); } } From 47588f2d4f7371d197273834a77059d348c34c94 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Fri, 12 Jul 2024 08:33:43 +1200 Subject: [PATCH 2/2] fix: include start block if matched --- .../src/receiver_blocks/receiver_blocks_processor.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/block-streamer/src/receiver_blocks/receiver_blocks_processor.rs b/block-streamer/src/receiver_blocks/receiver_blocks_processor.rs index 7c9a25d5..29dfb55e 100644 --- a/block-streamer/src/receiver_blocks/receiver_blocks_processor.rs +++ b/block-streamer/src/receiver_blocks/receiver_blocks_processor.rs @@ -187,7 +187,7 @@ impl ReceiverBlocksProcessor { } for block_height in bitmap_for_day.iter() { - if block_height > start_block_height { + if block_height >= start_block_height { yield block_height; } } @@ -349,7 +349,10 @@ mod tests { let stream = reciever_blocks_processor.stream_matching_block_heights(2, "someone.near".to_owned()); - assert_eq!(stream.try_collect::>().await.unwrap(), vec![3, 4]); + assert_eq!( + stream.try_collect::>().await.unwrap(), + vec![2, 3, 4] + ); } #[tokio::test]