From 1cef18ebf88a77420ec0dad4804f85f00b33e544 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 5 Jun 2024 12:20:47 -0700 Subject: [PATCH 01/20] Some starting code --- block-streamer/Cargo.lock | 1 + block-streamer/Cargo.toml | 1 + block-streamer/src/bitmap.rs | 25 ++++++++++++ block-streamer/src/block_height_stream.rs | 48 +++++++++++++++++++++++ block-streamer/src/graphql/client.rs | 4 +- block-streamer/src/main.rs | 1 + coordinator/Cargo.lock | 1 + 7 files changed, 79 insertions(+), 2 deletions(-) create mode 100644 block-streamer/src/block_height_stream.rs diff --git a/block-streamer/Cargo.lock b/block-streamer/Cargo.lock index 1a68afaa5..19c2d2cae 100644 --- a/block-streamer/Cargo.lock +++ b/block-streamer/Cargo.lock @@ -964,6 +964,7 @@ version = "0.1.0" dependencies = [ "actix-web", "anyhow", + "async-stream", "async-trait", "aws-config", "aws-sdk-s3", diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index d44903786..6e1ee4c82 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -34,6 +34,7 @@ wildmatch = "2.1.1" registry-types = { path = "../registry/types" } base64 = "0.22.1" +async-stream = "0.3.5" [build-dependencies] tonic-build = "0.10" diff --git a/block-streamer/src/bitmap.rs b/block-streamer/src/bitmap.rs index a4e921974..bac3dd735 100644 --- a/block-streamer/src/bitmap.rs +++ b/block-streamer/src/bitmap.rs @@ -1,11 +1,16 @@ use anyhow::anyhow; use base64::{engine::general_purpose, Engine as _}; +use std::convert::TryFrom; +use crate::graphql::client::{get_bitmaps_exact, get_bitmaps_wildcard}; + +#[derive(Debug, Default, PartialEq)] pub struct Base64Bitmap { pub start_block_height: usize, pub base64: String, } +#[derive(Debug, Default, PartialEq)] pub struct Bitmap { pub start_block_height: usize, pub bitmap: Vec, @@ -19,6 +24,26 @@ struct EliasGammaDecoded { pub struct BitmapOperator {} +impl Base64Bitmap { + pub fn from_exact_query( + query_item: get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex, + ) -> Self { + Self { + base64: query_item.bitmap, + start_block_height: usize::try_from(query_item.first_block_height).unwrap(), + } + } + + pub fn from_wildcard_query( + query_item: get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex, + ) -> Self { + Self { + base64: query_item.bitmap, + start_block_height: usize::try_from(query_item.first_block_height).unwrap(), + } + } +} + #[cfg_attr(test, mockall::automock)] impl BitmapOperator { pub fn new() -> Self { diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs new file mode 100644 index 000000000..edce47069 --- /dev/null +++ b/block-streamer/src/block_height_stream.rs @@ -0,0 +1,48 @@ +use crate::bitmap::{Base64Bitmap, BitmapOperator}; +use crate::graphql::client::GraphQLClient; +use async_stream::stream; +use futures::Stream; +use near_lake_framework::near_indexer_primitives; + +pub struct BlockHeightStream { + graphql_client: GraphQLClient, + bitmap_operator: BitmapOperator, +} + +#[cfg_attr(test, mockall::automock)] +impl BlockHeightStream { + pub fn new(graphql_endpoint: String) -> Self { + Self { + graphql_client: GraphQLClient::new(graphql_endpoint), + bitmap_operator: BitmapOperator::new(), + } + } + + fn parse_contract_pattern(&self, contract_pattern: &str) -> Vec< + + pub async fn list_matching_block_heights( + &self, + start_block_height: near_indexer_primitives::types::BlockHeight, + contract_pattern: &str, + ) -> impl Stream { + let start_date = self.get_nearest_block_date(start_block_height).await?; + + stream! { + for i in 0..3 { + yield i; + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const HASURA_ENDPOINT: &str = + "https://queryapi-hasura-graphql-mainnet-vcqilefdcq-ew.a.run.app/v1/graphql"; + + fn collect_three_block_heights_from_one_bitmap() {} + + fn collect_three_block_heights_from_two_bitmaps() {} +} diff --git a/block-streamer/src/graphql/client.rs b/block-streamer/src/graphql/client.rs index eca7cee80..09c37e8dd 100644 --- a/block-streamer/src/graphql/client.rs +++ b/block-streamer/src/graphql/client.rs @@ -14,7 +14,7 @@ type Date = String; response_derives = "Debug", normalization = "rust" )] -struct GetBitmapsExact; +pub struct GetBitmapsExact; #[derive(GraphQLQuery)] #[graphql( @@ -23,7 +23,7 @@ struct GetBitmapsExact; response_derives = "Debug", normalization = "rust" )] -struct GetBitmapsWildcard; +pub struct GetBitmapsWildcard; pub struct GraphQLClient { client: reqwest::Client, diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index d8e0a3c05..b8d5c4ee9 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -1,6 +1,7 @@ use tracing_subscriber::prelude::*; mod bitmap; +mod block_height_stream; mod block_stream; mod delta_lake_client; mod graphql; diff --git a/coordinator/Cargo.lock b/coordinator/Cargo.lock index 032dae9cf..0a95983d9 100644 --- a/coordinator/Cargo.lock +++ b/coordinator/Cargo.lock @@ -926,6 +926,7 @@ version = "0.1.0" dependencies = [ "actix-web", "anyhow", + "async-stream", "async-trait", "aws-config", "aws-sdk-s3", From 31ec45548515f5e1200ed9bc057bdea6a4ea2808 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 5 Jun 2024 18:32:54 -0700 Subject: [PATCH 02/20] Contract filter parsing and block date fetch --- block-streamer/src/block_height_stream.rs | 167 ++++++++++++++++++++-- 1 file changed, 157 insertions(+), 10 deletions(-) diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs index edce47069..9d53ff72b 100644 --- a/block-streamer/src/block_height_stream.rs +++ b/block-streamer/src/block_height_stream.rs @@ -1,38 +1,128 @@ use crate::bitmap::{Base64Bitmap, BitmapOperator}; use crate::graphql::client::GraphQLClient; +use crate::rules::types::ChainId; +use anyhow::Context; use async_stream::stream; -use futures::Stream; +use chrono::{DateTime, Duration, TimeZone, Utc}; +use futures::stream::{BoxStream, Stream}; +use futures::StreamExt; use near_lake_framework::near_indexer_primitives; +const MAX_S3_RETRY_COUNT: u8 = 20; + +#[derive(Debug, Eq, PartialEq)] +enum ContractPatternType { + Exact(Vec), + Wildcard(String), +} + pub struct BlockHeightStream { graphql_client: GraphQLClient, bitmap_operator: BitmapOperator, + s3_client: crate::s3_client::S3Client, + chain_id: ChainId, } #[cfg_attr(test, mockall::automock)] impl BlockHeightStream { - pub fn new(graphql_endpoint: String) -> Self { + pub fn new(graphql_endpoint: String, s3_client: crate::s3_client::S3Client) -> Self { Self { graphql_client: GraphQLClient::new(graphql_endpoint), bitmap_operator: BitmapOperator::new(), + s3_client, + chain_id: ChainId::Mainnet, // Hardcoded mainnet for now } } - fn parse_contract_pattern(&self, contract_pattern: &str) -> Vec< + fn get_lake_bucket(&self) -> String { + match self.chain_id { + ChainId::Mainnet => "near-lake-data-mainnet".to_string(), + ChainId::Testnet => "near-lake-data-testnet".to_string(), + } + } - pub async fn list_matching_block_heights( - &self, - start_block_height: near_indexer_primitives::types::BlockHeight, - contract_pattern: &str, - ) -> impl Stream { - let start_date = self.get_nearest_block_date(start_block_height).await?; - + pub async fn get_nearest_block_date(&self, block_height: u64) -> anyhow::Result> { + let mut current_block_height = block_height; + let mut retry_count = 1; + loop { + let block_key = format!("{:0>12}/block.json", current_block_height); + match self + .s3_client + .get_text_file(&self.get_lake_bucket(), &block_key) + .await + { + Ok(text) => { + let block: near_indexer_primitives::views::BlockView = + serde_json::from_str(&text)?; + return Ok(Utc.timestamp_nanos(block.header.timestamp_nanosec as i64)); + } + + Err(e) => { + if e.root_cause() + .downcast_ref::() + .is_some() + { + retry_count += 1; + if retry_count > MAX_S3_RETRY_COUNT { + anyhow::bail!("Exceeded maximum retries to fetch block from S3"); + } + + tracing::debug!( + "Block {} not found on S3, attempting to fetch next block", + current_block_height + ); + current_block_height += 1; + continue; + } + + return Err(e).context("Failed to fetch block from S3"); + } + } + } + } + + fn next_day(date: DateTime) -> DateTime { + date + Duration::days(1) + } + + fn parse_contract_pattern(&self, contract_pattern: &str) -> ContractPatternType { + let trimmed_contract_pattern: String = contract_pattern + .chars() + .filter(|c| !c.is_whitespace()) + .collect(); + if contract_pattern.chars().any(|c| c == '*') { + let wildcard_pattern = trimmed_contract_pattern + .replace(',', "|") + .replace('.', "\\.") + .replace('*', ".*"); + return ContractPatternType::Wildcard(wildcard_pattern); + } + + let exact_pattern = trimmed_contract_pattern + .split(',') + .map(str::to_string) + .collect(); + ContractPatternType::Exact(exact_pattern) + } + + fn generate_block_height_stream(&self) -> impl Stream { stream! { for i in 0..3 { yield i; } } } + + pub async fn list_matching_block_heights( + &self, + start_block_height: near_indexer_primitives::types::BlockHeight, + contract_pattern: &str, + ) -> anyhow::Result> { + let start_date = self.get_nearest_block_date(start_block_height).await?; + let contract_pattern_type = self.parse_contract_pattern(contract_pattern); + + Ok(self.generate_block_height_stream().boxed()) + } } #[cfg(test)] @@ -42,7 +132,64 @@ mod tests { const HASURA_ENDPOINT: &str = "https://queryapi-hasura-graphql-mainnet-vcqilefdcq-ew.a.run.app/v1/graphql"; + #[test] + fn parse_exact_contract_patterns() { + let mock_s3_client = crate::s3_client::S3Client::default(); + let block_height_stream = + BlockHeightStream::new(HASURA_ENDPOINT.to_owned(), mock_s3_client); + let sample_patterns = vec![ + "near", + "near, someone.tg", + "a.near, b.near, a.b, a.b.c.near", + ]; + + assert_eq!( + block_height_stream.parse_contract_pattern(sample_patterns[0]), + ContractPatternType::Exact(vec!["near".to_string()]) + ); + assert_eq!( + block_height_stream.parse_contract_pattern(sample_patterns[1]), + ContractPatternType::Exact(vec!["near".to_string(), "someone.tg".to_string()],) + ); + assert_eq!( + block_height_stream.parse_contract_pattern(sample_patterns[2]), + ContractPatternType::Exact(vec![ + "a.near".to_string(), + "b.near".to_string(), + "a.b".to_string(), + "a.b.c.near".to_string(), + ]) + ); + } + + #[test] + fn parse_wildcard_contract_patterns() { + let mock_s3_client = crate::s3_client::S3Client::default(); + let block_height_stream = + BlockHeightStream::new(HASURA_ENDPOINT.to_owned(), mock_s3_client); + let sample_patterns = vec![ + "*.near", + "near, someone.*.tg", + "a.near, b.*, *.b, a.*.c.near", + ]; + + assert_eq!( + block_height_stream.parse_contract_pattern(sample_patterns[0]), + ContractPatternType::Wildcard(".*\\.near".to_string()) + ); + assert_eq!( + block_height_stream.parse_contract_pattern(sample_patterns[1]), + ContractPatternType::Wildcard("near|someone\\..*\\.tg".to_string()) + ); + assert_eq!( + block_height_stream.parse_contract_pattern(sample_patterns[2]), + ContractPatternType::Wildcard("a\\.near|b\\..*|.*\\.b|a\\..*\\.c\\.near".to_string()) + ); + } + + #[test] fn collect_three_block_heights_from_one_bitmap() {} + #[test] fn collect_three_block_heights_from_two_bitmaps() {} } From 0fa8afe415c6e161c355cb2b1d1192f466e3fb2a Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 5 Jun 2024 20:04:26 -0700 Subject: [PATCH 03/20] Naive implementation of stream --- block-streamer/src/bitmap.rs | 8 ++--- block-streamer/src/block_height_stream.rs | 39 +++++++++++++++++++---- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/block-streamer/src/bitmap.rs b/block-streamer/src/bitmap.rs index bac3dd735..6b90f5f18 100644 --- a/block-streamer/src/bitmap.rs +++ b/block-streamer/src/bitmap.rs @@ -26,19 +26,19 @@ pub struct BitmapOperator {} impl Base64Bitmap { pub fn from_exact_query( - query_item: get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex, + query_item: &get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex, ) -> Self { Self { - base64: query_item.bitmap, + base64: query_item.bitmap.clone(), start_block_height: usize::try_from(query_item.first_block_height).unwrap(), } } pub fn from_wildcard_query( - query_item: get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex, + query_item: &get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex, ) -> Self { Self { - base64: query_item.bitmap, + base64: query_item.bitmap.clone(), start_block_height: usize::try_from(query_item.first_block_height).unwrap(), } } diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs index 9d53ff72b..94988f1be 100644 --- a/block-streamer/src/block_height_stream.rs +++ b/block-streamer/src/block_height_stream.rs @@ -81,7 +81,7 @@ impl BlockHeightStream { } } - fn next_day(date: DateTime) -> DateTime { + fn next_day(&self, date: DateTime) -> DateTime { date + Duration::days(1) } @@ -105,23 +105,50 @@ impl BlockHeightStream { ContractPatternType::Exact(exact_pattern) } - fn generate_block_height_stream(&self) -> impl Stream { + fn generate_block_height_stream( + &'static self, + start_date: DateTime, + contract_pattern_type: ContractPatternType, + ) -> impl Stream { stream! { - for i in 0..3 { - yield i; + let mut current_date = start_date; + while current_date <= Utc::now() { + let current_date_string = current_date.format("%m-%d-%Y").to_string(); + let bitmaps_from_query: Vec = match contract_pattern_type { + // TODO: Implement pagination of query + ContractPatternType::Exact(ref pattern) => { + let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), current_date_string, 1000, 0).await.unwrap(); + query_result.iter().map(|result_item| Base64Bitmap::from_exact_query(result_item)).collect() + }, + ContractPatternType::Wildcard(ref pattern) => { + let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), current_date_string, 1000, 0).await.unwrap(); + query_result.iter().map(|result_item| Base64Bitmap::from_wildcard_query(result_item)).collect() + + }, + }; + let starting_block_height = bitmaps_from_query.iter().map(|item| item.start_block_height).min().unwrap(); + let bitmap_for_day = self.bitmap_operator.merge_bitmaps(&bitmaps_from_query, starting_block_height).unwrap(); + for index in 0..bitmap_for_day.bitmap.len() { + if self.bitmap_operator.get_bit(&bitmap_for_day.bitmap, index) { + yield starting_block_height + index; + } + } + current_date = self.next_day(current_date); } } } pub async fn list_matching_block_heights( - &self, + &'static self, start_block_height: near_indexer_primitives::types::BlockHeight, contract_pattern: &str, ) -> anyhow::Result> { let start_date = self.get_nearest_block_date(start_block_height).await?; let contract_pattern_type = self.parse_contract_pattern(contract_pattern); - Ok(self.generate_block_height_stream().boxed()) + Ok(self + .generate_block_height_stream(start_date, contract_pattern_type) + .boxed()) } } From 7cb10fd9ecbdc3e9b7aeda9c137dedd4a040bb25 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Thu, 6 Jun 2024 19:00:22 -0700 Subject: [PATCH 04/20] Ran successfully with actual query --- block-streamer/src/block_height_stream.rs | 109 +++++++++++++++++----- 1 file changed, 88 insertions(+), 21 deletions(-) diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs index 94988f1be..7d8978c93 100644 --- a/block-streamer/src/block_height_stream.rs +++ b/block-streamer/src/block_height_stream.rs @@ -105,50 +105,51 @@ impl BlockHeightStream { ContractPatternType::Exact(exact_pattern) } - fn generate_block_height_stream( - &'static self, + fn generate_block_height_stream<'a>( + &'a self, start_date: DateTime, contract_pattern_type: ContractPatternType, - ) -> impl Stream { - stream! { + ) -> BoxStream<'a, usize> { + Box::pin(stream! { let mut current_date = start_date; while current_date <= Utc::now() { - let current_date_string = current_date.format("%m-%d-%Y").to_string(); + let current_date_string = current_date.format("%Y-%m-%d").to_string(); let bitmaps_from_query: Vec = match contract_pattern_type { // TODO: Implement pagination of query ContractPatternType::Exact(ref pattern) => { - let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), current_date_string, 1000, 0).await.unwrap(); + let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), current_date_string.clone(), 100, 0).await.unwrap(); query_result.iter().map(|result_item| Base64Bitmap::from_exact_query(result_item)).collect() }, ContractPatternType::Wildcard(ref pattern) => { - let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), current_date_string, 1000, 0).await.unwrap(); + let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), current_date_string.clone(), 100, 0).await.unwrap(); query_result.iter().map(|result_item| Base64Bitmap::from_wildcard_query(result_item)).collect() }, }; - let starting_block_height = bitmaps_from_query.iter().map(|item| item.start_block_height).min().unwrap(); - let bitmap_for_day = self.bitmap_operator.merge_bitmaps(&bitmaps_from_query, starting_block_height).unwrap(); - for index in 0..bitmap_for_day.bitmap.len() { - if self.bitmap_operator.get_bit(&bitmap_for_day.bitmap, index) { - yield starting_block_height + index; + println!("date: {}, num of matched receivers: {}", ¤t_date_string, bitmaps_from_query.len()); + if !bitmaps_from_query.is_empty() { + let starting_block_height = bitmaps_from_query.iter().map(|item| item.start_block_height).min().unwrap(); + let bitmap_for_day = self.bitmap_operator.merge_bitmaps(&bitmaps_from_query, starting_block_height).unwrap(); + for index in 0..(bitmap_for_day.bitmap.len() * 8) { + if self.bitmap_operator.get_bit(&bitmap_for_day.bitmap, index) { + yield starting_block_height + index; + } } } current_date = self.next_day(current_date); } - } + }) } - pub async fn list_matching_block_heights( - &'static self, + pub async fn list_matching_block_heights<'a>( + &'a self, start_block_height: near_indexer_primitives::types::BlockHeight, contract_pattern: &str, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let start_date = self.get_nearest_block_date(start_block_height).await?; let contract_pattern_type = self.parse_contract_pattern(contract_pattern); - Ok(self - .generate_block_height_stream(start_date, contract_pattern_type) - .boxed()) + Ok(self.generate_block_height_stream(start_date, contract_pattern_type)) } } @@ -159,6 +160,57 @@ mod tests { const HASURA_ENDPOINT: &str = "https://queryapi-hasura-graphql-mainnet-vcqilefdcq-ew.a.run.app/v1/graphql"; + fn generate_block_with_timestamp(date: &str) -> String { + let naive_date = chrono::NaiveDate::parse_from_str(date, "%Y-%m-%d") + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap(); + + let date_time_utc = chrono::Utc.from_utc_datetime(&naive_date).timestamp() * 1_000_000_000; + + format!( + r#"{{ + "author": "someone", + "header": {{ + "approvals": [], + "block_merkle_root": "ERiC7AJ2zbVz1HJHThR5NWDDN9vByhwdjcVfivmpY5B", + "block_ordinal": 92102682, + "challenges_result": [], + "challenges_root": "11111111111111111111111111111111", + "chunk_headers_root": "MDiJxDyvUQaZRKmUwa5jgQuV6XjwVvnm4tDrajCxwvz", + "chunk_mask": [], + "chunk_receipts_root": "n84wEo7kTKTCJsyqBZ2jndhjrAMeJAXMwKvnJR7vCuy", + "chunk_tx_root": "D8j64GMKBMvUfvnuHtWUyDtMHM5mJ2pA4G5VmYYJvo5G", + "chunks_included": 4, + "epoch_id": "2RMQiomr6CSSwUWpmB62YohxHbfadrHfcsaa3FVb4J9x", + "epoch_sync_data_hash": null, + "gas_price": "100000000", + "hash": "FA1z9RVm9fX3g3mgP3NToZGwWeeXYn8bvZs4nwwTgCpD", + "height": 102162333, + "last_ds_final_block": "Ax2a3MSYuv2hgybnCbpNJMdYmPrHDHdA2hHTUrBkD915", + "last_final_block": "8xkwjn6Lb6UhMBhxcbVQBf3318GafkdaXoHA8Jako1nn", + "latest_protocol_version": 62, + "next_bp_hash": "dmW84aEj2iVJMLwJodJwTfAyeA1LJaHEthvnoAsvTPt", + "next_epoch_id": "C9TDDYthANoduoTBZS7WYDsBSe9XCm4M2F9hRoVXVXWY", + "outcome_root": "6WxzWLVp4b4bFbxHzu18apVfXLvHGKY7CHoqD2Eq3TFJ", + "prev_hash": "Ax2a3MSYuv2hgybnCbpNJMdYmPrHDHdA2hHTUrBkD915", + "prev_height": 102162332, + "prev_state_root": "Aq2ndkyDiwroUWN69Ema9hHtnr6dPHoEBRNyfmd8v4gB", + "random_value": "7ruuMyDhGtTkYaCGYMy7PirPiM79DXa8GhVzQW1pHRoz", + "rent_paid": "0", + "signature": "ed25519:5gYYaWHkAEK5etB8tDpw7fmehkoYSprUxKPygaNqmhVDFCMkA1n379AtL1BBkQswLAPxWs1BZvypFnnLvBtHRknm", + "timestamp": 1695921400989555700, + "timestamp_nanosec": "{}", + "total_supply": "1155783047679681223245725102954966", + "validator_proposals": [], + "validator_reward": "0" + }}, + "chunks": [] + }}"#, + date_time_utc + ) + } + #[test] fn parse_exact_contract_patterns() { let mock_s3_client = crate::s3_client::S3Client::default(); @@ -214,8 +266,23 @@ mod tests { ); } - #[test] - fn collect_three_block_heights_from_one_bitmap() {} + #[tokio::test] + async fn collect_three_block_heights_from_one_bitmap() { + let mut mock_s3_client = crate::s3_client::S3Client::default(); + mock_s3_client + .expect_get_text_file() + .returning(|_, _| Ok(generate_block_with_timestamp("2024-06-07"))); + let block_height_stream = + BlockHeightStream::new(HASURA_ENDPOINT.to_string(), mock_s3_client); + + let mut stream = block_height_stream + .list_matching_block_heights(120200447, "*.paras.near") + .await + .unwrap(); + while let Some(height) = stream.next().await { + println!("Block Height: {}", height); + } + } #[test] fn collect_three_block_heights_from_two_bitmaps() {} From 7e5d61a66ad861d510897a030564e12e051a5484 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Fri, 7 Jun 2024 13:12:02 -0700 Subject: [PATCH 05/20] Add automocking for dependent classes --- block-streamer/src/bitmap.rs | 53 +++++++++++++---------- block-streamer/src/block_height_stream.rs | 33 ++++++++++---- block-streamer/src/graphql/client.rs | 13 ++++-- 3 files changed, 62 insertions(+), 37 deletions(-) diff --git a/block-streamer/src/bitmap.rs b/block-streamer/src/bitmap.rs index 6b90f5f18..b7c90b6d8 100644 --- a/block-streamer/src/bitmap.rs +++ b/block-streamer/src/bitmap.rs @@ -10,20 +10,6 @@ pub struct Base64Bitmap { pub base64: String, } -#[derive(Debug, Default, PartialEq)] -pub struct Bitmap { - pub start_block_height: usize, - pub bitmap: Vec, -} - -#[derive(Default)] -struct EliasGammaDecoded { - pub value: usize, - pub last_bit_index: usize, -} - -pub struct BitmapOperator {} - impl Base64Bitmap { pub fn from_exact_query( query_item: &get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex, @@ -44,8 +30,27 @@ impl Base64Bitmap { } } +#[derive(Debug, Default, PartialEq)] +pub struct Bitmap { + pub start_block_height: usize, + pub bitmap: Vec, +} + +#[derive(Default)] +struct EliasGammaDecoded { + pub value: usize, + pub last_bit_index: usize, +} + +#[cfg(not(test))] +pub use BitmapOperatorImpl as BitmapOperator; +#[cfg(test)] +pub use MockBitmapOperatorImpl as BitmapOperator; + +pub struct BitmapOperatorImpl {} + #[cfg_attr(test, mockall::automock)] -impl BitmapOperator { +impl BitmapOperatorImpl { pub fn new() -> Self { Self {} } @@ -224,7 +229,7 @@ mod tests { #[test] fn get_bit_from_bytes() { - let operator: BitmapOperator = BitmapOperator::new(); + let operator = BitmapOperatorImpl::new(); let bytes: &[u8; 3] = &[0b00000001, 0b00000000, 0b00001001]; let results: Vec = [7, 8, 9, 15, 19, 20, 22, 23] .iter() @@ -240,7 +245,7 @@ mod tests { #[test] fn set_bit_in_bytes() { - let operator: BitmapOperator = BitmapOperator::new(); + let operator = BitmapOperatorImpl::new(); let correct_bytes: &[u8; 3] = &[0b00000001, 0b00000000, 0b00001001]; let test_bytes: &mut [u8; 3] = &mut [0b10000000, 0b10000000, 0b00001001]; operator.set_bit(test_bytes, 0, false, true); @@ -252,14 +257,14 @@ mod tests { #[test] fn get_unsigned_integer_from_binary_sequence() { - let operator: BitmapOperator = BitmapOperator::new(); + let operator = BitmapOperatorImpl::new(); let bytes: &[u8; 3] = &[0b11111110, 0b10010100, 0b10001101]; assert_eq!(operator.read_integer_from_binary(bytes, 6, 16), 1321); } #[test] fn get_index_of_first_set_bit() { - let operator: BitmapOperator = BitmapOperator::new(); + let operator = BitmapOperatorImpl::new(); let bytes: &[u8; 4] = &[0b00000001, 0b10000000, 0b00000001, 0b00000000]; assert_eq!( operator.index_of_first_set_bit(bytes, 4).unwrap(), @@ -289,7 +294,7 @@ mod tests { #[test] fn decode_elias_gamma() { - let operator: BitmapOperator = BitmapOperator::new(); + let operator = BitmapOperatorImpl::new(); let bytes: &[u8; 2] = &[0b00000000, 0b00110110]; let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 6); assert_eq!(decoded_eg.value, 27); @@ -298,7 +303,7 @@ mod tests { #[test] fn decode_empty_elias_gamma() { - let operator: BitmapOperator = BitmapOperator::new(); + let operator = BitmapOperatorImpl::new(); let bytes: &[u8; 2] = &[0b00000000, 0b00000000]; let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 0); assert_eq!(decoded_eg.value, 0); @@ -307,7 +312,7 @@ mod tests { #[test] fn decode_compressed_bitmap() { - let operator: BitmapOperator = BitmapOperator::new(); + let operator = BitmapOperatorImpl::new(); assert_eq!(operator.decompress_bitmap(&[0b10100000]), &[0b11000000]); assert_eq!(operator.decompress_bitmap(&[0b00100100]), &[0b00110000]); assert_eq!(operator.decompress_bitmap(&[0b10010000]), &[0b11110000]); @@ -342,7 +347,7 @@ mod tests { #[test] fn merge_two_decompressed_bitmaps() { - let operator: BitmapOperator = BitmapOperator::new(); + let operator = BitmapOperatorImpl::new(); let mut base_bitmap: Bitmap = Bitmap { bitmap: vec![0b11001010, 0b10001111], start_block_height: 10, @@ -360,7 +365,7 @@ mod tests { #[test] fn merge_multiple_bitmaps_together() { - let operator: BitmapOperator = BitmapOperator::new(); + let operator = BitmapOperatorImpl::new(); let test_bitmaps_to_merge: Vec = vec![ Base64Bitmap { base64: "oA==".to_string(), // Decompresses to 11000000 diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs index 7d8978c93..a98366d6d 100644 --- a/block-streamer/src/block_height_stream.rs +++ b/block-streamer/src/block_height_stream.rs @@ -16,7 +16,12 @@ enum ContractPatternType { Wildcard(String), } -pub struct BlockHeightStream { +#[cfg(not(test))] +pub use BlockHeightStreamImpl as BlockHeightStream; +#[cfg(test)] +pub use MockBlockHeightStreamImpl as BlockHeightStream; + +pub struct BlockHeightStreamImpl { graphql_client: GraphQLClient, bitmap_operator: BitmapOperator, s3_client: crate::s3_client::S3Client, @@ -24,13 +29,17 @@ pub struct BlockHeightStream { } #[cfg_attr(test, mockall::automock)] -impl BlockHeightStream { - pub fn new(graphql_endpoint: String, s3_client: crate::s3_client::S3Client) -> Self { +impl BlockHeightStreamImpl { + pub fn new( + graphql_client: GraphQLClient, + bitmap_operator: BitmapOperator, + s3_client: crate::s3_client::S3Client, + ) -> Self { Self { - graphql_client: GraphQLClient::new(graphql_endpoint), - bitmap_operator: BitmapOperator::new(), + graphql_client, + bitmap_operator, s3_client, - chain_id: ChainId::Mainnet, // Hardcoded mainnet for now + chain_id: ChainId::Mainnet, } } @@ -214,8 +223,10 @@ mod tests { #[test] fn parse_exact_contract_patterns() { let mock_s3_client = crate::s3_client::S3Client::default(); + let mock_graphql_client = crate::graphql::client::GraphQLClient::default(); + let mock_bitmap_operator = crate::bitmap::BitmapOperator::default(); let block_height_stream = - BlockHeightStream::new(HASURA_ENDPOINT.to_owned(), mock_s3_client); + BlockHeightStreamImpl::new(mock_graphql_client, mock_bitmap_operator, mock_s3_client); let sample_patterns = vec![ "near", "near, someone.tg", @@ -244,8 +255,10 @@ mod tests { #[test] fn parse_wildcard_contract_patterns() { let mock_s3_client = crate::s3_client::S3Client::default(); + let mock_graphql_client = crate::graphql::client::GraphQLClient::default(); + let mock_bitmap_operator = crate::bitmap::BitmapOperator::default(); let block_height_stream = - BlockHeightStream::new(HASURA_ENDPOINT.to_owned(), mock_s3_client); + BlockHeightStreamImpl::new(mock_graphql_client, mock_bitmap_operator, mock_s3_client); let sample_patterns = vec![ "*.near", "near, someone.*.tg", @@ -272,8 +285,10 @@ mod tests { mock_s3_client .expect_get_text_file() .returning(|_, _| Ok(generate_block_with_timestamp("2024-06-07"))); + let mock_graphql_client = crate::graphql::client::GraphQLClient::default(); + let mock_bitmap_operator = crate::bitmap::BitmapOperator::default(); let block_height_stream = - BlockHeightStream::new(HASURA_ENDPOINT.to_string(), mock_s3_client); + BlockHeightStreamImpl::new(mock_graphql_client, mock_bitmap_operator, mock_s3_client); let mut stream = block_height_stream .list_matching_block_heights(120200447, "*.paras.near") diff --git a/block-streamer/src/graphql/client.rs b/block-streamer/src/graphql/client.rs index 09c37e8dd..518cf3ed6 100644 --- a/block-streamer/src/graphql/client.rs +++ b/block-streamer/src/graphql/client.rs @@ -25,13 +25,18 @@ pub struct GetBitmapsExact; )] pub struct GetBitmapsWildcard; -pub struct GraphQLClient { +#[cfg(not(test))] +pub use GraphQLClientImpl as GraphQLClient; +#[cfg(test)] +pub use MockGraphQLClientImpl as GraphQLClient; + +pub struct GraphQLClientImpl { client: reqwest::Client, graphql_endpoint: String, } #[cfg_attr(test, mockall::automock)] -impl GraphQLClient { +impl GraphQLClientImpl { pub fn new(graphql_endpoint: String) -> Self { Self { client: reqwest::Client::new(), @@ -106,7 +111,7 @@ mod tests { #[tokio::test] async fn test_get_bitmaps_exact() { - let client = GraphQLClient::new(HASURA_ENDPOINT.to_string()); + let client = GraphQLClientImpl::new(HASURA_ENDPOINT.to_string()); let receiver_ids = vec!["app.nearcrowd.near".to_string()]; let block_date = "2024-03-21".to_string(); let limit = 10; @@ -122,7 +127,7 @@ mod tests { #[ignore] #[tokio::test] async fn test_get_bitmaps_wildcard() { - let client = GraphQLClient::new(HASURA_ENDPOINT.to_string()); + let client = GraphQLClientImpl::new(HASURA_ENDPOINT.to_string()); let receiver_ids = "app.nearcrowd.near".to_string(); let block_date = "2024-03-21".to_string(); let limit = 10; From a52c845cdef8834ab3c9cff7e2a961d1364e9449 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Fri, 7 Jun 2024 17:33:24 -0700 Subject: [PATCH 06/20] Finish basic unit tests of block height stream --- block-streamer/src/bitmap.rs | 28 ++-- block-streamer/src/block_height_stream.rs | 153 ++++++++++++++++++++-- block-streamer/src/graphql/client.rs | 4 +- 3 files changed, 152 insertions(+), 33 deletions(-) diff --git a/block-streamer/src/bitmap.rs b/block-streamer/src/bitmap.rs index b7c90b6d8..17bc0bfdb 100644 --- a/block-streamer/src/bitmap.rs +++ b/block-streamer/src/bitmap.rs @@ -42,15 +42,9 @@ struct EliasGammaDecoded { pub last_bit_index: usize, } -#[cfg(not(test))] -pub use BitmapOperatorImpl as BitmapOperator; -#[cfg(test)] -pub use MockBitmapOperatorImpl as BitmapOperator; - -pub struct BitmapOperatorImpl {} +pub struct BitmapOperator {} -#[cfg_attr(test, mockall::automock)] -impl BitmapOperatorImpl { +impl BitmapOperator { pub fn new() -> Self { Self {} } @@ -229,7 +223,7 @@ mod tests { #[test] fn get_bit_from_bytes() { - let operator = BitmapOperatorImpl::new(); + let operator = BitmapOperator::new(); let bytes: &[u8; 3] = &[0b00000001, 0b00000000, 0b00001001]; let results: Vec = [7, 8, 9, 15, 19, 20, 22, 23] .iter() @@ -245,7 +239,7 @@ mod tests { #[test] fn set_bit_in_bytes() { - let operator = BitmapOperatorImpl::new(); + let operator = BitmapOperator::new(); let correct_bytes: &[u8; 3] = &[0b00000001, 0b00000000, 0b00001001]; let test_bytes: &mut [u8; 3] = &mut [0b10000000, 0b10000000, 0b00001001]; operator.set_bit(test_bytes, 0, false, true); @@ -257,14 +251,14 @@ mod tests { #[test] fn get_unsigned_integer_from_binary_sequence() { - let operator = BitmapOperatorImpl::new(); + let operator = BitmapOperator::new(); let bytes: &[u8; 3] = &[0b11111110, 0b10010100, 0b10001101]; assert_eq!(operator.read_integer_from_binary(bytes, 6, 16), 1321); } #[test] fn get_index_of_first_set_bit() { - let operator = BitmapOperatorImpl::new(); + let operator = BitmapOperator::new(); let bytes: &[u8; 4] = &[0b00000001, 0b10000000, 0b00000001, 0b00000000]; assert_eq!( operator.index_of_first_set_bit(bytes, 4).unwrap(), @@ -294,7 +288,7 @@ mod tests { #[test] fn decode_elias_gamma() { - let operator = BitmapOperatorImpl::new(); + let operator = BitmapOperator::new(); let bytes: &[u8; 2] = &[0b00000000, 0b00110110]; let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 6); assert_eq!(decoded_eg.value, 27); @@ -303,7 +297,7 @@ mod tests { #[test] fn decode_empty_elias_gamma() { - let operator = BitmapOperatorImpl::new(); + let operator = BitmapOperator::new(); let bytes: &[u8; 2] = &[0b00000000, 0b00000000]; let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 0); assert_eq!(decoded_eg.value, 0); @@ -312,7 +306,7 @@ mod tests { #[test] fn decode_compressed_bitmap() { - let operator = BitmapOperatorImpl::new(); + let operator = BitmapOperator::new(); assert_eq!(operator.decompress_bitmap(&[0b10100000]), &[0b11000000]); assert_eq!(operator.decompress_bitmap(&[0b00100100]), &[0b00110000]); assert_eq!(operator.decompress_bitmap(&[0b10010000]), &[0b11110000]); @@ -347,7 +341,7 @@ mod tests { #[test] fn merge_two_decompressed_bitmaps() { - let operator = BitmapOperatorImpl::new(); + let operator = BitmapOperator::new(); let mut base_bitmap: Bitmap = Bitmap { bitmap: vec![0b11001010, 0b10001111], start_block_height: 10, @@ -365,7 +359,7 @@ mod tests { #[test] fn merge_multiple_bitmaps_together() { - let operator = BitmapOperatorImpl::new(); + let operator = BitmapOperator::new(); let test_bitmaps_to_merge: Vec = vec![ Base64Bitmap { base64: "oA==".to_string(), // Decompresses to 11000000 diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs index a98366d6d..5cf3579fe 100644 --- a/block-streamer/src/block_height_stream.rs +++ b/block-streamer/src/block_height_stream.rs @@ -165,10 +165,32 @@ impl BlockHeightStreamImpl { #[cfg(test)] mod tests { use super::*; + use mockall::predicate; const HASURA_ENDPOINT: &str = "https://queryapi-hasura-graphql-mainnet-vcqilefdcq-ew.a.run.app/v1/graphql"; + fn exact_query_result( + first_block_height: i64, + bitmap: &str, + ) -> crate::graphql::client::get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex + { + crate::graphql::client::get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex { + first_block_height, + bitmap: bitmap.to_string(), + } + } + + fn wildcard_query_result( + first_block_height: i64, + bitmap: &str + ) -> crate::graphql::client::get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex{ + crate::graphql::client::get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex { + first_block_height, + bitmap: bitmap.to_string(), + } + } + fn generate_block_with_timestamp(date: &str) -> String { let naive_date = chrono::NaiveDate::parse_from_str(date, "%Y-%m-%d") .unwrap() @@ -224,9 +246,9 @@ mod tests { fn parse_exact_contract_patterns() { let mock_s3_client = crate::s3_client::S3Client::default(); let mock_graphql_client = crate::graphql::client::GraphQLClient::default(); - let mock_bitmap_operator = crate::bitmap::BitmapOperator::default(); + let bitmap_operator = crate::bitmap::BitmapOperator::new(); let block_height_stream = - BlockHeightStreamImpl::new(mock_graphql_client, mock_bitmap_operator, mock_s3_client); + BlockHeightStreamImpl::new(mock_graphql_client, bitmap_operator, mock_s3_client); let sample_patterns = vec![ "near", "near, someone.tg", @@ -256,9 +278,9 @@ mod tests { fn parse_wildcard_contract_patterns() { let mock_s3_client = crate::s3_client::S3Client::default(); let mock_graphql_client = crate::graphql::client::GraphQLClient::default(); - let mock_bitmap_operator = crate::bitmap::BitmapOperator::default(); + let bitmap_operator = crate::bitmap::BitmapOperator::new(); let block_height_stream = - BlockHeightStreamImpl::new(mock_graphql_client, mock_bitmap_operator, mock_s3_client); + BlockHeightStreamImpl::new(mock_graphql_client, bitmap_operator, mock_s3_client); let sample_patterns = vec![ "*.near", "near, someone.*.tg", @@ -280,25 +302,128 @@ mod tests { } #[tokio::test] - async fn collect_three_block_heights_from_one_bitmap() { + async fn collect_block_heights_from_one_day() { let mut mock_s3_client = crate::s3_client::S3Client::default(); mock_s3_client .expect_get_text_file() - .returning(|_, _| Ok(generate_block_with_timestamp("2024-06-07"))); - let mock_graphql_client = crate::graphql::client::GraphQLClient::default(); - let mock_bitmap_operator = crate::bitmap::BitmapOperator::default(); - let block_height_stream = - BlockHeightStreamImpl::new(mock_graphql_client, mock_bitmap_operator, mock_s3_client); + .returning(move |_, _| { + Ok(generate_block_with_timestamp( + &Utc::now().format("%Y-%m-%d").to_string(), + )) + }); + + let mut mock_graphql_client = crate::graphql::client::GraphQLClient::default(); + let mock_query_result_item = exact_query_result(1, "wA=="); + let mock_query_result = vec![mock_query_result_item]; + mock_graphql_client + .expect_get_bitmaps_exact() + .with( + predicate::eq(vec!["someone.near".to_string()]), + predicate::eq(Utc::now().format("%Y-%m-%d").to_string()), + predicate::eq(100), + predicate::eq(0), + ) + .times(1) + .returning(move |_, _, _, _| Ok(mock_query_result.clone())); + + let block_height_stream = BlockHeightStreamImpl::new( + mock_graphql_client, + crate::bitmap::BitmapOperator::new(), + mock_s3_client, + ); let mut stream = block_height_stream - .list_matching_block_heights(120200447, "*.paras.near") + .list_matching_block_heights(0, "someone.near") .await .unwrap(); + let mut result_heights = vec![]; while let Some(height) = stream.next().await { - println!("Block Height: {}", height); + result_heights.push(height); } + assert_eq!(result_heights, vec![1]); } - #[test] - fn collect_three_block_heights_from_two_bitmaps() {} + #[tokio::test] + async fn collect_block_heights_from_past_three_days() { + let mut mock_s3_client = crate::s3_client::S3Client::default(); + mock_s3_client + .expect_get_text_file() + .returning(move |_, _| { + Ok(generate_block_with_timestamp( + &(Utc::now() - Duration::days(2)) + .format("%Y-%m-%d") + .to_string(), + )) + }); + + let mut mock_graphql_client = crate::graphql::client::GraphQLClient::default(); + mock_graphql_client + .expect_get_bitmaps_wildcard() + .with( + predicate::eq(".*\\.someone\\.near".to_string()), + predicate::eq( + (Utc::now() - Duration::days(2)) + .format("%Y-%m-%d") + .to_string(), + ), + predicate::eq(100), + predicate::eq(0), + ) + .times(1) + .returning(move |_, _, _, _| { + Ok(vec![ + wildcard_query_result(1, "wA=="), + wildcard_query_result(5, "wA=="), + ]) + }); + mock_graphql_client + .expect_get_bitmaps_wildcard() + .with( + predicate::eq(".*\\.someone\\.near".to_string()), + predicate::eq( + (Utc::now() - Duration::days(1)) + .format("%Y-%m-%d") + .to_string(), + ), + predicate::eq(100), + predicate::eq(0), + ) + .times(1) + .returning(move |_, _, _, _| { + Ok(vec![ + wildcard_query_result(10, "wA=="), + wildcard_query_result(15, "wA=="), + ]) + }); + mock_graphql_client + .expect_get_bitmaps_wildcard() + .with( + predicate::eq(".*\\.someone\\.near".to_string()), + predicate::eq(Utc::now().format("%Y-%m-%d").to_string()), + predicate::eq(100), + predicate::eq(0), + ) + .times(1) + .returning(move |_, _, _, _| { + Ok(vec![ + wildcard_query_result(100, "wA=="), + wildcard_query_result(105, "wA=="), + ]) + }); + let block_height_stream = BlockHeightStreamImpl::new( + mock_graphql_client, + crate::bitmap::BitmapOperator::new(), + mock_s3_client, + ); + + let mut stream = block_height_stream + .list_matching_block_heights(0, "*.someone.near") + .await + .unwrap(); + let mut result_heights = vec![]; + while let Some(height) = stream.next().await { + result_heights.push(height); + } + assert_eq!(result_heights, vec![1, 5, 10, 15, 100, 105]); + } } diff --git a/block-streamer/src/graphql/client.rs b/block-streamer/src/graphql/client.rs index 518cf3ed6..c02d9b756 100644 --- a/block-streamer/src/graphql/client.rs +++ b/block-streamer/src/graphql/client.rs @@ -11,7 +11,7 @@ type Date = String; #[graphql( schema_path = "graphql/darunrs_near/schema.graphql", query_path = "graphql/darunrs_near/get_bitmaps_exact.graphql", - response_derives = "Debug", + response_derives = "Debug,Clone", normalization = "rust" )] pub struct GetBitmapsExact; @@ -20,7 +20,7 @@ pub struct GetBitmapsExact; #[graphql( schema_path = "graphql/darunrs_near/schema.graphql", query_path = "graphql/darunrs_near/get_bitmaps_wildcard.graphql", - response_derives = "Debug", + response_derives = "Debug,Clone", normalization = "rust" )] pub struct GetBitmapsWildcard; From 722d26a23640ea5697d25864c51c0a9d0096b08c Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Fri, 7 Jun 2024 18:01:17 -0700 Subject: [PATCH 07/20] Paginate graphQL queries --- block-streamer/src/block_height_stream.rs | 21 ++---- block-streamer/src/graphql/client.rs | 83 +++++++++++++++-------- 2 files changed, 59 insertions(+), 45 deletions(-) diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs index 5cf3579fe..0051f8ab8 100644 --- a/block-streamer/src/block_height_stream.rs +++ b/block-streamer/src/block_height_stream.rs @@ -124,13 +124,12 @@ impl BlockHeightStreamImpl { while current_date <= Utc::now() { let current_date_string = current_date.format("%Y-%m-%d").to_string(); let bitmaps_from_query: Vec = match contract_pattern_type { - // TODO: Implement pagination of query ContractPatternType::Exact(ref pattern) => { - let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), current_date_string.clone(), 100, 0).await.unwrap(); + let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), current_date_string.clone()).await.unwrap(); query_result.iter().map(|result_item| Base64Bitmap::from_exact_query(result_item)).collect() }, ContractPatternType::Wildcard(ref pattern) => { - let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), current_date_string.clone(), 100, 0).await.unwrap(); + let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), current_date_string.clone()).await.unwrap(); query_result.iter().map(|result_item| Base64Bitmap::from_wildcard_query(result_item)).collect() }, @@ -320,11 +319,9 @@ mod tests { .with( predicate::eq(vec!["someone.near".to_string()]), predicate::eq(Utc::now().format("%Y-%m-%d").to_string()), - predicate::eq(100), - predicate::eq(0), ) .times(1) - .returning(move |_, _, _, _| Ok(mock_query_result.clone())); + .returning(move |_, _| Ok(mock_query_result.clone())); let block_height_stream = BlockHeightStreamImpl::new( mock_graphql_client, @@ -366,11 +363,9 @@ mod tests { .format("%Y-%m-%d") .to_string(), ), - predicate::eq(100), - predicate::eq(0), ) .times(1) - .returning(move |_, _, _, _| { + .returning(move |_, _| { Ok(vec![ wildcard_query_result(1, "wA=="), wildcard_query_result(5, "wA=="), @@ -385,11 +380,9 @@ mod tests { .format("%Y-%m-%d") .to_string(), ), - predicate::eq(100), - predicate::eq(0), ) .times(1) - .returning(move |_, _, _, _| { + .returning(move |_, _| { Ok(vec![ wildcard_query_result(10, "wA=="), wildcard_query_result(15, "wA=="), @@ -400,11 +393,9 @@ mod tests { .with( predicate::eq(".*\\.someone\\.near".to_string()), predicate::eq(Utc::now().format("%Y-%m-%d").to_string()), - predicate::eq(100), - predicate::eq(0), ) .times(1) - .returning(move |_, _, _, _| { + .returning(move |_, _| { Ok(vec![ wildcard_query_result(100, "wA=="), wildcard_query_result(105, "wA=="), diff --git a/block-streamer/src/graphql/client.rs b/block-streamer/src/graphql/client.rs index c02d9b756..4cd853c69 100644 --- a/block-streamer/src/graphql/client.rs +++ b/block-streamer/src/graphql/client.rs @@ -3,6 +3,7 @@ use graphql_client::{GraphQLQuery, Response}; // TODO: Use Dataplatform account const HASURA_ACCOUNT: &str = "darunrs_near"; +const QUERY_LIMIT: i64 = 1000; #[allow(clippy::upper_case_acronyms)] type Date = String; @@ -64,40 +65,66 @@ impl GraphQLClientImpl { &self, receiver_ids: Vec, block_date: String, - limit: i64, - offset: i64, ) -> anyhow::Result> { - self.post_graphql::(get_bitmaps_exact::Variables { - receiver_ids: Some(receiver_ids), - block_date: Some(block_date), - limit: Some(limit), - offset: Some(offset), - }) - .await? - .data - .ok_or(anyhow::anyhow!("No bitmaps were returned")) - .map(|data| data.darunrs_near_bitmap_v5_actions_index) + let mut all_query_results: Vec< + get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex, + > = vec![]; + let mut offset = 0; + let mut has_more = true; + while has_more { + let mut query_result = self + .post_graphql::(get_bitmaps_exact::Variables { + receiver_ids: Some(receiver_ids.clone()), + block_date: Some(block_date.clone()), + limit: Some(QUERY_LIMIT), + offset: Some(offset), + }) + .await? + .data + .ok_or(anyhow::anyhow!("No bitmaps were returned")) + .map(|data| data.darunrs_near_bitmap_v5_actions_index)?; + + has_more = query_result.len() >= QUERY_LIMIT as usize; + offset += QUERY_LIMIT; + + all_query_results.append(&mut query_result); + } + + Ok(all_query_results) } pub async fn get_bitmaps_wildcard( &self, receiver_ids: String, block_date: String, - limit: i64, - offset: i64, ) -> anyhow::Result> { - self.post_graphql::(get_bitmaps_wildcard::Variables { - receiver_ids: Some(receiver_ids), - block_date: Some(block_date), - limit: Some(limit), - offset: Some(offset), - }) - .await? - .data - .ok_or(anyhow::anyhow!("No bitmaps were returned")) - .map(|data| data.darunrs_near_bitmap_v5_actions_index) + let mut all_query_results: Vec< + get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex, + > = vec![]; + let mut offset = 0; + let mut has_more = true; + while has_more { + let mut query_result = self + .post_graphql::(get_bitmaps_wildcard::Variables { + receiver_ids: Some(receiver_ids.clone()), + block_date: Some(block_date.clone()), + limit: Some(QUERY_LIMIT), + offset: Some(offset), + }) + .await? + .data + .ok_or(anyhow::anyhow!("No bitmaps were returned")) + .map(|data| data.darunrs_near_bitmap_v5_actions_index)?; + + has_more = query_result.len() >= QUERY_LIMIT as usize; + offset += QUERY_LIMIT; + + all_query_results.append(&mut query_result); + } + + Ok(all_query_results) } } @@ -114,10 +141,8 @@ mod tests { let client = GraphQLClientImpl::new(HASURA_ENDPOINT.to_string()); let receiver_ids = vec!["app.nearcrowd.near".to_string()]; let block_date = "2024-03-21".to_string(); - let limit = 10; - let offset = 0; let response = client - .get_bitmaps_exact(receiver_ids, block_date, limit, offset) + .get_bitmaps_exact(receiver_ids, block_date) .await .unwrap(); assert_eq!(response[0].first_block_height, 115130287); @@ -130,10 +155,8 @@ mod tests { let client = GraphQLClientImpl::new(HASURA_ENDPOINT.to_string()); let receiver_ids = "app.nearcrowd.near".to_string(); let block_date = "2024-03-21".to_string(); - let limit = 10; - let offset = 0; let response = client - .get_bitmaps_wildcard(receiver_ids, block_date, limit, offset) + .get_bitmaps_wildcard(receiver_ids, block_date) .await .unwrap(); assert_eq!(response[0].first_block_height, 115130287); From bc3e985d40dfd91b54b503bab60b161dfe8f1583 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Fri, 7 Jun 2024 18:51:55 -0700 Subject: [PATCH 08/20] Strip wildcards from root accounts --- block-streamer/Cargo.lock | 1 + block-streamer/Cargo.toml | 1 + block-streamer/src/block_height_stream.rs | 55 ++++++++++++++++++----- coordinator/Cargo.lock | 11 ++--- 4 files changed, 52 insertions(+), 16 deletions(-) diff --git a/block-streamer/Cargo.lock b/block-streamer/Cargo.lock index 19c2d2cae..ce9cb859d 100644 --- a/block-streamer/Cargo.lock +++ b/block-streamer/Cargo.lock @@ -983,6 +983,7 @@ dependencies = [ "prometheus", "prost 0.12.4", "redis", + "regex", "registry-types", "reqwest", "serde", diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index 6e1ee4c82..58fee0c53 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -35,6 +35,7 @@ wildmatch = "2.1.1" registry-types = { path = "../registry/types" } base64 = "0.22.1" async-stream = "0.3.5" +regex = "1.10.4" [build-dependencies] tonic-build = "0.10" diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs index 0051f8ab8..37b8bca31 100644 --- a/block-streamer/src/block_height_stream.rs +++ b/block-streamer/src/block_height_stream.rs @@ -7,6 +7,7 @@ use chrono::{DateTime, Duration, TimeZone, Utc}; use futures::stream::{BoxStream, Stream}; use futures::StreamExt; use near_lake_framework::near_indexer_primitives; +use regex::Regex; const MAX_S3_RETRY_COUNT: u8 = 20; @@ -94,20 +95,38 @@ impl BlockHeightStreamImpl { date + Duration::days(1) } + fn strip_wildcard_if_root_account(&self, receiver_id: String) -> String { + let wildcard_root_account_regex = Regex::new(r"^\*\.([a-zA-Z0-9]+)$").unwrap(); + if wildcard_root_account_regex.is_match(&receiver_id) { + return receiver_id + .split('.') + .nth(1) + .unwrap_or(&receiver_id) + .to_string(); + } + receiver_id + } + fn parse_contract_pattern(&self, contract_pattern: &str) -> ContractPatternType { - let trimmed_contract_pattern: String = contract_pattern - .chars() - .filter(|c| !c.is_whitespace()) - .collect(); - if contract_pattern.chars().any(|c| c == '*') { - let wildcard_pattern = trimmed_contract_pattern + // If receiver_id is of pattern *.SOME_ROOT_ACCOUNT such as *.near, we can reduce this to + // "near" as we store bitmaps for root accounts like near ,tg, and so on. + let cleaned_contract_pattern: String = contract_pattern + .split(',') + .map(|receiver| receiver.trim()) + .map(str::to_string) + .map(|receiver| self.strip_wildcard_if_root_account(receiver)) + .collect::>() + .join(","); + + if cleaned_contract_pattern.chars().any(|c| c == '*') { + let wildcard_pattern = cleaned_contract_pattern .replace(',', "|") .replace('.', "\\.") .replace('*', ".*"); return ContractPatternType::Wildcard(wildcard_pattern); } - let exact_pattern = trimmed_contract_pattern + let exact_pattern = cleaned_contract_pattern .split(',') .map(str::to_string) .collect(); @@ -250,7 +269,9 @@ mod tests { BlockHeightStreamImpl::new(mock_graphql_client, bitmap_operator, mock_s3_client); let sample_patterns = vec![ "near", + "*.near", "near, someone.tg", + "*.near, someone.tg, *.tg", "a.near, b.near, a.b, a.b.c.near", ]; @@ -260,10 +281,22 @@ mod tests { ); assert_eq!( block_height_stream.parse_contract_pattern(sample_patterns[1]), - ContractPatternType::Exact(vec!["near".to_string(), "someone.tg".to_string()],) + ContractPatternType::Exact(vec!["near".to_string()]) ); assert_eq!( block_height_stream.parse_contract_pattern(sample_patterns[2]), + ContractPatternType::Exact(vec!["near".to_string(), "someone.tg".to_string()],) + ); + assert_eq!( + block_height_stream.parse_contract_pattern(sample_patterns[3]), + ContractPatternType::Exact(vec![ + "near".to_string(), + "someone.tg".to_string(), + "tg".to_string() + ],) + ); + assert_eq!( + block_height_stream.parse_contract_pattern(sample_patterns[4]), ContractPatternType::Exact(vec![ "a.near".to_string(), "b.near".to_string(), @@ -281,14 +314,14 @@ mod tests { let block_height_stream = BlockHeightStreamImpl::new(mock_graphql_client, bitmap_operator, mock_s3_client); let sample_patterns = vec![ - "*.near", + "*.someone.near", "near, someone.*.tg", "a.near, b.*, *.b, a.*.c.near", ]; assert_eq!( block_height_stream.parse_contract_pattern(sample_patterns[0]), - ContractPatternType::Wildcard(".*\\.near".to_string()) + ContractPatternType::Wildcard(".*\\.someone\\.near".to_string()) ); assert_eq!( block_height_stream.parse_contract_pattern(sample_patterns[1]), @@ -296,7 +329,7 @@ mod tests { ); assert_eq!( block_height_stream.parse_contract_pattern(sample_patterns[2]), - ContractPatternType::Wildcard("a\\.near|b\\..*|.*\\.b|a\\..*\\.c\\.near".to_string()) + ContractPatternType::Wildcard("a\\.near|b\\..*|b|a\\..*\\.c\\.near".to_string()) ); } diff --git a/coordinator/Cargo.lock b/coordinator/Cargo.lock index 0a95983d9..5b118095c 100644 --- a/coordinator/Cargo.lock +++ b/coordinator/Cargo.lock @@ -942,6 +942,7 @@ dependencies = [ "prometheus", "prost 0.12.3", "redis 0.21.7", + "regex", "registry-types", "reqwest", "serde", @@ -3576,13 +3577,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.2" +version = "1.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.3", + "regex-automata 0.4.6", "regex-syntax 0.8.2", ] @@ -3597,9 +3598,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.3" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" +checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ "aho-corasick", "memchr", From 75409b8df1fb99ea452e3aed7bb89a2e3263e351 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Fri, 7 Jun 2024 18:53:01 -0700 Subject: [PATCH 09/20] Reorder added dependencies --- block-streamer/Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index 58fee0c53..2d529d480 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -6,9 +6,11 @@ edition = "2021" [dependencies] actix-web = "4.5.1" anyhow = "1.0.57" +async-stream = "0.3.5" async-trait = "0.1.74" aws-config = { version = "1.1.3", features = ["behavior-version-latest"] } aws-sdk-s3 = "1.13.0" +base64 = "0.22.1" borsh = "0.10.2" cached = "0.49.3" chrono = "0.4.25" @@ -20,6 +22,7 @@ near-lake-framework = "0.7.8" prometheus = "0.13.3" prost = "0.12.3" redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] } +regex = "1.10.4" reqwest = { version = "^0.11.0", features = ["json"] } serde = { version = "1", features = ["derive"] } serde_json = "1.0.55" @@ -33,9 +36,6 @@ tonic = "0.10.2" wildmatch = "2.1.1" registry-types = { path = "../registry/types" } -base64 = "0.22.1" -async-stream = "0.3.5" -regex = "1.10.4" [build-dependencies] tonic-build = "0.10" From b778236fca31c958703a63b604ff32426e96ea38 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Fri, 7 Jun 2024 19:02:08 -0700 Subject: [PATCH 10/20] Improve error message --- block-streamer/src/graphql/client.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/block-streamer/src/graphql/client.rs b/block-streamer/src/graphql/client.rs index 4cd853c69..6b56f3975 100644 --- a/block-streamer/src/graphql/client.rs +++ b/block-streamer/src/graphql/client.rs @@ -82,7 +82,9 @@ impl GraphQLClientImpl { }) .await? .data - .ok_or(anyhow::anyhow!("No bitmaps were returned")) + .ok_or(anyhow::anyhow!( + "Query response is malformed. Missing data field." + )) .map(|data| data.darunrs_near_bitmap_v5_actions_index)?; has_more = query_result.len() >= QUERY_LIMIT as usize; @@ -115,7 +117,9 @@ impl GraphQLClientImpl { }) .await? .data - .ok_or(anyhow::anyhow!("No bitmaps were returned")) + .ok_or(anyhow::anyhow!( + "Query response is malformed. Missing data field." + )) .map(|data| data.darunrs_near_bitmap_v5_actions_index)?; has_more = query_result.len() >= QUERY_LIMIT as usize; From 71d6c59a64051fe7bdf9fec43d44f46849d1df35 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Mon, 10 Jun 2024 16:54:51 -0700 Subject: [PATCH 11/20] Use From impl instead of custom name --- block-streamer/src/bitmap.rs | 10 +++++++--- block-streamer/src/block_height_stream.rs | 6 ++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/block-streamer/src/bitmap.rs b/block-streamer/src/bitmap.rs index 17bc0bfdb..441de5e06 100644 --- a/block-streamer/src/bitmap.rs +++ b/block-streamer/src/bitmap.rs @@ -10,8 +10,8 @@ pub struct Base64Bitmap { pub base64: String, } -impl Base64Bitmap { - pub fn from_exact_query( +impl From<&get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex> for Base64Bitmap { + fn from( query_item: &get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex, ) -> Self { Self { @@ -19,8 +19,12 @@ impl Base64Bitmap { start_block_height: usize::try_from(query_item.first_block_height).unwrap(), } } +} - pub fn from_wildcard_query( +impl From<&get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex> + for Base64Bitmap +{ + fn from( query_item: &get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex, ) -> Self { Self { diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs index 37b8bca31..fcc66bbf9 100644 --- a/block-streamer/src/block_height_stream.rs +++ b/block-streamer/src/block_height_stream.rs @@ -145,15 +145,13 @@ impl BlockHeightStreamImpl { let bitmaps_from_query: Vec = match contract_pattern_type { ContractPatternType::Exact(ref pattern) => { let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), current_date_string.clone()).await.unwrap(); - query_result.iter().map(|result_item| Base64Bitmap::from_exact_query(result_item)).collect() + query_result.iter().map(|result_item| Base64Bitmap::from(result_item)).collect() }, ContractPatternType::Wildcard(ref pattern) => { let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), current_date_string.clone()).await.unwrap(); - query_result.iter().map(|result_item| Base64Bitmap::from_wildcard_query(result_item)).collect() - + query_result.iter().map(|result_item| Base64Bitmap::from(result_item)).collect() }, }; - println!("date: {}, num of matched receivers: {}", ¤t_date_string, bitmaps_from_query.len()); if !bitmaps_from_query.is_empty() { let starting_block_height = bitmaps_from_query.iter().map(|item| item.start_block_height).min().unwrap(); let bitmap_for_day = self.bitmap_operator.merge_bitmaps(&bitmaps_from_query, starting_block_height).unwrap(); From 8e0ae5ab63fa71a578232b634ce4246c97127aa4 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 12 Jun 2024 12:05:00 -0700 Subject: [PATCH 12/20] Use u64 for block height and try stream --- block-streamer/src/bitmap.rs | 77 ++++++++++++----------- block-streamer/src/block_height_stream.rs | 18 +++--- 2 files changed, 51 insertions(+), 44 deletions(-) diff --git a/block-streamer/src/bitmap.rs b/block-streamer/src/bitmap.rs index 441de5e06..6f741aad6 100644 --- a/block-streamer/src/bitmap.rs +++ b/block-streamer/src/bitmap.rs @@ -6,43 +6,45 @@ use crate::graphql::client::{get_bitmaps_exact, get_bitmaps_wildcard}; #[derive(Debug, Default, PartialEq)] pub struct Base64Bitmap { - pub start_block_height: usize, + pub start_block_height: u64, pub base64: String, } -impl From<&get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex> for Base64Bitmap { - fn from( +impl TryFrom<&get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex> for Base64Bitmap { + type Error = anyhow::Error; + fn try_from( query_item: &get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex, - ) -> Self { - Self { + ) -> anyhow::Result { + Ok(Self { base64: query_item.bitmap.clone(), - start_block_height: usize::try_from(query_item.first_block_height).unwrap(), - } + start_block_height: u64::try_from(query_item.first_block_height)?, + }) } } -impl From<&get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex> +impl TryFrom<&get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex> for Base64Bitmap { - fn from( + type Error = anyhow::Error; + fn try_from( query_item: &get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex, - ) -> Self { - Self { + ) -> anyhow::Result { + Ok(Self { base64: query_item.bitmap.clone(), - start_block_height: usize::try_from(query_item.first_block_height).unwrap(), - } + start_block_height: u64::try_from(query_item.first_block_height)?, + }) } } #[derive(Debug, Default, PartialEq)] pub struct Bitmap { - pub start_block_height: usize, + pub start_block_height: u64, pub bitmap: Vec, } #[derive(Default)] struct EliasGammaDecoded { - pub value: usize, + pub value: u64, pub last_bit_index: usize, } @@ -73,12 +75,12 @@ impl BitmapOperator { bytes: &[u8], start_bit_index: usize, end_bit_index: usize, - ) -> u32 { - let mut number: u32 = 0; + ) -> u64 { + let mut number: u64 = 0; // Read bits from right to left for curr_bit_index in (start_bit_index..=end_bit_index).rev() { if self.get_bit(bytes, curr_bit_index) { - number |= 1u32 << (end_bit_index - curr_bit_index); + number |= 1u64 << (end_bit_index - curr_bit_index); } } @@ -101,29 +103,31 @@ impl BitmapOperator { None } - fn decode_elias_gamma_entry(&self, bytes: &[u8], start_bit_index: usize) -> EliasGammaDecoded { + fn decode_elias_gamma_entry( + &self, + bytes: &[u8], + start_bit_index: usize, + ) -> anyhow::Result { if bytes.is_empty() { - return EliasGammaDecoded::default(); + return Ok(EliasGammaDecoded::default()); } let first_bit_index = match self.index_of_first_set_bit(bytes, start_bit_index) { Some(index) => index, None => { - return EliasGammaDecoded::default(); + return Ok(EliasGammaDecoded::default()); } }; let zero_count: usize = first_bit_index - start_bit_index; - let remainder: usize = if zero_count == 0 { + let remainder: u64 = if zero_count == 0 { 0 } else { self.read_integer_from_binary(bytes, first_bit_index + 1, first_bit_index + zero_count) - .try_into() - .unwrap() }; - EliasGammaDecoded { - value: 2_usize.pow(zero_count.try_into().unwrap()) + remainder, + Ok(EliasGammaDecoded { + value: 2_u64.pow(zero_count.try_into().unwrap()) + remainder, last_bit_index: first_bit_index + zero_count, - } + }) } fn decompress_bitmap(&self, compressed_bitmap: &[u8]) -> Vec { @@ -135,15 +139,18 @@ impl BitmapOperator { let mut decompressed_bit_index = 0; while compressed_bit_index < compressed_bit_length { - let decoded_elias_gamma = - self.decode_elias_gamma_entry(compressed_bitmap, compressed_bit_index); + let decoded_elias_gamma = self + .decode_elias_gamma_entry(compressed_bitmap, compressed_bit_index) + .unwrap(); if decoded_elias_gamma.value == 0 { break; } compressed_bit_index = decoded_elias_gamma.last_bit_index + 1; let mut bit_index_offset: usize = 0; - while current_bit_value && (bit_index_offset < decoded_elias_gamma.value) { + while current_bit_value + && (bit_index_offset < usize::try_from(decoded_elias_gamma.value).unwrap()) + { while decompressed_bit_index + bit_index_offset >= (decompressed_bytes.len() * 8) { decompressed_bytes.push(0b00000000); } @@ -156,7 +163,7 @@ impl BitmapOperator { bit_index_offset += 1; } - decompressed_bit_index += decoded_elias_gamma.value; + decompressed_bit_index += usize::try_from(decoded_elias_gamma.value).unwrap(); current_bit_value = !current_bit_value; } @@ -172,7 +179,7 @@ impl BitmapOperator { .start_block_height .checked_sub(bitmap_to_update.start_block_height) { - Some(result) => result, + Some(result) => usize::try_from(result)?, None => { return Err(anyhow!( "Start block height in bitmap was lower than provided lowest block height", @@ -200,7 +207,7 @@ impl BitmapOperator { pub fn merge_bitmaps( &self, bitmaps_to_merge: &Vec, - smallest_start_block_height: usize, + smallest_start_block_height: u64, ) -> anyhow::Result { let mut merged_bitmap: Bitmap = Bitmap { bitmap: Vec::new(), @@ -294,7 +301,7 @@ mod tests { fn decode_elias_gamma() { let operator = BitmapOperator::new(); let bytes: &[u8; 2] = &[0b00000000, 0b00110110]; - let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 6); + let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 6).unwrap(); assert_eq!(decoded_eg.value, 27); assert_eq!(decoded_eg.last_bit_index, 14); } @@ -303,7 +310,7 @@ mod tests { fn decode_empty_elias_gamma() { let operator = BitmapOperator::new(); let bytes: &[u8; 2] = &[0b00000000, 0b00000000]; - let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 0); + let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 0).unwrap(); assert_eq!(decoded_eg.value, 0); assert_eq!(decoded_eg.last_bit_index, 0); } diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs index fcc66bbf9..4e8562ebe 100644 --- a/block-streamer/src/block_height_stream.rs +++ b/block-streamer/src/block_height_stream.rs @@ -2,7 +2,7 @@ use crate::bitmap::{Base64Bitmap, BitmapOperator}; use crate::graphql::client::GraphQLClient; use crate::rules::types::ChainId; use anyhow::Context; -use async_stream::stream; +use async_stream::try_stream; use chrono::{DateTime, Duration, TimeZone, Utc}; use futures::stream::{BoxStream, Stream}; use futures::StreamExt; @@ -137,19 +137,19 @@ impl BlockHeightStreamImpl { &'a self, start_date: DateTime, contract_pattern_type: ContractPatternType, - ) -> BoxStream<'a, usize> { - Box::pin(stream! { + ) -> BoxStream<'a, anyhow::Result> { + Box::pin(try_stream! { let mut current_date = start_date; while current_date <= Utc::now() { let current_date_string = current_date.format("%Y-%m-%d").to_string(); let bitmaps_from_query: Vec = match contract_pattern_type { ContractPatternType::Exact(ref pattern) => { let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), current_date_string.clone()).await.unwrap(); - query_result.iter().map(|result_item| Base64Bitmap::from(result_item)).collect() + query_result.iter().map(|result_item| Base64Bitmap::try_from(result_item).unwrap()).collect() }, ContractPatternType::Wildcard(ref pattern) => { let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), current_date_string.clone()).await.unwrap(); - query_result.iter().map(|result_item| Base64Bitmap::from(result_item)).collect() + query_result.iter().map(|result_item| Base64Bitmap::try_from(result_item).unwrap()).collect() }, }; if !bitmaps_from_query.is_empty() { @@ -157,7 +157,7 @@ impl BlockHeightStreamImpl { let bitmap_for_day = self.bitmap_operator.merge_bitmaps(&bitmaps_from_query, starting_block_height).unwrap(); for index in 0..(bitmap_for_day.bitmap.len() * 8) { if self.bitmap_operator.get_bit(&bitmap_for_day.bitmap, index) { - yield starting_block_height + index; + yield starting_block_height + u64::try_from(index)?; } } } @@ -170,7 +170,7 @@ impl BlockHeightStreamImpl { &'a self, start_block_height: near_indexer_primitives::types::BlockHeight, contract_pattern: &str, - ) -> anyhow::Result> { + ) -> anyhow::Result>> { let start_date = self.get_nearest_block_date(start_block_height).await?; let contract_pattern_type = self.parse_contract_pattern(contract_pattern); @@ -365,7 +365,7 @@ mod tests { .await .unwrap(); let mut result_heights = vec![]; - while let Some(height) = stream.next().await { + while let Some(Ok(height)) = stream.next().await { result_heights.push(height); } assert_eq!(result_heights, vec![1]); @@ -443,7 +443,7 @@ mod tests { .await .unwrap(); let mut result_heights = vec![]; - while let Some(height) = stream.next().await { + while let Some(Ok(height)) = stream.next().await { result_heights.push(height); } assert_eq!(result_heights, vec![1, 5, 10, 15, 100, 105]); From 294e172449383c33223938a8ab0262358de19523 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 12 Jun 2024 13:40:33 -0700 Subject: [PATCH 13/20] Graphql client now takes DateTime arg --- block-streamer/src/block_height_stream.rs | 29 +++++++++++------------ block-streamer/src/graphql/client.rs | 27 ++++++++++++++------- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs index 4e8562ebe..e7c9ccaed 100644 --- a/block-streamer/src/block_height_stream.rs +++ b/block-streamer/src/block_height_stream.rs @@ -141,14 +141,13 @@ impl BlockHeightStreamImpl { Box::pin(try_stream! { let mut current_date = start_date; while current_date <= Utc::now() { - let current_date_string = current_date.format("%Y-%m-%d").to_string(); let bitmaps_from_query: Vec = match contract_pattern_type { ContractPatternType::Exact(ref pattern) => { - let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), current_date_string.clone()).await.unwrap(); + let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), ¤t_date).await.unwrap(); query_result.iter().map(|result_item| Base64Bitmap::try_from(result_item).unwrap()).collect() }, ContractPatternType::Wildcard(ref pattern) => { - let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), current_date_string.clone()).await.unwrap(); + let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), ¤t_date).await.unwrap(); query_result.iter().map(|result_item| Base64Bitmap::try_from(result_item).unwrap()).collect() }, }; @@ -349,7 +348,9 @@ mod tests { .expect_get_bitmaps_exact() .with( predicate::eq(vec!["someone.near".to_string()]), - predicate::eq(Utc::now().format("%Y-%m-%d").to_string()), + predicate::function(|date: &DateTime| { + date.date_naive() == Utc::now().date_naive() + }), ) .times(1) .returning(move |_, _| Ok(mock_query_result.clone())); @@ -389,11 +390,9 @@ mod tests { .expect_get_bitmaps_wildcard() .with( predicate::eq(".*\\.someone\\.near".to_string()), - predicate::eq( - (Utc::now() - Duration::days(2)) - .format("%Y-%m-%d") - .to_string(), - ), + predicate::function(|date: &DateTime| { + date.date_naive() == (Utc::now() - Duration::days(2)).date_naive() + }), ) .times(1) .returning(move |_, _| { @@ -406,11 +405,9 @@ mod tests { .expect_get_bitmaps_wildcard() .with( predicate::eq(".*\\.someone\\.near".to_string()), - predicate::eq( - (Utc::now() - Duration::days(1)) - .format("%Y-%m-%d") - .to_string(), - ), + predicate::function(|date: &DateTime| { + date.date_naive() == (Utc::now() - Duration::days(1)).date_naive() + }), ) .times(1) .returning(move |_, _| { @@ -423,7 +420,9 @@ mod tests { .expect_get_bitmaps_wildcard() .with( predicate::eq(".*\\.someone\\.near".to_string()), - predicate::eq(Utc::now().format("%Y-%m-%d").to_string()), + predicate::function(|date: &DateTime| { + date.date_naive() == Utc::now().date_naive() + }), ) .times(1) .returning(move |_, _| { diff --git a/block-streamer/src/graphql/client.rs b/block-streamer/src/graphql/client.rs index 6b56f3975..6f109603f 100644 --- a/block-streamer/src/graphql/client.rs +++ b/block-streamer/src/graphql/client.rs @@ -1,4 +1,5 @@ use ::reqwest; +use chrono::{DateTime, Utc}; use graphql_client::{GraphQLQuery, Response}; // TODO: Use Dataplatform account @@ -64,7 +65,7 @@ impl GraphQLClientImpl { pub async fn get_bitmaps_exact( &self, receiver_ids: Vec, - block_date: String, + block_date: &DateTime, ) -> anyhow::Result> { let mut all_query_results: Vec< @@ -76,7 +77,7 @@ impl GraphQLClientImpl { let mut query_result = self .post_graphql::(get_bitmaps_exact::Variables { receiver_ids: Some(receiver_ids.clone()), - block_date: Some(block_date.clone()), + block_date: Some(block_date.format("%Y-%m-%d").to_string()), limit: Some(QUERY_LIMIT), offset: Some(offset), }) @@ -99,7 +100,7 @@ impl GraphQLClientImpl { pub async fn get_bitmaps_wildcard( &self, receiver_ids: String, - block_date: String, + block_date: &DateTime, ) -> anyhow::Result> { let mut all_query_results: Vec< @@ -111,7 +112,7 @@ impl GraphQLClientImpl { let mut query_result = self .post_graphql::(get_bitmaps_wildcard::Variables { receiver_ids: Some(receiver_ids.clone()), - block_date: Some(block_date.clone()), + block_date: Some(block_date.format("%Y-%m-%d").to_string()), limit: Some(QUERY_LIMIT), offset: Some(offset), }) @@ -135,18 +136,28 @@ impl GraphQLClientImpl { // TODO: Remove Unit tests after bitmap query is integrated into the main application #[cfg(test)] mod tests { + use chrono::{NaiveDateTime, TimeZone}; + use super::*; const HASURA_ENDPOINT: &str = "https://queryapi-hasura-graphql-mainnet-vcqilefdcq-ew.a.run.app/v1/graphql"; + fn utc_date_time_from_date_string(date: &str) -> DateTime { + let naive_date_time: NaiveDateTime = chrono::NaiveDate::parse_from_str(date, "%Y-%m-%d") + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap(); + TimeZone::from_utc_datetime(&chrono::Utc, &naive_date_time) + } + #[tokio::test] async fn test_get_bitmaps_exact() { let client = GraphQLClientImpl::new(HASURA_ENDPOINT.to_string()); let receiver_ids = vec!["app.nearcrowd.near".to_string()]; - let block_date = "2024-03-21".to_string(); + let block_date: DateTime = utc_date_time_from_date_string("2024-03-21"); let response = client - .get_bitmaps_exact(receiver_ids, block_date) + .get_bitmaps_exact(receiver_ids, &block_date) .await .unwrap(); assert_eq!(response[0].first_block_height, 115130287); @@ -158,9 +169,9 @@ mod tests { async fn test_get_bitmaps_wildcard() { let client = GraphQLClientImpl::new(HASURA_ENDPOINT.to_string()); let receiver_ids = "app.nearcrowd.near".to_string(); - let block_date = "2024-03-21".to_string(); + let block_date: DateTime = utc_date_time_from_date_string("2024-03-21"); let response = client - .get_bitmaps_wildcard(receiver_ids, block_date) + .get_bitmaps_wildcard(receiver_ids, &block_date) .await .unwrap(); assert_eq!(response[0].first_block_height, 115130287); From 6e7afdc082376aa72f62f3b7aa380ee51b1aefa0 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Thu, 13 Jun 2024 09:50:11 -0700 Subject: [PATCH 14/20] Define lifetimes for stream and remove explicit pin before return --- block-streamer/src/block_height_stream.rs | 47 +++++++++-------------- 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs index e7c9ccaed..a8cd642b2 100644 --- a/block-streamer/src/block_height_stream.rs +++ b/block-streamer/src/block_height_stream.rs @@ -17,10 +17,7 @@ enum ContractPatternType { Wildcard(String), } -#[cfg(not(test))] pub use BlockHeightStreamImpl as BlockHeightStream; -#[cfg(test)] -pub use MockBlockHeightStreamImpl as BlockHeightStream; pub struct BlockHeightStreamImpl { graphql_client: GraphQLClient, @@ -29,7 +26,6 @@ pub struct BlockHeightStreamImpl { chain_id: ChainId, } -#[cfg_attr(test, mockall::automock)] impl BlockHeightStreamImpl { pub fn new( graphql_client: GraphQLClient, @@ -133,12 +129,14 @@ impl BlockHeightStreamImpl { ContractPatternType::Exact(exact_pattern) } - fn generate_block_height_stream<'a>( + fn stream_matching_block_heights<'b, 'a: 'b>( &'a self, - start_date: DateTime, - contract_pattern_type: ContractPatternType, - ) -> BoxStream<'a, anyhow::Result> { - Box::pin(try_stream! { + start_block_height: near_indexer_primitives::types::BlockHeight, + contract_pattern: String, + ) -> impl futures::Stream> + 'b { + try_stream! { + let start_date = self.get_nearest_block_date(start_block_height).await?; + let contract_pattern_type = self.parse_contract_pattern(&contract_pattern); let mut current_date = start_date; while current_date <= Utc::now() { let bitmaps_from_query: Vec = match contract_pattern_type { @@ -151,6 +149,10 @@ impl BlockHeightStreamImpl { query_result.iter().map(|result_item| Base64Bitmap::try_from(result_item).unwrap()).collect() }, }; + // convert to base64 + // convert to compressed + // convert to decompressed + // merge if !bitmaps_from_query.is_empty() { let starting_block_height = bitmaps_from_query.iter().map(|item| item.start_block_height).min().unwrap(); let bitmap_for_day = self.bitmap_operator.merge_bitmaps(&bitmaps_from_query, starting_block_height).unwrap(); @@ -162,18 +164,7 @@ impl BlockHeightStreamImpl { } current_date = self.next_day(current_date); } - }) - } - - pub async fn list_matching_block_heights<'a>( - &'a self, - start_block_height: near_indexer_primitives::types::BlockHeight, - contract_pattern: &str, - ) -> anyhow::Result>> { - let start_date = self.get_nearest_block_date(start_block_height).await?; - let contract_pattern_type = self.parse_contract_pattern(contract_pattern); - - Ok(self.generate_block_height_stream(start_date, contract_pattern_type)) + } } } @@ -361,10 +352,9 @@ mod tests { mock_s3_client, ); - let mut stream = block_height_stream - .list_matching_block_heights(0, "someone.near") - .await - .unwrap(); + let stream = + block_height_stream.stream_matching_block_heights(0, "someone.near".to_owned()); + tokio::pin!(stream); let mut result_heights = vec![]; while let Some(Ok(height)) = stream.next().await { result_heights.push(height); @@ -437,10 +427,9 @@ mod tests { mock_s3_client, ); - let mut stream = block_height_stream - .list_matching_block_heights(0, "*.someone.near") - .await - .unwrap(); + let stream = + block_height_stream.stream_matching_block_heights(0, "*.someone.near".to_string()); + tokio::pin!(stream); let mut result_heights = vec![]; while let Some(Ok(height)) = stream.next().await { result_heights.push(height); From 6db09108ace5380cc4cf8f2daef364e29e2df2c8 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Thu, 13 Jun 2024 15:55:10 -0700 Subject: [PATCH 15/20] Refactored Bitmap trait into struct implementations --- block-streamer/src/bitmap.rs | 417 +++++++++++++--------- block-streamer/src/block_height_stream.rs | 61 ++-- 2 files changed, 277 insertions(+), 201 deletions(-) diff --git a/block-streamer/src/bitmap.rs b/block-streamer/src/bitmap.rs index 6f741aad6..54d43ed91 100644 --- a/block-streamer/src/bitmap.rs +++ b/block-streamer/src/bitmap.rs @@ -36,50 +36,56 @@ impl TryFrom<&get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5Actions } } -#[derive(Debug, Default, PartialEq)] -pub struct Bitmap { - pub start_block_height: u64, - pub bitmap: Vec, -} - #[derive(Default)] struct EliasGammaDecoded { pub value: u64, pub last_bit_index: usize, } -pub struct BitmapOperator {} +#[derive(Debug, Default, PartialEq)] +pub struct CompressedBitmap { + pub start_block_height: u64, + pub bitmap: Vec, +} + +impl TryFrom<&Base64Bitmap> for CompressedBitmap { + type Error = anyhow::Error; + fn try_from(value: &Base64Bitmap) -> Result { + Ok(Self { + bitmap: general_purpose::STANDARD.decode(value.base64.clone())?, + start_block_height: value.start_block_height, + }) + } +} -impl BitmapOperator { - pub fn new() -> Self { - Self {} +impl CompressedBitmap { + pub fn new(start_block_height: u64, bitmap: Vec) -> Self { + Self { + start_block_height, + bitmap, + } } - pub fn get_bit(&self, bytes: &[u8], bit_index: usize) -> bool { + pub fn get_bit(&self, bit_index: usize) -> bool { let byte_index: usize = bit_index / 8; let bit_index_in_byte: usize = bit_index % 8; - (bytes[byte_index] & (1u8 << (7 - bit_index_in_byte))) > 0 + (self.bitmap[byte_index] & (1u8 << (7 - bit_index_in_byte))) > 0 } - fn set_bit(&self, bytes: &mut [u8], bit_index: usize, bit_value: bool, write_zero: bool) { + fn set_bit(&mut self, bit_index: usize, bit_value: bool, write_zero: bool) { if !bit_value && write_zero { - bytes[bit_index / 8] &= !(1u8 << (7 - (bit_index % 8))); + self.bitmap[bit_index / 8] &= !(1u8 << (7 - (bit_index % 8))); } else if bit_value { - bytes[bit_index / 8] |= 1u8 << (7 - (bit_index % 8)); + self.bitmap[bit_index / 8] |= 1u8 << (7 - (bit_index % 8)); } } - fn read_integer_from_binary( - &self, - bytes: &[u8], - start_bit_index: usize, - end_bit_index: usize, - ) -> u64 { + fn read_integer_from_binary(&self, start_bit_index: usize, end_bit_index: usize) -> u64 { let mut number: u64 = 0; // Read bits from right to left for curr_bit_index in (start_bit_index..=end_bit_index).rev() { - if self.get_bit(bytes, curr_bit_index) { + if self.get_bit(curr_bit_index) { number |= 1u64 << (end_bit_index - curr_bit_index); } } @@ -87,9 +93,9 @@ impl BitmapOperator { number } - fn index_of_first_set_bit(&self, bytes: &[u8], start_bit_index: usize) -> Option { + fn index_of_first_set_bit(&self, start_bit_index: usize) -> Option { let mut first_bit_index: usize = start_bit_index % 8; - for (byte_index, byte) in bytes.iter().enumerate().skip(start_bit_index / 8) { + for (byte_index, byte) in self.bitmap.iter().enumerate().skip(start_bit_index / 8) { if *byte > 0 { for bit_index in first_bit_index..=7 { if *byte & (1u8 << (7 - bit_index)) > 0 { @@ -105,13 +111,12 @@ impl BitmapOperator { fn decode_elias_gamma_entry( &self, - bytes: &[u8], start_bit_index: usize, ) -> anyhow::Result { - if bytes.is_empty() { + if self.bitmap.is_empty() { return Ok(EliasGammaDecoded::default()); } - let first_bit_index = match self.index_of_first_set_bit(bytes, start_bit_index) { + let first_bit_index = match self.index_of_first_set_bit(start_bit_index) { Some(index) => index, None => { return Ok(EliasGammaDecoded::default()); @@ -121,7 +126,7 @@ impl BitmapOperator { let remainder: u64 = if zero_count == 0 { 0 } else { - self.read_integer_from_binary(bytes, first_bit_index + 1, first_bit_index + zero_count) + self.read_integer_from_binary(first_bit_index + 1, first_bit_index + zero_count) }; Ok(EliasGammaDecoded { @@ -130,18 +135,17 @@ impl BitmapOperator { }) } - fn decompress_bitmap(&self, compressed_bitmap: &[u8]) -> Vec { - let compressed_bit_length: usize = compressed_bitmap.len() * 8; - let mut current_bit_value: bool = (compressed_bitmap[0] & 0b10000000) > 0; - let mut decompressed_bytes: Vec = Vec::new(); + pub fn decompress(&self) -> DecompressedBitmap { + let compressed_bit_length: usize = self.bitmap.len() * 8; + let mut current_bit_value: bool = (self.bitmap[0] & 0b10000000) > 0; + let mut decompressed: DecompressedBitmap = + DecompressedBitmap::new(self.start_block_height, None); let mut compressed_bit_index = 1; let mut decompressed_bit_index = 0; while compressed_bit_index < compressed_bit_length { - let decoded_elias_gamma = self - .decode_elias_gamma_entry(compressed_bitmap, compressed_bit_index) - .unwrap(); + let decoded_elias_gamma = self.decode_elias_gamma_entry(compressed_bit_index).unwrap(); if decoded_elias_gamma.value == 0 { break; } @@ -151,15 +155,10 @@ impl BitmapOperator { while current_bit_value && (bit_index_offset < usize::try_from(decoded_elias_gamma.value).unwrap()) { - while decompressed_bit_index + bit_index_offset >= (decompressed_bytes.len() * 8) { - decompressed_bytes.push(0b00000000); + while decompressed_bit_index + bit_index_offset >= (decompressed.bitmap.len() * 8) { + decompressed.bitmap.push(0b00000000); } - self.set_bit( - &mut decompressed_bytes, - decompressed_bit_index + bit_index_offset, - true, - true, - ); + decompressed.set_bit(decompressed_bit_index + bit_index_offset, true, true); bit_index_offset += 1; } @@ -167,64 +166,95 @@ impl BitmapOperator { current_bit_value = !current_bit_value; } - decompressed_bytes + decompressed } +} - fn merge_bitmap( - &self, - bitmap_to_update: &mut Bitmap, - bitmap_to_merge: &Bitmap, - ) -> anyhow::Result<()> { - let start_bit_index: usize = match bitmap_to_merge - .start_block_height - .checked_sub(bitmap_to_update.start_block_height) - { - Some(result) => usize::try_from(result)?, - None => { - return Err(anyhow!( - "Start block height in bitmap was lower than provided lowest block height", - )) - } - }; +#[derive(Debug, Default, PartialEq)] +pub struct DecompressedBitmap { + pub start_block_height: u64, + pub bitmap: Vec, +} - for bit_index_offset in 0..(bitmap_to_merge.bitmap.len() * 8) { - let decompressed_bit_value = self.get_bit(&bitmap_to_merge.bitmap, bit_index_offset); - while start_bit_index + bit_index_offset >= bitmap_to_update.bitmap.len() * 8 { - bitmap_to_update.bitmap.push(0b00000000); - } +impl DecompressedBitmap { + pub fn new(start_block_height: u64, bitmap: Option>) -> Self { + Self { + start_block_height, + bitmap: bitmap.unwrap_or(vec![]), + } + } + + pub fn iter(&self) -> DecompressedBitmapIter { + DecompressedBitmapIter::new(self) + } - self.set_bit( - &mut bitmap_to_update.bitmap, - start_bit_index + bit_index_offset, - decompressed_bit_value, - false, + pub fn get_bit(&self, bit_index: usize) -> bool { + let byte_index: usize = bit_index / 8; + let bit_index_in_byte: usize = bit_index % 8; + + (self.bitmap[byte_index] & (1u8 << (7 - bit_index_in_byte))) > 0 + } + + fn set_bit(&mut self, bit_index: usize, bit_value: bool, write_zero: bool) { + if !bit_value && write_zero { + self.bitmap[bit_index / 8] &= !(1u8 << (7 - (bit_index % 8))); + } else if bit_value { + self.bitmap[bit_index / 8] |= 1u8 << (7 - (bit_index % 8)); + } + } + + pub fn merge(&mut self, to_merge: &mut DecompressedBitmap) -> anyhow::Result<&mut Self> { + if to_merge.start_block_height < self.start_block_height { + std::mem::swap(&mut self.bitmap, &mut to_merge.bitmap); + std::mem::swap( + &mut self.start_block_height, + &mut to_merge.start_block_height, ); } + let block_height_difference = to_merge + .start_block_height + .checked_sub(self.start_block_height) + .ok_or_else(|| anyhow!("Caller of merge should have smaller start block height",))?; + let start_bit_index: usize = usize::try_from(block_height_difference)?; + + for bit_index_offset in 0..(to_merge.bitmap.len() * 8) { + let bit_value = to_merge.get_bit(bit_index_offset); + while start_bit_index + bit_index_offset >= self.bitmap.len() * 8 { + self.bitmap.push(0b00000000); + } - Ok(()) + self.set_bit(start_bit_index + bit_index_offset, bit_value, false); + } + + Ok(self) } +} - pub fn merge_bitmaps( - &self, - bitmaps_to_merge: &Vec, - smallest_start_block_height: u64, - ) -> anyhow::Result { - let mut merged_bitmap: Bitmap = Bitmap { - bitmap: Vec::new(), - start_block_height: smallest_start_block_height, - }; +pub struct DecompressedBitmapIter<'a> { + data: &'a DecompressedBitmap, + bit_index: usize, +} - for compressed_base64_bitmap in bitmaps_to_merge { - let decoded_bitmap: Vec = - general_purpose::STANDARD.decode(compressed_base64_bitmap.base64.clone())?; - let decompressed_bitmap: Bitmap = Bitmap { - bitmap: self.decompress_bitmap(&decoded_bitmap), - start_block_height: compressed_base64_bitmap.start_block_height, - }; - self.merge_bitmap(&mut merged_bitmap, &decompressed_bitmap)?; - } +impl<'a> DecompressedBitmapIter<'a> { + fn new(data: &'a DecompressedBitmap) -> Self { + Self { data, bit_index: 0 } + } +} + +impl Iterator for DecompressedBitmapIter<'_> { + type Item = u64; - Ok(merged_bitmap) + fn next(&mut self) -> Option { + while self.bit_index < self.data.bitmap.len() * 8 { + if self.data.get_bit(self.bit_index) { + self.bit_index += 1; + return Some( + self.data.start_block_height + u64::try_from(self.bit_index - 1).ok()?, + ); + } + self.bit_index += 1; + } + None } } @@ -233,13 +263,26 @@ mod tests { use super::*; #[test] - fn get_bit_from_bytes() { - let operator = BitmapOperator::new(); - let bytes: &[u8; 3] = &[0b00000001, 0b00000000, 0b00001001]; + fn decode_base_64() { + let base64 = Base64Bitmap { + base64: "wA==".to_string(), + start_block_height: 10, + }; + + assert_eq!( + CompressedBitmap::try_from(&base64).unwrap().bitmap, + vec![0b11000000] + ); + } + + #[test] + fn get_bit() { + let bytes = vec![0b00000001, 0b00000000, 0b00001001]; + let bitmap = DecompressedBitmap::new(0, Some(bytes)); let results: Vec = [7, 8, 9, 15, 19, 20, 22, 23] .iter() .map(|index| { - return operator.get_bit(bytes, *index); + return bitmap.get_bit(*index); }) .collect(); assert_eq!( @@ -249,101 +292,135 @@ mod tests { } #[test] - fn set_bit_in_bytes() { - let operator = BitmapOperator::new(); - let correct_bytes: &[u8; 3] = &[0b00000001, 0b00000000, 0b00001001]; - let test_bytes: &mut [u8; 3] = &mut [0b10000000, 0b10000000, 0b00001001]; - operator.set_bit(test_bytes, 0, false, true); - operator.set_bit(test_bytes, 7, true, true); - operator.set_bit(test_bytes, 8, false, true); - operator.set_bit(test_bytes, 12, false, false); - assert_eq!(correct_bytes, test_bytes); + fn decompressed_iterator() { + let bytes = vec![0b00000001, 0b00000000, 0b00001001]; + let bitmap = DecompressedBitmap::new(0, Some(bytes)); + let results: Vec = bitmap.iter().collect(); + assert_eq!(results, [7, 20, 23]); + } + + #[test] + fn set_bit() { + let test_bytes = vec![0b10000000, 0b10000000, 0b00001001]; + let mut bitmap = DecompressedBitmap::new(0, Some(test_bytes.clone())); + let correct_bytes = vec![0b00000001, 0b00000000, 0b00001001]; + bitmap.set_bit(0, false, true); + bitmap.set_bit(7, true, true); + bitmap.set_bit(8, false, true); + bitmap.set_bit(12, false, false); + assert_eq!(correct_bytes, bitmap.bitmap); } #[test] fn get_unsigned_integer_from_binary_sequence() { - let operator = BitmapOperator::new(); - let bytes: &[u8; 3] = &[0b11111110, 0b10010100, 0b10001101]; - assert_eq!(operator.read_integer_from_binary(bytes, 6, 16), 1321); + let bytes = vec![0b11111110, 0b10010100, 0b10001101]; + let bitmap = CompressedBitmap::new(0, bytes); + assert_eq!(bitmap.read_integer_from_binary(6, 16), 1321); } #[test] fn get_index_of_first_set_bit() { - let operator = BitmapOperator::new(); - let bytes: &[u8; 4] = &[0b00000001, 0b10000000, 0b00000001, 0b00000000]; + let bytes = vec![0b00000001, 0b10000000, 0b00000001, 0b00000000]; + let bitmap = CompressedBitmap::new(0, bytes); assert_eq!( - operator.index_of_first_set_bit(bytes, 4).unwrap(), + bitmap.index_of_first_set_bit(4).unwrap(), 7, "Should get index 7 when starting from 4", ); assert_eq!( - operator.index_of_first_set_bit(bytes, 7).unwrap(), + bitmap.index_of_first_set_bit(7).unwrap(), 7, "Should get index 7 when starting from 7", ); assert_eq!( - operator.index_of_first_set_bit(bytes, 8).unwrap(), + bitmap.index_of_first_set_bit(8).unwrap(), 8, "Should get index 8 when starting from 8", ); assert_eq!( - operator.index_of_first_set_bit(bytes, 17).unwrap(), + bitmap.index_of_first_set_bit(17).unwrap(), 23, "Should get index 23 when starting from 17", ); assert!( - operator.index_of_first_set_bit(bytes, 25).is_none(), + bitmap.index_of_first_set_bit(25).is_none(), "Should get None when starting from 25", ); } #[test] fn decode_elias_gamma() { - let operator = BitmapOperator::new(); - let bytes: &[u8; 2] = &[0b00000000, 0b00110110]; - let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 6).unwrap(); + let bytes = vec![0b00000000, 0b00110110]; + let bitmap = CompressedBitmap::new(0, bytes); + let decoded_eg: EliasGammaDecoded = bitmap.decode_elias_gamma_entry(6).unwrap(); assert_eq!(decoded_eg.value, 27); assert_eq!(decoded_eg.last_bit_index, 14); } #[test] fn decode_empty_elias_gamma() { - let operator = BitmapOperator::new(); - let bytes: &[u8; 2] = &[0b00000000, 0b00000000]; - let decoded_eg: EliasGammaDecoded = operator.decode_elias_gamma_entry(bytes, 0).unwrap(); + let bytes = vec![0b00000000, 0b00000000]; + let bitmap = CompressedBitmap::new(0, bytes); + let decoded_eg: EliasGammaDecoded = bitmap.decode_elias_gamma_entry(0).unwrap(); assert_eq!(decoded_eg.value, 0); assert_eq!(decoded_eg.last_bit_index, 0); } #[test] fn decode_compressed_bitmap() { - let operator = BitmapOperator::new(); - assert_eq!(operator.decompress_bitmap(&[0b10100000]), &[0b11000000]); - assert_eq!(operator.decompress_bitmap(&[0b00100100]), &[0b00110000]); - assert_eq!(operator.decompress_bitmap(&[0b10010000]), &[0b11110000]); assert_eq!( - operator.decompress_bitmap(&[0b10110010, 0b01000000]), - &[0b11100001] + CompressedBitmap::new(0, vec![0b10100000]) + .decompress() + .bitmap, + vec![0b11000000] + ); + assert_eq!( + CompressedBitmap::new(0, vec![0b00100100]) + .decompress() + .bitmap, + vec![0b00110000] + ); + assert_eq!( + CompressedBitmap::new(0, vec![0b10010000]) + .decompress() + .bitmap, + vec![0b11110000] + ); + assert_eq!( + CompressedBitmap::new(0, vec![0b10110010, 0b01000000]) + .decompress() + .bitmap, + vec![0b11100001] ); assert_eq!( - operator.decompress_bitmap(&[0b01010001, 0b01010000]), - &[0b01100000, 0b11000000] + CompressedBitmap::new(0, vec![0b01010001, 0b01010000]) + .decompress() + .bitmap, + vec![0b01100000, 0b11000000] ); assert_eq!( - operator.decompress_bitmap(&[0b01111111, 0b11111111, 0b11111000]), - &[0b01010101, 0b01010101, 0b01010000] + CompressedBitmap::new(0, vec![0b01111111, 0b11111111, 0b11111000]) + .decompress() + .bitmap, + vec![0b01010101, 0b01010101, 0b01010000] ); assert_eq!( - operator.decompress_bitmap(&[0b11010101, 0b11010101, 0b11010100]), - &[0b10010001, 0b00100010, 0b01000000] + CompressedBitmap::new(0, vec![0b11010101, 0b11010101, 0b11010100]) + .decompress() + .bitmap, + vec![0b10010001, 0b00100010, 0b01000000] ); assert_eq!( - operator.decompress_bitmap(&[0b00000111, 0b11100000]), - &[0b00000000, 0b00000000, 0b00000000, 0b00000001] + CompressedBitmap::new(0, vec![0b00000111, 0b11100000]) + .decompress() + .bitmap, + vec![0b00000000, 0b00000000, 0b00000000, 0b00000001] ); assert_eq!( - operator.decompress_bitmap(&[0b11000001, 0b11011011]), - &[ + CompressedBitmap::new(0, vec![0b11000001, 0b11011011]) + .decompress() + .bitmap, + vec![ 0b10000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00001110 ] @@ -351,43 +428,59 @@ mod tests { } #[test] - fn merge_two_decompressed_bitmaps() { - let operator = BitmapOperator::new(); - let mut base_bitmap: Bitmap = Bitmap { + fn merge_two_bitmaps() { + let mut base_bitmap: DecompressedBitmap = DecompressedBitmap { bitmap: vec![0b11001010, 0b10001111], start_block_height: 10, }; - let compressed_bitmap: Bitmap = Bitmap { - bitmap: vec![0b11100001], // Decompresses to 11100001 + let mut to_merge: DecompressedBitmap = DecompressedBitmap { + bitmap: vec![0b11100001], start_block_height: 14, }; - assert!(operator - .merge_bitmap(&mut base_bitmap, &compressed_bitmap) - .is_ok()); + assert!(base_bitmap.merge(&mut to_merge).is_ok()); + assert_eq!(base_bitmap.bitmap, vec![0b11001110, 0b10011111]); + } + + #[test] + fn merge_two_bitmaps_with_swap() { + let mut to_merge: DecompressedBitmap = DecompressedBitmap { + bitmap: vec![0b11001010, 0b10001111], + start_block_height: 10, + }; + let mut base_bitmap: DecompressedBitmap = DecompressedBitmap { + bitmap: vec![0b11100001], + start_block_height: 14, + }; + + assert!(base_bitmap.merge(&mut to_merge).is_ok()); assert_eq!(base_bitmap.bitmap, vec![0b11001110, 0b10011111]); } #[test] fn merge_multiple_bitmaps_together() { - let operator = BitmapOperator::new(); - let test_bitmaps_to_merge: Vec = vec![ - Base64Bitmap { - base64: "oA==".to_string(), // Decompresses to 11000000 - start_block_height: 10, - }, - Base64Bitmap { - base64: "oA==".to_string(), - start_block_height: 14, - }, - Base64Bitmap { - base64: "oA==".to_string(), - start_block_height: 18, - }, - ]; - - let merged_bitmap = operator.merge_bitmaps(&test_bitmaps_to_merge, 10).unwrap(); - assert_eq!(merged_bitmap.bitmap, vec![0b11001100, 0b11000000]); - assert_eq!(merged_bitmap.start_block_height, 10); + let mut base_bitmap = DecompressedBitmap::new(200, None); + let mut bitmap_a = DecompressedBitmap { + bitmap: vec![0b11000000], + start_block_height: 18, + }; + let mut bitmap_b = DecompressedBitmap { + bitmap: vec![0b11000000], + start_block_height: 10, + }; + let mut bitmap_c = DecompressedBitmap { + bitmap: vec![0b11000000], + start_block_height: 14, + }; + + base_bitmap + .merge(&mut bitmap_a) + .unwrap() + .merge(&mut bitmap_b) + .unwrap() + .merge(&mut bitmap_c) + .unwrap(); + assert_eq!(base_bitmap.bitmap, vec![0b11001100, 0b11000000]); + assert_eq!(base_bitmap.start_block_height, 10); } } diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs index a8cd642b2..f01f4cc68 100644 --- a/block-streamer/src/block_height_stream.rs +++ b/block-streamer/src/block_height_stream.rs @@ -1,4 +1,4 @@ -use crate::bitmap::{Base64Bitmap, BitmapOperator}; +use crate::bitmap::{Base64Bitmap, CompressedBitmap, DecompressedBitmap}; use crate::graphql::client::GraphQLClient; use crate::rules::types::ChainId; use anyhow::Context; @@ -21,20 +21,14 @@ pub use BlockHeightStreamImpl as BlockHeightStream; pub struct BlockHeightStreamImpl { graphql_client: GraphQLClient, - bitmap_operator: BitmapOperator, s3_client: crate::s3_client::S3Client, chain_id: ChainId, } impl BlockHeightStreamImpl { - pub fn new( - graphql_client: GraphQLClient, - bitmap_operator: BitmapOperator, - s3_client: crate::s3_client::S3Client, - ) -> Self { + pub fn new(graphql_client: GraphQLClient, s3_client: crate::s3_client::S3Client) -> Self { Self { graphql_client, - bitmap_operator, s3_client, chain_id: ChainId::Mainnet, } @@ -139,28 +133,29 @@ impl BlockHeightStreamImpl { let contract_pattern_type = self.parse_contract_pattern(&contract_pattern); let mut current_date = start_date; while current_date <= Utc::now() { - let bitmaps_from_query: Vec = match contract_pattern_type { + let base_64_bitmaps: Vec = match contract_pattern_type { ContractPatternType::Exact(ref pattern) => { let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), ¤t_date).await.unwrap(); - query_result.iter().map(|result_item| Base64Bitmap::try_from(result_item).unwrap()).collect() + query_result.iter().map(Base64Bitmap::try_from).collect()? }, ContractPatternType::Wildcard(ref pattern) => { let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), ¤t_date).await.unwrap(); - query_result.iter().map(|result_item| Base64Bitmap::try_from(result_item).unwrap()).collect() + query_result.iter().map(Base64Bitmap::try_from).collect()? }, }; - // convert to base64 - // convert to compressed - // convert to decompressed - // merge - if !bitmaps_from_query.is_empty() { - let starting_block_height = bitmaps_from_query.iter().map(|item| item.start_block_height).min().unwrap(); - let bitmap_for_day = self.bitmap_operator.merge_bitmaps(&bitmaps_from_query, starting_block_height).unwrap(); - for index in 0..(bitmap_for_day.bitmap.len() * 8) { - if self.bitmap_operator.get_bit(&bitmap_for_day.bitmap, index) { - yield starting_block_height + u64::try_from(index)?; - } - } + + let compressed_bitmaps: Vec = base_64_bitmaps.iter().map(CompressedBitmap::try_from).collect()?; + let decompressed_bitmaps: Vec = compressed_bitmaps.iter().map(CompressedBitmap::decompress).collect(); + + let starting_block_height = decompressed_bitmaps.iter().map(|item| item.start_block_height).min().unwrap(); + let mut bitmap_for_day = DecompressedBitmap::new(starting_block_height, None); + for mut bitmap in decompressed_bitmaps { + let _ = bitmap_for_day.merge(&mut bitmap); + } + + let mut bitmap_iter = bitmap_for_day.iter(); + while let Some(block_height) = bitmap_iter.next() { + yield block_height; } current_date = self.next_day(current_date); } @@ -252,9 +247,7 @@ mod tests { fn parse_exact_contract_patterns() { let mock_s3_client = crate::s3_client::S3Client::default(); let mock_graphql_client = crate::graphql::client::GraphQLClient::default(); - let bitmap_operator = crate::bitmap::BitmapOperator::new(); - let block_height_stream = - BlockHeightStreamImpl::new(mock_graphql_client, bitmap_operator, mock_s3_client); + let block_height_stream = BlockHeightStreamImpl::new(mock_graphql_client, mock_s3_client); let sample_patterns = vec![ "near", "*.near", @@ -298,9 +291,7 @@ mod tests { fn parse_wildcard_contract_patterns() { let mock_s3_client = crate::s3_client::S3Client::default(); let mock_graphql_client = crate::graphql::client::GraphQLClient::default(); - let bitmap_operator = crate::bitmap::BitmapOperator::new(); - let block_height_stream = - BlockHeightStreamImpl::new(mock_graphql_client, bitmap_operator, mock_s3_client); + let block_height_stream = BlockHeightStreamImpl::new(mock_graphql_client, mock_s3_client); let sample_patterns = vec![ "*.someone.near", "near, someone.*.tg", @@ -346,11 +337,7 @@ mod tests { .times(1) .returning(move |_, _| Ok(mock_query_result.clone())); - let block_height_stream = BlockHeightStreamImpl::new( - mock_graphql_client, - crate::bitmap::BitmapOperator::new(), - mock_s3_client, - ); + let block_height_stream = BlockHeightStreamImpl::new(mock_graphql_client, mock_s3_client); let stream = block_height_stream.stream_matching_block_heights(0, "someone.near".to_owned()); @@ -421,11 +408,7 @@ mod tests { wildcard_query_result(105, "wA=="), ]) }); - let block_height_stream = BlockHeightStreamImpl::new( - mock_graphql_client, - crate::bitmap::BitmapOperator::new(), - mock_s3_client, - ); + let block_height_stream = BlockHeightStreamImpl::new(mock_graphql_client, mock_s3_client); let stream = block_height_stream.stream_matching_block_heights(0, "*.someone.near".to_string()); From fd7750bc75ffb53ce84e28145d93fda1b9b0e588 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Thu, 13 Jun 2024 16:27:41 -0700 Subject: [PATCH 16/20] Remove unwrap calls and implement contract type parsing as From --- block-streamer/src/bitmap.rs | 21 ++-- block-streamer/src/block_height_stream.rs | 126 +++++++++++----------- 2 files changed, 76 insertions(+), 71 deletions(-) diff --git a/block-streamer/src/bitmap.rs b/block-streamer/src/bitmap.rs index 54d43ed91..83966f0b3 100644 --- a/block-streamer/src/bitmap.rs +++ b/block-streamer/src/bitmap.rs @@ -130,12 +130,12 @@ impl CompressedBitmap { }; Ok(EliasGammaDecoded { - value: 2_u64.pow(zero_count.try_into().unwrap()) + remainder, + value: 2_u64.pow(zero_count.try_into()?) + remainder, last_bit_index: first_bit_index + zero_count, }) } - pub fn decompress(&self) -> DecompressedBitmap { + pub fn decompress(&self) -> anyhow::Result { let compressed_bit_length: usize = self.bitmap.len() * 8; let mut current_bit_value: bool = (self.bitmap[0] & 0b10000000) > 0; let mut decompressed: DecompressedBitmap = @@ -145,7 +145,7 @@ impl CompressedBitmap { let mut decompressed_bit_index = 0; while compressed_bit_index < compressed_bit_length { - let decoded_elias_gamma = self.decode_elias_gamma_entry(compressed_bit_index).unwrap(); + let decoded_elias_gamma = self.decode_elias_gamma_entry(compressed_bit_index)?; if decoded_elias_gamma.value == 0 { break; } @@ -153,7 +153,7 @@ impl CompressedBitmap { compressed_bit_index = decoded_elias_gamma.last_bit_index + 1; let mut bit_index_offset: usize = 0; while current_bit_value - && (bit_index_offset < usize::try_from(decoded_elias_gamma.value).unwrap()) + && (bit_index_offset < usize::try_from(decoded_elias_gamma.value)?) { while decompressed_bit_index + bit_index_offset >= (decompressed.bitmap.len() * 8) { decompressed.bitmap.push(0b00000000); @@ -162,11 +162,11 @@ impl CompressedBitmap { bit_index_offset += 1; } - decompressed_bit_index += usize::try_from(decoded_elias_gamma.value).unwrap(); + decompressed_bit_index += usize::try_from(decoded_elias_gamma.value)?; current_bit_value = !current_bit_value; } - decompressed + Ok(decompressed) } } @@ -371,54 +371,63 @@ mod tests { assert_eq!( CompressedBitmap::new(0, vec![0b10100000]) .decompress() + .unwrap() .bitmap, vec![0b11000000] ); assert_eq!( CompressedBitmap::new(0, vec![0b00100100]) .decompress() + .unwrap() .bitmap, vec![0b00110000] ); assert_eq!( CompressedBitmap::new(0, vec![0b10010000]) .decompress() + .unwrap() .bitmap, vec![0b11110000] ); assert_eq!( CompressedBitmap::new(0, vec![0b10110010, 0b01000000]) .decompress() + .unwrap() .bitmap, vec![0b11100001] ); assert_eq!( CompressedBitmap::new(0, vec![0b01010001, 0b01010000]) .decompress() + .unwrap() .bitmap, vec![0b01100000, 0b11000000] ); assert_eq!( CompressedBitmap::new(0, vec![0b01111111, 0b11111111, 0b11111000]) .decompress() + .unwrap() .bitmap, vec![0b01010101, 0b01010101, 0b01010000] ); assert_eq!( CompressedBitmap::new(0, vec![0b11010101, 0b11010101, 0b11010100]) .decompress() + .unwrap() .bitmap, vec![0b10010001, 0b00100010, 0b01000000] ); assert_eq!( CompressedBitmap::new(0, vec![0b00000111, 0b11100000]) .decompress() + .unwrap() .bitmap, vec![0b00000000, 0b00000000, 0b00000000, 0b00000001] ); assert_eq!( CompressedBitmap::new(0, vec![0b11000001, 0b11011011]) .decompress() + .unwrap() .bitmap, vec![ 0b10000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs index f01f4cc68..97f357c96 100644 --- a/block-streamer/src/block_height_stream.rs +++ b/block-streamer/src/block_height_stream.rs @@ -17,15 +17,58 @@ enum ContractPatternType { Wildcard(String), } -pub use BlockHeightStreamImpl as BlockHeightStream; +impl ContractPatternType { + fn strip_wildcard_if_root_account(receiver_id: String) -> anyhow::Result { + let wildcard_root_account_regex = Regex::new(r"^\*\.([a-zA-Z0-9]+)$")?; + if wildcard_root_account_regex.is_match(&receiver_id) { + return Ok(receiver_id + .split('.') + .nth(1) + .unwrap_or(&receiver_id) + .to_string()); + } + Ok(receiver_id) + } +} + +impl From<&str> for ContractPatternType { + fn from(contract_pattern: &str) -> Self { + // If receiver_id is of pattern *.SOME_ROOT_ACCOUNT such as *.near, we can reduce this to + // "near" as we store bitmaps for root accounts like near ,tg, and so on. + let cleaned_contract_pattern: String = contract_pattern + .split(',') + .map(|receiver| receiver.trim()) + .map(str::to_string) + .map(|receiver| { + ContractPatternType::strip_wildcard_if_root_account(receiver.clone()) + .unwrap_or(receiver) + }) + .collect::>() + .join(","); + + if cleaned_contract_pattern.chars().any(|c| c == '*') { + let wildcard_pattern = cleaned_contract_pattern + .replace(',', "|") + .replace('.', "\\.") + .replace('*', ".*"); + return ContractPatternType::Wildcard(wildcard_pattern); + } + + let exact_pattern = cleaned_contract_pattern + .split(',') + .map(str::to_string) + .collect(); + ContractPatternType::Exact(exact_pattern) + } +} -pub struct BlockHeightStreamImpl { +pub struct BitmapProcessor { graphql_client: GraphQLClient, s3_client: crate::s3_client::S3Client, chain_id: ChainId, } -impl BlockHeightStreamImpl { +impl BitmapProcessor { pub fn new(graphql_client: GraphQLClient, s3_client: crate::s3_client::S3Client) -> Self { Self { graphql_client, @@ -85,44 +128,6 @@ impl BlockHeightStreamImpl { date + Duration::days(1) } - fn strip_wildcard_if_root_account(&self, receiver_id: String) -> String { - let wildcard_root_account_regex = Regex::new(r"^\*\.([a-zA-Z0-9]+)$").unwrap(); - if wildcard_root_account_regex.is_match(&receiver_id) { - return receiver_id - .split('.') - .nth(1) - .unwrap_or(&receiver_id) - .to_string(); - } - receiver_id - } - - fn parse_contract_pattern(&self, contract_pattern: &str) -> ContractPatternType { - // If receiver_id is of pattern *.SOME_ROOT_ACCOUNT such as *.near, we can reduce this to - // "near" as we store bitmaps for root accounts like near ,tg, and so on. - let cleaned_contract_pattern: String = contract_pattern - .split(',') - .map(|receiver| receiver.trim()) - .map(str::to_string) - .map(|receiver| self.strip_wildcard_if_root_account(receiver)) - .collect::>() - .join(","); - - if cleaned_contract_pattern.chars().any(|c| c == '*') { - let wildcard_pattern = cleaned_contract_pattern - .replace(',', "|") - .replace('.', "\\.") - .replace('*', ".*"); - return ContractPatternType::Wildcard(wildcard_pattern); - } - - let exact_pattern = cleaned_contract_pattern - .split(',') - .map(str::to_string) - .collect(); - ContractPatternType::Exact(exact_pattern) - } - fn stream_matching_block_heights<'b, 'a: 'b>( &'a self, start_block_height: near_indexer_primitives::types::BlockHeight, @@ -130,24 +135,24 @@ impl BlockHeightStreamImpl { ) -> impl futures::Stream> + 'b { try_stream! { let start_date = self.get_nearest_block_date(start_block_height).await?; - let contract_pattern_type = self.parse_contract_pattern(&contract_pattern); + let contract_pattern_type = ContractPatternType::from(contract_pattern.as_str()); let mut current_date = start_date; while current_date <= Utc::now() { let base_64_bitmaps: Vec = match contract_pattern_type { ContractPatternType::Exact(ref pattern) => { - let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), ¤t_date).await.unwrap(); + let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), ¤t_date).await?; query_result.iter().map(Base64Bitmap::try_from).collect()? }, ContractPatternType::Wildcard(ref pattern) => { - let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), ¤t_date).await.unwrap(); + let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), ¤t_date).await?; query_result.iter().map(Base64Bitmap::try_from).collect()? }, }; let compressed_bitmaps: Vec = base_64_bitmaps.iter().map(CompressedBitmap::try_from).collect()?; - let decompressed_bitmaps: Vec = compressed_bitmaps.iter().map(CompressedBitmap::decompress).collect(); + let decompressed_bitmaps: Vec = compressed_bitmaps.iter().map(CompressedBitmap::decompress).collect()?; - let starting_block_height = decompressed_bitmaps.iter().map(|item| item.start_block_height).min().unwrap(); + let starting_block_height: u64 = decompressed_bitmaps.iter().map(|item| item.start_block_height).min().unwrap_or(decompressed_bitmaps[0].start_block_height); let mut bitmap_for_day = DecompressedBitmap::new(starting_block_height, None); for mut bitmap in decompressed_bitmaps { let _ = bitmap_for_day.merge(&mut bitmap); @@ -168,9 +173,6 @@ mod tests { use super::*; use mockall::predicate; - const HASURA_ENDPOINT: &str = - "https://queryapi-hasura-graphql-mainnet-vcqilefdcq-ew.a.run.app/v1/graphql"; - fn exact_query_result( first_block_height: i64, bitmap: &str, @@ -245,9 +247,6 @@ mod tests { #[test] fn parse_exact_contract_patterns() { - let mock_s3_client = crate::s3_client::S3Client::default(); - let mock_graphql_client = crate::graphql::client::GraphQLClient::default(); - let block_height_stream = BlockHeightStreamImpl::new(mock_graphql_client, mock_s3_client); let sample_patterns = vec![ "near", "*.near", @@ -257,19 +256,19 @@ mod tests { ]; assert_eq!( - block_height_stream.parse_contract_pattern(sample_patterns[0]), + ContractPatternType::from(sample_patterns[0]), ContractPatternType::Exact(vec!["near".to_string()]) ); assert_eq!( - block_height_stream.parse_contract_pattern(sample_patterns[1]), + ContractPatternType::from(sample_patterns[1]), ContractPatternType::Exact(vec!["near".to_string()]) ); assert_eq!( - block_height_stream.parse_contract_pattern(sample_patterns[2]), + ContractPatternType::from(sample_patterns[2]), ContractPatternType::Exact(vec!["near".to_string(), "someone.tg".to_string()],) ); assert_eq!( - block_height_stream.parse_contract_pattern(sample_patterns[3]), + ContractPatternType::from(sample_patterns[3]), ContractPatternType::Exact(vec![ "near".to_string(), "someone.tg".to_string(), @@ -277,7 +276,7 @@ mod tests { ],) ); assert_eq!( - block_height_stream.parse_contract_pattern(sample_patterns[4]), + ContractPatternType::from(sample_patterns[4]), ContractPatternType::Exact(vec![ "a.near".to_string(), "b.near".to_string(), @@ -289,9 +288,6 @@ mod tests { #[test] fn parse_wildcard_contract_patterns() { - let mock_s3_client = crate::s3_client::S3Client::default(); - let mock_graphql_client = crate::graphql::client::GraphQLClient::default(); - let block_height_stream = BlockHeightStreamImpl::new(mock_graphql_client, mock_s3_client); let sample_patterns = vec![ "*.someone.near", "near, someone.*.tg", @@ -299,15 +295,15 @@ mod tests { ]; assert_eq!( - block_height_stream.parse_contract_pattern(sample_patterns[0]), + ContractPatternType::from(sample_patterns[0]), ContractPatternType::Wildcard(".*\\.someone\\.near".to_string()) ); assert_eq!( - block_height_stream.parse_contract_pattern(sample_patterns[1]), + ContractPatternType::from(sample_patterns[1]), ContractPatternType::Wildcard("near|someone\\..*\\.tg".to_string()) ); assert_eq!( - block_height_stream.parse_contract_pattern(sample_patterns[2]), + ContractPatternType::from(sample_patterns[2]), ContractPatternType::Wildcard("a\\.near|b\\..*|b|a\\..*\\.c\\.near".to_string()) ); } @@ -337,7 +333,7 @@ mod tests { .times(1) .returning(move |_, _| Ok(mock_query_result.clone())); - let block_height_stream = BlockHeightStreamImpl::new(mock_graphql_client, mock_s3_client); + let block_height_stream = BitmapProcessor::new(mock_graphql_client, mock_s3_client); let stream = block_height_stream.stream_matching_block_heights(0, "someone.near".to_owned()); @@ -408,7 +404,7 @@ mod tests { wildcard_query_result(105, "wA=="), ]) }); - let block_height_stream = BlockHeightStreamImpl::new(mock_graphql_client, mock_s3_client); + let block_height_stream = BitmapProcessor::new(mock_graphql_client, mock_s3_client); let stream = block_height_stream.stream_matching_block_heights(0, "*.someone.near".to_string()); From cbc82f91e61c83177f713c7b020f0e13f2e412fc Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Thu, 13 Jun 2024 16:30:58 -0700 Subject: [PATCH 17/20] Rename some tests --- block-streamer/src/bitmap.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/block-streamer/src/bitmap.rs b/block-streamer/src/bitmap.rs index 83966f0b3..c29477f06 100644 --- a/block-streamer/src/bitmap.rs +++ b/block-streamer/src/bitmap.rs @@ -292,7 +292,7 @@ mod tests { } #[test] - fn decompressed_iterator() { + fn iterate_decompressed_bitmap() { let bytes = vec![0b00000001, 0b00000000, 0b00001001]; let bitmap = DecompressedBitmap::new(0, Some(bytes)); let results: Vec = bitmap.iter().collect(); @@ -367,7 +367,7 @@ mod tests { } #[test] - fn decode_compressed_bitmap() { + fn decompress_many_compressed_bitmaps() { assert_eq!( CompressedBitmap::new(0, vec![0b10100000]) .decompress() From 2d33af4b8b6eed13f19d6e0da1022fa1a8f8ceae Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Thu, 13 Jun 2024 17:47:34 -0700 Subject: [PATCH 18/20] Rename files --- .../src/{block_height_stream.rs => bitmap_processor.rs} | 0 block-streamer/src/main.rs | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename block-streamer/src/{block_height_stream.rs => bitmap_processor.rs} (100%) diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/bitmap_processor.rs similarity index 100% rename from block-streamer/src/block_height_stream.rs rename to block-streamer/src/bitmap_processor.rs diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index b8d5c4ee9..f14c65706 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -1,7 +1,7 @@ use tracing_subscriber::prelude::*; mod bitmap; -mod block_height_stream; +mod bitmap_processor; mod block_stream; mod delta_lake_client; mod graphql; From c92fd8132a14dafeb7abd367cde1191d1deaa7c8 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Fri, 14 Jun 2024 07:58:49 -0700 Subject: [PATCH 19/20] Resolve 2nd round comments --- block-streamer/src/bitmap.rs | 23 ++++++------- block-streamer/src/bitmap_processor.rs | 45 ++++++++++++++++++-------- 2 files changed, 41 insertions(+), 27 deletions(-) diff --git a/block-streamer/src/bitmap.rs b/block-streamer/src/bitmap.rs index c29477f06..55dbc95f6 100644 --- a/block-streamer/src/bitmap.rs +++ b/block-streamer/src/bitmap.rs @@ -50,7 +50,7 @@ pub struct CompressedBitmap { impl TryFrom<&Base64Bitmap> for CompressedBitmap { type Error = anyhow::Error; - fn try_from(value: &Base64Bitmap) -> Result { + fn try_from(value: &Base64Bitmap) -> anyhow::Result { Ok(Self { bitmap: general_purpose::STANDARD.decode(value.base64.clone())?, start_block_height: value.start_block_height, @@ -203,7 +203,7 @@ impl DecompressedBitmap { } } - pub fn merge(&mut self, to_merge: &mut DecompressedBitmap) -> anyhow::Result<&mut Self> { + pub fn merge(&mut self, mut to_merge: DecompressedBitmap) -> anyhow::Result<&mut Self> { if to_merge.start_block_height < self.start_block_height { std::mem::swap(&mut self.bitmap, &mut to_merge.bitmap); std::mem::swap( @@ -211,10 +211,7 @@ impl DecompressedBitmap { &mut to_merge.start_block_height, ); } - let block_height_difference = to_merge - .start_block_height - .checked_sub(self.start_block_height) - .ok_or_else(|| anyhow!("Caller of merge should have smaller start block height",))?; + let block_height_difference = to_merge.start_block_height - self.start_block_height; let start_bit_index: usize = usize::try_from(block_height_difference)?; for bit_index_offset in 0..(to_merge.bitmap.len() * 8) { @@ -248,9 +245,7 @@ impl Iterator for DecompressedBitmapIter<'_> { while self.bit_index < self.data.bitmap.len() * 8 { if self.data.get_bit(self.bit_index) { self.bit_index += 1; - return Some( - self.data.start_block_height + u64::try_from(self.bit_index - 1).ok()?, - ); + return Some(self.data.start_block_height + (self.bit_index as u64) - 1); } self.bit_index += 1; } @@ -447,7 +442,7 @@ mod tests { start_block_height: 14, }; - assert!(base_bitmap.merge(&mut to_merge).is_ok()); + assert!(base_bitmap.merge(to_merge).is_ok()); assert_eq!(base_bitmap.bitmap, vec![0b11001110, 0b10011111]); } @@ -462,7 +457,7 @@ mod tests { start_block_height: 14, }; - assert!(base_bitmap.merge(&mut to_merge).is_ok()); + assert!(base_bitmap.merge(to_merge).is_ok()); assert_eq!(base_bitmap.bitmap, vec![0b11001110, 0b10011111]); } @@ -483,11 +478,11 @@ mod tests { }; base_bitmap - .merge(&mut bitmap_a) + .merge(bitmap_a) .unwrap() - .merge(&mut bitmap_b) + .merge(bitmap_b) .unwrap() - .merge(&mut bitmap_c) + .merge(bitmap_c) .unwrap(); assert_eq!(base_bitmap.bitmap, vec![0b11001100, 0b11000000]); assert_eq!(base_bitmap.start_block_height, 10); diff --git a/block-streamer/src/bitmap_processor.rs b/block-streamer/src/bitmap_processor.rs index 97f357c96..22e888ba6 100644 --- a/block-streamer/src/bitmap_processor.rs +++ b/block-streamer/src/bitmap_processor.rs @@ -128,6 +128,35 @@ impl BitmapProcessor { date + Duration::days(1) } + async fn query_base_64_bitmaps( + &self, + contract_pattern_type: &ContractPatternType, + current_date: &DateTime, + ) -> anyhow::Result> { + match contract_pattern_type { + ContractPatternType::Exact(ref pattern) => { + let query_result: Vec<_> = self + .graphql_client + .get_bitmaps_exact(pattern.clone(), ¤t_date) + .await?; + Ok(query_result + .iter() + .map(Base64Bitmap::try_from) + .collect::>>()?) + } + ContractPatternType::Wildcard(ref pattern) => { + let query_result: Vec<_> = self + .graphql_client + .get_bitmaps_wildcard(pattern.clone(), ¤t_date) + .await?; + Ok(query_result + .iter() + .map(Base64Bitmap::try_from) + .collect::>>()?) + } + } + } + fn stream_matching_block_heights<'b, 'a: 'b>( &'a self, start_block_height: near_indexer_primitives::types::BlockHeight, @@ -138,24 +167,14 @@ impl BitmapProcessor { let contract_pattern_type = ContractPatternType::from(contract_pattern.as_str()); let mut current_date = start_date; while current_date <= Utc::now() { - let base_64_bitmaps: Vec = match contract_pattern_type { - ContractPatternType::Exact(ref pattern) => { - let query_result: Vec<_> = self.graphql_client.get_bitmaps_exact(pattern.clone(), ¤t_date).await?; - query_result.iter().map(Base64Bitmap::try_from).collect()? - }, - ContractPatternType::Wildcard(ref pattern) => { - let query_result: Vec<_> = self.graphql_client.get_bitmaps_wildcard(pattern.clone(), ¤t_date).await?; - query_result.iter().map(Base64Bitmap::try_from).collect()? - }, - }; - + let base_64_bitmaps: Vec = self.query_base_64_bitmaps(&contract_pattern_type, ¤t_date).await?; let compressed_bitmaps: Vec = base_64_bitmaps.iter().map(CompressedBitmap::try_from).collect()?; let decompressed_bitmaps: Vec = compressed_bitmaps.iter().map(CompressedBitmap::decompress).collect()?; let starting_block_height: u64 = decompressed_bitmaps.iter().map(|item| item.start_block_height).min().unwrap_or(decompressed_bitmaps[0].start_block_height); let mut bitmap_for_day = DecompressedBitmap::new(starting_block_height, None); - for mut bitmap in decompressed_bitmaps { - let _ = bitmap_for_day.merge(&mut bitmap); + for bitmap in decompressed_bitmaps { + bitmap_for_day.merge(bitmap)?; } let mut bitmap_iter = bitmap_for_day.iter(); From be06911e3a564395706f607c45972dc770108381 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Fri, 14 Jun 2024 08:06:20 -0700 Subject: [PATCH 20/20] rename test variables --- block-streamer/src/bitmap_processor.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/block-streamer/src/bitmap_processor.rs b/block-streamer/src/bitmap_processor.rs index 22e888ba6..79bf12597 100644 --- a/block-streamer/src/bitmap_processor.rs +++ b/block-streamer/src/bitmap_processor.rs @@ -352,10 +352,9 @@ mod tests { .times(1) .returning(move |_, _| Ok(mock_query_result.clone())); - let block_height_stream = BitmapProcessor::new(mock_graphql_client, mock_s3_client); + let bitmap_processor = BitmapProcessor::new(mock_graphql_client, mock_s3_client); - let stream = - block_height_stream.stream_matching_block_heights(0, "someone.near".to_owned()); + let stream = bitmap_processor.stream_matching_block_heights(0, "someone.near".to_owned()); tokio::pin!(stream); let mut result_heights = vec![]; while let Some(Ok(height)) = stream.next().await { @@ -423,10 +422,10 @@ mod tests { wildcard_query_result(105, "wA=="), ]) }); - let block_height_stream = BitmapProcessor::new(mock_graphql_client, mock_s3_client); + let bitmap_processor = BitmapProcessor::new(mock_graphql_client, mock_s3_client); let stream = - block_height_stream.stream_matching_block_heights(0, "*.someone.near".to_string()); + bitmap_processor.stream_matching_block_heights(0, "*.someone.near".to_string()); tokio::pin!(stream); let mut result_heights = vec![]; while let Some(Ok(height)) = stream.next().await {