Skip to content

Commit

Permalink
fix: Filter receiver blocks before starting block (#863)
Browse files Browse the repository at this point in the history
As we fetch blocks for a given day, there is possibility for that day to
include blocks lesser than the provided `start_block_height`. This PR
ensures that those blocks are filtered out.
  • Loading branch information
morgsmccauley committed Jul 11, 2024
1 parent 1318cb6 commit e697f37
Showing 1 changed file with 44 additions and 14 deletions.
58 changes: 44 additions & 14 deletions block-streamer/src/receiver_blocks/receiver_blocks_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ impl ReceiverBlocksProcessor {
}

for block_height in bitmap_for_day.iter() {
yield block_height;
if block_height >= start_block_height {
yield block_height;
}
}
}
}
Expand All @@ -199,7 +201,7 @@ mod tests {
use super::*;
use mockall::predicate;

use futures::StreamExt;
use futures::TryStreamExt;

fn exact_query_result(
first_block_height: i64,
Expand Down Expand Up @@ -314,14 +316,43 @@ mod tests {
let stream =
reciever_blocks_processor.stream_matching_block_heights(0, "someone.near".to_owned());

tokio::pin!(stream);
assert_eq!(stream.try_collect::<Vec<u64>>().await.unwrap(), vec![1]);
}

let mut result_heights = vec![];
while let Some(Ok(height)) = stream.next().await {
result_heights.push(height);
}
#[tokio::test]
async fn filters_blocks_before_start_block() {
let mut mock_s3_client = crate::s3_client::S3Client::default();
mock_s3_client
.expect_get_text_file()
.returning(move |_, _| {
Ok(crate::test_utils::generate_block_with_timestamp(
&Utc::now().format("%Y-%m-%d").to_string(),
))
});

let mut mock_graphql_client = crate::graphql::client::GraphQLClient::default();
mock_graphql_client
.expect_get_bitmaps_exact()
.with(
predicate::eq(vec!["someone.near".to_string()]),
predicate::function(|date: &DateTime<Utc>| {
date.date_naive() == Utc::now().date_naive()
}),
)
.times(1)
// DecompressedBitmap: 0b11110000
.returning(move |_, _| Ok(vec![exact_query_result(1, "kA==")]));

let reciever_blocks_processor =
ReceiverBlocksProcessor::new(mock_graphql_client, mock_s3_client);

assert_eq!(result_heights, vec![1]);
let stream =
reciever_blocks_processor.stream_matching_block_heights(2, "someone.near".to_owned());

assert_eq!(
stream.try_collect::<Vec<u64>>().await.unwrap(),
vec![2, 3, 4]
);
}

#[tokio::test]
Expand Down Expand Up @@ -388,11 +419,10 @@ mod tests {

let stream = reciever_blocks_processor
.stream_matching_block_heights(0, "*.someone.near".to_string());
tokio::pin!(stream);
let mut result_heights = vec![];
while let Some(Ok(height)) = stream.next().await {
result_heights.push(height);
}
assert_eq!(result_heights, vec![1, 5, 10, 15, 100, 105]);

assert_eq!(
stream.try_collect::<Vec<u64>>().await.unwrap(),
vec![1, 5, 10, 15, 100, 105]
);
}
}

0 comments on commit e697f37

Please sign in to comment.