diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 2031160e2..09036fa20 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -18,7 +18,9 @@ use tokio::task::JoinHandle; pub const INDEXED_DATA_FILES_BUCKET: &str = "near-delta-lake"; pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-"; -pub const INDEXED_DATA_FILES_FOLDER: &str = "silver/contracts/action_receipt_actions/metadata"; +pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_actions"; +pub const METADATA_FOLDER: &str = "metadata"; +pub const DATA_FOLDER: &str = "data"; pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes. pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20; @@ -145,7 +147,7 @@ pub(crate) async fn process_historical_messages( pub(crate) async fn last_indexed_block_from_metadata( aws_config: &SdkConfig, ) -> anyhow::Result { - let key = format!("{}/{}", INDEXED_DATA_FILES_FOLDER, "latest_block.json"); + let key = format!("{}/{}", INDEXED_ACTIONS_FILES_FOLDER, "latest_block.json"); let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); let s3_client: S3Client = S3Client::from_conf(s3_config); let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await?; @@ -187,7 +189,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files( s3::fetch_contract_index_files( aws_config, s3_bucket, - INDEXED_DATA_FILES_FOLDER, + INDEXED_ACTIONS_FILES_FOLDER, start_date, affected_account_id, ) diff --git a/indexer/queryapi_coordinator/src/s3.rs b/indexer/queryapi_coordinator/src/s3.rs index be640b437..819bc165f 100644 --- a/indexer/queryapi_coordinator/src/s3.rs +++ b/indexer/queryapi_coordinator/src/s3.rs @@ -1,3 +1,4 @@ +use crate::historical_block_processing::METADATA_FOLDER; use anyhow::{bail, Context, Result}; use aws_sdk_s3::Client as S3Client; use aws_sdk_s3::Config; @@ -9,8 +10,13 @@ use regex::Regex; // Sanity check, if we hit this we have 1M S3 results. // Currently that would be either 2,700 years of FunctionCall data or 1M contract folders. // If we hit 1M contracts we should build an index to support efficient wildcard contract matching. -// temporarily increasing from 1000 to 30,000 until new wildcard handling and index structure is in place. -const MAX_S3_LIST_REQUESTS: usize = 30000; +const MAX_S3_LIST_REQUESTS: usize = 1000; + +fn storage_path_for_account(account: &str) -> String { + let mut folders = account.split('.').collect::>(); + folders.reverse(); + folders.join("/") +} pub async fn find_index_files_by_pattern( aws_config: &SdkConfig, @@ -20,18 +26,23 @@ pub async fn find_index_files_by_pattern( ) -> Result> { Ok(match pattern { x if x.contains(',') => { - let contract_array = x.split(','); + let account_array = x.split(','); let mut results = vec![]; - for contract in contract_array { - let contract = contract.trim(); - let sub_results = if contract.contains('*') { - list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &contract) + for account in account_array { + let account = account.trim(); + let sub_results = if account.contains('*') { + list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &account) .await? } else { list_s3_bucket_by_prefix( aws_config, s3_bucket, - &format!("{}/{}/", s3_folder, contract), + &format!( + "{}/{}/{}/", + s3_folder, + storage_path_for_account(account), + METADATA_FOLDER + ), ) .await? }; @@ -46,7 +57,12 @@ pub async fn find_index_files_by_pattern( list_s3_bucket_by_prefix( aws_config, s3_bucket, - &format!("{}/{}/", s3_folder, pattern), + &format!( + "{}/{}/{}/", + s3_folder, + storage_path_for_account(pattern), + METADATA_FOLDER + ), ) .await? } @@ -57,17 +73,18 @@ async fn list_index_files_by_wildcard( aws_config: &SdkConfig, s3_bucket: &str, s3_folder: &str, - x: &&str, + pattern: &&str, ) -> Result> { - // fetch all folders and filter by regex + // remove sub-account wildcard from pattern + let pattern = pattern.replace("*.", ""); + let path = storage_path_for_account(&pattern); + let folders = - list_s3_bucket_by_prefix(aws_config, s3_bucket, &format!("{}/", s3_folder)).await?; - let regex_string = &x.replace('.', "\\.").replace('*', ".*"); - let re = Regex::new(regex_string).unwrap(); - let matching_folders = folders.into_iter().filter(|folder| re.is_match(folder)); + list_s3_bucket_by_prefix(aws_config, s3_bucket, &format!("{}/{}/", s3_folder, path)) + .await?; // for each matching folder list files let mut results = vec![]; - for folder in matching_folders { + for folder in folders { results.extend(list_s3_bucket_by_prefix(aws_config, s3_bucket, &folder).await?); } Ok(results) @@ -199,23 +216,23 @@ fn file_name_date_after(start_date: DateTime, file_name: &str) -> bool { #[cfg(test)] mod tests { - use crate::opts::{Opts, Parser}; + use crate::opts::Opts; use crate::s3::{find_index_files_by_pattern, list_s3_bucket_by_prefix}; - use aws_types::SdkConfig; + use crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET; + use crate::historical_block_processing::INDEXED_ACTIONS_FILES_FOLDER; /// Parses env vars from .env, Run with /// cargo test s3::tests::list_delta_bucket -- mainnet from-latest; #[tokio::test] async fn list_delta_bucket() { - let opts = Opts::parse(); - let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); + let opts = Opts::test_opts_with_aws(); let list = list_s3_bucket_by_prefix( - aws_config, - crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET, + &opts.lake_aws_sdk_config(), + INDEXED_DATA_FILES_BUCKET, &format!( "{}/", - crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER.to_string() + INDEXED_ACTIONS_FILES_FOLDER.to_string() ), ) .await @@ -226,12 +243,12 @@ mod tests { /// cargo test s3::tests::list_with_single_contract -- mainnet from-latest #[tokio::test] async fn list_with_single_contract() { - let opts = Opts::parse(); + let opts = Opts::test_opts_with_aws(); let list = find_index_files_by_pattern( &opts.lake_aws_sdk_config(), - crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET, - crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER, + INDEXED_DATA_FILES_BUCKET, + INDEXED_ACTIONS_FILES_FOLDER, "hackathon.agency.near", ) .await @@ -242,12 +259,12 @@ mod tests { /// cargo test s3::tests::list_with_csv_contracts -- mainnet from-latest #[tokio::test] async fn list_with_csv_contracts() { - let opts = Opts::parse(); + let opts = Opts::test_opts_with_aws(); let list = find_index_files_by_pattern( &opts.lake_aws_sdk_config(), - crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET, - crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER, + INDEXED_DATA_FILES_BUCKET, + INDEXED_ACTIONS_FILES_FOLDER, "hackathon.agency.near, hackathon.aurora-silo-dev.near, hackathon.sputnik-dao.near", ) .await @@ -258,12 +275,12 @@ mod tests { /// cargo test s3::tests::list_with_wildcard_contracts -- mainnet from-latest #[tokio::test] async fn list_with_wildcard_contracts() { - let opts = Opts::parse(); + let opts = Opts::test_opts_with_aws(); let list = find_index_files_by_pattern( &opts.lake_aws_sdk_config(), - crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET, - crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER, + INDEXED_DATA_FILES_BUCKET, + INDEXED_ACTIONS_FILES_FOLDER, "*.keypom.near", ) .await @@ -274,16 +291,34 @@ mod tests { /// cargo test s3::tests::list_with_csv_and_wildcard_contracts -- mainnet from-latest #[tokio::test] async fn list_with_csv_and_wildcard_contracts() { - let opts = Opts::parse(); + let opts = Opts::test_opts_with_aws(); let list = find_index_files_by_pattern( &opts.lake_aws_sdk_config(), - crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET, - crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER, + INDEXED_DATA_FILES_BUCKET, + INDEXED_ACTIONS_FILES_FOLDER, "*.keypom.near, hackathon.agency.near, *.nearcrowd.near", ) .await .unwrap(); assert!(list.len() > 1370); } + + #[test] + fn storage_path_for_account_splits_and_reverses_into_folders() { + let account = "buildnear.testnet"; + let expected = "testnet/buildnear"; + let actual = super::storage_path_for_account(account); + assert_eq!(expected, actual); + + let account = "v2.keypom.near"; + let expected = "near/keypom/v2"; + let actual = super::storage_path_for_account(account); + assert_eq!(expected, actual); + + let account = "0.app5.hipodev.near"; + let expected = "near/hipodev/app5/0"; + let actual = super::storage_path_for_account(account); + assert_eq!(expected, actual); + } }