Skip to content

Commit

Permalink
fix: Publish blocks to correct redis stream
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Jan 11, 2024
1 parent d7aa296 commit e373d4c
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ pub(crate) async fn start_block_stream(
if !matches.is_empty() {
redis_client
.xadd(
crate::redis::generate_historical_stream_key(&indexer.get_full_name()),
redis_stream.clone(),
&[("block_height".to_string(), block_height.to_owned())],
)
.await
Expand All @@ -254,10 +254,10 @@ pub(crate) async fn start_block_stream(
mod tests {
use super::*;

use mockall::predicate;

#[tokio::test]
async fn adds_matching_blocks_from_index_and_lake() {
let expected_matching_block_height_count = 3;

let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default();
mock_delta_lake_client
.expect_get_latest_block_metadata()
Expand All @@ -277,8 +277,12 @@ mod tests {
let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_xadd::<String, u64>()
.returning(|_, _| Ok(()))
.times(expected_matching_block_height_count);
.with(predicate::eq("stream key".to_string()), predicate::always())
.returning(|_, fields| {
assert!(vec![107503702, 107503703, 107503705].contains(&fields[0].1));
Ok(())
})
.times(3);
mock_redis_client
.expect_set::<String, u64>()
.returning(|_, _| Ok(()))
Expand Down

0 comments on commit e373d4c

Please sign in to comment.