diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index ee403a235..82d56cc0a 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -3,8 +3,9 @@ use near_lake_framework::near_indexer_primitives; use tokio::task::JoinHandle; use crate::indexer_config::IndexerConfig; -use crate::metrics; use crate::rules::types::ChainId; +use crate::{bitmap_processor, metrics}; +use futures::StreamExt; use registry_types::Rule; /// The number of blocks to prefetch within `near-lake-framework`. The internal default is 100, but @@ -46,6 +47,7 @@ impl BlockStream { &mut self, start_block_height: near_indexer_primitives::types::BlockHeight, redis_client: std::sync::Arc, + bitmap_processor: std::sync::Arc, delta_lake_client: std::sync::Arc, lake_s3_client: crate::lake_s3_client::SharedLakeS3Client, ) -> anyhow::Result<()> { @@ -75,6 +77,7 @@ impl BlockStream { start_block_height, &indexer_config, redis_client, + bitmap_processor, delta_lake_client, lake_s3_client, &chain_id, @@ -130,6 +133,7 @@ pub(crate) async fn start_block_stream( start_block_height: near_indexer_primitives::types::BlockHeight, indexer: &IndexerConfig, redis_client: std::sync::Arc, + bitmap_processor: std::sync::Arc, delta_lake_client: std::sync::Arc, lake_s3_client: crate::lake_s3_client::SharedLakeS3Client, chain_id: &ChainId, @@ -142,6 +146,14 @@ pub(crate) async fn start_block_stream( .with_label_values(&[&indexer.get_full_name()]) .reset(); + process_bitmap_indexer_blocks( + start_block_height, + bitmap_processor, + redis_client.clone(), + indexer, + redis_stream.clone(), + ); + let last_indexed_delta_lake_block = process_delta_lake_blocks( start_block_height, delta_lake_client, @@ -172,6 +184,51 @@ pub(crate) async fn start_block_stream( Ok(()) } +async fn process_bitmap_indexer_blocks( + start_block_height: near_indexer_primitives::types::BlockHeight, + bitmap_processor: std::sync::Arc, + redis_client: std::sync::Arc, + indexer: &IndexerConfig, + redis_stream: String, +) -> anyhow::Result { + let mut last_block_height: u64 = start_block_height; + + let contract_pattern: String = match &indexer.rule { + Rule::ActionAny { + affected_account_id, + .. + } => { + tracing::debug!( + "Fetching block heights starting from {} from Bitmap Indexer", + start_block_height, + ); + + anyhow::Ok(affected_account_id.to_owned()) + } + Rule::ActionFunctionCall { .. } => { + tracing::error!("ActionFunctionCall matching rule not yet supported for delta lake processing, function: {:?} {:?}", indexer.account_id, indexer.function_name); + Ok("".to_string()) + } + Rule::Event { .. } => { + tracing::error!("Event matching rule not yet supported for delta lake processing, function {:?} {:?}", indexer.account_id, indexer.function_name); + Ok("".to_string()) + } + }?; + + if contract_pattern == "".to_string() { + return Ok(start_block_height); + } + + let matching_block_heights = + bitmap_processor.stream_matching_block_heights(start_block_height, contract_pattern); + tokio::pin!(matching_block_heights); + while let Some(Ok(block_height)) = matching_block_heights.next().await { + last_block_height = block_height; + println!("{}", block_height); + } + Ok(last_block_height) +} + async fn process_delta_lake_blocks( start_block_height: near_indexer_primitives::types::BlockHeight, delta_lake_client: std::sync::Arc, diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index f14c65706..5aca04317 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -32,6 +32,7 @@ async fn main() -> anyhow::Result<()> { } let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL is not set"); + let graphql_url = std::env::var("GRAPHQL_URL").expect("GRAPHQL_URL is not set"); let grpc_port = std::env::var("GRPC_PORT").expect("GRPC_PORT is not set"); let metrics_port = std::env::var("METRICS_PORT") .expect("METRICS_PORT is not set") @@ -51,6 +52,12 @@ async fn main() -> anyhow::Result<()> { let s3_config = aws_sdk_s3::Config::from(&aws_config); let s3_client = crate::s3_client::S3Client::new(s3_config.clone()); + let graphql_client = graphql::client::GraphQLClient::new(graphql_url); + + let bitmap_processor = std::sync::Arc::new(crate::bitmap_processor::BitmapProcessor::new( + graphql_client, + s3_client, + )); let delta_lake_client = std::sync::Arc::new(crate::delta_lake_client::DeltaLakeClient::new(s3_client)); @@ -58,7 +65,14 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(metrics::init_server(metrics_port).expect("Failed to start metrics server")); - server::init(&grpc_port, redis_client, delta_lake_client, lake_s3_client).await?; + server::init( + &grpc_port, + redis_client, + bitmap_processor, + delta_lake_client, + lake_s3_client, + ) + .await?; Ok(()) } diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 7c2a6d45a..a0f3e1ef9 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -7,13 +7,14 @@ use tonic::{Request, Response, Status}; use crate::indexer_config::IndexerConfig; use crate::rules::types::ChainId; -use crate::block_stream; use crate::server::blockstreamer; +use crate::{bitmap_processor, block_stream}; use blockstreamer::*; pub struct BlockStreamerService { redis_client: std::sync::Arc, + bitmap_processor: std::sync::Arc, delta_lake_client: std::sync::Arc, lake_s3_client: crate::lake_s3_client::SharedLakeS3Client, chain_id: ChainId, @@ -23,12 +24,14 @@ pub struct BlockStreamerService { impl BlockStreamerService { pub fn new( redis_client: std::sync::Arc, + bitmap_processor: std::sync::Arc, delta_lake_client: std::sync::Arc, lake_s3_client: crate::lake_s3_client::SharedLakeS3Client, ) -> Self { Self { redis_client, delta_lake_client, + bitmap_processor, lake_s3_client, chain_id: ChainId::Mainnet, block_streams: Mutex::new(HashMap::new()), @@ -114,6 +117,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic .start( request.start_block_height, self.redis_client.clone(), + self.bitmap_processor.clone(), self.delta_lake_client.clone(), self.lake_s3_client.clone(), ) diff --git a/block-streamer/src/server/mod.rs b/block-streamer/src/server/mod.rs index fcdb77068..ffc711e28 100644 --- a/block-streamer/src/server/mod.rs +++ b/block-streamer/src/server/mod.rs @@ -1,3 +1,5 @@ +use crate::bitmap_processor; + mod block_streamer_service; pub mod blockstreamer { @@ -7,6 +9,7 @@ pub mod blockstreamer { pub async fn init( port: &str, redis_client: std::sync::Arc, + bitmap_processor: std::sync::Arc, delta_lake_client: std::sync::Arc, lake_s3_client: crate::lake_s3_client::SharedLakeS3Client, ) -> anyhow::Result<()> { @@ -16,6 +19,7 @@ pub async fn init( let block_streamer_service = block_streamer_service::BlockStreamerService::new( redis_client, + bitmap_processor, delta_lake_client, lake_s3_client, );