From 3c3fbb8c73c36fac36162296f8366152772f6f7b Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 5 Jun 2024 12:20:47 -0700 Subject: [PATCH] Some starting code --- block-streamer/Cargo.lock | 1 + block-streamer/Cargo.toml | 1 + block-streamer/src/bitmap.rs | 25 ++++++++++++ block-streamer/src/block_height_stream.rs | 48 +++++++++++++++++++++++ block-streamer/src/graphql/client.rs | 4 +- block-streamer/src/main.rs | 1 + coordinator/Cargo.lock | 1 + 7 files changed, 79 insertions(+), 2 deletions(-) create mode 100644 block-streamer/src/block_height_stream.rs diff --git a/block-streamer/Cargo.lock b/block-streamer/Cargo.lock index 1a68afaa5..19c2d2cae 100644 --- a/block-streamer/Cargo.lock +++ b/block-streamer/Cargo.lock @@ -964,6 +964,7 @@ version = "0.1.0" dependencies = [ "actix-web", "anyhow", + "async-stream", "async-trait", "aws-config", "aws-sdk-s3", diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index d44903786..6e1ee4c82 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -34,6 +34,7 @@ wildmatch = "2.1.1" registry-types = { path = "../registry/types" } base64 = "0.22.1" +async-stream = "0.3.5" [build-dependencies] tonic-build = "0.10" diff --git a/block-streamer/src/bitmap.rs b/block-streamer/src/bitmap.rs index a4e921974..bac3dd735 100644 --- a/block-streamer/src/bitmap.rs +++ b/block-streamer/src/bitmap.rs @@ -1,11 +1,16 @@ use anyhow::anyhow; use base64::{engine::general_purpose, Engine as _}; +use std::convert::TryFrom; +use crate::graphql::client::{get_bitmaps_exact, get_bitmaps_wildcard}; + +#[derive(Debug, Default, PartialEq)] pub struct Base64Bitmap { pub start_block_height: usize, pub base64: String, } +#[derive(Debug, Default, PartialEq)] pub struct Bitmap { pub start_block_height: usize, pub bitmap: Vec, @@ -19,6 +24,26 @@ struct EliasGammaDecoded { pub struct BitmapOperator {} +impl Base64Bitmap { + pub fn from_exact_query( + query_item: get_bitmaps_exact::GetBitmapsExactDarunrsNearBitmapV5ActionsIndex, + ) -> Self { + Self { + base64: query_item.bitmap, + start_block_height: usize::try_from(query_item.first_block_height).unwrap(), + } + } + + pub fn from_wildcard_query( + query_item: get_bitmaps_wildcard::GetBitmapsWildcardDarunrsNearBitmapV5ActionsIndex, + ) -> Self { + Self { + base64: query_item.bitmap, + start_block_height: usize::try_from(query_item.first_block_height).unwrap(), + } + } +} + #[cfg_attr(test, mockall::automock)] impl BitmapOperator { pub fn new() -> Self { diff --git a/block-streamer/src/block_height_stream.rs b/block-streamer/src/block_height_stream.rs new file mode 100644 index 000000000..edce47069 --- /dev/null +++ b/block-streamer/src/block_height_stream.rs @@ -0,0 +1,48 @@ +use crate::bitmap::{Base64Bitmap, BitmapOperator}; +use crate::graphql::client::GraphQLClient; +use async_stream::stream; +use futures::Stream; +use near_lake_framework::near_indexer_primitives; + +pub struct BlockHeightStream { + graphql_client: GraphQLClient, + bitmap_operator: BitmapOperator, +} + +#[cfg_attr(test, mockall::automock)] +impl BlockHeightStream { + pub fn new(graphql_endpoint: String) -> Self { + Self { + graphql_client: GraphQLClient::new(graphql_endpoint), + bitmap_operator: BitmapOperator::new(), + } + } + + fn parse_contract_pattern(&self, contract_pattern: &str) -> Vec< + + pub async fn list_matching_block_heights( + &self, + start_block_height: near_indexer_primitives::types::BlockHeight, + contract_pattern: &str, + ) -> impl Stream { + let start_date = self.get_nearest_block_date(start_block_height).await?; + + stream! { + for i in 0..3 { + yield i; + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const HASURA_ENDPOINT: &str = + "https://queryapi-hasura-graphql-mainnet-vcqilefdcq-ew.a.run.app/v1/graphql"; + + fn collect_three_block_heights_from_one_bitmap() {} + + fn collect_three_block_heights_from_two_bitmaps() {} +} diff --git a/block-streamer/src/graphql/client.rs b/block-streamer/src/graphql/client.rs index eca7cee80..09c37e8dd 100644 --- a/block-streamer/src/graphql/client.rs +++ b/block-streamer/src/graphql/client.rs @@ -14,7 +14,7 @@ type Date = String; response_derives = "Debug", normalization = "rust" )] -struct GetBitmapsExact; +pub struct GetBitmapsExact; #[derive(GraphQLQuery)] #[graphql( @@ -23,7 +23,7 @@ struct GetBitmapsExact; response_derives = "Debug", normalization = "rust" )] -struct GetBitmapsWildcard; +pub struct GetBitmapsWildcard; pub struct GraphQLClient { client: reqwest::Client, diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index d8e0a3c05..b8d5c4ee9 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -1,6 +1,7 @@ use tracing_subscriber::prelude::*; mod bitmap; +mod block_height_stream; mod block_stream; mod delta_lake_client; mod graphql; diff --git a/coordinator/Cargo.lock b/coordinator/Cargo.lock index 032dae9cf..0a95983d9 100644 --- a/coordinator/Cargo.lock +++ b/coordinator/Cargo.lock @@ -926,6 +926,7 @@ version = "0.1.0" dependencies = [ "actix-web", "anyhow", + "async-stream", "async-trait", "aws-config", "aws-sdk-s3",