Skip to content

Commit

Permalink
Naive implementation of stream
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Jun 6, 2024
1 parent 505ef86 commit 9864651
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 10 deletions.
8 changes: 4 additions & 4 deletions block-streamer/src/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@ pub struct BitmapOperator {}

impl Base64Bitmap {
pub fn from_exact_query(
query_item: get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex,
query_item: &get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex,
) -> Self {
Self {
base64: query_item.bitmap,
base64: query_item.bitmap.clone(),
start_block_height: usize::try_from(query_item.first_block_height).unwrap(),
}
}

pub fn from_wildcard_query(
query_item: get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex,
query_item: &get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex,
) -> Self {
Self {
base64: query_item.bitmap,
base64: query_item.bitmap.clone(),
start_block_height: usize::try_from(query_item.first_block_height).unwrap(),
}
}
Expand Down
39 changes: 33 additions & 6 deletions block-streamer/src/block_height_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl BlockHeightStream {
}
}

fn next_day(date: DateTime<Utc>) -> DateTime<Utc> {
fn next_day(&self, date: DateTime<Utc>) -> DateTime<Utc> {
date + Duration::days(1)
}

Expand All @@ -105,23 +105,50 @@ impl BlockHeightStream {
ContractPatternType::Exact(exact_pattern)
}

fn generate_block_height_stream(&self) -> impl Stream<Item = usize> {
fn generate_block_height_stream(
&'static self,
start_date: DateTime<Utc>,
contract_pattern_type: ContractPatternType,
) -> impl Stream<Item = usize> {
stream! {
for i in 0..3 {
yield i;
let mut current_date = start_date;
while current_date <= Utc::now() {
let current_date_string = current_date.format("%m-%d-%Y").to_string();
let bitmaps_from_query: Vec<Base64Bitmap> = match contract_pattern_type {
// TODO: Implement pagination of query
ContractPatternType::Exact(ref pattern) => {
let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), current_date_string, 1000, 0).await.unwrap();
query_result.iter().map(|result_item| Base64Bitmap::from_exact_query(result_item)).collect()
},
ContractPatternType::Wildcard(ref pattern) => {
let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), current_date_string, 1000, 0).await.unwrap();
query_result.iter().map(|result_item| Base64Bitmap::from_wildcard_query(result_item)).collect()

},
};
let starting_block_height = bitmaps_from_query.iter().map(|item| item.start_block_height).min().unwrap();
let bitmap_for_day = self.bitmap_operator.merge_bitmaps(&bitmaps_from_query, starting_block_height).unwrap();
for index in 0..bitmap_for_day.bitmap.len() {
if self.bitmap_operator.get_bit(&bitmap_for_day.bitmap, index) {
yield starting_block_height + index;
}
}
current_date = self.next_day(current_date);
}
}
}

pub async fn list_matching_block_heights(
&self,
&'static self,
start_block_height: near_indexer_primitives::types::BlockHeight,
contract_pattern: &str,
) -> anyhow::Result<BoxStream<'static, usize>> {
let start_date = self.get_nearest_block_date(start_block_height).await?;
let contract_pattern_type = self.parse_contract_pattern(contract_pattern);

Ok(self.generate_block_height_stream().boxed())
Ok(self
.generate_block_height_stream(start_date, contract_pattern_type)
.boxed())
}
}

Expand Down

0 comments on commit 9864651

Please sign in to comment.