Skip to content

Commit

Permalink
test: Add tests for block_streamer_service
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Dec 11, 2023
1 parent a04e4f9 commit 8ae3bd3
Showing 1 changed file with 123 additions and 0 deletions.
123 changes: 123 additions & 0 deletions block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,126 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
Ok(Response::new(response))
}
}

#[cfg(test)]
mod tests {
use super::*;

use blockstreamer::block_streamer_server::BlockStreamer;

fn create_block_streamer_service() -> BlockStreamerService {
let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default();
mock_delta_lake_client
.expect_get_latest_block_metadata()
.returning(|| {
Ok(crate::delta_lake_client::LatestBlockMetadata {
last_indexed_block: "107503703".to_string(),
processed_at_utc: "".to_string(),
first_indexed_block: "".to_string(),
last_indexed_block_date: "".to_string(),
first_indexed_block_date: "".to_string(),
})
});
mock_delta_lake_client
.expect_list_matching_block_heights()
.returning(|_, _| Ok(vec![]));

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_xadd::<String, u64>()
.returning(|_, _| Ok(()));

let lake_s3_config = crate::test_utils::create_mock_lake_s3_config(&[107503704]);

BlockStreamerService::new(
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_delta_lake_client),
lake_s3_config,
)
}

#[tokio::test]
async fn starts_a_block_stream() {
let block_streamer_service = create_block_streamer_service();

{
let lock = block_streamer_service.get_block_streams_lock().unwrap();
assert_eq!(lock.len(), 0);
}

block_streamer_service
.start_stream(Request::new(StartStreamRequest {
start_block_height: 0,
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: 0,
})),
}))
.await
.unwrap();

let lock = block_streamer_service.get_block_streams_lock().unwrap();
assert_eq!(lock.len(), 1);
}

#[tokio::test]
async fn stops_a_block_stream() {
let block_streamer_service = create_block_streamer_service();

assert_eq!(
block_streamer_service
.list_streams(Request::new(ListStreamsRequest {}))
.await
.unwrap()
.into_inner()
.streams
.len(),
0
);

block_streamer_service
.start_stream(Request::new(StartStreamRequest {
start_block_height: 0,
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: 0,
})),
}))
.await
.unwrap();

assert_eq!(
block_streamer_service
.list_streams(Request::new(ListStreamsRequest {}))
.await
.unwrap()
.into_inner()
.streams
.len(),
1
);

block_streamer_service
.stop_stream(Request::new(StopStreamRequest {
// ID for indexer morgs.near/test
stream_id: "16210176318434468568".to_string(),
}))
.await
.unwrap();

assert_eq!(
block_streamer_service
.list_streams(Request::new(ListStreamsRequest {}))
.await
.unwrap()
.into_inner()
.streams
.len(),
0
);
}
}

0 comments on commit 8ae3bd3

Please sign in to comment.