diff --git a/block-streamer/src/bitmap.rs b/block-streamer/src/bitmap.rs index 441de5e06..6f741aad6 100644 --- a/block-streamer/src/bitmap.rs +++ b/block-streamer/src/bitmap.rs @@ -6,43 +6,45 @@ use crate::graphql::client::{get_bitmaps_exact, get_bitmaps_wildcard}; #[derive(Debug, Default, PartialEq)] pub struct Base64Bitmap { - pub start_block_height: usize, + pub start_block_height: u64, pub base64: String, } -impl From<&get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex> for Base64Bitmap { - fn from( +impl TryFrom<&get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex> for Base64Bitmap { + type Error = anyhow::Error; + fn try_from( query_item: &get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex, - ) -> Self { - Self { + ) -> anyhow::Result { + Ok(Self { base64: query_item.bitmap.clone(), - start_block_height: usize::try_from(query_item.first_block_height).unwrap(), - } + start_block_height: u64::try_from(query_item.first_block_height)?, + }) } } -impl From<&get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex> +impl TryFrom<&get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex> for Base64Bitmap { - fn from( + type Error = anyhow::Error; + fn try_from( query_item: &get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex, - ) -> Self { - Self { + ) -> anyhow::Result { + Ok(Self { base64: query_item.bitmap.clone(), - start_block_height: usize::try_from(query_item.first_block_height).unwrap(), - } + start_block_height: u64::try_from(query_item.first_block_height)?, + }) } } #[derive(Debug, Default, PartialEq)] pub struct Bitmap { - pub start_block_height: usize, + pub start_block_height: u64, pub bitmap: Vec, } #[derive(Default)] struct EliasGammaDecoded { - pub value: usize, + pub value: u64, pub last_bit_index: usize, } @@ -73,12 +75,12 @@ impl BitmapOperator { bytes: &[u8], start_bit_index: usize, end_bit_index: usize, - ) -> u32 { - let mut number: u32 = 0; + ) -> u64 { + let mut number: u64 = 0; // Read bits from right to left for curr_bit_index in (start_bit_index..=end_bit_index).rev() { if self.get_bit(bytes, curr_bit_index) { - number |= 1u32 << (end_bit_index - curr_bit_index); + number |= 1u64 << (end_bit_index - curr_bit_index); } } @@ -101,29 +103,31 @@ impl BitmapOperator { None } - fn decode_elias_gamma_entry(&self, bytes: &[u8], start_bit_index: usize) -> EliasGammaDecoded { + fn decode_elias_gamma_entry( + &self, + bytes: &[u8], + start_bit_index: usize, + ) -> anyhow::Result { if bytes.is_empty() { - return EliasGammaDecoded::default(); + return Ok(EliasGammaDecoded::default()); } let first_bit_index = match self.index_of_first_set_bit(bytes, start_bit_index) { Some(index) => index, None => { - return EliasGammaDecoded::default(); + return Ok(EliasGammaDecoded::default()); } }; let zero_count: usize = first_bit_index - start_bit_index; - let remainder: usize = if zero_count == 0 { + let remainder: u64 = if zero_count == 0 { 0 } else { self.read_integer_from_binary(bytes, first_bit_index + 1, first_bit_index + zero_count) - .try_into() - .unwrap() }; - EliasGammaDecoded { - value: 2_usize.pow(zero_count.try_into().unwrap()) + remainder, + Ok(EliasGammaDecoded { + value: 2_u64.pow(zero_count.try_into().unwrap()) + remainder, last_bit_index: first_bit_index + zero_count, - } + }) } fn decompress_bitmap(&self, compressed_bitmap: &[u8]) -> Vec { @@ -135,15 +139,18 @@ impl BitmapOperator { let mut decompressed_bit_index = 0; while compressed_bit_index < compressed_bit_length { - let decoded_elias_gamma = - self.decode_elias_gamma_entry(compressed_bitmap, compressed_bit_index); + let decoded_elias_gamma = self + .decode_elias_gamma_entry(compressed_bitmap, compressed_bit_index) + .unwrap(); if decoded_elias_gamma.value == 0 { break; } compressed_bit_index = decoded_elias_gamma.last_bit_index + 1; let mut bit_index_offset: usize = 0; - while current_bit_value && (bit_index_offset < decoded_elias_gamma.value) { + while current_bit_value + && (bit_index_offset < usize::try_from(decoded_elias_gamma.value).unwrap()) + { while decompressed_bit_index + bit_index_offset >= (decompressed_bytes.len() * 8) { decompressed_bytes.push(0b00000000); } @@ -156,7 +163,7 @@ impl BitmapOperator { bit_index_offset += 1; } - decompressed_bit_index += decoded_elias_gamma.value; + decompressed_bit_index += usize::try_from(decoded_elias_gamma.value).unwrap(); current_bit_value = !current_bit_value; } @@ -172,7 +179,7 @@ impl BitmapOperator { .start_block_height .checked_sub(bitmap_to_update.start_block_height) { - Some(result) => result, + Some(result) => usize::try_from(result)?, None => { return Err(anyhow!( "Start block height in bitmap was lower than provided lowest block height", @@ -200,7 +207,7 @@ impl BitmapOperator { pub fn merge_bitmaps( &self, bitmaps_to_merge: &Vec, - smallest_start_block_height: usize, + smallest_start_block_height: u64, ) -> anyhow::Result { let mut merged_bitmap: Bitmap = Bitmap { bitmap: Vec::new(), @@ -294,7 +301,7 @@ mod tests { fn decode_elias_gamma() { let operator = BitmapOperator::new(); let bytes: &[u8; 2] = &[0b00000000, 0b00110110]; - let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 6); + let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 6).unwrap(); assert_eq!(decoded_eg.value, 27); assert_eq!(decoded_eg.last_bit_index, 14); } @@ -303,7 +310,7 @@ mod tests { fn decode_empty_elias_gamma() { let operator = BitmapOperator::new(); let bytes: &[u8; 2] = &[0b00000000, 0b00000000]; - let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 0); + let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 0).unwrap(); assert_eq!(decoded_eg.value, 0); assert_eq!(decoded_eg.last_bit_index, 0); } diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs index fcc66bbf9..4e8562ebe 100644 --- a/block-streamer/src/block_height_stream.rs +++ b/block-streamer/src/block_height_stream.rs @@ -2,7 +2,7 @@ use crate::bitmap::{Base64Bitmap, BitmapOperator}; use crate::graphql::client::GraphQLClient; use crate::rules::types::ChainId; use anyhow::Context; -use async_stream::stream; +use async_stream::try_stream; use chrono::{DateTime, Duration, TimeZone, Utc}; use futures::stream::{BoxStream, Stream}; use futures::StreamExt; @@ -137,19 +137,19 @@ impl BlockHeightStreamImpl { &'a self, start_date: DateTime, contract_pattern_type: ContractPatternType, - ) -> BoxStream<'a, usize> { - Box::pin(stream! { + ) -> BoxStream<'a, anyhow::Result> { + Box::pin(try_stream! { let mut current_date = start_date; while current_date <= Utc::now() { let current_date_string = current_date.format("%Y-%m-%d").to_string(); let bitmaps_from_query: Vec = match contract_pattern_type { ContractPatternType::Exact(ref pattern) => { let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), current_date_string.clone()).await.unwrap(); - query_result.iter().map(|result_item| Base64Bitmap::from(result_item)).collect() + query_result.iter().map(|result_item| Base64Bitmap::try_from(result_item).unwrap()).collect() }, ContractPatternType::Wildcard(ref pattern) => { let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), current_date_string.clone()).await.unwrap(); - query_result.iter().map(|result_item| Base64Bitmap::from(result_item)).collect() + query_result.iter().map(|result_item| Base64Bitmap::try_from(result_item).unwrap()).collect() }, }; if !bitmaps_from_query.is_empty() { @@ -157,7 +157,7 @@ impl BlockHeightStreamImpl { let bitmap_for_day = self.bitmap_operator.merge_bitmaps(&bitmaps_from_query, starting_block_height).unwrap(); for index in 0..(bitmap_for_day.bitmap.len() * 8) { if self.bitmap_operator.get_bit(&bitmap_for_day.bitmap, index) { - yield starting_block_height + index; + yield starting_block_height + u64::try_from(index)?; } } } @@ -170,7 +170,7 @@ impl BlockHeightStreamImpl { &'a self, start_block_height: near_indexer_primitives::types::BlockHeight, contract_pattern: &str, - ) -> anyhow::Result> { + ) -> anyhow::Result>> { let start_date = self.get_nearest_block_date(start_block_height).await?; let contract_pattern_type = self.parse_contract_pattern(contract_pattern); @@ -365,7 +365,7 @@ mod tests { .await .unwrap(); let mut result_heights = vec![]; - while let Some(height) = stream.next().await { + while let Some(Ok(height)) = stream.next().await { result_heights.push(height); } assert_eq!(result_heights, vec![1]); @@ -443,7 +443,7 @@ mod tests { .await .unwrap(); let mut result_heights = vec![]; - while let Some(height) = stream.next().await { + while let Some(Ok(height)) = stream.next().await { result_heights.push(height); } assert_eq!(result_heights, vec![1, 5, 10, 15, 100, 105]);