diff --git a/block-streamer/Cargo.lock b/block-streamer/Cargo.lock index 1a68afaa5..ce9cb859d 100644 --- a/block-streamer/Cargo.lock +++ b/block-streamer/Cargo.lock @@ -964,6 +964,7 @@ version = "0.1.0" dependencies = [ "actix-web", "anyhow", + "async-stream", "async-trait", "aws-config", "aws-sdk-s3", @@ -982,6 +983,7 @@ dependencies = [ "prometheus", "prost 0.12.4", "redis", + "regex", "registry-types", "reqwest", "serde", diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index d44903786..2d529d480 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -6,9 +6,11 @@ edition = "2021" [dependencies] actix-web = "4.5.1" anyhow = "1.0.57" +async-stream = "0.3.5" async-trait = "0.1.74" aws-config = { version = "1.1.3", features = ["behavior-version-latest"] } aws-sdk-s3 = "1.13.0" +base64 = "0.22.1" borsh = "0.10.2" cached = "0.49.3" chrono = "0.4.25" @@ -20,6 +22,7 @@ near-lake-framework = "0.7.8" prometheus = "0.13.3" prost = "0.12.3" redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] } +regex = "1.10.4" reqwest = { version = "^0.11.0", features = ["json"] } serde = { version = "1", features = ["derive"] } serde_json = "1.0.55" @@ -33,7 +36,6 @@ tonic = "0.10.2" wildmatch = "2.1.1" registry-types = { path = "../registry/types" } -base64 = "0.22.1" [build-dependencies] tonic-build = "0.10" diff --git a/block-streamer/src/bitmap.rs b/block-streamer/src/bitmap.rs index a4e921974..55dbc95f6 100644 --- a/block-streamer/src/bitmap.rs +++ b/block-streamer/src/bitmap.rs @@ -1,65 +1,101 @@ use anyhow::anyhow; use base64::{engine::general_purpose, Engine as _}; +use std::convert::TryFrom; +use crate::graphql::client::{get_bitmaps_exact, get_bitmaps_wildcard}; + +#[derive(Debug, Default, PartialEq)] pub struct Base64Bitmap { - pub start_block_height: usize, + pub start_block_height: u64, pub base64: String, } -pub struct Bitmap { - pub start_block_height: usize, - pub bitmap: Vec, +impl TryFrom<&get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex> for Base64Bitmap { + type Error = anyhow::Error; + fn try_from( + query_item: &get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex, + ) -> anyhow::Result { + Ok(Self { + base64: query_item.bitmap.clone(), + start_block_height: u64::try_from(query_item.first_block_height)?, + }) + } +} + +impl TryFrom<&get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex> + for Base64Bitmap +{ + type Error = anyhow::Error; + fn try_from( + query_item: &get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex, + ) -> anyhow::Result { + Ok(Self { + base64: query_item.bitmap.clone(), + start_block_height: u64::try_from(query_item.first_block_height)?, + }) + } } #[derive(Default)] struct EliasGammaDecoded { - pub value: usize, + pub value: u64, pub last_bit_index: usize, } -pub struct BitmapOperator {} +#[derive(Debug, Default, PartialEq)] +pub struct CompressedBitmap { + pub start_block_height: u64, + pub bitmap: Vec, +} + +impl TryFrom<&Base64Bitmap> for CompressedBitmap { + type Error = anyhow::Error; + fn try_from(value: &Base64Bitmap) -> anyhow::Result { + Ok(Self { + bitmap: general_purpose::STANDARD.decode(value.base64.clone())?, + start_block_height: value.start_block_height, + }) + } +} -#[cfg_attr(test, mockall::automock)] -impl BitmapOperator { - pub fn new() -> Self { - Self {} +impl CompressedBitmap { + pub fn new(start_block_height: u64, bitmap: Vec) -> Self { + Self { + start_block_height, + bitmap, + } } - pub fn get_bit(&self, bytes: &[u8], bit_index: usize) -> bool { + pub fn get_bit(&self, bit_index: usize) -> bool { let byte_index: usize = bit_index / 8; let bit_index_in_byte: usize = bit_index % 8; - (bytes[byte_index] & (1u8 << (7 - bit_index_in_byte))) > 0 + (self.bitmap[byte_index] & (1u8 << (7 - bit_index_in_byte))) > 0 } - fn set_bit(&self, bytes: &mut [u8], bit_index: usize, bit_value: bool, write_zero: bool) { + fn set_bit(&mut self, bit_index: usize, bit_value: bool, write_zero: bool) { if !bit_value && write_zero { - bytes[bit_index / 8] &= !(1u8 << (7 - (bit_index % 8))); + self.bitmap[bit_index / 8] &= !(1u8 << (7 - (bit_index % 8))); } else if bit_value { - bytes[bit_index / 8] |= 1u8 << (7 - (bit_index % 8)); + self.bitmap[bit_index / 8] |= 1u8 << (7 - (bit_index % 8)); } } - fn read_integer_from_binary( - &self, - bytes: &[u8], - start_bit_index: usize, - end_bit_index: usize, - ) -> u32 { - let mut number: u32 = 0; + fn read_integer_from_binary(&self, start_bit_index: usize, end_bit_index: usize) -> u64 { + let mut number: u64 = 0; // Read bits from right to left for curr_bit_index in (start_bit_index..=end_bit_index).rev() { - if self.get_bit(bytes, curr_bit_index) { - number |= 1u32 << (end_bit_index - curr_bit_index); + if self.get_bit(curr_bit_index) { + number |= 1u64 << (end_bit_index - curr_bit_index); } } number } - fn index_of_first_set_bit(&self, bytes: &[u8], start_bit_index: usize) -> Option { + fn index_of_first_set_bit(&self, start_bit_index: usize) -> Option { let mut first_bit_index: usize = start_bit_index % 8; - for (byte_index, byte) in bytes.iter().enumerate().skip(start_bit_index / 8) { + for (byte_index, byte) in self.bitmap.iter().enumerate().skip(start_bit_index / 8) { if *byte > 0 { for bit_index in first_bit_index..=7 { if *byte & (1u8 << (7 - bit_index)) > 0 { @@ -73,123 +109,147 @@ impl BitmapOperator { None } - fn decode_elias_gamma_entry(&self, bytes: &[u8], start_bit_index: usize) -> EliasGammaDecoded { - if bytes.is_empty() { - return EliasGammaDecoded::default(); + fn decode_elias_gamma_entry( + &self, + start_bit_index: usize, + ) -> anyhow::Result { + if self.bitmap.is_empty() { + return Ok(EliasGammaDecoded::default()); } - let first_bit_index = match self.index_of_first_set_bit(bytes, start_bit_index) { + let first_bit_index = match self.index_of_first_set_bit(start_bit_index) { Some(index) => index, None => { - return EliasGammaDecoded::default(); + return Ok(EliasGammaDecoded::default()); } }; let zero_count: usize = first_bit_index - start_bit_index; - let remainder: usize = if zero_count == 0 { + let remainder: u64 = if zero_count == 0 { 0 } else { - self.read_integer_from_binary(bytes, first_bit_index + 1, first_bit_index + zero_count) - .try_into() - .unwrap() + self.read_integer_from_binary(first_bit_index + 1, first_bit_index + zero_count) }; - EliasGammaDecoded { - value: 2_usize.pow(zero_count.try_into().unwrap()) + remainder, + Ok(EliasGammaDecoded { + value: 2_u64.pow(zero_count.try_into()?) + remainder, last_bit_index: first_bit_index + zero_count, - } + }) } - fn decompress_bitmap(&self, compressed_bitmap: &[u8]) -> Vec { - let compressed_bit_length: usize = compressed_bitmap.len() * 8; - let mut current_bit_value: bool = (compressed_bitmap[0] & 0b10000000) > 0; - let mut decompressed_bytes: Vec = Vec::new(); + pub fn decompress(&self) -> anyhow::Result { + let compressed_bit_length: usize = self.bitmap.len() * 8; + let mut current_bit_value: bool = (self.bitmap[0] & 0b10000000) > 0; + let mut decompressed: DecompressedBitmap = + DecompressedBitmap::new(self.start_block_height, None); let mut compressed_bit_index = 1; let mut decompressed_bit_index = 0; while compressed_bit_index < compressed_bit_length { - let decoded_elias_gamma = - self.decode_elias_gamma_entry(compressed_bitmap, compressed_bit_index); + let decoded_elias_gamma = self.decode_elias_gamma_entry(compressed_bit_index)?; if decoded_elias_gamma.value == 0 { break; } compressed_bit_index = decoded_elias_gamma.last_bit_index + 1; let mut bit_index_offset: usize = 0; - while current_bit_value && (bit_index_offset < decoded_elias_gamma.value) { - while decompressed_bit_index + bit_index_offset >= (decompressed_bytes.len() * 8) { - decompressed_bytes.push(0b00000000); + while current_bit_value + && (bit_index_offset < usize::try_from(decoded_elias_gamma.value)?) + { + while decompressed_bit_index + bit_index_offset >= (decompressed.bitmap.len() * 8) { + decompressed.bitmap.push(0b00000000); } - self.set_bit( - &mut decompressed_bytes, - decompressed_bit_index + bit_index_offset, - true, - true, - ); + decompressed.set_bit(decompressed_bit_index + bit_index_offset, true, true); bit_index_offset += 1; } - decompressed_bit_index += decoded_elias_gamma.value; + decompressed_bit_index += usize::try_from(decoded_elias_gamma.value)?; current_bit_value = !current_bit_value; } - decompressed_bytes + Ok(decompressed) } +} - fn merge_bitmap( - &self, - bitmap_to_update: &mut Bitmap, - bitmap_to_merge: &Bitmap, - ) -> anyhow::Result<()> { - let start_bit_index: usize = match bitmap_to_merge - .start_block_height - .checked_sub(bitmap_to_update.start_block_height) - { - Some(result) => result, - None => { - return Err(anyhow!( - "Start block height in bitmap was lower than provided lowest block height", - )) - } - }; +#[derive(Debug, Default, PartialEq)] +pub struct DecompressedBitmap { + pub start_block_height: u64, + pub bitmap: Vec, +} - for bit_index_offset in 0..(bitmap_to_merge.bitmap.len() * 8) { - let decompressed_bit_value = self.get_bit(&bitmap_to_merge.bitmap, bit_index_offset); - while start_bit_index + bit_index_offset >= bitmap_to_update.bitmap.len() * 8 { - bitmap_to_update.bitmap.push(0b00000000); - } +impl DecompressedBitmap { + pub fn new(start_block_height: u64, bitmap: Option>) -> Self { + Self { + start_block_height, + bitmap: bitmap.unwrap_or(vec![]), + } + } + + pub fn iter(&self) -> DecompressedBitmapIter { + DecompressedBitmapIter::new(self) + } + + pub fn get_bit(&self, bit_index: usize) -> bool { + let byte_index: usize = bit_index / 8; + let bit_index_in_byte: usize = bit_index % 8; + + (self.bitmap[byte_index] & (1u8 << (7 - bit_index_in_byte))) > 0 + } + + fn set_bit(&mut self, bit_index: usize, bit_value: bool, write_zero: bool) { + if !bit_value && write_zero { + self.bitmap[bit_index / 8] &= !(1u8 << (7 - (bit_index % 8))); + } else if bit_value { + self.bitmap[bit_index / 8] |= 1u8 << (7 - (bit_index % 8)); + } + } - self.set_bit( - &mut bitmap_to_update.bitmap, - start_bit_index + bit_index_offset, - decompressed_bit_value, - false, + pub fn merge(&mut self, mut to_merge: DecompressedBitmap) -> anyhow::Result<&mut Self> { + if to_merge.start_block_height < self.start_block_height { + std::mem::swap(&mut self.bitmap, &mut to_merge.bitmap); + std::mem::swap( + &mut self.start_block_height, + &mut to_merge.start_block_height, ); } + let block_height_difference = to_merge.start_block_height - self.start_block_height; + let start_bit_index: usize = usize::try_from(block_height_difference)?; + + for bit_index_offset in 0..(to_merge.bitmap.len() * 8) { + let bit_value = to_merge.get_bit(bit_index_offset); + while start_bit_index + bit_index_offset >= self.bitmap.len() * 8 { + self.bitmap.push(0b00000000); + } + + self.set_bit(start_bit_index + bit_index_offset, bit_value, false); + } - Ok(()) + Ok(self) } +} - pub fn merge_bitmaps( - &self, - bitmaps_to_merge: &Vec, - smallest_start_block_height: usize, - ) -> anyhow::Result { - let mut merged_bitmap: Bitmap = Bitmap { - bitmap: Vec::new(), - start_block_height: smallest_start_block_height, - }; +pub struct DecompressedBitmapIter<'a> { + data: &'a DecompressedBitmap, + bit_index: usize, +} - for compressed_base64_bitmap in bitmaps_to_merge { - let decoded_bitmap: Vec = - general_purpose::STANDARD.decode(compressed_base64_bitmap.base64.clone())?; - let decompressed_bitmap: Bitmap = Bitmap { - bitmap: self.decompress_bitmap(&decoded_bitmap), - start_block_height: compressed_base64_bitmap.start_block_height, - }; - self.merge_bitmap(&mut merged_bitmap, &decompressed_bitmap)?; - } +impl<'a> DecompressedBitmapIter<'a> { + fn new(data: &'a DecompressedBitmap) -> Self { + Self { data, bit_index: 0 } + } +} + +impl Iterator for DecompressedBitmapIter<'_> { + type Item = u64; - Ok(merged_bitmap) + fn next(&mut self) -> Option { + while self.bit_index < self.data.bitmap.len() * 8 { + if self.data.get_bit(self.bit_index) { + self.bit_index += 1; + return Some(self.data.start_block_height + (self.bit_index as u64) - 1); + } + self.bit_index += 1; + } + None } } @@ -198,13 +258,26 @@ mod tests { use super::*; #[test] - fn get_bit_from_bytes() { - let operator: BitmapOperator = BitmapOperator::new(); - let bytes: &[u8; 3] = &[0b00000001, 0b00000000, 0b00001001]; + fn decode_base_64() { + let base64 = Base64Bitmap { + base64: "wA==".to_string(), + start_block_height: 10, + }; + + assert_eq!( + CompressedBitmap::try_from(&base64).unwrap().bitmap, + vec![0b11000000] + ); + } + + #[test] + fn get_bit() { + let bytes = vec![0b00000001, 0b00000000, 0b00001001]; + let bitmap = DecompressedBitmap::new(0, Some(bytes)); let results: Vec = [7, 8, 9, 15, 19, 20, 22, 23] .iter() .map(|index| { - return operator.get_bit(bytes, *index); + return bitmap.get_bit(*index); }) .collect(); assert_eq!( @@ -214,101 +287,144 @@ mod tests { } #[test] - fn set_bit_in_bytes() { - let operator: BitmapOperator = BitmapOperator::new(); - let correct_bytes: &[u8; 3] = &[0b00000001, 0b00000000, 0b00001001]; - let test_bytes: &mut [u8; 3] = &mut [0b10000000, 0b10000000, 0b00001001]; - operator.set_bit(test_bytes, 0, false, true); - operator.set_bit(test_bytes, 7, true, true); - operator.set_bit(test_bytes, 8, false, true); - operator.set_bit(test_bytes, 12, false, false); - assert_eq!(correct_bytes, test_bytes); + fn iterate_decompressed_bitmap() { + let bytes = vec![0b00000001, 0b00000000, 0b00001001]; + let bitmap = DecompressedBitmap::new(0, Some(bytes)); + let results: Vec = bitmap.iter().collect(); + assert_eq!(results, [7, 20, 23]); + } + + #[test] + fn set_bit() { + let test_bytes = vec![0b10000000, 0b10000000, 0b00001001]; + let mut bitmap = DecompressedBitmap::new(0, Some(test_bytes.clone())); + let correct_bytes = vec![0b00000001, 0b00000000, 0b00001001]; + bitmap.set_bit(0, false, true); + bitmap.set_bit(7, true, true); + bitmap.set_bit(8, false, true); + bitmap.set_bit(12, false, false); + assert_eq!(correct_bytes, bitmap.bitmap); } #[test] fn get_unsigned_integer_from_binary_sequence() { - let operator: BitmapOperator = BitmapOperator::new(); - let bytes: &[u8; 3] = &[0b11111110, 0b10010100, 0b10001101]; - assert_eq!(operator.read_integer_from_binary(bytes, 6, 16), 1321); + let bytes = vec![0b11111110, 0b10010100, 0b10001101]; + let bitmap = CompressedBitmap::new(0, bytes); + assert_eq!(bitmap.read_integer_from_binary(6, 16), 1321); } #[test] fn get_index_of_first_set_bit() { - let operator: BitmapOperator = BitmapOperator::new(); - let bytes: &[u8; 4] = &[0b00000001, 0b10000000, 0b00000001, 0b00000000]; + let bytes = vec![0b00000001, 0b10000000, 0b00000001, 0b00000000]; + let bitmap = CompressedBitmap::new(0, bytes); assert_eq!( - operator.index_of_first_set_bit(bytes, 4).unwrap(), + bitmap.index_of_first_set_bit(4).unwrap(), 7, "Should get index 7 when starting from 4", ); assert_eq!( - operator.index_of_first_set_bit(bytes, 7).unwrap(), + bitmap.index_of_first_set_bit(7).unwrap(), 7, "Should get index 7 when starting from 7", ); assert_eq!( - operator.index_of_first_set_bit(bytes, 8).unwrap(), + bitmap.index_of_first_set_bit(8).unwrap(), 8, "Should get index 8 when starting from 8", ); assert_eq!( - operator.index_of_first_set_bit(bytes, 17).unwrap(), + bitmap.index_of_first_set_bit(17).unwrap(), 23, "Should get index 23 when starting from 17", ); assert!( - operator.index_of_first_set_bit(bytes, 25).is_none(), + bitmap.index_of_first_set_bit(25).is_none(), "Should get None when starting from 25", ); } #[test] fn decode_elias_gamma() { - let operator: BitmapOperator = BitmapOperator::new(); - let bytes: &[u8; 2] = &[0b00000000, 0b00110110]; - let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 6); + let bytes = vec![0b00000000, 0b00110110]; + let bitmap = CompressedBitmap::new(0, bytes); + let decoded_eg: EliasGammaDecoded = bitmap.decode_elias_gamma_entry(6).unwrap(); assert_eq!(decoded_eg.value, 27); assert_eq!(decoded_eg.last_bit_index, 14); } #[test] fn decode_empty_elias_gamma() { - let operator: BitmapOperator = BitmapOperator::new(); - let bytes: &[u8; 2] = &[0b00000000, 0b00000000]; - let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 0); + let bytes = vec![0b00000000, 0b00000000]; + let bitmap = CompressedBitmap::new(0, bytes); + let decoded_eg: EliasGammaDecoded = bitmap.decode_elias_gamma_entry(0).unwrap(); assert_eq!(decoded_eg.value, 0); assert_eq!(decoded_eg.last_bit_index, 0); } #[test] - fn decode_compressed_bitmap() { - let operator: BitmapOperator = BitmapOperator::new(); - assert_eq!(operator.decompress_bitmap(&[0b10100000]), &[0b11000000]); - assert_eq!(operator.decompress_bitmap(&[0b00100100]), &[0b00110000]); - assert_eq!(operator.decompress_bitmap(&[0b10010000]), &[0b11110000]); + fn decompress_many_compressed_bitmaps() { + assert_eq!( + CompressedBitmap::new(0, vec![0b10100000]) + .decompress() + .unwrap() + .bitmap, + vec![0b11000000] + ); + assert_eq!( + CompressedBitmap::new(0, vec![0b00100100]) + .decompress() + .unwrap() + .bitmap, + vec![0b00110000] + ); + assert_eq!( + CompressedBitmap::new(0, vec![0b10010000]) + .decompress() + .unwrap() + .bitmap, + vec![0b11110000] + ); assert_eq!( - operator.decompress_bitmap(&[0b10110010, 0b01000000]), - &[0b11100001] + CompressedBitmap::new(0, vec![0b10110010, 0b01000000]) + .decompress() + .unwrap() + .bitmap, + vec![0b11100001] ); assert_eq!( - operator.decompress_bitmap(&[0b01010001, 0b01010000]), - &[0b01100000, 0b11000000] + CompressedBitmap::new(0, vec![0b01010001, 0b01010000]) + .decompress() + .unwrap() + .bitmap, + vec![0b01100000, 0b11000000] ); assert_eq!( - operator.decompress_bitmap(&[0b01111111, 0b11111111, 0b11111000]), - &[0b01010101, 0b01010101, 0b01010000] + CompressedBitmap::new(0, vec![0b01111111, 0b11111111, 0b11111000]) + .decompress() + .unwrap() + .bitmap, + vec![0b01010101, 0b01010101, 0b01010000] ); assert_eq!( - operator.decompress_bitmap(&[0b11010101, 0b11010101, 0b11010100]), - &[0b10010001, 0b00100010, 0b01000000] + CompressedBitmap::new(0, vec![0b11010101, 0b11010101, 0b11010100]) + .decompress() + .unwrap() + .bitmap, + vec![0b10010001, 0b00100010, 0b01000000] ); assert_eq!( - operator.decompress_bitmap(&[0b00000111, 0b11100000]), - &[0b00000000, 0b00000000, 0b00000000, 0b00000001] + CompressedBitmap::new(0, vec![0b00000111, 0b11100000]) + .decompress() + .unwrap() + .bitmap, + vec![0b00000000, 0b00000000, 0b00000000, 0b00000001] ); assert_eq!( - operator.decompress_bitmap(&[0b11000001, 0b11011011]), - &[ + CompressedBitmap::new(0, vec![0b11000001, 0b11011011]) + .decompress() + .unwrap() + .bitmap, + vec![ 0b10000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00001110 ] @@ -316,43 +432,59 @@ mod tests { } #[test] - fn merge_two_decompressed_bitmaps() { - let operator: BitmapOperator = BitmapOperator::new(); - let mut base_bitmap: Bitmap = Bitmap { + fn merge_two_bitmaps() { + let mut base_bitmap: DecompressedBitmap = DecompressedBitmap { + bitmap: vec![0b11001010, 0b10001111], + start_block_height: 10, + }; + let mut to_merge: DecompressedBitmap = DecompressedBitmap { + bitmap: vec![0b11100001], + start_block_height: 14, + }; + + assert!(base_bitmap.merge(to_merge).is_ok()); + assert_eq!(base_bitmap.bitmap, vec![0b11001110, 0b10011111]); + } + + #[test] + fn merge_two_bitmaps_with_swap() { + let mut to_merge: DecompressedBitmap = DecompressedBitmap { bitmap: vec![0b11001010, 0b10001111], start_block_height: 10, }; - let compressed_bitmap: Bitmap = Bitmap { - bitmap: vec![0b11100001], // Decompresses to 11100001 + let mut base_bitmap: DecompressedBitmap = DecompressedBitmap { + bitmap: vec![0b11100001], start_block_height: 14, }; - assert!(operator - .merge_bitmap(&mut base_bitmap, &compressed_bitmap) - .is_ok()); + assert!(base_bitmap.merge(to_merge).is_ok()); assert_eq!(base_bitmap.bitmap, vec![0b11001110, 0b10011111]); } #[test] fn merge_multiple_bitmaps_together() { - let operator: BitmapOperator = BitmapOperator::new(); - let test_bitmaps_to_merge: Vec = vec![ - Base64Bitmap { - base64: "oA==".to_string(), // Decompresses to 11000000 - start_block_height: 10, - }, - Base64Bitmap { - base64: "oA==".to_string(), - start_block_height: 14, - }, - Base64Bitmap { - base64: "oA==".to_string(), - start_block_height: 18, - }, - ]; - - let merged_bitmap = operator.merge_bitmaps(&test_bitmaps_to_merge, 10).unwrap(); - assert_eq!(merged_bitmap.bitmap, vec![0b11001100, 0b11000000]); - assert_eq!(merged_bitmap.start_block_height, 10); + let mut base_bitmap = DecompressedBitmap::new(200, None); + let mut bitmap_a = DecompressedBitmap { + bitmap: vec![0b11000000], + start_block_height: 18, + }; + let mut bitmap_b = DecompressedBitmap { + bitmap: vec![0b11000000], + start_block_height: 10, + }; + let mut bitmap_c = DecompressedBitmap { + bitmap: vec![0b11000000], + start_block_height: 14, + }; + + base_bitmap + .merge(bitmap_a) + .unwrap() + .merge(bitmap_b) + .unwrap() + .merge(bitmap_c) + .unwrap(); + assert_eq!(base_bitmap.bitmap, vec![0b11001100, 0b11000000]); + assert_eq!(base_bitmap.start_block_height, 10); } } diff --git a/block-streamer/src/bitmap_processor.rs b/block-streamer/src/bitmap_processor.rs new file mode 100644 index 000000000..79bf12597 --- /dev/null +++ b/block-streamer/src/bitmap_processor.rs @@ -0,0 +1,436 @@ +use crate::bitmap::{Base64Bitmap, CompressedBitmap, DecompressedBitmap}; +use crate::graphql::client::GraphQLClient; +use crate::rules::types::ChainId; +use anyhow::Context; +use async_stream::try_stream; +use chrono::{DateTime, Duration, TimeZone, Utc}; +use futures::stream::{BoxStream, Stream}; +use futures::StreamExt; +use near_lake_framework::near_indexer_primitives; +use regex::Regex; + +const MAX_S3_RETRY_COUNT: u8 = 20; + +#[derive(Debug, Eq, PartialEq)] +enum ContractPatternType { + Exact(Vec), + Wildcard(String), +} + +impl ContractPatternType { + fn strip_wildcard_if_root_account(receiver_id: String) -> anyhow::Result { + let wildcard_root_account_regex = Regex::new(r"^\*\.([a-zA-Z0-9]+)$")?; + if wildcard_root_account_regex.is_match(&receiver_id) { + return Ok(receiver_id + .split('.') + .nth(1) + .unwrap_or(&receiver_id) + .to_string()); + } + Ok(receiver_id) + } +} + +impl From<&str> for ContractPatternType { + fn from(contract_pattern: &str) -> Self { + // If receiver_id is of pattern *.SOME_ROOT_ACCOUNT such as *.near, we can reduce this to + // "near" as we store bitmaps for root accounts like near ,tg, and so on. + let cleaned_contract_pattern: String = contract_pattern + .split(',') + .map(|receiver| receiver.trim()) + .map(str::to_string) + .map(|receiver| { + ContractPatternType::strip_wildcard_if_root_account(receiver.clone()) + .unwrap_or(receiver) + }) + .collect::>() + .join(","); + + if cleaned_contract_pattern.chars().any(|c| c == '*') { + let wildcard_pattern = cleaned_contract_pattern + .replace(',', "|") + .replace('.', "\\.") + .replace('*', ".*"); + return ContractPatternType::Wildcard(wildcard_pattern); + } + + let exact_pattern = cleaned_contract_pattern + .split(',') + .map(str::to_string) + .collect(); + ContractPatternType::Exact(exact_pattern) + } +} + +pub struct BitmapProcessor { + graphql_client: GraphQLClient, + s3_client: crate::s3_client::S3Client, + chain_id: ChainId, +} + +impl BitmapProcessor { + pub fn new(graphql_client: GraphQLClient, s3_client: crate::s3_client::S3Client) -> Self { + Self { + graphql_client, + s3_client, + chain_id: ChainId::Mainnet, + } + } + + fn get_lake_bucket(&self) -> String { + match self.chain_id { + ChainId::Mainnet => "near-lake-data-mainnet".to_string(), + ChainId::Testnet => "near-lake-data-testnet".to_string(), + } + } + + pub async fn get_nearest_block_date(&self, block_height: u64) -> anyhow::Result> { + let mut current_block_height = block_height; + let mut retry_count = 1; + loop { + let block_key = format!("{:0>12}/block.json", current_block_height); + match self + .s3_client + .get_text_file(&self.get_lake_bucket(), &block_key) + .await + { + Ok(text) => { + let block: near_indexer_primitives::views::BlockView = + serde_json::from_str(&text)?; + return Ok(Utc.timestamp_nanos(block.header.timestamp_nanosec as i64)); + } + + Err(e) => { + if e.root_cause() + .downcast_ref::() + .is_some() + { + retry_count += 1; + if retry_count > MAX_S3_RETRY_COUNT { + anyhow::bail!("Exceeded maximum retries to fetch block from S3"); + } + + tracing::debug!( + "Block {} not found on S3, attempting to fetch next block", + current_block_height + ); + current_block_height += 1; + continue; + } + + return Err(e).context("Failed to fetch block from S3"); + } + } + } + } + + fn next_day(&self, date: DateTime) -> DateTime { + date + Duration::days(1) + } + + async fn query_base_64_bitmaps( + &self, + contract_pattern_type: &ContractPatternType, + current_date: &DateTime, + ) -> anyhow::Result> { + match contract_pattern_type { + ContractPatternType::Exact(ref pattern) => { + let query_result: Vec<_> = self + .graphql_client + .get_bitmaps_exact(pattern.clone(), ¤t_date) + .await?; + Ok(query_result + .iter() + .map(Base64Bitmap::try_from) + .collect::>>()?) + } + ContractPatternType::Wildcard(ref pattern) => { + let query_result: Vec<_> = self + .graphql_client + .get_bitmaps_wildcard(pattern.clone(), ¤t_date) + .await?; + Ok(query_result + .iter() + .map(Base64Bitmap::try_from) + .collect::>>()?) + } + } + } + + fn stream_matching_block_heights<'b, 'a: 'b>( + &'a self, + start_block_height: near_indexer_primitives::types::BlockHeight, + contract_pattern: String, + ) -> impl futures::Stream> + 'b { + try_stream! { + let start_date = self.get_nearest_block_date(start_block_height).await?; + let contract_pattern_type = ContractPatternType::from(contract_pattern.as_str()); + let mut current_date = start_date; + while current_date <= Utc::now() { + let base_64_bitmaps: Vec = self.query_base_64_bitmaps(&contract_pattern_type, ¤t_date).await?; + let compressed_bitmaps: Vec = base_64_bitmaps.iter().map(CompressedBitmap::try_from).collect()?; + let decompressed_bitmaps: Vec = compressed_bitmaps.iter().map(CompressedBitmap::decompress).collect()?; + + let starting_block_height: u64 = decompressed_bitmaps.iter().map(|item| item.start_block_height).min().unwrap_or(decompressed_bitmaps[0].start_block_height); + let mut bitmap_for_day = DecompressedBitmap::new(starting_block_height, None); + for bitmap in decompressed_bitmaps { + bitmap_for_day.merge(bitmap)?; + } + + let mut bitmap_iter = bitmap_for_day.iter(); + while let Some(block_height) = bitmap_iter.next() { + yield block_height; + } + current_date = self.next_day(current_date); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use mockall::predicate; + + fn exact_query_result( + first_block_height: i64, + bitmap: &str, + ) -> crate::graphql::client::get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex + { + crate::graphql::client::get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex { + first_block_height, + bitmap: bitmap.to_string(), + } + } + + fn wildcard_query_result( + first_block_height: i64, + bitmap: &str + ) -> crate::graphql::client::get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex{ + crate::graphql::client::get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex { + first_block_height, + bitmap: bitmap.to_string(), + } + } + + fn generate_block_with_timestamp(date: &str) -> String { + let naive_date = chrono::NaiveDate::parse_from_str(date, "%Y-%m-%d") + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap(); + + let date_time_utc = chrono::Utc.from_utc_datetime(&naive_date).timestamp() * 1_000_000_000; + + format!( + r#"{{ + "author": "someone", + "header": {{ + "approvals": [], + "block_merkle_root": "ERiC7AJ2zbVz1HJHThR5NWDDN9vByhwdjcVfivmpY5B", + "block_ordinal": 92102682, + "challenges_result": [], + "challenges_root": "11111111111111111111111111111111", + "chunk_headers_root": "MDiJxDyvUQaZRKmUwa5jgQuV6XjwVvnm4tDrajCxwvz", + "chunk_mask": [], + "chunk_receipts_root": "n84wEo7kTKTCJsyqBZ2jndhjrAMeJAXMwKvnJR7vCuy", + "chunk_tx_root": "D8j64GMKBMvUfvnuHtWUyDtMHM5mJ2pA4G5VmYYJvo5G", + "chunks_included": 4, + "epoch_id": "2RMQiomr6CSSwUWpmB62YohxHbfadrHfcsaa3FVb4J9x", + "epoch_sync_data_hash": null, + "gas_price": "100000000", + "hash": "FA1z9RVm9fX3g3mgP3NToZGwWeeXYn8bvZs4nwwTgCpD", + "height": 102162333, + "last_ds_final_block": "Ax2a3MSYuv2hgybnCbpNJMdYmPrHDHdA2hHTUrBkD915", + "last_final_block": "8xkwjn6Lb6UhMBhxcbVQBf3318GafkdaXoHA8Jako1nn", + "latest_protocol_version": 62, + "next_bp_hash": "dmW84aEj2iVJMLwJodJwTfAyeA1LJaHEthvnoAsvTPt", + "next_epoch_id": "C9TDDYthANoduoTBZS7WYDsBSe9XCm4M2F9hRoVXVXWY", + "outcome_root": "6WxzWLVp4b4bFbxHzu18apVfXLvHGKY7CHoqD2Eq3TFJ", + "prev_hash": "Ax2a3MSYuv2hgybnCbpNJMdYmPrHDHdA2hHTUrBkD915", + "prev_height": 102162332, + "prev_state_root": "Aq2ndkyDiwroUWN69Ema9hHtnr6dPHoEBRNyfmd8v4gB", + "random_value": "7ruuMyDhGtTkYaCGYMy7PirPiM79DXa8GhVzQW1pHRoz", + "rent_paid": "0", + "signature": "ed25519:5gYYaWHkAEK5etB8tDpw7fmehkoYSprUxKPygaNqmhVDFCMkA1n379AtL1BBkQswLAPxWs1BZvypFnnLvBtHRknm", + "timestamp": 1695921400989555700, + "timestamp_nanosec": "{}", + "total_supply": "1155783047679681223245725102954966", + "validator_proposals": [], + "validator_reward": "0" + }}, + "chunks": [] + }}"#, + date_time_utc + ) + } + + #[test] + fn parse_exact_contract_patterns() { + let sample_patterns = vec![ + "near", + "*.near", + "near, someone.tg", + "*.near, someone.tg, *.tg", + "a.near, b.near, a.b, a.b.c.near", + ]; + + assert_eq!( + ContractPatternType::from(sample_patterns[0]), + ContractPatternType::Exact(vec!["near".to_string()]) + ); + assert_eq!( + ContractPatternType::from(sample_patterns[1]), + ContractPatternType::Exact(vec!["near".to_string()]) + ); + assert_eq!( + ContractPatternType::from(sample_patterns[2]), + ContractPatternType::Exact(vec!["near".to_string(), "someone.tg".to_string()],) + ); + assert_eq!( + ContractPatternType::from(sample_patterns[3]), + ContractPatternType::Exact(vec![ + "near".to_string(), + "someone.tg".to_string(), + "tg".to_string() + ],) + ); + assert_eq!( + ContractPatternType::from(sample_patterns[4]), + ContractPatternType::Exact(vec![ + "a.near".to_string(), + "b.near".to_string(), + "a.b".to_string(), + "a.b.c.near".to_string(), + ]) + ); + } + + #[test] + fn parse_wildcard_contract_patterns() { + let sample_patterns = vec![ + "*.someone.near", + "near, someone.*.tg", + "a.near, b.*, *.b, a.*.c.near", + ]; + + assert_eq!( + ContractPatternType::from(sample_patterns[0]), + ContractPatternType::Wildcard(".*\\.someone\\.near".to_string()) + ); + assert_eq!( + ContractPatternType::from(sample_patterns[1]), + ContractPatternType::Wildcard("near|someone\\..*\\.tg".to_string()) + ); + assert_eq!( + ContractPatternType::from(sample_patterns[2]), + ContractPatternType::Wildcard("a\\.near|b\\..*|b|a\\..*\\.c\\.near".to_string()) + ); + } + + #[tokio::test] + async fn collect_block_heights_from_one_day() { + let mut mock_s3_client = crate::s3_client::S3Client::default(); + mock_s3_client + .expect_get_text_file() + .returning(move |_, _| { + Ok(generate_block_with_timestamp( + &Utc::now().format("%Y-%m-%d").to_string(), + )) + }); + + let mut mock_graphql_client = crate::graphql::client::GraphQLClient::default(); + let mock_query_result_item = exact_query_result(1, "wA=="); + let mock_query_result = vec![mock_query_result_item]; + mock_graphql_client + .expect_get_bitmaps_exact() + .with( + predicate::eq(vec!["someone.near".to_string()]), + predicate::function(|date: &DateTime| { + date.date_naive() == Utc::now().date_naive() + }), + ) + .times(1) + .returning(move |_, _| Ok(mock_query_result.clone())); + + let bitmap_processor = BitmapProcessor::new(mock_graphql_client, mock_s3_client); + + let stream = bitmap_processor.stream_matching_block_heights(0, "someone.near".to_owned()); + tokio::pin!(stream); + let mut result_heights = vec![]; + while let Some(Ok(height)) = stream.next().await { + result_heights.push(height); + } + assert_eq!(result_heights, vec![1]); + } + + #[tokio::test] + async fn collect_block_heights_from_past_three_days() { + let mut mock_s3_client = crate::s3_client::S3Client::default(); + mock_s3_client + .expect_get_text_file() + .returning(move |_, _| { + Ok(generate_block_with_timestamp( + &(Utc::now() - Duration::days(2)) + .format("%Y-%m-%d") + .to_string(), + )) + }); + + let mut mock_graphql_client = crate::graphql::client::GraphQLClient::default(); + mock_graphql_client + .expect_get_bitmaps_wildcard() + .with( + predicate::eq(".*\\.someone\\.near".to_string()), + predicate::function(|date: &DateTime| { + date.date_naive() == (Utc::now() - Duration::days(2)).date_naive() + }), + ) + .times(1) + .returning(move |_, _| { + Ok(vec![ + wildcard_query_result(1, "wA=="), + wildcard_query_result(5, "wA=="), + ]) + }); + mock_graphql_client + .expect_get_bitmaps_wildcard() + .with( + predicate::eq(".*\\.someone\\.near".to_string()), + predicate::function(|date: &DateTime| { + date.date_naive() == (Utc::now() - Duration::days(1)).date_naive() + }), + ) + .times(1) + .returning(move |_, _| { + Ok(vec![ + wildcard_query_result(10, "wA=="), + wildcard_query_result(15, "wA=="), + ]) + }); + mock_graphql_client + .expect_get_bitmaps_wildcard() + .with( + predicate::eq(".*\\.someone\\.near".to_string()), + predicate::function(|date: &DateTime| { + date.date_naive() == Utc::now().date_naive() + }), + ) + .times(1) + .returning(move |_, _| { + Ok(vec![ + wildcard_query_result(100, "wA=="), + wildcard_query_result(105, "wA=="), + ]) + }); + let bitmap_processor = BitmapProcessor::new(mock_graphql_client, mock_s3_client); + + let stream = + bitmap_processor.stream_matching_block_heights(0, "*.someone.near".to_string()); + tokio::pin!(stream); + let mut result_heights = vec![]; + while let Some(Ok(height)) = stream.next().await { + result_heights.push(height); + } + assert_eq!(result_heights, vec![1, 5, 10, 15, 100, 105]); + } +} diff --git a/block-streamer/src/graphql/client.rs b/block-streamer/src/graphql/client.rs index eca7cee80..6f109603f 100644 --- a/block-streamer/src/graphql/client.rs +++ b/block-streamer/src/graphql/client.rs @@ -1,8 +1,10 @@ use ::reqwest; +use chrono::{DateTime, Utc}; use graphql_client::{GraphQLQuery, Response}; // TODO: Use Dataplatform account const HASURA_ACCOUNT: &str = "darunrs_near"; +const QUERY_LIMIT: i64 = 1000; #[allow(clippy::upper_case_acronyms)] type Date = String; @@ -11,27 +13,32 @@ type Date = String; #[graphql( schema_path = "graphql/darunrs_near/schema.graphql", query_path = "graphql/darunrs_near/get_bitmaps_exact.graphql", - response_derives = "Debug", + response_derives = "Debug,Clone", normalization = "rust" )] -struct GetBitmapsExact; +pub struct GetBitmapsExact; #[derive(GraphQLQuery)] #[graphql( schema_path = "graphql/darunrs_near/schema.graphql", query_path = "graphql/darunrs_near/get_bitmaps_wildcard.graphql", - response_derives = "Debug", + response_derives = "Debug,Clone", normalization = "rust" )] -struct GetBitmapsWildcard; +pub struct GetBitmapsWildcard; -pub struct GraphQLClient { +#[cfg(not(test))] +pub use GraphQLClientImpl as GraphQLClient; +#[cfg(test)] +pub use MockGraphQLClientImpl as GraphQLClient; + +pub struct GraphQLClientImpl { client: reqwest::Client, graphql_endpoint: String, } #[cfg_attr(test, mockall::automock)] -impl GraphQLClient { +impl GraphQLClientImpl { pub fn new(graphql_endpoint: String) -> Self { Self { client: reqwest::Client::new(), @@ -58,61 +65,99 @@ impl GraphQLClient { pub async fn get_bitmaps_exact( &self, receiver_ids: Vec, - block_date: String, - limit: i64, - offset: i64, + block_date: &DateTime, ) -> anyhow::Result> { - self.post_graphql::(get_bitmaps_exact::Variables { - receiver_ids: Some(receiver_ids), - block_date: Some(block_date), - limit: Some(limit), - offset: Some(offset), - }) - .await? - .data - .ok_or(anyhow::anyhow!("No bitmaps were returned")) - .map(|data| data.darunrs_near_bitmap_v5_actions_index) + let mut all_query_results: Vec< + get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex, + > = vec![]; + let mut offset = 0; + let mut has_more = true; + while has_more { + let mut query_result = self + .post_graphql::(get_bitmaps_exact::Variables { + receiver_ids: Some(receiver_ids.clone()), + block_date: Some(block_date.format("%Y-%m-%d").to_string()), + limit: Some(QUERY_LIMIT), + offset: Some(offset), + }) + .await? + .data + .ok_or(anyhow::anyhow!( + "Query response is malformed. Missing data field." + )) + .map(|data| data.darunrs_near_bitmap_v5_actions_index)?; + + has_more = query_result.len() >= QUERY_LIMIT as usize; + offset += QUERY_LIMIT; + + all_query_results.append(&mut query_result); + } + + Ok(all_query_results) } pub async fn get_bitmaps_wildcard( &self, receiver_ids: String, - block_date: String, - limit: i64, - offset: i64, + block_date: &DateTime, ) -> anyhow::Result> { - self.post_graphql::(get_bitmaps_wildcard::Variables { - receiver_ids: Some(receiver_ids), - block_date: Some(block_date), - limit: Some(limit), - offset: Some(offset), - }) - .await? - .data - .ok_or(anyhow::anyhow!("No bitmaps were returned")) - .map(|data| data.darunrs_near_bitmap_v5_actions_index) + let mut all_query_results: Vec< + get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex, + > = vec![]; + let mut offset = 0; + let mut has_more = true; + while has_more { + let mut query_result = self + .post_graphql::(get_bitmaps_wildcard::Variables { + receiver_ids: Some(receiver_ids.clone()), + block_date: Some(block_date.format("%Y-%m-%d").to_string()), + limit: Some(QUERY_LIMIT), + offset: Some(offset), + }) + .await? + .data + .ok_or(anyhow::anyhow!( + "Query response is malformed. Missing data field." + )) + .map(|data| data.darunrs_near_bitmap_v5_actions_index)?; + + has_more = query_result.len() >= QUERY_LIMIT as usize; + offset += QUERY_LIMIT; + + all_query_results.append(&mut query_result); + } + + Ok(all_query_results) } } // TODO: Remove Unit tests after bitmap query is integrated into the main application #[cfg(test)] mod tests { + use chrono::{NaiveDateTime, TimeZone}; + use super::*; const HASURA_ENDPOINT: &str = "https://queryapi-hasura-graphql-mainnet-vcqilefdcq-ew.a.run.app/v1/graphql"; + fn utc_date_time_from_date_string(date: &str) -> DateTime { + let naive_date_time: NaiveDateTime = chrono::NaiveDate::parse_from_str(date, "%Y-%m-%d") + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap(); + TimeZone::from_utc_datetime(&chrono::Utc, &naive_date_time) + } + #[tokio::test] async fn test_get_bitmaps_exact() { - let client = GraphQLClient::new(HASURA_ENDPOINT.to_string()); + let client = GraphQLClientImpl::new(HASURA_ENDPOINT.to_string()); let receiver_ids = vec!["app.nearcrowd.near".to_string()]; - let block_date = "2024-03-21".to_string(); - let limit = 10; - let offset = 0; + let block_date: DateTime = utc_date_time_from_date_string("2024-03-21"); let response = client - .get_bitmaps_exact(receiver_ids, block_date, limit, offset) + .get_bitmaps_exact(receiver_ids, &block_date) .await .unwrap(); assert_eq!(response[0].first_block_height, 115130287); @@ -122,13 +167,11 @@ mod tests { #[ignore] #[tokio::test] async fn test_get_bitmaps_wildcard() { - let client = GraphQLClient::new(HASURA_ENDPOINT.to_string()); + let client = GraphQLClientImpl::new(HASURA_ENDPOINT.to_string()); let receiver_ids = "app.nearcrowd.near".to_string(); - let block_date = "2024-03-21".to_string(); - let limit = 10; - let offset = 0; + let block_date: DateTime = utc_date_time_from_date_string("2024-03-21"); let response = client - .get_bitmaps_wildcard(receiver_ids, block_date, limit, offset) + .get_bitmaps_wildcard(receiver_ids, &block_date) .await .unwrap(); assert_eq!(response[0].first_block_height, 115130287); diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index d8e0a3c05..f14c65706 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -1,6 +1,7 @@ use tracing_subscriber::prelude::*; mod bitmap; +mod bitmap_processor; mod block_stream; mod delta_lake_client; mod graphql; diff --git a/coordinator/Cargo.lock b/coordinator/Cargo.lock index 032dae9cf..5b118095c 100644 --- a/coordinator/Cargo.lock +++ b/coordinator/Cargo.lock @@ -926,6 +926,7 @@ version = "0.1.0" dependencies = [ "actix-web", "anyhow", + "async-stream", "async-trait", "aws-config", "aws-sdk-s3", @@ -941,6 +942,7 @@ dependencies = [ "prometheus", "prost 0.12.3", "redis 0.21.7", + "regex", "registry-types", "reqwest", "serde", @@ -3575,13 +3577,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.2" +version = "1.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.3", + "regex-automata 0.4.6", "regex-syntax 0.8.2", ] @@ -3596,9 +3598,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.3" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" +checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ "aho-corasick", "memchr",