Skip to content

Commit

Permalink
Use u64 for block height and try stream
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Jun 12, 2024
1 parent 71d6c59 commit 8e0ae5a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 44 deletions.
77 changes: 42 additions & 35 deletions block-streamer/src/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,45 @@ use crate::graphql::client::{get_bitmaps_exact, get_bitmaps_wildcard};

#[derive(Debug, Default, PartialEq)]
pub struct Base64Bitmap {
pub start_block_height: usize,
pub start_block_height: u64,
pub base64: String,
}

impl From<&get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex> for Base64Bitmap {
fn from(
impl TryFrom<&get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex> for Base64Bitmap {
type Error = anyhow::Error;
fn try_from(
query_item: &get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex,
) -> Self {
Self {
) -> anyhow::Result<Self> {
Ok(Self {
base64: query_item.bitmap.clone(),
start_block_height: usize::try_from(query_item.first_block_height).unwrap(),
}
start_block_height: u64::try_from(query_item.first_block_height)?,
})
}
}

impl From<&get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex>
impl TryFrom<&get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex>
for Base64Bitmap
{
fn from(
type Error = anyhow::Error;
fn try_from(
query_item: &get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex,
) -> Self {
Self {
) -> anyhow::Result<Self> {
Ok(Self {
base64: query_item.bitmap.clone(),
start_block_height: usize::try_from(query_item.first_block_height).unwrap(),
}
start_block_height: u64::try_from(query_item.first_block_height)?,
})
}
}

#[derive(Debug, Default, PartialEq)]
pub struct Bitmap {
pub start_block_height: usize,
pub start_block_height: u64,
pub bitmap: Vec<u8>,
}

#[derive(Default)]
struct EliasGammaDecoded {
pub value: usize,
pub value: u64,
pub last_bit_index: usize,
}

Expand Down Expand Up @@ -73,12 +75,12 @@ impl BitmapOperator {
bytes: &[u8],
start_bit_index: usize,
end_bit_index: usize,
) -> u32 {
let mut number: u32 = 0;
) -> u64 {
let mut number: u64 = 0;
// Read bits from right to left
for curr_bit_index in (start_bit_index..=end_bit_index).rev() {
if self.get_bit(bytes, curr_bit_index) {
number |= 1u32 << (end_bit_index - curr_bit_index);
number |= 1u64 << (end_bit_index - curr_bit_index);
}
}

Expand All @@ -101,29 +103,31 @@ impl BitmapOperator {
None
}

fn decode_elias_gamma_entry(&self, bytes: &[u8], start_bit_index: usize) -> EliasGammaDecoded {
fn decode_elias_gamma_entry(
&self,
bytes: &[u8],
start_bit_index: usize,
) -> anyhow::Result<EliasGammaDecoded> {
if bytes.is_empty() {
return EliasGammaDecoded::default();
return Ok(EliasGammaDecoded::default());
}
let first_bit_index = match self.index_of_first_set_bit(bytes, start_bit_index) {
Some(index) => index,
None => {
return EliasGammaDecoded::default();
return Ok(EliasGammaDecoded::default());
}
};
let zero_count: usize = first_bit_index - start_bit_index;
let remainder: usize = if zero_count == 0 {
let remainder: u64 = if zero_count == 0 {
0
} else {
self.read_integer_from_binary(bytes, first_bit_index + 1, first_bit_index + zero_count)
.try_into()
.unwrap()
};

EliasGammaDecoded {
value: 2_usize.pow(zero_count.try_into().unwrap()) + remainder,
Ok(EliasGammaDecoded {
value: 2_u64.pow(zero_count.try_into().unwrap()) + remainder,
last_bit_index: first_bit_index + zero_count,
}
})
}

fn decompress_bitmap(&self, compressed_bitmap: &[u8]) -> Vec<u8> {
Expand All @@ -135,15 +139,18 @@ impl BitmapOperator {
let mut decompressed_bit_index = 0;

while compressed_bit_index < compressed_bit_length {
let decoded_elias_gamma =
self.decode_elias_gamma_entry(compressed_bitmap, compressed_bit_index);
let decoded_elias_gamma = self
.decode_elias_gamma_entry(compressed_bitmap, compressed_bit_index)
.unwrap();
if decoded_elias_gamma.value == 0 {
break;
}

compressed_bit_index = decoded_elias_gamma.last_bit_index + 1;
let mut bit_index_offset: usize = 0;
while current_bit_value && (bit_index_offset < decoded_elias_gamma.value) {
while current_bit_value
&& (bit_index_offset < usize::try_from(decoded_elias_gamma.value).unwrap())
{
while decompressed_bit_index + bit_index_offset >= (decompressed_bytes.len() * 8) {
decompressed_bytes.push(0b00000000);
}
Expand All @@ -156,7 +163,7 @@ impl BitmapOperator {
bit_index_offset += 1;
}

decompressed_bit_index += decoded_elias_gamma.value;
decompressed_bit_index += usize::try_from(decoded_elias_gamma.value).unwrap();
current_bit_value = !current_bit_value;
}

Expand All @@ -172,7 +179,7 @@ impl BitmapOperator {
.start_block_height
.checked_sub(bitmap_to_update.start_block_height)
{
Some(result) => result,
Some(result) => usize::try_from(result)?,
None => {
return Err(anyhow!(
"Start block height in bitmap was lower than provided lowest block height",
Expand Down Expand Up @@ -200,7 +207,7 @@ impl BitmapOperator {
pub fn merge_bitmaps(
&self,
bitmaps_to_merge: &Vec<Base64Bitmap>,
smallest_start_block_height: usize,
smallest_start_block_height: u64,
) -> anyhow::Result<Bitmap> {
let mut merged_bitmap: Bitmap = Bitmap {
bitmap: Vec::new(),
Expand Down Expand Up @@ -294,7 +301,7 @@ mod tests {
fn decode_elias_gamma() {
let operator = BitmapOperator::new();
let bytes: &[u8; 2] = &[0b00000000, 0b00110110];
let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 6);
let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 6).unwrap();
assert_eq!(decoded_eg.value, 27);
assert_eq!(decoded_eg.last_bit_index, 14);
}
Expand All @@ -303,7 +310,7 @@ mod tests {
fn decode_empty_elias_gamma() {
let operator = BitmapOperator::new();
let bytes: &[u8; 2] = &[0b00000000, 0b00000000];
let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 0);
let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 0).unwrap();
assert_eq!(decoded_eg.value, 0);
assert_eq!(decoded_eg.last_bit_index, 0);
}
Expand Down
18 changes: 9 additions & 9 deletions block-streamer/src/block_height_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::bitmap::{Base64Bitmap, BitmapOperator};
use crate::graphql::client::GraphQLClient;
use crate::rules::types::ChainId;
use anyhow::Context;
use async_stream::stream;
use async_stream::try_stream;
use chrono::{DateTime, Duration, TimeZone, Utc};
use futures::stream::{BoxStream, Stream};
use futures::StreamExt;
Expand Down Expand Up @@ -137,27 +137,27 @@ impl BlockHeightStreamImpl {
&'a self,
start_date: DateTime<Utc>,
contract_pattern_type: ContractPatternType,
) -> BoxStream<'a, usize> {
Box::pin(stream! {
) -> BoxStream<'a, anyhow::Result<u64>> {
Box::pin(try_stream! {
let mut current_date = start_date;
while current_date <= Utc::now() {
let current_date_string = current_date.format("%Y-%m-%d").to_string();
let bitmaps_from_query: Vec<Base64Bitmap> = match contract_pattern_type {
ContractPatternType::Exact(ref pattern) => {
let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), current_date_string.clone()).await.unwrap();
query_result.iter().map(|result_item| Base64Bitmap::from(result_item)).collect()
query_result.iter().map(|result_item| Base64Bitmap::try_from(result_item).unwrap()).collect()
},
ContractPatternType::Wildcard(ref pattern) => {
let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), current_date_string.clone()).await.unwrap();
query_result.iter().map(|result_item| Base64Bitmap::from(result_item)).collect()
query_result.iter().map(|result_item| Base64Bitmap::try_from(result_item).unwrap()).collect()
},
};
if !bitmaps_from_query.is_empty() {
let starting_block_height = bitmaps_from_query.iter().map(|item| item.start_block_height).min().unwrap();
let bitmap_for_day = self.bitmap_operator.merge_bitmaps(&bitmaps_from_query, starting_block_height).unwrap();
for index in 0..(bitmap_for_day.bitmap.len() * 8) {
if self.bitmap_operator.get_bit(&bitmap_for_day.bitmap, index) {
yield starting_block_height + index;
yield starting_block_height + u64::try_from(index)?;
}
}
}
Expand All @@ -170,7 +170,7 @@ impl BlockHeightStreamImpl {
&'a self,
start_block_height: near_indexer_primitives::types::BlockHeight,
contract_pattern: &str,
) -> anyhow::Result<BoxStream<'a, usize>> {
) -> anyhow::Result<BoxStream<'a, anyhow::Result<u64>>> {
let start_date = self.get_nearest_block_date(start_block_height).await?;
let contract_pattern_type = self.parse_contract_pattern(contract_pattern);

Expand Down Expand Up @@ -365,7 +365,7 @@ mod tests {
.await
.unwrap();
let mut result_heights = vec![];
while let Some(height) = stream.next().await {
while let Some(Ok(height)) = stream.next().await {
result_heights.push(height);
}
assert_eq!(result_heights, vec![1]);
Expand Down Expand Up @@ -443,7 +443,7 @@ mod tests {
.await
.unwrap();
let mut result_heights = vec![];
while let Some(height) = stream.next().await {
while let Some(Ok(height)) = stream.next().await {
result_heights.push(height);
}
assert_eq!(result_heights, vec![1, 5, 10, 15, 100, 105]);
Expand Down

0 comments on commit 8e0ae5a

Please sign in to comment.