Skip to content

Commit

Permalink
Working backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Jun 14, 2024
1 parent 1eb5dff commit 3f91e8e
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 16 deletions.
26 changes: 14 additions & 12 deletions block-streamer/src/bitmap_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl BitmapProcessor {
}
}

fn stream_matching_block_heights<'b, 'a: 'b>(
pub fn stream_matching_block_heights<'b, 'a: 'b>(
&'a self,
start_block_height: near_indexer_primitives::types::BlockHeight,
contract_pattern: String,
Expand All @@ -168,18 +168,20 @@ impl BitmapProcessor {
let mut current_date = start_date;
while current_date <= Utc::now() {
let base_64_bitmaps: Vec<Base64Bitmap> = self.query_base_64_bitmaps(&contract_pattern_type, &current_date).await?;
let compressed_bitmaps: Vec<CompressedBitmap> = base_64_bitmaps.iter().map(CompressedBitmap::try_from).collect()?;
let decompressed_bitmaps: Vec<DecompressedBitmap> = compressed_bitmaps.iter().map(CompressedBitmap::decompress).collect()?;

let starting_block_height: u64 = decompressed_bitmaps.iter().map(|item| item.start_block_height).min().unwrap_or(decompressed_bitmaps[0].start_block_height);
let mut bitmap_for_day = DecompressedBitmap::new(starting_block_height, None);
for bitmap in decompressed_bitmaps {
bitmap_for_day.merge(bitmap)?;
}
if !base_64_bitmaps.is_empty() {
let compressed_bitmaps: Vec<CompressedBitmap> = base_64_bitmaps.iter().map(CompressedBitmap::try_from).collect()?;
let decompressed_bitmaps: Vec<DecompressedBitmap> = compressed_bitmaps.iter().map(CompressedBitmap::decompress).collect()?;

let starting_block_height: u64 = decompressed_bitmaps.iter().map(|item| item.start_block_height).min().unwrap_or(decompressed_bitmaps[0].start_block_height);
let mut bitmap_for_day = DecompressedBitmap::new(starting_block_height, None);
for bitmap in decompressed_bitmaps {
bitmap_for_day.merge(bitmap)?;
}

let mut bitmap_iter = bitmap_for_day.iter();
while let Some(block_height) = bitmap_iter.next() {
yield block_height;
let mut bitmap_iter = bitmap_for_day.iter();
while let Some(block_height) = bitmap_iter.next() {
yield block_height;
}
}
current_date = self.next_day(current_date);
}
Expand Down
4 changes: 3 additions & 1 deletion block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ pub(crate) async fn start_block_stream(
redis_client.clone(),
indexer,
redis_stream.clone(),
);
)
.await
.context("Failed while fetching and streaming bitmap indexer blocks")?;

let last_indexed_delta_lake_block = process_delta_lake_blocks(
start_block_height,
Expand Down
6 changes: 3 additions & 3 deletions block-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main() -> anyhow::Result<()> {
}

let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL is not set");
let graphql_url = std::env::var("GRAPHQL_URL").expect("GRAPHQL_URL is not set");
let graphql_url = "https://queryapi-hasura-graphql-mainnet-vcqilefdcq-ew.a.run.app/v1/graphql";
let grpc_port = std::env::var("GRPC_PORT").expect("GRPC_PORT is not set");
let metrics_port = std::env::var("METRICS_PORT")
.expect("METRICS_PORT is not set")
Expand All @@ -52,11 +52,11 @@ async fn main() -> anyhow::Result<()> {
let s3_config = aws_sdk_s3::Config::from(&aws_config);
let s3_client = crate::s3_client::S3Client::new(s3_config.clone());

let graphql_client = graphql::client::GraphQLClient::new(graphql_url);
let graphql_client = graphql::client::GraphQLClient::new(graphql_url.to_string());

let bitmap_processor = std::sync::Arc::new(crate::bitmap_processor::BitmapProcessor::new(
graphql_client,
s3_client,
s3_client.clone(),
));
let delta_lake_client =
std::sync::Arc::new(crate::delta_lake_client::DeltaLakeClient::new(s3_client));
Expand Down

0 comments on commit 3f91e8e

Please sign in to comment.