diff --git a/block-streamer/src/bitmap_processor.rs b/block-streamer/src/bitmap_processor.rs index 27e5ae28b..ae0a6b0a3 100644 --- a/block-streamer/src/bitmap_processor.rs +++ b/block-streamer/src/bitmap_processor.rs @@ -215,57 +215,6 @@ mod tests { } } - fn generate_block_with_timestamp(date: &str) -> String { - let naive_date = chrono::NaiveDate::parse_from_str(date, "%Y-%m-%d") - .unwrap() - .and_hms_opt(0, 0, 0) - .unwrap(); - - let date_time_utc = chrono::Utc.from_utc_datetime(&naive_date).timestamp() * 1_000_000_000; - - format!( - r#"{{ - "author": "someone", - "header": {{ - "approvals": [], - "block_merkle_root": "ERiC7AJ2zbVz1HJHThR5NWDDN9vByhwdjcVfivmpY5B", - "block_ordinal": 92102682, - "challenges_result": [], - "challenges_root": "11111111111111111111111111111111", - "chunk_headers_root": "MDiJxDyvUQaZRKmUwa5jgQuV6XjwVvnm4tDrajCxwvz", - "chunk_mask": [], - "chunk_receipts_root": "n84wEo7kTKTCJsyqBZ2jndhjrAMeJAXMwKvnJR7vCuy", - "chunk_tx_root": "D8j64GMKBMvUfvnuHtWUyDtMHM5mJ2pA4G5VmYYJvo5G", - "chunks_included": 4, - "epoch_id": "2RMQiomr6CSSwUWpmB62YohxHbfadrHfcsaa3FVb4J9x", - "epoch_sync_data_hash": null, - "gas_price": "100000000", - "hash": "FA1z9RVm9fX3g3mgP3NToZGwWeeXYn8bvZs4nwwTgCpD", - "height": 102162333, - "last_ds_final_block": "Ax2a3MSYuv2hgybnCbpNJMdYmPrHDHdA2hHTUrBkD915", - "last_final_block": "8xkwjn6Lb6UhMBhxcbVQBf3318GafkdaXoHA8Jako1nn", - "latest_protocol_version": 62, - "next_bp_hash": "dmW84aEj2iVJMLwJodJwTfAyeA1LJaHEthvnoAsvTPt", - "next_epoch_id": "C9TDDYthANoduoTBZS7WYDsBSe9XCm4M2F9hRoVXVXWY", - "outcome_root": "6WxzWLVp4b4bFbxHzu18apVfXLvHGKY7CHoqD2Eq3TFJ", - "prev_hash": "Ax2a3MSYuv2hgybnCbpNJMdYmPrHDHdA2hHTUrBkD915", - "prev_height": 102162332, - "prev_state_root": "Aq2ndkyDiwroUWN69Ema9hHtnr6dPHoEBRNyfmd8v4gB", - "random_value": "7ruuMyDhGtTkYaCGYMy7PirPiM79DXa8GhVzQW1pHRoz", - "rent_paid": "0", - "signature": "ed25519:5gYYaWHkAEK5etB8tDpw7fmehkoYSprUxKPygaNqmhVDFCMkA1n379AtL1BBkQswLAPxWs1BZvypFnnLvBtHRknm", - "timestamp": 1695921400989555700, - "timestamp_nanosec": "{}", - "total_supply": "1155783047679681223245725102954966", - "validator_proposals": [], - "validator_reward": "0" - }}, - "chunks": [] - }}"#, - date_time_utc - ) - } - #[test] fn parse_exact_contract_patterns() { let sample_patterns = vec![ @@ -335,7 +284,7 @@ mod tests { mock_s3_client .expect_get_text_file() .returning(move |_, _| { - Ok(generate_block_with_timestamp( + Ok(crate::test_utils::generate_block_with_timestamp( &Utc::now().format("%Y-%m-%d").to_string(), )) }); @@ -371,7 +320,7 @@ mod tests { mock_s3_client .expect_get_text_file() .returning(move |_, _| { - Ok(generate_block_with_timestamp( + Ok(crate::test_utils::generate_block_with_timestamp( &(Utc::now() - Duration::days(2)) .format("%Y-%m-%d") .to_string(), diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 152ea6aac..bc0ad2ab9 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -289,11 +289,6 @@ async fn process_near_lake_blocks( .publish_block(indexer, redis_stream.clone(), block_height) .await?; } - - if block_height == 107503705 { - drop(sender); - return Ok(last_indexed_block); - } } drop(sender); @@ -311,66 +306,6 @@ mod tests { use mockall::predicate; use near_lake_framework::s3_client::GetObjectBytesError; - fn utc_date_time_from_date_string(date: &str) -> chrono::DateTime { - let naive_date_time: chrono::NaiveDateTime = - chrono::NaiveDate::parse_from_str(date, "%Y-%m-%d") - .unwrap() - .and_hms_opt(0, 0, 0) - .unwrap(); - chrono::TimeZone::from_utc_datetime(&chrono::Utc, &naive_date_time) - } - - fn generate_block_with_timestamp(date: &str) -> String { - let naive_date = chrono::NaiveDate::parse_from_str(date, "%Y-%m-%d") - .unwrap() - .and_hms_opt(0, 0, 0) - .unwrap(); - - let date_time_utc = chrono::Utc.from_utc_datetime(&naive_date).timestamp() * 1_000_000_000; - - format!( - r#"{{ - "author": "someone", - "header": {{ - "approvals": [], - "block_merkle_root": "ERiC7AJ2zbVz1HJHThR5NWDDN9vByhwdjcVfivmpY5B", - "block_ordinal": 92102682, - "challenges_result": [], - "challenges_root": "11111111111111111111111111111111", - "chunk_headers_root": "MDiJxDyvUQaZRKmUwa5jgQuV6XjwVvnm4tDrajCxwvz", - "chunk_mask": [], - "chunk_receipts_root": "n84wEo7kTKTCJsyqBZ2jndhjrAMeJAXMwKvnJR7vCuy", - "chunk_tx_root": "D8j64GMKBMvUfvnuHtWUyDtMHM5mJ2pA4G5VmYYJvo5G", - "chunks_included": 4, - "epoch_id": "2RMQiomr6CSSwUWpmB62YohxHbfadrHfcsaa3FVb4J9x", - "epoch_sync_data_hash": null, - "gas_price": "100000000", - "hash": "FA1z9RVm9fX3g3mgP3NToZGwWeeXYn8bvZs4nwwTgCpD", - "height": 102162333, - "last_ds_final_block": "Ax2a3MSYuv2hgybnCbpNJMdYmPrHDHdA2hHTUrBkD915", - "last_final_block": "8xkwjn6Lb6UhMBhxcbVQBf3318GafkdaXoHA8Jako1nn", - "latest_protocol_version": 62, - "next_bp_hash": "dmW84aEj2iVJMLwJodJwTfAyeA1LJaHEthvnoAsvTPt", - "next_epoch_id": "C9TDDYthANoduoTBZS7WYDsBSe9XCm4M2F9hRoVXVXWY", - "outcome_root": "6WxzWLVp4b4bFbxHzu18apVfXLvHGKY7CHoqD2Eq3TFJ", - "prev_hash": "Ax2a3MSYuv2hgybnCbpNJMdYmPrHDHdA2hHTUrBkD915", - "prev_height": 102162332, - "prev_state_root": "Aq2ndkyDiwroUWN69Ema9hHtnr6dPHoEBRNyfmd8v4gB", - "random_value": "7ruuMyDhGtTkYaCGYMy7PirPiM79DXa8GhVzQW1pHRoz", - "rent_paid": "0", - "signature": "ed25519:5gYYaWHkAEK5etB8tDpw7fmehkoYSprUxKPygaNqmhVDFCMkA1n379AtL1BBkQswLAPxWs1BZvypFnnLvBtHRknm", - "timestamp": 1695921400989555700, - "timestamp_nanosec": "{}", - "total_supply": "1155783047679681223245725102954966", - "validator_proposals": [], - "validator_reward": "0" - }}, - "chunks": [] - }}"#, - date_time_utc - ) - } - fn exact_query_result( first_block_height: i64, bitmap: &str, @@ -421,8 +356,8 @@ mod tests { predicate::eq("000091940840/block.json"), ) .returning(move |_, _| { - Ok(generate_block_with_timestamp( - &chrono::Utc::now().format("%Y-%m-%d").to_string(), + Ok(crate::test_utils::generate_block_with_timestamp( + "2023-12-09", )) }); @@ -432,13 +367,15 @@ mod tests { .expect_get_bitmaps_exact() .with( predicate::eq(vec![contract_filter.to_owned()]), - predicate::eq(utc_date_time_from_date_string("2023-12-09")), + predicate::eq(crate::test_utils::utc_date_time_from_date_string( + "2023-12-09", + )), ) .returning(|_, _| Ok(vec![exact_query_result(107503702, "oA==")])); mock_graphql_client .expect_get_bitmaps_exact() - .returning(|_, _| Ok(vec![exact_query_result(107503702, "oA==")])); + .returning(|_, _| Ok(vec![])); let mock_bitmap_processor = crate::bitmap_processor::BitmapProcessor::new(mock_graphql_client, mock_s3_client); @@ -524,7 +461,7 @@ mod tests { predicate::eq("000107503704/block.json"), ) .returning(move |_, _| { - Ok(generate_block_with_timestamp( + Ok(crate::test_utils::generate_block_with_timestamp( &chrono::Utc::now().format("%Y-%m-%d").to_string(), )) }); diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index a2cb09443..406a44ef7 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -58,21 +58,12 @@ async fn main() -> anyhow::Result<()> { graphql_client, s3_client.clone(), )); - let delta_lake_client = - std::sync::Arc::new(crate::delta_lake_client::DeltaLakeClient::new(s3_client)); let lake_s3_client = crate::lake_s3_client::SharedLakeS3Client::from_conf(s3_config); tokio::spawn(metrics::init_server(metrics_port).expect("Failed to start metrics server")); - server::init( - &grpc_port, - redis_client, - bitmap_processor, - delta_lake_client, - lake_s3_client, - ) - .await?; + server::init(&grpc_port, redis_client, bitmap_processor, lake_s3_client).await?; Ok(()) } diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index dc4cfa4f5..c0002a59d 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -15,7 +15,6 @@ use blockstreamer::*; pub struct BlockStreamerService { redis_client: std::sync::Arc, bitmap_processor: std::sync::Arc, - delta_lake_client: std::sync::Arc, lake_s3_client: crate::lake_s3_client::SharedLakeS3Client, chain_id: ChainId, block_streams: Mutex>, @@ -25,12 +24,10 @@ impl BlockStreamerService { pub fn new( redis_client: std::sync::Arc, bitmap_processor: std::sync::Arc, - delta_lake_client: std::sync::Arc, lake_s3_client: crate::lake_s3_client::SharedLakeS3Client, ) -> Self { Self { redis_client, - delta_lake_client, bitmap_processor, lake_s3_client, chain_id: ChainId::Mainnet, @@ -186,132 +183,140 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic } } -// #[cfg(test)] -// mod tests { -// use super::*; -// -// use blockstreamer::block_streamer_server::BlockStreamer; -// -// fn create_block_streamer_service() -> BlockStreamerService { -// let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default(); -// mock_delta_lake_client -// .expect_get_latest_block_metadata() -// .returning(|| { -// Ok(crate::delta_lake_client::LatestBlockMetadata { -// last_indexed_block: "107503703".to_string(), -// processed_at_utc: "".to_string(), -// first_indexed_block: "".to_string(), -// last_indexed_block_date: "".to_string(), -// first_indexed_block_date: "".to_string(), -// }) -// }); -// mock_delta_lake_client -// .expect_list_matching_block_heights() -// .returning(|_, _| Ok(vec![])); -// -// let mut mock_redis_client = crate::redis::RedisClient::default(); -// mock_redis_client -// .expect_xadd::() -// .returning(|_, _| Ok(())); -// -// let mut mock_lake_s3_client = crate::lake_s3_client::SharedLakeS3Client::default(); -// mock_lake_s3_client -// .expect_clone() -// .returning(crate::lake_s3_client::SharedLakeS3Client::default); -// -// BlockStreamerService::new( -// std::sync::Arc::new(mock_redis_client), -// std::sync::Arc::new(mock_delta_lake_client), -// mock_lake_s3_client, -// ) -// } -// -// #[tokio::test] -// async fn starts_a_block_stream() { -// let block_streamer_service = create_block_streamer_service(); -// -// { -// let lock = block_streamer_service.get_block_streams_lock().unwrap(); -// assert_eq!(lock.len(), 0); -// } -// -// block_streamer_service -// .start_stream(Request::new(StartStreamRequest { -// start_block_height: 0, -// account_id: "morgs.near".to_string(), -// function_name: "test".to_string(), -// version: 0, -// redis_stream: "stream".to_string(), -// rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule { -// affected_account_id: "queryapi.dataplatform.near".to_string(), -// status: 1, -// })), -// })) -// .await -// .unwrap(); -// -// let lock = block_streamer_service.get_block_streams_lock().unwrap(); -// assert_eq!(lock.len(), 1); -// } -// -// #[tokio::test] -// async fn stops_a_block_stream() { -// let block_streamer_service = create_block_streamer_service(); -// -// assert_eq!( -// block_streamer_service -// .list_streams(Request::new(ListStreamsRequest {})) -// .await -// .unwrap() -// .into_inner() -// .streams -// .len(), -// 0 -// ); -// -// block_streamer_service -// .start_stream(Request::new(StartStreamRequest { -// start_block_height: 0, -// account_id: "morgs.near".to_string(), -// function_name: "test".to_string(), -// version: 0, -// redis_stream: "stream".to_string(), -// rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule { -// affected_account_id: "queryapi.dataplatform.near".to_string(), -// status: 1, -// })), -// })) -// .await -// .unwrap(); -// -// assert_eq!( -// block_streamer_service -// .list_streams(Request::new(ListStreamsRequest {})) -// .await -// .unwrap() -// .into_inner() -// .streams -// .len(), -// 1 -// ); -// -// block_streamer_service -// .stop_stream(Request::new(StopStreamRequest { -// // ID for indexer morgs.near/test -// stream_id: "16210176318434468568".to_string(), -// })) -// .await -// .unwrap(); -// -// assert_eq!( -// block_streamer_service -// .list_streams(Request::new(ListStreamsRequest {})) -// .await -// .unwrap() -// .into_inner() -// .streams -// .len(), -// 0 -// ); -// } -// } +#[cfg(test)] +mod tests { + use super::*; + + use blockstreamer::block_streamer_server::BlockStreamer; + use mockall::predicate; + + fn create_block_streamer_service() -> BlockStreamerService { + let mut mock_s3_client = crate::s3_client::S3Client::default(); + + mock_s3_client + .expect_get_text_file() + .with( + predicate::eq("near-lake-data-mainnet".to_string()), + predicate::always(), + ) + .returning(move |_, _| { + Ok(crate::test_utils::generate_block_with_timestamp( + &chrono::Utc::now().format("%Y-%m-%d").to_string(), + )) + }); + + let mut mock_graphql_client = crate::graphql::client::GraphQLClient::default(); + + mock_graphql_client + .expect_get_bitmaps_exact() + .returning(|_, _| Ok(vec![])); + + let mock_bitmap_processor = + crate::bitmap_processor::BitmapProcessor::new(mock_graphql_client, mock_s3_client); + + let mut mock_redis_client = crate::redis::RedisClient::default(); + mock_redis_client + .expect_xadd::() + .returning(|_, _| Ok(())); + + let mut mock_lake_s3_client = crate::lake_s3_client::SharedLakeS3Client::default(); + mock_lake_s3_client + .expect_clone() + .returning(crate::lake_s3_client::SharedLakeS3Client::default); + + BlockStreamerService::new( + std::sync::Arc::new(mock_redis_client), + std::sync::Arc::new(mock_bitmap_processor), + mock_lake_s3_client, + ) + } + + #[tokio::test] + async fn starts_a_block_stream() { + let block_streamer_service = create_block_streamer_service(); + + { + let lock = block_streamer_service.get_block_streams_lock().unwrap(); + assert_eq!(lock.len(), 0); + } + + block_streamer_service + .start_stream(Request::new(StartStreamRequest { + start_block_height: 0, + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + version: 0, + redis_stream: "stream".to_string(), + rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: 1, + })), + })) + .await + .unwrap(); + + let lock = block_streamer_service.get_block_streams_lock().unwrap(); + assert_eq!(lock.len(), 1); + } + + #[tokio::test] + async fn stops_a_block_stream() { + let block_streamer_service = create_block_streamer_service(); + + assert_eq!( + block_streamer_service + .list_streams(Request::new(ListStreamsRequest {})) + .await + .unwrap() + .into_inner() + .streams + .len(), + 0 + ); + + block_streamer_service + .start_stream(Request::new(StartStreamRequest { + start_block_height: 0, + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + version: 0, + redis_stream: "stream".to_string(), + rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: 1, + })), + })) + .await + .unwrap(); + + assert_eq!( + block_streamer_service + .list_streams(Request::new(ListStreamsRequest {})) + .await + .unwrap() + .into_inner() + .streams + .len(), + 1 + ); + + block_streamer_service + .stop_stream(Request::new(StopStreamRequest { + // ID for indexer morgs.near/test + stream_id: "16210176318434468568".to_string(), + })) + .await + .unwrap(); + + assert_eq!( + block_streamer_service + .list_streams(Request::new(ListStreamsRequest {})) + .await + .unwrap() + .into_inner() + .streams + .len(), + 0 + ); + } +} diff --git a/block-streamer/src/server/mod.rs b/block-streamer/src/server/mod.rs index ffc711e28..96cabd64c 100644 --- a/block-streamer/src/server/mod.rs +++ b/block-streamer/src/server/mod.rs @@ -10,7 +10,6 @@ pub async fn init( port: &str, redis_client: std::sync::Arc, bitmap_processor: std::sync::Arc, - delta_lake_client: std::sync::Arc, lake_s3_client: crate::lake_s3_client::SharedLakeS3Client, ) -> anyhow::Result<()> { let addr = format!("0.0.0.0:{}", port).parse()?; @@ -20,7 +19,6 @@ pub async fn init( let block_streamer_service = block_streamer_service::BlockStreamerService::new( redis_client, bitmap_processor, - delta_lake_client, lake_s3_client, ); diff --git a/block-streamer/src/test_utils.rs b/block-streamer/src/test_utils.rs index a4af388e8..0039cd616 100644 --- a/block-streamer/src/test_utils.rs +++ b/block-streamer/src/test_utils.rs @@ -1,5 +1,6 @@ use aws_smithy_runtime::client::http::test_util::{ReplayEvent, StaticReplayClient}; use aws_smithy_types::body::SdkBody; +use chrono::TimeZone; use near_lake_framework::near_indexer_primitives; fn generate_replay_events_for_block(block_height: u64) -> Vec { @@ -173,3 +174,63 @@ pub fn get_streamer_message(block_height: u64) -> near_indexer_primitives::Strea near_indexer_primitives::StreamerMessage { block, shards } } + +pub fn utc_date_time_from_date_string(date: &str) -> chrono::DateTime { + let naive_date_time: chrono::NaiveDateTime = + chrono::NaiveDate::parse_from_str(date, "%Y-%m-%d") + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap(); + chrono::TimeZone::from_utc_datetime(&chrono::Utc, &naive_date_time) +} + +pub fn generate_block_with_timestamp(date: &str) -> String { + let naive_date = chrono::NaiveDate::parse_from_str(date, "%Y-%m-%d") + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap(); + + let date_time_utc = chrono::Utc.from_utc_datetime(&naive_date).timestamp() * 1_000_000_000; + + format!( + r#"{{ + "author": "someone", + "header": {{ + "approvals": [], + "block_merkle_root": "ERiC7AJ2zbVz1HJHThR5NWDDN9vByhwdjcVfivmpY5B", + "block_ordinal": 92102682, + "challenges_result": [], + "challenges_root": "11111111111111111111111111111111", + "chunk_headers_root": "MDiJxDyvUQaZRKmUwa5jgQuV6XjwVvnm4tDrajCxwvz", + "chunk_mask": [], + "chunk_receipts_root": "n84wEo7kTKTCJsyqBZ2jndhjrAMeJAXMwKvnJR7vCuy", + "chunk_tx_root": "D8j64GMKBMvUfvnuHtWUyDtMHM5mJ2pA4G5VmYYJvo5G", + "chunks_included": 4, + "epoch_id": "2RMQiomr6CSSwUWpmB62YohxHbfadrHfcsaa3FVb4J9x", + "epoch_sync_data_hash": null, + "gas_price": "100000000", + "hash": "FA1z9RVm9fX3g3mgP3NToZGwWeeXYn8bvZs4nwwTgCpD", + "height": 102162333, + "last_ds_final_block": "Ax2a3MSYuv2hgybnCbpNJMdYmPrHDHdA2hHTUrBkD915", + "last_final_block": "8xkwjn6Lb6UhMBhxcbVQBf3318GafkdaXoHA8Jako1nn", + "latest_protocol_version": 62, + "next_bp_hash": "dmW84aEj2iVJMLwJodJwTfAyeA1LJaHEthvnoAsvTPt", + "next_epoch_id": "C9TDDYthANoduoTBZS7WYDsBSe9XCm4M2F9hRoVXVXWY", + "outcome_root": "6WxzWLVp4b4bFbxHzu18apVfXLvHGKY7CHoqD2Eq3TFJ", + "prev_hash": "Ax2a3MSYuv2hgybnCbpNJMdYmPrHDHdA2hHTUrBkD915", + "prev_height": 102162332, + "prev_state_root": "Aq2ndkyDiwroUWN69Ema9hHtnr6dPHoEBRNyfmd8v4gB", + "random_value": "7ruuMyDhGtTkYaCGYMy7PirPiM79DXa8GhVzQW1pHRoz", + "rent_paid": "0", + "signature": "ed25519:5gYYaWHkAEK5etB8tDpw7fmehkoYSprUxKPygaNqmhVDFCMkA1n379AtL1BBkQswLAPxWs1BZvypFnnLvBtHRknm", + "timestamp": 1695921400989555700, + "timestamp_nanosec": "{}", + "total_supply": "1155783047679681223245725102954966", + "validator_proposals": [], + "validator_reward": "0" + }}, + "chunks": [] + }}"#, + date_time_utc + ) +}