diff --git a/block-streamer/examples/list_streams.rs b/block-streamer/examples/list_streams.rs index 7c3c28c58..f9095658c 100644 --- a/block-streamer/examples/list_streams.rs +++ b/block-streamer/examples/list_streams.rs @@ -5,7 +5,7 @@ use block_streamer::ListStreamsRequest; #[tokio::main] async fn main() -> Result<(), Box> { - let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?; + let mut client = BlockStreamerClient::connect("http://0.0.0.0:8002").await?; let response = client .list_streams(Request::new(ListStreamsRequest {})) diff --git a/block-streamer/examples/start_stream.rs b/block-streamer/examples/start_stream.rs index 79bead1ea..922219ac3 100644 --- a/block-streamer/examples/start_stream.rs +++ b/block-streamer/examples/start_stream.rs @@ -5,7 +5,7 @@ use block_streamer::{start_stream_request::Rule, ActionAnyRule, StartStreamReque #[tokio::main] async fn main() -> Result<(), Box> { - let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?; + let mut client = BlockStreamerClient::connect("http://0.0.0.0:8002").await?; let response = client .start_stream(Request::new(StartStreamRequest { diff --git a/block-streamer/examples/stop_stream.rs b/block-streamer/examples/stop_stream.rs index a723526e1..9576030f8 100644 --- a/block-streamer/examples/stop_stream.rs +++ b/block-streamer/examples/stop_stream.rs @@ -5,7 +5,7 @@ use block_streamer::StopStreamRequest; #[tokio::main] async fn main() -> Result<(), Box> { - let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?; + let mut client = BlockStreamerClient::connect("http://0.0.0.0:8002").await?; let response = client .stop_stream(Request::new(StopStreamRequest { diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 815c750f5..cc6cd24c9 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -9,6 +9,7 @@ use registry_types::Rule; /// The number of blocks to prefetch within `near-lake-framework`. The internal default is 100, but /// we need this configurable for testing purposes. const LAKE_PREFETCH_SIZE: usize = 100; +const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"]; pub struct Task { handle: JoinHandle>, @@ -184,6 +185,16 @@ async fn process_delta_lake_blocks( affected_account_id, .. } => { + if affected_account_id + .split(",") + .any(|account_id| DELTA_LAKE_SKIP_ACCOUNTS.contains(&account_id.trim())) + { + tracing::debug!( + "Skipping fetching index files from delta lake due to wildcard contract filter present in {}", + affected_account_id + ); + return Ok(start_block_height); + } tracing::debug!( "Fetching block heights starting from {} from delta lake", start_block_height, @@ -358,4 +369,193 @@ mod tests { .await .unwrap(); } + + #[tokio::test] + async fn skips_delta_lake_for_star_filter() { + let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default(); + mock_delta_lake_client + .expect_get_latest_block_metadata() + .returning(|| { + Ok(crate::delta_lake_client::LatestBlockMetadata { + last_indexed_block: "107503700".to_string(), + processed_at_utc: "".to_string(), + first_indexed_block: "".to_string(), + last_indexed_block_date: "".to_string(), + first_indexed_block_date: "".to_string(), + }) + }); + mock_delta_lake_client + .expect_list_matching_block_heights() + .never(); + + let mock_lake_s3_config = + crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]); + + let mut mock_redis_client = crate::redis::RedisClient::default(); + mock_redis_client + .expect_set::() + .returning(|_, fields| { + assert!(vec![107503704, 107503705].contains(&fields)); + Ok(()) + }) + .times(2); + mock_redis_client + .expect_xadd::() + .returning(|_, fields| { + assert!(vec![107503704, 107503705].contains(&fields[0].1)); + Ok(()) + }) + .times(2); + + let indexer_config = crate::indexer_config::IndexerConfig { + account_id: near_indexer_primitives::types::AccountId::try_from( + "morgs.near".to_string(), + ) + .unwrap(), + function_name: "test".to_string(), + rule: registry_types::Rule::ActionAny { + affected_account_id: "*".to_string(), + status: registry_types::Status::Success, + }, + }; + + start_block_stream( + 107503704, + &indexer_config, + std::sync::Arc::new(mock_redis_client), + std::sync::Arc::new(mock_delta_lake_client), + mock_lake_s3_config, + &ChainId::Mainnet, + 1, + "stream key".to_string(), + ) + .await + .unwrap(); + } + + #[tokio::test] + async fn skips_delta_lake_for_multiple_star_filter() { + let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default(); + mock_delta_lake_client + .expect_get_latest_block_metadata() + .returning(|| { + Ok(crate::delta_lake_client::LatestBlockMetadata { + last_indexed_block: "107503700".to_string(), + processed_at_utc: "".to_string(), + first_indexed_block: "".to_string(), + last_indexed_block_date: "".to_string(), + first_indexed_block_date: "".to_string(), + }) + }); + mock_delta_lake_client + .expect_list_matching_block_heights() + .never(); + + let mock_lake_s3_config = + crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]); + + let mut mock_redis_client = crate::redis::RedisClient::default(); + mock_redis_client + .expect_set::() + .returning(|_, fields| { + assert!(vec![107503704, 107503705].contains(&fields)); + Ok(()) + }) + .times(2); + mock_redis_client + .expect_xadd::() + .returning(|_, fields| { + assert!(vec![107503704, 107503705].contains(&fields[0].1)); + Ok(()) + }) + .times(2); + + let indexer_config = crate::indexer_config::IndexerConfig { + account_id: near_indexer_primitives::types::AccountId::try_from( + "morgs.near".to_string(), + ) + .unwrap(), + function_name: "test".to_string(), + rule: registry_types::Rule::ActionAny { + affected_account_id: "*, *.tg".to_string(), + status: registry_types::Status::Success, + }, + }; + + start_block_stream( + 107503704, + &indexer_config, + std::sync::Arc::new(mock_redis_client), + std::sync::Arc::new(mock_delta_lake_client), + mock_lake_s3_config, + &ChainId::Mainnet, + 1, + "stream key".to_string(), + ) + .await + .unwrap(); + } + + #[tokio::test] + async fn skips_delta_lake_for_star_filter_after_normal_account() { + let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default(); + mock_delta_lake_client + .expect_get_latest_block_metadata() + .returning(|| { + Ok(crate::delta_lake_client::LatestBlockMetadata { + last_indexed_block: "107503700".to_string(), + processed_at_utc: "".to_string(), + first_indexed_block: "".to_string(), + last_indexed_block_date: "".to_string(), + first_indexed_block_date: "".to_string(), + }) + }); + mock_delta_lake_client + .expect_list_matching_block_heights() + .never(); + + let mock_lake_s3_config = + crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]); + + let mut mock_redis_client = crate::redis::RedisClient::default(); + mock_redis_client + .expect_set::() + .returning(|_, fields| { + assert!(vec![107503704, 107503705].contains(&fields)); + Ok(()) + }) + .times(2); + mock_redis_client + .expect_xadd::() + .returning(|_, fields| { + assert!(vec![107503704, 107503705].contains(&fields[0].1)); + Ok(()) + }) + .times(2); + + let indexer_config = crate::indexer_config::IndexerConfig { + account_id: near_indexer_primitives::types::AccountId::try_from( + "morgs.near".to_string(), + ) + .unwrap(), + function_name: "test".to_string(), + rule: registry_types::Rule::ActionAny { + affected_account_id: "someone.near, *.kaiching".to_string(), + status: registry_types::Status::Success, + }, + }; + + start_block_stream( + 107503704, + &indexer_config, + std::sync::Arc::new(mock_redis_client), + std::sync::Arc::new(mock_delta_lake_client), + mock_lake_s3_config, + &ChainId::Mainnet, + 1, + "stream key".to_string(), + ) + .await + .unwrap(); + } }