Skip to content

Commit

Permalink
introduce process bitmap blocks function
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Jun 14, 2024
1 parent 609b7df commit 1eb5dff
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 3 deletions.
59 changes: 58 additions & 1 deletion block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use near_lake_framework::near_indexer_primitives;
use tokio::task::JoinHandle;

use crate::indexer_config::IndexerConfig;
use crate::metrics;
use crate::rules::types::ChainId;
use crate::{bitmap_processor, metrics};
use futures::StreamExt;
use registry_types::Rule;

/// The number of blocks to prefetch within `near-lake-framework`. The internal default is 100, but
Expand Down Expand Up @@ -46,6 +47,7 @@ impl BlockStream {
&mut self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
bitmap_processor: std::sync::Arc<crate::bitmap_processor::BitmapProcessor>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -75,6 +77,7 @@ impl BlockStream {
start_block_height,
&indexer_config,
redis_client,
bitmap_processor,
delta_lake_client,
lake_s3_client,
&chain_id,
Expand Down Expand Up @@ -130,6 +133,7 @@ pub(crate) async fn start_block_stream(
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: &IndexerConfig,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
bitmap_processor: std::sync::Arc<crate::bitmap_processor::BitmapProcessor>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
chain_id: &ChainId,
Expand All @@ -142,6 +146,14 @@ pub(crate) async fn start_block_stream(
.with_label_values(&[&indexer.get_full_name()])
.reset();

process_bitmap_indexer_blocks(
start_block_height,
bitmap_processor,
redis_client.clone(),
indexer,
redis_stream.clone(),
);

let last_indexed_delta_lake_block = process_delta_lake_blocks(
start_block_height,
delta_lake_client,
Expand Down Expand Up @@ -172,6 +184,51 @@ pub(crate) async fn start_block_stream(
Ok(())
}

async fn process_bitmap_indexer_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
bitmap_processor: std::sync::Arc<crate::bitmap_processor::BitmapProcessor>,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
) -> anyhow::Result<u64> {
let mut last_block_height: u64 = start_block_height;

let contract_pattern: String = match &indexer.rule {
Rule::ActionAny {
affected_account_id,
..
} => {
tracing::debug!(
"Fetching block heights starting from {} from Bitmap Indexer",
start_block_height,
);

anyhow::Ok(affected_account_id.to_owned())
}
Rule::ActionFunctionCall { .. } => {
tracing::error!("ActionFunctionCall matching rule not yet supported for delta lake processing, function: {:?} {:?}", indexer.account_id, indexer.function_name);
Ok("".to_string())
}
Rule::Event { .. } => {
tracing::error!("Event matching rule not yet supported for delta lake processing, function {:?} {:?}", indexer.account_id, indexer.function_name);
Ok("".to_string())
}
}?;

if contract_pattern == "".to_string() {
return Ok(start_block_height);
}

let matching_block_heights =
bitmap_processor.stream_matching_block_heights(start_block_height, contract_pattern);
tokio::pin!(matching_block_heights);
while let Some(Ok(block_height)) = matching_block_heights.next().await {
last_block_height = block_height;
println!("{}", block_height);
}
Ok(last_block_height)
}

async fn process_delta_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
Expand Down
16 changes: 15 additions & 1 deletion block-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ async fn main() -> anyhow::Result<()> {
}

let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL is not set");
let graphql_url = std::env::var("GRAPHQL_URL").expect("GRAPHQL_URL is not set");
let grpc_port = std::env::var("GRPC_PORT").expect("GRPC_PORT is not set");
let metrics_port = std::env::var("METRICS_PORT")
.expect("METRICS_PORT is not set")
Expand All @@ -51,14 +52,27 @@ async fn main() -> anyhow::Result<()> {
let s3_config = aws_sdk_s3::Config::from(&aws_config);
let s3_client = crate::s3_client::S3Client::new(s3_config.clone());

let graphql_client = graphql::client::GraphQLClient::new(graphql_url);

let bitmap_processor = std::sync::Arc::new(crate::bitmap_processor::BitmapProcessor::new(
graphql_client,
s3_client,
));
let delta_lake_client =
std::sync::Arc::new(crate::delta_lake_client::DeltaLakeClient::new(s3_client));

let lake_s3_client = crate::lake_s3_client::SharedLakeS3Client::from_conf(s3_config);

tokio::spawn(metrics::init_server(metrics_port).expect("Failed to start metrics server"));

server::init(&grpc_port, redis_client, delta_lake_client, lake_s3_client).await?;
server::init(
&grpc_port,
redis_client,
bitmap_processor,
delta_lake_client,
lake_s3_client,
)
.await?;

Ok(())
}
6 changes: 5 additions & 1 deletion block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ use tonic::{Request, Response, Status};
use crate::indexer_config::IndexerConfig;
use crate::rules::types::ChainId;

use crate::block_stream;
use crate::server::blockstreamer;
use crate::{bitmap_processor, block_stream};

use blockstreamer::*;

pub struct BlockStreamerService {
redis_client: std::sync::Arc<crate::redis::RedisClient>,
bitmap_processor: std::sync::Arc<crate::bitmap_processor::BitmapProcessor>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
chain_id: ChainId,
Expand All @@ -23,12 +24,14 @@ pub struct BlockStreamerService {
impl BlockStreamerService {
pub fn new(
redis_client: std::sync::Arc<crate::redis::RedisClient>,
bitmap_processor: std::sync::Arc<crate::bitmap_processor::BitmapProcessor>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
) -> Self {
Self {
redis_client,
delta_lake_client,
bitmap_processor,
lake_s3_client,
chain_id: ChainId::Mainnet,
block_streams: Mutex::new(HashMap::new()),
Expand Down Expand Up @@ -114,6 +117,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
.start(
request.start_block_height,
self.redis_client.clone(),
self.bitmap_processor.clone(),
self.delta_lake_client.clone(),
self.lake_s3_client.clone(),
)
Expand Down
4 changes: 4 additions & 0 deletions block-streamer/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::bitmap_processor;

mod block_streamer_service;

pub mod blockstreamer {
Expand All @@ -7,6 +9,7 @@ pub mod blockstreamer {
pub async fn init(
port: &str,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
bitmap_processor: std::sync::Arc<crate::bitmap_processor::BitmapProcessor>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
) -> anyhow::Result<()> {
Expand All @@ -16,6 +19,7 @@ pub async fn init(

let block_streamer_service = block_streamer_service::BlockStreamerService::new(
redis_client,
bitmap_processor,
delta_lake_client,
lake_s3_client,
);
Expand Down

0 comments on commit 1eb5dff

Please sign in to comment.