Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement Block Streamer Bitmap Operations #747

Merged
merged 11 commits into from
Jun 4, 2024
7 changes: 7 additions & 0 deletions block-streamer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tonic = "0.10.2"
wildmatch = "2.1.1"

registry-types = { path = "../registry/types" }
base64 = "0.22.1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you place with the other versioned imports please?


[build-dependencies]
tonic-build = "0.10"
Expand Down
367 changes: 367 additions & 0 deletions block-streamer/src/bitmap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,367 @@
use anyhow::anyhow;
use base64::{engine::general_purpose, Engine as _};

pub struct Base64Bitmap {
pub start_block_height: usize,
pub base64: String,
}

pub struct Bitmap {
pub start_block_height: usize,
pub bitmap: Vec<u8>,
}

struct EliasGammaDecoded {
pub value: usize,
pub last_bit_index: usize,
}

pub struct BitmapOperator {}

#[cfg_attr(test, mockall::automock)]
impl BitmapOperator {
pub fn new() -> Self {
Self {}
}

pub fn get_bit(&self, byte_array: &[u8], bit_index: usize) -> bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn get_bit(&self, byte_array: &[u8], bit_index: usize) -> bool {
pub fn get_bit(&self, bytes: &[u8], bit_index: usize) -> bool {

Nit: bytes seems sufficient here?

let byte_index: usize = bit_index / 8;
let bit_index_in_byte: usize = bit_index % 8;

(byte_array[byte_index] & (1u8 << (7 - bit_index_in_byte))) > 0
}

fn set_bit(&self, byte_array: &mut [u8], bit_index: usize, bit_value: bool, write_zero: bool) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we take both bit_value and write_zero? Would bit_value alone be sufficient?

Copy link
Collaborator Author

@darunrs darunrs Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary because we sometimes want to write the 0 over a 1, when we usually don't want to. Specifically when we are replacing one Elias Gamma encoding over another, as the length might be shorter (leaving extra 1's that should be zero). Technically we don't need it in the current code, but I ported it over as its exactly how we have it in the indexer logic.

if !bit_value && write_zero {
byte_array[bit_index / 8] &= !(1u8 << (7 - (bit_index % 8)));
} else if bit_value {
byte_array[bit_index / 8] |= 1u8 << (7 - (bit_index % 8));
}
}

fn get_number_between_bits(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is number? Can we be more explicit?

Copy link
Collaborator Author

@darunrs darunrs Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm basically we encoded a number as binary and are simply reading the binary value from a particular stretch of bits. Perhaps I can rename this to read_integer_from_binary? Even though all our functions deal with binary, maybe this can explicitly state this binary is utilized to build an integer.

&self,
byte_array: &[u8],
start_bit_index: usize,
end_bit_index: usize,
) -> u32 {
let mut number: u32 = 0;
// Read bits from right to left
for curr_bit_index in (start_bit_index..=end_bit_index).rev() {
if self.get_bit(byte_array, curr_bit_index) {
number |= 1u32 << (end_bit_index - curr_bit_index);
}
}

number
}

