Skip to content

Commit

Permalink
feat: Support multiple wildcard filters for skipping delta lake
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Feb 21, 2024
1 parent c3f82f1 commit 420a658
Showing 1 changed file with 133 additions and 3 deletions.
136 changes: 133 additions & 3 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use registry_types::Rule;
/// The number of blocks to prefetch within `near-lake-framework`. The internal default is 100, but
/// we need this configurable for testing purposes.
const LAKE_PREFETCH_SIZE: usize = 100;
const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"];

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
Expand Down Expand Up @@ -184,8 +185,11 @@ async fn process_delta_lake_blocks(
affected_account_id,
..
} => {
if affected_account_id.eq("*") {
tracing::info!(
if affected_account_id
.split(",")
.any(|account_id| DELTA_LAKE_SKIP_ACCOUNTS.contains(&account_id.trim()))
{
tracing::debug!(
"Skipping fetching index files from delta lake due to wildcard contract filter {}",
affected_account_id
);
Expand Down Expand Up @@ -382,7 +386,7 @@ mod tests {
});
mock_delta_lake_client
.expect_list_matching_block_heights()
.times(0);
.never();

let mock_lake_s3_config =
crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]);
Expand Down Expand Up @@ -428,4 +432,130 @@ mod tests {
.await
.unwrap();
}

#[tokio::test]
async fn skips_delta_lake_for_multiple_star_filter() {
let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default();
mock_delta_lake_client
.expect_get_latest_block_metadata()
.returning(|| {
Ok(crate::delta_lake_client::LatestBlockMetadata {
last_indexed_block: "107503700".to_string(),
processed_at_utc: "".to_string(),
first_indexed_block: "".to_string(),
last_indexed_block_date: "".to_string(),
first_indexed_block_date: "".to_string(),
})
});
mock_delta_lake_client
.expect_list_matching_block_heights()
.never();

let mock_lake_s3_config =
crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]);

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_set::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields));
Ok(())
})
.times(2);
mock_redis_client
.expect_xadd::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields[0].1));
Ok(())
})
.times(2);

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
"morgs.near".to_string(),
)
.unwrap(),
function_name: "test".to_string(),
rule: registry_types::Rule::ActionAny {
affected_account_id: "*, *.tg".to_string(),
status: registry_types::Status::Success,
},
};

start_block_stream(
107503704,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_delta_lake_client),
mock_lake_s3_config,
&ChainId::Mainnet,
1,
"stream key".to_string(),
)
.await
.unwrap();
}

#[tokio::test]
async fn skips_delta_lake_for_star_filter_after_normal_account() {
let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default();
mock_delta_lake_client
.expect_get_latest_block_metadata()
.returning(|| {
Ok(crate::delta_lake_client::LatestBlockMetadata {
last_indexed_block: "107503700".to_string(),
processed_at_utc: "".to_string(),
first_indexed_block: "".to_string(),
last_indexed_block_date: "".to_string(),
first_indexed_block_date: "".to_string(),
})
});
mock_delta_lake_client
.expect_list_matching_block_heights()
.never();

let mock_lake_s3_config =
crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]);

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_set::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields));
Ok(())
})
.times(2);
mock_redis_client
.expect_xadd::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields[0].1));
Ok(())
})
.times(2);

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
"morgs.near".to_string(),
)
.unwrap(),
function_name: "test".to_string(),
rule: registry_types::Rule::ActionAny {
affected_account_id: "someone.near, *.kaiching".to_string(),
status: registry_types::Status::Success,
},
};

start_block_stream(
107503704,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_delta_lake_client),
mock_lake_s3_config,
&ChainId::Mainnet,
1,
"stream key".to_string(),
)
.await
.unwrap();
}
}

0 comments on commit 420a658

Please sign in to comment.