Skip to content

Commit

Permalink
Fix lifetime issues for try stream and prepare match statement
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Feb 26, 2024
1 parent d8961b6 commit 8816cd3
Show file tree
Hide file tree
Showing 4 changed files with 560 additions and 552 deletions.
1 change: 1 addition & 0 deletions block-streamer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
actix-web = "4.5.1"
anyhow = "1.0.57"
async-stream = "0.3.5"
async-trait = "0.1.74"
aws-config = { version = "1.0.0", features = ["behavior-version-latest"]}
aws-sdk-s3 = "0.39.1"
Expand Down
26 changes: 13 additions & 13 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,31 +207,31 @@ async fn process_delta_lake_blocks(

delta_lake_client
.get_matching_block_heights_stream(start_block_height, affected_account_id)
.await
}
Rule::ActionFunctionCall { .. } => {
tracing::error!("ActionFunctionCall matching rule not yet supported for delta lake processing, function: {:?} {:?}", indexer.account_id, indexer.function_name);
Ok(vec![])

}
Rule::Event { .. } => {
tracing::error!("Event matching rule not yet supported for delta lake processing, function {:?} {:?}", indexer.account_id, indexer.function_name);
Ok(vec![])

}
}?;
};

tracing::debug!(
"Flushing {} block heights from index files to Redis Stream",
blocks_from_index.len(),
);

for block_height in &blocks_from_index {
let block_height = block_height.to_owned();
redis_client
.publish_block(indexer, redis_stream.clone(), block_height)
.await?;
redis_client
.set_last_processed_block(indexer, block_height)
.await?;
while let Some(blocks_for day) = blocks_from_index.next().await {
for block_height in &blocks_from_index {
let block_height = block_height.to_owned();
redis_client
.publish_block(indexer, redis_stream.clone(), block_height)
.await?;
redis_client
.set_last_processed_block(indexer, block_height)
.await?;
}
}

let last_indexed_block =
Expand Down
Loading

0 comments on commit 8816cd3

Please sign in to comment.