From db8ce199a598183eda969d42cc7e544d1f413b8d Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Tue, 20 Feb 2024 10:22:06 -0800 Subject: [PATCH 1/7] feat: Support * contract filter --- block-streamer/src/block_stream.rs | 63 ++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 815c750f5..dc5720627 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -184,6 +184,13 @@ async fn process_delta_lake_blocks( affected_account_id, .. } => { + if affected_account_id.eq("*") { + tracing::debug!( + "Skipping fetching index files from delta lake due to wildcard contract filter {}", + affected_account_id + ); + return Ok(start_block_height); + } tracing::debug!( "Fetching block heights starting from {} from delta lake", start_block_height, @@ -358,4 +365,60 @@ mod tests { .await .unwrap(); } + + #[tokio::test] + async fn skips_delta_lake_for_star_filter() { + let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default(); + mock_delta_lake_client + .expect_get_latest_block_metadata() + .returning(|| { + Ok(crate::delta_lake_client::LatestBlockMetadata { + last_indexed_block: "107503703".to_string(), + processed_at_utc: "".to_string(), + first_indexed_block: "".to_string(), + last_indexed_block_date: "".to_string(), + first_indexed_block_date: "".to_string(), + }) + }); + mock_delta_lake_client.expect_list_matching_block_heights().times(0); + + let mock_lake_s3_config = crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]); + + let mut mock_redis_client = crate::redis::RedisClient::default(); + mock_redis_client + .expect_set::() + .returning(|_,_| Ok(())) + .times(2); + mock_redis_client + .expect_xadd::() + .returning(|_, fields| { + assert!(vec![107503704, 107503705].contains(&fields[0].1)); + Ok(()) + }) + .times(2); + + let indexer_config = crate::indexer_config::IndexerConfig { + account_id: near_indexer_primitives::types::AccountId::try_from( + "morgs.near".to_string() + ).unwrap(), + function_name: "test".to_string(), + rule: registry_types::Rule::ActionAny { + affected_account_id: "*".to_string(), + status: registry_types::Status::Success, + } + }; + + start_block_stream( + 107503704, + &indexer_config, + std::sync::Arc::new(mock_redis_client), + std::sync::Arc::new(mock_delta_lake_client), + mock_lake_s3_config, + &ChainId::Mainnet, + 1, + "stream key".to_string(), + ) + .await + .unwrap(); + } } From 9bc433c5d6a86e9626dcc226d34c290223b2440a Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Tue, 20 Feb 2024 11:21:26 -0800 Subject: [PATCH 2/7] Adjust examples and manually test --- block-streamer/examples/list_streams.rs | 2 +- block-streamer/examples/start_stream.rs | 4 ++-- block-streamer/examples/stop_stream.rs | 2 +- block-streamer/src/block_stream.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/block-streamer/examples/list_streams.rs b/block-streamer/examples/list_streams.rs index 7c3c28c58..f9095658c 100644 --- a/block-streamer/examples/list_streams.rs +++ b/block-streamer/examples/list_streams.rs @@ -5,7 +5,7 @@ use block_streamer::ListStreamsRequest; #[tokio::main] async fn main() -> Result<(), Box> { - let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?; + let mut client = BlockStreamerClient::connect("http://0.0.0.0:8002").await?; let response = client .list_streams(Request::new(ListStreamsRequest {})) diff --git a/block-streamer/examples/start_stream.rs b/block-streamer/examples/start_stream.rs index 79bead1ea..2bb1c4adb 100644 --- a/block-streamer/examples/start_stream.rs +++ b/block-streamer/examples/start_stream.rs @@ -5,7 +5,7 @@ use block_streamer::{start_stream_request::Rule, ActionAnyRule, StartStreamReque #[tokio::main] async fn main() -> Result<(), Box> { - let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?; + let mut client = BlockStreamerClient::connect("http://0.0.0.0:8002").await?; let response = client .start_stream(Request::new(StartStreamRequest { @@ -15,7 +15,7 @@ async fn main() -> Result<(), Box> { version: 0, redis_stream: "morgs.near/test:block_stream".to_string(), rule: Some(Rule::ActionAnyRule(ActionAnyRule { - affected_account_id: "social.near".to_string(), + affected_account_id: "*".to_string(), status: Status::Success.into(), })), })) diff --git a/block-streamer/examples/stop_stream.rs b/block-streamer/examples/stop_stream.rs index a723526e1..9576030f8 100644 --- a/block-streamer/examples/stop_stream.rs +++ b/block-streamer/examples/stop_stream.rs @@ -5,7 +5,7 @@ use block_streamer::StopStreamRequest; #[tokio::main] async fn main() -> Result<(), Box> { - let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?; + let mut client = BlockStreamerClient::connect("http://0.0.0.0:8002").await?; let response = client .stop_stream(Request::new(StopStreamRequest { diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index dc5720627..4008a86b4 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -185,7 +185,7 @@ async fn process_delta_lake_blocks( .. } => { if affected_account_id.eq("*") { - tracing::debug!( + tracing::info!( "Skipping fetching index files from delta lake due to wildcard contract filter {}", affected_account_id ); From 4c3173945e17e46e150429f6fdccacb0e677c0b4 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Tue, 20 Feb 2024 12:15:44 -0800 Subject: [PATCH 3/7] Assert redis set doesn't use delta lake last indexed block --- block-streamer/src/block_stream.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 4008a86b4..0aa5fcc59 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -373,7 +373,7 @@ mod tests { .expect_get_latest_block_metadata() .returning(|| { Ok(crate::delta_lake_client::LatestBlockMetadata { - last_indexed_block: "107503703".to_string(), + last_indexed_block: "107503700".to_string(), processed_at_utc: "".to_string(), first_indexed_block: "".to_string(), last_indexed_block_date: "".to_string(), @@ -387,7 +387,10 @@ mod tests { let mut mock_redis_client = crate::redis::RedisClient::default(); mock_redis_client .expect_set::() - .returning(|_,_| Ok(())) + .returning(|_, fields| { + assert!(vec![107503704, 107503705].contains(&fields)); + Ok(()) + }) .times(2); mock_redis_client .expect_xadd::() From 2e4bbbfba754de319a7551c86d4f561803bf51d2 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Tue, 20 Feb 2024 12:18:18 -0800 Subject: [PATCH 4/7] fix: Cargo format --- block-streamer/src/block_stream.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 0aa5fcc59..b3ffd95d6 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -380,9 +380,12 @@ mod tests { first_indexed_block_date: "".to_string(), }) }); - mock_delta_lake_client.expect_list_matching_block_heights().times(0); + mock_delta_lake_client + .expect_list_matching_block_heights() + .times(0); - let mock_lake_s3_config = crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]); + let mock_lake_s3_config = + crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]); let mut mock_redis_client = crate::redis::RedisClient::default(); mock_redis_client @@ -399,16 +402,17 @@ mod tests { Ok(()) }) .times(2); - + let indexer_config = crate::indexer_config::IndexerConfig { account_id: near_indexer_primitives::types::AccountId::try_from( - "morgs.near".to_string() - ).unwrap(), + "morgs.near".to_string(), + ) + .unwrap(), function_name: "test".to_string(), - rule: registry_types::Rule::ActionAny { + rule: registry_types::Rule::ActionAny { affected_account_id: "*".to_string(), status: registry_types::Status::Success, - } + }, }; start_block_stream( From c3f82f1ed2d2c02907eb6d3dc040fe6316a6f476 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Tue, 20 Feb 2024 12:22:55 -0800 Subject: [PATCH 5/7] fix: Revert example start stream contract filter back to social.near --- block-streamer/examples/start_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block-streamer/examples/start_stream.rs b/block-streamer/examples/start_stream.rs index 2bb1c4adb..922219ac3 100644 --- a/block-streamer/examples/start_stream.rs +++ b/block-streamer/examples/start_stream.rs @@ -15,7 +15,7 @@ async fn main() -> Result<(), Box> { version: 0, redis_stream: "morgs.near/test:block_stream".to_string(), rule: Some(Rule::ActionAnyRule(ActionAnyRule { - affected_account_id: "*".to_string(), + affected_account_id: "social.near".to_string(), status: Status::Success.into(), })), })) From 420a658d545c3fef467dfeabb339992f56f74f07 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Tue, 20 Feb 2024 16:10:01 -0800 Subject: [PATCH 6/7] feat: Support multiple wildcard filters for skipping delta lake --- block-streamer/src/block_stream.rs | 136 ++++++++++++++++++++++++++++- 1 file changed, 133 insertions(+), 3 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index b3ffd95d6..423047d0d 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -9,6 +9,7 @@ use registry_types::Rule; /// The number of blocks to prefetch within `near-lake-framework`. The internal default is 100, but /// we need this configurable for testing purposes. const LAKE_PREFETCH_SIZE: usize = 100; +const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"]; pub struct Task { handle: JoinHandle>, @@ -184,8 +185,11 @@ async fn process_delta_lake_blocks( affected_account_id, .. } => { - if affected_account_id.eq("*") { - tracing::info!( + if affected_account_id + .split(",") + .any(|account_id| DELTA_LAKE_SKIP_ACCOUNTS.contains(&account_id.trim())) + { + tracing::debug!( "Skipping fetching index files from delta lake due to wildcard contract filter {}", affected_account_id ); @@ -382,7 +386,7 @@ mod tests { }); mock_delta_lake_client .expect_list_matching_block_heights() - .times(0); + .never(); let mock_lake_s3_config = crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]); @@ -428,4 +432,130 @@ mod tests { .await .unwrap(); } + + #[tokio::test] + async fn skips_delta_lake_for_multiple_star_filter() { + let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default(); + mock_delta_lake_client + .expect_get_latest_block_metadata() + .returning(|| { + Ok(crate::delta_lake_client::LatestBlockMetadata { + last_indexed_block: "107503700".to_string(), + processed_at_utc: "".to_string(), + first_indexed_block: "".to_string(), + last_indexed_block_date: "".to_string(), + first_indexed_block_date: "".to_string(), + }) + }); + mock_delta_lake_client + .expect_list_matching_block_heights() + .never(); + + let mock_lake_s3_config = + crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]); + + let mut mock_redis_client = crate::redis::RedisClient::default(); + mock_redis_client + .expect_set::() + .returning(|_, fields| { + assert!(vec![107503704, 107503705].contains(&fields)); + Ok(()) + }) + .times(2); + mock_redis_client + .expect_xadd::() + .returning(|_, fields| { + assert!(vec![107503704, 107503705].contains(&fields[0].1)); + Ok(()) + }) + .times(2); + + let indexer_config = crate::indexer_config::IndexerConfig { + account_id: near_indexer_primitives::types::AccountId::try_from( + "morgs.near".to_string(), + ) + .unwrap(), + function_name: "test".to_string(), + rule: registry_types::Rule::ActionAny { + affected_account_id: "*, *.tg".to_string(), + status: registry_types::Status::Success, + }, + }; + + start_block_stream( + 107503704, + &indexer_config, + std::sync::Arc::new(mock_redis_client), + std::sync::Arc::new(mock_delta_lake_client), + mock_lake_s3_config, + &ChainId::Mainnet, + 1, + "stream key".to_string(), + ) + .await + .unwrap(); + } + + #[tokio::test] + async fn skips_delta_lake_for_star_filter_after_normal_account() { + let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default(); + mock_delta_lake_client + .expect_get_latest_block_metadata() + .returning(|| { + Ok(crate::delta_lake_client::LatestBlockMetadata { + last_indexed_block: "107503700".to_string(), + processed_at_utc: "".to_string(), + first_indexed_block: "".to_string(), + last_indexed_block_date: "".to_string(), + first_indexed_block_date: "".to_string(), + }) + }); + mock_delta_lake_client + .expect_list_matching_block_heights() + .never(); + + let mock_lake_s3_config = + crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]); + + let mut mock_redis_client = crate::redis::RedisClient::default(); + mock_redis_client + .expect_set::() + .returning(|_, fields| { + assert!(vec![107503704, 107503705].contains(&fields)); + Ok(()) + }) + .times(2); + mock_redis_client + .expect_xadd::() + .returning(|_, fields| { + assert!(vec![107503704, 107503705].contains(&fields[0].1)); + Ok(()) + }) + .times(2); + + let indexer_config = crate::indexer_config::IndexerConfig { + account_id: near_indexer_primitives::types::AccountId::try_from( + "morgs.near".to_string(), + ) + .unwrap(), + function_name: "test".to_string(), + rule: registry_types::Rule::ActionAny { + affected_account_id: "someone.near, *.kaiching".to_string(), + status: registry_types::Status::Success, + }, + }; + + start_block_stream( + 107503704, + &indexer_config, + std::sync::Arc::new(mock_redis_client), + std::sync::Arc::new(mock_delta_lake_client), + mock_lake_s3_config, + &ChainId::Mainnet, + 1, + "stream key".to_string(), + ) + .await + .unwrap(); + } } From 64d3dcfc3ef65c55fd9426d6777336541d1af846 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Tue, 20 Feb 2024 16:27:13 -0800 Subject: [PATCH 7/7] Rename log statement --- block-streamer/src/block_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 423047d0d..cc6cd24c9 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -190,7 +190,7 @@ async fn process_delta_lake_blocks( .any(|account_id| DELTA_LAKE_SKIP_ACCOUNTS.contains(&account_id.trim())) { tracing::debug!( - "Skipping fetching index files from delta lake due to wildcard contract filter {}", + "Skipping fetching index files from delta lake due to wildcard contract filter present in {}", affected_account_id ); return Ok(start_block_height);