From 989c432d92aba2fcdbdf7944df5ffe9eb3ccfc73 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Tue, 4 Jun 2024 13:21:38 -0700 Subject: [PATCH] feat: Implement Block Streamer Bitmap Operations (#747) The bitmap indexer will return a list of bitmaps in the form of base 64 strings, and associated start block heights. We need a way to convert all that data into a single block height and an associated bitmap. This PR introduces a new BitmapOperator class which holds all the operations necessary to perform the function of returning a combined binary bitmap with the lowest start block height as index 0. --- block-streamer/Cargo.lock | 7 + block-streamer/Cargo.toml | 1 + block-streamer/src/bitmap.rs | 358 +++++++++++++++++++++++++++ block-streamer/src/graphql/client.rs | 1 - block-streamer/src/main.rs | 1 + coordinator/Cargo.lock | 7 + 6 files changed, 374 insertions(+), 1 deletion(-) create mode 100644 block-streamer/src/bitmap.rs diff --git a/block-streamer/Cargo.lock b/block-streamer/Cargo.lock index e0e82e1ee..1a68afaa5 100644 --- a/block-streamer/Cargo.lock +++ b/block-streamer/Cargo.lock @@ -904,6 +904,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "base64-simd" version = "0.8.0" @@ -963,6 +969,7 @@ dependencies = [ "aws-sdk-s3", "aws-smithy-runtime", "aws-smithy-types", + "base64 0.22.1", "borsh 0.10.3", "cached", "chrono", diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index c8bc5f499..d44903786 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -33,6 +33,7 @@ tonic = "0.10.2" wildmatch = "2.1.1" registry-types = { path = "../registry/types" } +base64 = "0.22.1" [build-dependencies] tonic-build = "0.10" diff --git a/block-streamer/src/bitmap.rs b/block-streamer/src/bitmap.rs new file mode 100644 index 000000000..a4e921974 --- /dev/null +++ b/block-streamer/src/bitmap.rs @@ -0,0 +1,358 @@ +use anyhow::anyhow; +use base64::{engine::general_purpose, Engine as _}; + +pub struct Base64Bitmap { + pub start_block_height: usize, + pub base64: String, +} + +pub struct Bitmap { + pub start_block_height: usize, + pub bitmap: Vec, +} + +#[derive(Default)] +struct EliasGammaDecoded { + pub value: usize, + pub last_bit_index: usize, +} + +pub struct BitmapOperator {} + +#[cfg_attr(test, mockall::automock)] +impl BitmapOperator { + pub fn new() -> Self { + Self {} + } + + pub fn get_bit(&self, bytes: &[u8], bit_index: usize) -> bool { + let byte_index: usize = bit_index / 8; + let bit_index_in_byte: usize = bit_index % 8; + + (bytes[byte_index] & (1u8 << (7 - bit_index_in_byte))) > 0 + } + + fn set_bit(&self, bytes: &mut [u8], bit_index: usize, bit_value: bool, write_zero: bool) { + if !bit_value && write_zero { + bytes[bit_index / 8] &= !(1u8 << (7 - (bit_index % 8))); + } else if bit_value { + bytes[bit_index / 8] |= 1u8 << (7 - (bit_index % 8)); + } + } + + fn read_integer_from_binary( + &self, + bytes: &[u8], + start_bit_index: usize, + end_bit_index: usize, + ) -> u32 { + let mut number: u32 = 0; + // Read bits from right to left + for curr_bit_index in (start_bit_index..=end_bit_index).rev() { + if self.get_bit(bytes, curr_bit_index) { + number |= 1u32 << (end_bit_index - curr_bit_index); + } + } + + number + } + + fn index_of_first_set_bit(&self, bytes: &[u8], start_bit_index: usize) -> Option { + let mut first_bit_index: usize = start_bit_index % 8; + for (byte_index, byte) in bytes.iter().enumerate().skip(start_bit_index / 8) { + if *byte > 0 { + for bit_index in first_bit_index..=7 { + if *byte & (1u8 << (7 - bit_index)) > 0 { + return Some(byte_index * 8 + bit_index); + } + } + } + first_bit_index = 0; + } + + None + } + + fn decode_elias_gamma_entry(&self, bytes: &[u8], start_bit_index: usize) -> EliasGammaDecoded { + if bytes.is_empty() { + return EliasGammaDecoded::default(); + } + let first_bit_index = match self.index_of_first_set_bit(bytes, start_bit_index) { + Some(index) => index, + None => { + return EliasGammaDecoded::default(); + } + }; + let zero_count: usize = first_bit_index - start_bit_index; + let remainder: usize = if zero_count == 0 { + 0 + } else { + self.read_integer_from_binary(bytes, first_bit_index + 1, first_bit_index + zero_count) + .try_into() + .unwrap() + }; + + EliasGammaDecoded { + value: 2_usize.pow(zero_count.try_into().unwrap()) + remainder, + last_bit_index: first_bit_index + zero_count, + } + } + + fn decompress_bitmap(&self, compressed_bitmap: &[u8]) -> Vec { + let compressed_bit_length: usize = compressed_bitmap.len() * 8; + let mut current_bit_value: bool = (compressed_bitmap[0] & 0b10000000) > 0; + let mut decompressed_bytes: Vec = Vec::new(); + + let mut compressed_bit_index = 1; + let mut decompressed_bit_index = 0; + + while compressed_bit_index < compressed_bit_length { + let decoded_elias_gamma = + self.decode_elias_gamma_entry(compressed_bitmap, compressed_bit_index); + if decoded_elias_gamma.value == 0 { + break; + } + + compressed_bit_index = decoded_elias_gamma.last_bit_index + 1; + let mut bit_index_offset: usize = 0; + while current_bit_value && (bit_index_offset < decoded_elias_gamma.value) { + while decompressed_bit_index + bit_index_offset >= (decompressed_bytes.len() * 8) { + decompressed_bytes.push(0b00000000); + } + self.set_bit( + &mut decompressed_bytes, + decompressed_bit_index + bit_index_offset, + true, + true, + ); + bit_index_offset += 1; + } + + decompressed_bit_index += decoded_elias_gamma.value; + current_bit_value = !current_bit_value; + } + + decompressed_bytes + } + + fn merge_bitmap( + &self, + bitmap_to_update: &mut Bitmap, + bitmap_to_merge: &Bitmap, + ) -> anyhow::Result<()> { + let start_bit_index: usize = match bitmap_to_merge + .start_block_height + .checked_sub(bitmap_to_update.start_block_height) + { + Some(result) => result, + None => { + return Err(anyhow!( + "Start block height in bitmap was lower than provided lowest block height", + )) + } + }; + + for bit_index_offset in 0..(bitmap_to_merge.bitmap.len() * 8) { + let decompressed_bit_value = self.get_bit(&bitmap_to_merge.bitmap, bit_index_offset); + while start_bit_index + bit_index_offset >= bitmap_to_update.bitmap.len() * 8 { + bitmap_to_update.bitmap.push(0b00000000); + } + + self.set_bit( + &mut bitmap_to_update.bitmap, + start_bit_index + bit_index_offset, + decompressed_bit_value, + false, + ); + } + + Ok(()) + } + + pub fn merge_bitmaps( + &self, + bitmaps_to_merge: &Vec, + smallest_start_block_height: usize, + ) -> anyhow::Result { + let mut merged_bitmap: Bitmap = Bitmap { + bitmap: Vec::new(), + start_block_height: smallest_start_block_height, + }; + + for compressed_base64_bitmap in bitmaps_to_merge { + let decoded_bitmap: Vec = + general_purpose::STANDARD.decode(compressed_base64_bitmap.base64.clone())?; + let decompressed_bitmap: Bitmap = Bitmap { + bitmap: self.decompress_bitmap(&decoded_bitmap), + start_block_height: compressed_base64_bitmap.start_block_height, + }; + self.merge_bitmap(&mut merged_bitmap, &decompressed_bitmap)?; + } + + Ok(merged_bitmap) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn get_bit_from_bytes() { + let operator: BitmapOperator = BitmapOperator::new(); + let bytes: &[u8; 3] = &[0b00000001, 0b00000000, 0b00001001]; + let results: Vec = [7, 8, 9, 15, 19, 20, 22, 23] + .iter() + .map(|index| { + return operator.get_bit(bytes, *index); + }) + .collect(); + assert_eq!( + results, + [true, false, false, false, false, true, false, true] + ); + } + + #[test] + fn set_bit_in_bytes() { + let operator: BitmapOperator = BitmapOperator::new(); + let correct_bytes: &[u8; 3] = &[0b00000001, 0b00000000, 0b00001001]; + let test_bytes: &mut [u8; 3] = &mut [0b10000000, 0b10000000, 0b00001001]; + operator.set_bit(test_bytes, 0, false, true); + operator.set_bit(test_bytes, 7, true, true); + operator.set_bit(test_bytes, 8, false, true); + operator.set_bit(test_bytes, 12, false, false); + assert_eq!(correct_bytes, test_bytes); + } + + #[test] + fn get_unsigned_integer_from_binary_sequence() { + let operator: BitmapOperator = BitmapOperator::new(); + let bytes: &[u8; 3] = &[0b11111110, 0b10010100, 0b10001101]; + assert_eq!(operator.read_integer_from_binary(bytes, 6, 16), 1321); + } + + #[test] + fn get_index_of_first_set_bit() { + let operator: BitmapOperator = BitmapOperator::new(); + let bytes: &[u8; 4] = &[0b00000001, 0b10000000, 0b00000001, 0b00000000]; + assert_eq!( + operator.index_of_first_set_bit(bytes, 4).unwrap(), + 7, + "Should get index 7 when starting from 4", + ); + assert_eq!( + operator.index_of_first_set_bit(bytes, 7).unwrap(), + 7, + "Should get index 7 when starting from 7", + ); + assert_eq!( + operator.index_of_first_set_bit(bytes, 8).unwrap(), + 8, + "Should get index 8 when starting from 8", + ); + assert_eq!( + operator.index_of_first_set_bit(bytes, 17).unwrap(), + 23, + "Should get index 23 when starting from 17", + ); + assert!( + operator.index_of_first_set_bit(bytes, 25).is_none(), + "Should get None when starting from 25", + ); + } + + #[test] + fn decode_elias_gamma() { + let operator: BitmapOperator = BitmapOperator::new(); + let bytes: &[u8; 2] = &[0b00000000, 0b00110110]; + let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 6); + assert_eq!(decoded_eg.value, 27); + assert_eq!(decoded_eg.last_bit_index, 14); + } + + #[test] + fn decode_empty_elias_gamma() { + let operator: BitmapOperator = BitmapOperator::new(); + let bytes: &[u8; 2] = &[0b00000000, 0b00000000]; + let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 0); + assert_eq!(decoded_eg.value, 0); + assert_eq!(decoded_eg.last_bit_index, 0); + } + + #[test] + fn decode_compressed_bitmap() { + let operator: BitmapOperator = BitmapOperator::new(); + assert_eq!(operator.decompress_bitmap(&[0b10100000]), &[0b11000000]); + assert_eq!(operator.decompress_bitmap(&[0b00100100]), &[0b00110000]); + assert_eq!(operator.decompress_bitmap(&[0b10010000]), &[0b11110000]); + assert_eq!( + operator.decompress_bitmap(&[0b10110010, 0b01000000]), + &[0b11100001] + ); + assert_eq!( + operator.decompress_bitmap(&[0b01010001, 0b01010000]), + &[0b01100000, 0b11000000] + ); + assert_eq!( + operator.decompress_bitmap(&[0b01111111, 0b11111111, 0b11111000]), + &[0b01010101, 0b01010101, 0b01010000] + ); + assert_eq!( + operator.decompress_bitmap(&[0b11010101, 0b11010101, 0b11010100]), + &[0b10010001, 0b00100010, 0b01000000] + ); + assert_eq!( + operator.decompress_bitmap(&[0b00000111, 0b11100000]), + &[0b00000000, 0b00000000, 0b00000000, 0b00000001] + ); + assert_eq!( + operator.decompress_bitmap(&[0b11000001, 0b11011011]), + &[ + 0b10000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, + 0b00001110 + ] + ); + } + + #[test] + fn merge_two_decompressed_bitmaps() { + let operator: BitmapOperator = BitmapOperator::new(); + let mut base_bitmap: Bitmap = Bitmap { + bitmap: vec![0b11001010, 0b10001111], + start_block_height: 10, + }; + let compressed_bitmap: Bitmap = Bitmap { + bitmap: vec![0b11100001], // Decompresses to 11100001 + start_block_height: 14, + }; + + assert!(operator + .merge_bitmap(&mut base_bitmap, &compressed_bitmap) + .is_ok()); + assert_eq!(base_bitmap.bitmap, vec![0b11001110, 0b10011111]); + } + + #[test] + fn merge_multiple_bitmaps_together() { + let operator: BitmapOperator = BitmapOperator::new(); + let test_bitmaps_to_merge: Vec = vec![ + Base64Bitmap { + base64: "oA==".to_string(), // Decompresses to 11000000 + start_block_height: 10, + }, + Base64Bitmap { + base64: "oA==".to_string(), + start_block_height: 14, + }, + Base64Bitmap { + base64: "oA==".to_string(), + start_block_height: 18, + }, + ]; + + let merged_bitmap = operator.merge_bitmaps(&test_bitmaps_to_merge, 10).unwrap(); + assert_eq!(merged_bitmap.bitmap, vec![0b11001100, 0b11000000]); + assert_eq!(merged_bitmap.start_block_height, 10); + } +} diff --git a/block-streamer/src/graphql/client.rs b/block-streamer/src/graphql/client.rs index ec63b7f51..eca7cee80 100644 --- a/block-streamer/src/graphql/client.rs +++ b/block-streamer/src/graphql/client.rs @@ -1,6 +1,5 @@ use ::reqwest; use graphql_client::{GraphQLQuery, Response}; -use std::error::Error; // TODO: Use Dataplatform account const HASURA_ACCOUNT: &str = "darunrs_near"; diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index 29e915823..d8e0a3c05 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -1,5 +1,6 @@ use tracing_subscriber::prelude::*; +mod bitmap; mod block_stream; mod delta_lake_client; mod graphql; diff --git a/coordinator/Cargo.lock b/coordinator/Cargo.lock index 7e5f2258e..032dae9cf 100644 --- a/coordinator/Cargo.lock +++ b/coordinator/Cargo.lock @@ -866,6 +866,12 @@ version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "base64-simd" version = "0.8.0" @@ -923,6 +929,7 @@ dependencies = [ "async-trait", "aws-config", "aws-sdk-s3", + "base64 0.22.1", "borsh 0.10.3", "cached", "chrono",