Skip to content

Commit

Permalink
Resolve 2nd round comments
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Jun 14, 2024
1 parent 2d33af4 commit c92fd81
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 27 deletions.
23 changes: 9 additions & 14 deletions block-streamer/src/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct CompressedBitmap {

impl TryFrom<&Base64Bitmap> for CompressedBitmap {
type Error = anyhow::Error;
fn try_from(value: &Base64Bitmap) -> Result<Self, Self::Error> {
fn try_from(value: &Base64Bitmap) -> anyhow::Result<Self, Self::Error> {
Ok(Self {
bitmap: general_purpose::STANDARD.decode(value.base64.clone())?,
start_block_height: value.start_block_height,
Expand Down Expand Up @@ -203,18 +203,15 @@ impl DecompressedBitmap {
}
}

pub fn merge(&mut self, to_merge: &mut DecompressedBitmap) -> anyhow::Result<&mut Self> {
pub fn merge(&mut self, mut to_merge: DecompressedBitmap) -> anyhow::Result<&mut Self> {
if to_merge.start_block_height < self.start_block_height {
std::mem::swap(&mut self.bitmap, &mut to_merge.bitmap);
std::mem::swap(
&mut self.start_block_height,
&mut to_merge.start_block_height,
);
}
let block_height_difference = to_merge
.start_block_height
.checked_sub(self.start_block_height)
.ok_or_else(|| anyhow!("Caller of merge should have smaller start block height",))?;
let block_height_difference = to_merge.start_block_height - self.start_block_height;
let start_bit_index: usize = usize::try_from(block_height_difference)?;

for bit_index_offset in 0..(to_merge.bitmap.len() * 8) {
Expand Down Expand Up @@ -248,9 +245,7 @@ impl Iterator for DecompressedBitmapIter<'_> {
while self.bit_index < self.data.bitmap.len() * 8 {
if self.data.get_bit(self.bit_index) {
self.bit_index += 1;
return Some(
self.data.start_block_height + u64::try_from(self.bit_index - 1).ok()?,
);
return Some(self.data.start_block_height + (self.bit_index as u64) - 1);
}
self.bit_index += 1;
}
Expand Down Expand Up @@ -447,7 +442,7 @@ mod tests {
start_block_height: 14,
};

assert!(base_bitmap.merge(&mut to_merge).is_ok());
assert!(base_bitmap.merge(to_merge).is_ok());
assert_eq!(base_bitmap.bitmap, vec![0b11001110, 0b10011111]);
}

Expand All @@ -462,7 +457,7 @@ mod tests {
start_block_height: 14,
};

assert!(base_bitmap.merge(&mut to_merge).is_ok());
assert!(base_bitmap.merge(to_merge).is_ok());
assert_eq!(base_bitmap.bitmap, vec![0b11001110, 0b10011111]);
}

Expand All @@ -483,11 +478,11 @@ mod tests {
};

base_bitmap
.merge(&mut bitmap_a)
.merge(bitmap_a)
.unwrap()
.merge(&mut bitmap_b)
.merge(bitmap_b)
.unwrap()
.merge(&mut bitmap_c)
.merge(bitmap_c)
.unwrap();
assert_eq!(base_bitmap.bitmap, vec![0b11001100, 0b11000000]);
assert_eq!(base_bitmap.start_block_height, 10);
Expand Down
45 changes: 32 additions & 13 deletions block-streamer/src/bitmap_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,35 @@ impl BitmapProcessor {
date + Duration::days(1)
}

async fn query_base_64_bitmaps(
&self,
contract_pattern_type: &ContractPatternType,
current_date: &DateTime<Utc>,
) -> anyhow::Result<Vec<Base64Bitmap>> {
match contract_pattern_type {
ContractPatternType::Exact(ref pattern) => {
let query_result: Vec<_> = self
.graphql_client
.get_bitmaps_exact(pattern.clone(), &current_date)
.await?;
Ok(query_result
.iter()
.map(Base64Bitmap::try_from)
.collect::<anyhow::Result<Vec<_>>>()?)
}
ContractPatternType::Wildcard(ref pattern) => {
let query_result: Vec<_> = self
.graphql_client
.get_bitmaps_wildcard(pattern.clone(), &current_date)
.await?;
Ok(query_result
.iter()
.map(Base64Bitmap::try_from)
.collect::<anyhow::Result<Vec<_>>>()?)
}
}
}

fn stream_matching_block_heights<'b, 'a: 'b>(
&'a self,
start_block_height: near_indexer_primitives::types::BlockHeight,
Expand All @@ -138,24 +167,14 @@ impl BitmapProcessor {
let contract_pattern_type = ContractPatternType::from(contract_pattern.as_str());
let mut current_date = start_date;
while current_date <= Utc::now() {
let base_64_bitmaps: Vec<Base64Bitmap> = match contract_pattern_type {
ContractPatternType::Exact(ref pattern) => {
let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), &current_date).await?;
query_result.iter().map(Base64Bitmap::try_from).collect()?
},
ContractPatternType::Wildcard(ref pattern) => {
let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), &current_date).await?;
query_result.iter().map(Base64Bitmap::try_from).collect()?
},
};

let base_64_bitmaps: Vec<Base64Bitmap> = self.query_base_64_bitmaps(&contract_pattern_type, &current_date).await?;
let compressed_bitmaps: Vec<CompressedBitmap> = base_64_bitmaps.iter().map(CompressedBitmap::try_from).collect()?;
let decompressed_bitmaps: Vec<DecompressedBitmap> = compressed_bitmaps.iter().map(CompressedBitmap::decompress).collect()?;

let starting_block_height: u64 = decompressed_bitmaps.iter().map(|item| item.start_block_height).min().unwrap_or(decompressed_bitmaps[0].start_block_height);
let mut bitmap_for_day = DecompressedBitmap::new(starting_block_height, None);
for mut bitmap in decompressed_bitmaps {
let _ = bitmap_for_day.merge(&mut bitmap);
for bitmap in decompressed_bitmaps {
bitmap_for_day.merge(bitmap)?;
}

let mut bitmap_iter = bitmap_for_day.iter();
Expand Down

0 comments on commit c92fd81

Please sign in to comment.