diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 45151736a..fd98e03cd 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -230,7 +230,7 @@ pub(crate) async fn start_block_stream( if !matches.is_empty() { redis_client .xadd( - crate::redis::generate_historical_stream_key(&indexer.get_full_name()), + redis_stream.clone(), &[("block_height".to_string(), block_height.to_owned())], ) .await @@ -254,10 +254,10 @@ pub(crate) async fn start_block_stream( mod tests { use super::*; + use mockall::predicate; + #[tokio::test] async fn adds_matching_blocks_from_index_and_lake() { - let expected_matching_block_height_count = 3; - let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default(); mock_delta_lake_client .expect_get_latest_block_metadata() @@ -277,8 +277,12 @@ mod tests { let mut mock_redis_client = crate::redis::RedisClient::default(); mock_redis_client .expect_xadd::() - .returning(|_, _| Ok(())) - .times(expected_matching_block_height_count); + .with(predicate::eq("stream key".to_string()), predicate::always()) + .returning(|_, fields| { + assert!(vec![107503702, 107503703, 107503705].contains(&fields[0].1)); + Ok(()) + }) + .times(3); mock_redis_client .expect_set::() .returning(|_, _| Ok(()))