diff --git a/block-streamer/src/bitmap_processor.rs b/block-streamer/src/bitmap_processor.rs index 79bf12597..27e5ae28b 100644 --- a/block-streamer/src/bitmap_processor.rs +++ b/block-streamer/src/bitmap_processor.rs @@ -157,7 +157,7 @@ impl BitmapProcessor { } } - fn stream_matching_block_heights<'b, 'a: 'b>( + pub fn stream_matching_block_heights<'b, 'a: 'b>( &'a self, start_block_height: near_indexer_primitives::types::BlockHeight, contract_pattern: String, @@ -168,18 +168,20 @@ impl BitmapProcessor { let mut current_date = start_date; while current_date <= Utc::now() { let base_64_bitmaps: Vec = self.query_base_64_bitmaps(&contract_pattern_type, ¤t_date).await?; - let compressed_bitmaps: Vec = base_64_bitmaps.iter().map(CompressedBitmap::try_from).collect()?; - let decompressed_bitmaps: Vec = compressed_bitmaps.iter().map(CompressedBitmap::decompress).collect()?; - - let starting_block_height: u64 = decompressed_bitmaps.iter().map(|item| item.start_block_height).min().unwrap_or(decompressed_bitmaps[0].start_block_height); - let mut bitmap_for_day = DecompressedBitmap::new(starting_block_height, None); - for bitmap in decompressed_bitmaps { - bitmap_for_day.merge(bitmap)?; - } + if !base_64_bitmaps.is_empty() { + let compressed_bitmaps: Vec = base_64_bitmaps.iter().map(CompressedBitmap::try_from).collect()?; + let decompressed_bitmaps: Vec = compressed_bitmaps.iter().map(CompressedBitmap::decompress).collect()?; + + let starting_block_height: u64 = decompressed_bitmaps.iter().map(|item| item.start_block_height).min().unwrap_or(decompressed_bitmaps[0].start_block_height); + let mut bitmap_for_day = DecompressedBitmap::new(starting_block_height, None); + for bitmap in decompressed_bitmaps { + bitmap_for_day.merge(bitmap)?; + } - let mut bitmap_iter = bitmap_for_day.iter(); - while let Some(block_height) = bitmap_iter.next() { - yield block_height; + let mut bitmap_iter = bitmap_for_day.iter(); + while let Some(block_height) = bitmap_iter.next() { + yield block_height; + } } current_date = self.next_day(current_date); } diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 82d56cc0a..5799a0e07 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -152,7 +152,9 @@ pub(crate) async fn start_block_stream( redis_client.clone(), indexer, redis_stream.clone(), - ); + ) + .await + .context("Failed while fetching and streaming bitmap indexer blocks")?; let last_indexed_delta_lake_block = process_delta_lake_blocks( start_block_height, diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index 5aca04317..a2cb09443 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -32,7 +32,7 @@ async fn main() -> anyhow::Result<()> { } let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL is not set"); - let graphql_url = std::env::var("GRAPHQL_URL").expect("GRAPHQL_URL is not set"); + let graphql_url = "https://queryapi-hasura-graphql-mainnet-vcqilefdcq-ew.a.run.app/v1/graphql"; let grpc_port = std::env::var("GRPC_PORT").expect("GRPC_PORT is not set"); let metrics_port = std::env::var("METRICS_PORT") .expect("METRICS_PORT is not set") @@ -52,11 +52,11 @@ async fn main() -> anyhow::Result<()> { let s3_config = aws_sdk_s3::Config::from(&aws_config); let s3_client = crate::s3_client::S3Client::new(s3_config.clone()); - let graphql_client = graphql::client::GraphQLClient::new(graphql_url); + let graphql_client = graphql::client::GraphQLClient::new(graphql_url.to_string()); let bitmap_processor = std::sync::Arc::new(crate::bitmap_processor::BitmapProcessor::new( graphql_client, - s3_client, + s3_client.clone(), )); let delta_lake_client = std::sync::Arc::new(crate::delta_lake_client::DeltaLakeClient::new(s3_client));