From 98646516f50806016346dd0fc702123b8ce897a9 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 5 Jun 2024 20:04:26 -0700 Subject: [PATCH] Naive implementation of stream --- block-streamer/src/bitmap.rs | 8 ++--- block-streamer/src/block_height_stream.rs | 39 +++++++++++++++++++---- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/block-streamer/src/bitmap.rs b/block-streamer/src/bitmap.rs index bac3dd735..6b90f5f18 100644 --- a/block-streamer/src/bitmap.rs +++ b/block-streamer/src/bitmap.rs @@ -26,19 +26,19 @@ pub struct BitmapOperator {} impl Base64Bitmap { pub fn from_exact_query( - query_item: get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex, + query_item: &get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex, ) -> Self { Self { - base64: query_item.bitmap, + base64: query_item.bitmap.clone(), start_block_height: usize::try_from(query_item.first_block_height).unwrap(), } } pub fn from_wildcard_query( - query_item: get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex, + query_item: &get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex, ) -> Self { Self { - base64: query_item.bitmap, + base64: query_item.bitmap.clone(), start_block_height: usize::try_from(query_item.first_block_height).unwrap(), } } diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs index 9d53ff72b..94988f1be 100644 --- a/block-streamer/src/block_height_stream.rs +++ b/block-streamer/src/block_height_stream.rs @@ -81,7 +81,7 @@ impl BlockHeightStream { } } - fn next_day(date: DateTime) -> DateTime { + fn next_day(&self, date: DateTime) -> DateTime { date + Duration::days(1) } @@ -105,23 +105,50 @@ impl BlockHeightStream { ContractPatternType::Exact(exact_pattern) } - fn generate_block_height_stream(&self) -> impl Stream { + fn generate_block_height_stream( + &'static self, + start_date: DateTime, + contract_pattern_type: ContractPatternType, + ) -> impl Stream { stream! { - for i in 0..3 { - yield i; + let mut current_date = start_date; + while current_date <= Utc::now() { + let current_date_string = current_date.format("%m-%d-%Y").to_string(); + let bitmaps_from_query: Vec = match contract_pattern_type { + // TODO: Implement pagination of query + ContractPatternType::Exact(ref pattern) => { + let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), current_date_string, 1000, 0).await.unwrap(); + query_result.iter().map(|result_item| Base64Bitmap::from_exact_query(result_item)).collect() + }, + ContractPatternType::Wildcard(ref pattern) => { + let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), current_date_string, 1000, 0).await.unwrap(); + query_result.iter().map(|result_item| Base64Bitmap::from_wildcard_query(result_item)).collect() + + }, + }; + let starting_block_height = bitmaps_from_query.iter().map(|item| item.start_block_height).min().unwrap(); + let bitmap_for_day = self.bitmap_operator.merge_bitmaps(&bitmaps_from_query, starting_block_height).unwrap(); + for index in 0..bitmap_for_day.bitmap.len() { + if self.bitmap_operator.get_bit(&bitmap_for_day.bitmap, index) { + yield starting_block_height + index; + } + } + current_date = self.next_day(current_date); } } } pub async fn list_matching_block_heights( - &self, + &'static self, start_block_height: near_indexer_primitives::types::BlockHeight, contract_pattern: &str, ) -> anyhow::Result> { let start_date = self.get_nearest_block_date(start_block_height).await?; let contract_pattern_type = self.parse_contract_pattern(contract_pattern); - Ok(self.generate_block_height_stream().boxed()) + Ok(self + .generate_block_height_stream(start_date, contract_pattern_type) + .boxed()) } }