fn index_of_first_bit(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn index_of_first_bit(
fn index_of_first_positive_bit(

Not sure if positive makes sense in this case, but something similar may be more clear?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I can go with index_of_first_set_bit instead? I can't think of a better way to refer to a bit with value 1 other than saying set bit or outright saying "bit with value 1". I think set bit is more clear than what I originally had.

&self,
byte_array: &[u8],
start_bit_index: usize,
) -> anyhow::Result<usize> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
) -> anyhow::Result<usize> {
) -> Option<usize> {

Option seems more idiomatic here

let mut first_bit_index: usize = start_bit_index % 8;
for byte_index in (start_bit_index / 8)..byte_array.len() {
if byte_array[byte_index] > 0 {
for bit_index in first_bit_index..=7 {
if byte_array[byte_index] & (1u8 << (7 - bit_index)) > 0 {
return Ok(byte_index * 8 + bit_index);
}
}
}
first_bit_index = 0;
}

Err(anyhow!("Failed to find a bit with value 1 in byte array"))
}

fn decode_elias_gamma_entry(
&self,
byte_array: &[u8],
start_bit_index: usize,
) -> EliasGammaDecoded {
if byte_array.len() == 0 {
return EliasGammaDecoded {
value: 0,
last_bit_index: 0,
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return EliasGammaDecoded {
value: 0,
last_bit_index: 0,
};
return EliasGammaDecoded::default()

Could use Default here, but you'll need to derive it on EliasGammaDecoded

Copy link
Collaborator Author

@darunrs darunrs Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea actually. It would definitely make the match look nicer too. It seems the default for usize is 0 anyway.

}
let first_bit_index = match self.index_of_first_bit(byte_array, start_bit_index) {
Ok(index) => index,
Err(_) => {
return EliasGammaDecoded {
value: 0,
last_bit_index: 0,
}
}
};
let zero_count: usize = first_bit_index - start_bit_index;
let remainder: usize = if zero_count == 0 {
0
} else {
self.get_number_between_bits(
byte_array,
first_bit_index + 1,
first_bit_index + zero_count,
)
.try_into()
.unwrap()
};

EliasGammaDecoded {
value: 2_usize.pow(zero_count.try_into().unwrap()) + remainder,
last_bit_index: first_bit_index + zero_count,
}
}

fn decompress_bitmap(&self, compressed_bitmap: &[u8]) -> Vec<u8> {
let compressed_bit_length: usize = compressed_bitmap.len() * 8;
let mut current_bit_value: bool = (compressed_bitmap[0] & 0b10000000) > 0;
let mut decompressed_byte_array: Vec<u8> = Vec::new();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know the length of this upfront? Vec::with_capacity() would avoid unnecessary re-allocations.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we knew capacity, maybe we could just use &[u8] 🤔

Copy link
Collaborator Author

@darunrs darunrs Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't know the size upfront. We need to decompress the EG to know how long the bit sequence is for each EG, and we can have many of them. We do know the upper bound, which is 86000 bits, since 1 bit per block and 86000 seconds in a day. But, I felt it was unnecessary to create 12KB byte arrays every time as we usually don't need that many.


let mut compressed_bit_index = 1;
let mut decompressed_bit_index = 0;

while compressed_bit_index < compressed_bit_length {
let decoded_elias_gamma =
self.decode_elias_gamma_entry(compressed_bitmap, compressed_bit_index);
if decoded_elias_gamma.value == 0 {
break;
}

compressed_bit_index = decoded_elias_gamma.last_bit_index + 1;
let mut bit_index_offset: usize = 0;
while current_bit_value && (bit_index_offset < decoded_elias_gamma.value) {
while decompressed_bit_index + bit_index_offset
>= (decompressed_byte_array.len() * 8)
{
decompressed_byte_array.push(0b00000000);
}
self.set_bit(
&mut decompressed_byte_array,
decompressed_bit_index + bit_index_offset,
true,
true,
);
bit_index_offset = bit_index_offset + 1;
}

decompressed_bit_index += decoded_elias_gamma.value;
current_bit_value = !current_bit_value;
}

decompressed_byte_array
}

fn merge_compressed_bitmap_into_base_bitmap(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between base_bitmap and compressed_bitmap? Maybe this would be more obvious if we just had a merge function, and called decompress from the outside?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge could even be defined on the Bitmap struct instead for further clarity

Copy link
Collaborator Author

@darunrs darunrs Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the confusion is that it is doing two different things. Decompression, and merging. Before decompression, it matters which bitmap is the compressed one as we want to ensure bits re written to the decompressed one. But if the bitmaps are both decompressed, this is no longer an issue.

I think the better way to go forward is creating a merge_bitmap function like you mentioned but keep it in BitmapOperator. Then we do a three step sequence in the public get_merged_bitmap function: decode, decompress, merge. This I think would be clear while retaining BitmapOperator as a stateless utility class. I'm a little confused with how a Bitmap struct function would perform merge. I imagine it would need to have a BitmapOperator internally. I think it might make things confusing regarding who owns these data operator functions.

If there's a more clear way to structure this class, I'm happy to rework it when you're back!

&self,
base_bitmap: &mut Bitmap,
compressed_bitmap: &Bitmap,
) -> anyhow::Result<()> {
let decompressed_add_bitmap: Vec<u8> = self.decompress_bitmap(&compressed_bitmap.bitmap);
let start_bit_index: usize = match compressed_bitmap
.start_block_height
.checked_sub(base_bitmap.start_block_height)
{
Some(result) => result,
None => {
return Err(anyhow!(
"Start block height in bitmap was lower than provided lowest block height",
))
}
};

for bit_index_offset in 0..(decompressed_add_bitmap.len() * 8) {
let decompressed_bit_value = self.get_bit(&decompressed_add_bitmap, bit_index_offset);
while start_bit_index + bit_index_offset >= base_bitmap.bitmap.len() * 8 {
base_bitmap.bitmap.push(0b00000000);
}

self.set_bit(
&mut base_bitmap.bitmap,
start_bit_index + bit_index_offset,
decompressed_bit_value,
false,
);
}

Ok(())
}

pub fn get_merged_bitmap(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn get_merged_bitmap(
pub fn merge_bitmaps(

Nit: this seems more clear?

Copy link
Collaborator Author

@darunrs darunrs Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! I was originally thinking of merge_compressed_bitmaps but maybe its not worth requiring someone to know they're compressed before calling the function? Especially since the argument type is Base64Bitmap which should only really be received form the graphQL query.

&self,
bitmaps_to_merge: &Vec<Base64Bitmap>,
smallest_start_block_height: usize,
) -> anyhow::Result<Bitmap> {
let mut merged_bitmap: Bitmap = Bitmap {
bitmap: Vec::new(),
start_block_height: smallest_start_block_height,
};

for compressed_base64_bitmap in bitmaps_to_merge {
let decoded_bitmap: Vec<u8> =
general_purpose::STANDARD.decode(compressed_base64_bitmap.base64.clone())?;
let compressed_bitmap: Bitmap = Bitmap {
bitmap: decoded_bitmap,
start_block_height: compressed_base64_bitmap.start_block_height,
};
self.merge_compressed_bitmap_into_base_bitmap(&mut merged_bitmap, &compressed_bitmap)?;
}

Ok(merged_bitmap)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_getting_bit_from_array() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn test_getting_bit_from_array() {
fn getting_bit_from_array() {

Nit: test_ seems superfluous here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. I'll reword the test names.

let operator: BitmapOperator = BitmapOperator::new();
let byte_array: &[u8; 3] = &[0b00000001, 0b00000000, 0b00001001];
let results: Vec<bool> = [7, 8, 9, 15, 19, 20, 22, 23]
.iter()
.map(|index| {
return operator.get_bit(byte_array, *index);
})
.collect();
assert_eq!(
results,
[true, false, false, false, false, true, false, true]
);
}

#[test]
fn test_setting_bit_in_array() {
let operator: BitmapOperator = BitmapOperator::new();
let correct_byte_array: &[u8; 3] = &[0b00000001, 0b00000000, 0b00001001];
let test_byte_array: &mut [u8; 3] = &mut [0b10000000, 0b10000000, 0b00001001];
operator.set_bit(test_byte_array, 0, false, true);
operator.set_bit(test_byte_array, 7, true, true);
operator.set_bit(test_byte_array, 8, false, true);
operator.set_bit(test_byte_array, 12, false, false);
assert_eq!(correct_byte_array, test_byte_array);
}

#[test]
fn test_getting_number_from_bita() {
let operator: BitmapOperator = BitmapOperator::new();
let byte_array: &[u8; 3] = &[0b11111110, 0b10010100, 0b10001101];
assert_eq!(operator.get_number_between_bits(byte_array, 6, 16), 1321);
}

#[test]
fn test_getting_index_of_first_bit() {
let operator: BitmapOperator = BitmapOperator::new();
let byte_array: &[u8; 3] = &[0b00000001, 0b10000000, 0b00000001];
assert_eq!(
operator.index_of_first_bit(byte_array, 4).unwrap(),
7,
"Should get index 7 when starting from 4",
);
assert_eq!(
operator.index_of_first_bit(byte_array, 7).unwrap(),
7,
"Should get index 7 when starting from 7",
);
assert_eq!(
operator.index_of_first_bit(byte_array, 8).unwrap(),
8,
"Should get index 8 when starting from 8",
);
assert_eq!(
operator.index_of_first_bit(byte_array, 17).unwrap(),
23,
"Should get index 23 when starting gtom 17",
);
}

#[test]
fn test_decoding_elias_gamma() {
let operator: BitmapOperator = BitmapOperator::new();
let byte_array: &[u8; 2] = &[0b00000000, 0b00110110];
let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(byte_array, 6);
assert_eq!(decoded_eg.value, 27);
assert_eq!(decoded_eg.last_bit_index, 14);
}

#[test]
fn test_decoding_compressed_bitmap() {
let operator: BitmapOperator = BitmapOperator::new();
assert_eq!(operator.decompress_bitmap(&[0b10100000]), &[0b11000000]);
assert_eq!(operator.decompress_bitmap(&[0b00100100]), &[0b00110000]);
assert_eq!(operator.decompress_bitmap(&[0b10010000]), &[0b11110000]);
assert_eq!(
operator.decompress_bitmap(&[0b10110010, 0b01000000]),
&[0b11100001]
);
assert_eq!(
operator.decompress_bitmap(&[0b01010001, 0b01010000]),
&[0b01100000, 0b11000000]
);
assert_eq!(
operator.decompress_bitmap(&[0b01111111, 0b11111111, 0b11111000]),
&[0b01010101, 0b01010101, 0b01010000]
);
assert_eq!(
operator.decompress_bitmap(&[0b11010101, 0b11010101, 0b11010100]),
&[0b10010001, 0b00100010, 0b01000000]
);
assert_eq!(
operator.decompress_bitmap(&[0b00000111, 0b11100000]),
&[0b00000000, 0b00000000, 0b00000000, 0b00000001]
);
assert_eq!(
operator.decompress_bitmap(&[0b11000001, 0b11011011]),
&[
0b10000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000,
0b00001110
]
);
}

#[test]
fn test_merge_compressed_bitmap_into_base_bitmap() {
let operator: BitmapOperator = BitmapOperator::new();
let mut base_bitmap: Bitmap = Bitmap {
bitmap: vec![0b11001010, 0b10001111],
start_block_height: 10,
};
let compressed_bitmap: Bitmap = Bitmap {
bitmap: vec![0b10110010, 0b01000000], // Decompresses to 11100001
start_block_height: 14,
};

assert!(operator
.merge_compressed_bitmap_into_base_bitmap(&mut base_bitmap, &compressed_bitmap)
.is_ok());
assert_eq!(base_bitmap.bitmap, vec![0b11001110, 0b10011111]);
}

#[test]
fn test_get_merged_bitmap() {
let operator: BitmapOperator = BitmapOperator::new();
let test_bitmaps_to_merge: Vec<Base64Bitmap> = vec![
Base64Bitmap {
base64: "oA==".to_string(), // Decompresses to 11000000
start_block_height: 10,
},
Base64Bitmap {
base64: "oA==".to_string(),
start_block_height: 14,
},
Base64Bitmap {
base64: "oA==".to_string(),
start_block_height: 18,
},
];

let merged_bitmap = operator
.get_merged_bitmap(&test_bitmaps_to_merge, 10)
.unwrap();
assert_eq!(merged_bitmap.bitmap, vec![0b11001100, 0b11000000]);
assert_eq!(merged_bitmap.start_block_height, 10);
}
}
Loading
Loading