Skip to content

Commit

Permalink
Merge pull request #158 from near/dplt-1123_new_index_folder_structure
Browse files Browse the repository at this point in the history
DPLT-1123 Handle new index file folder structure
  • Loading branch information
gabehamilton authored Aug 3, 2023
2 parents 332b4a5 + 5d9d1d4 commit d35878c
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::task::JoinHandle;

pub const INDEXED_DATA_FILES_BUCKET: &str = "near-delta-lake";
pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-";
pub const INDEXED_DATA_FILES_FOLDER: &str = "silver/contracts/action_receipt_actions/metadata";
pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_actions/metadata";
pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes.
pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20;

Expand Down Expand Up @@ -145,7 +145,7 @@ pub(crate) async fn process_historical_messages(
pub(crate) async fn last_indexed_block_from_metadata(
aws_config: &SdkConfig,
) -> anyhow::Result<BlockHeight> {
let key = format!("{}/{}", INDEXED_DATA_FILES_FOLDER, "latest_block.json");
let key = format!("{}/{}", INDEXED_ACTIONS_FILES_FOLDER, "latest_block.json");
let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build();
let s3_client: S3Client = S3Client::from_conf(s3_config);
let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await?;
Expand Down Expand Up @@ -187,7 +187,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files(
s3::fetch_contract_index_files(
aws_config,
s3_bucket,
INDEXED_DATA_FILES_FOLDER,
INDEXED_ACTIONS_FILES_FOLDER,
start_date,
affected_account_id,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,6 @@ mod tests {
// remove any blocks from after when the test was written
let fixed_blocks: Vec<BlockHeight> =
blocks.into_iter().filter(|&b| b <= 95175853u64).collect();
assert_eq!(fixed_blocks.len(), 6); // hackathon.agency.near = 45894627,45898423, hacker.agency.near = 45897358, hack.agency.near = 45894872,45895120,45896237
assert_eq!(fixed_blocks.len(), 197); // hackathon.agency.near = 45894627,45898423, hacker.agency.near = 45897358, hack.agency.near = 45894872,45895120,45896237
}
}
9 changes: 6 additions & 3 deletions indexer/queryapi_coordinator/src/indexer_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,14 @@ fn index_and_process_register_calls(
.entry(new_indexer_function.account_id.clone())
.or_default();

match fns.get(new_indexer_function.function_name.as_str()) {
let functions = fns.get(new_indexer_function.function_name.as_str());
match functions {
// if there is no existing function then we will insert the new one with the default state of provisioned = false
None => {
tracing::info!(
target: crate::INDEXER,
"indexed creation call to {registry_method_name}: {:?} {:?}",
"Block {}. Indexed creation call to {registry_method_name}: {:?} {:?}",
block_height,
new_indexer_function.account_id.clone(),
new_indexer_function.function_name.clone()
);
Expand All @@ -153,7 +155,8 @@ fn index_and_process_register_calls(
Some(old_indexer_function) => {
tracing::info!(
target: crate::INDEXER,
"indexed update call to {registry_method_name}: {:?} {:?}",
"Block {}. Indexed update call to {registry_method_name}: {:?} {:?}",
block_height,
new_indexer_function.account_id.clone(),
new_indexer_function.function_name.clone(),
);
Expand Down
10 changes: 8 additions & 2 deletions indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,17 @@ fn set_provisioned_flag(
indexer_function.provisioned = true;
}
None => {
let keys = account_functions
.keys()
.map(|s| &**s)
.collect::<Vec<_>>()
.join(", ");
tracing::error!(
target: INDEXER,
"Unable to set provisioning status, Indexer function with account_id {} and function_name {} not found in registry",
"Unable to set provisioning status, Indexer function with account_id {} and function_name {} not found in registry. Functions for this account are: {}",
indexer_function.account_id,
indexer_function.function_name
indexer_function.function_name,
keys
);
}
}
Expand Down
103 changes: 61 additions & 42 deletions indexer/queryapi_coordinator/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ use aws_sdk_s3::Config;
use aws_types::SdkConfig;
use chrono::{DateTime, NaiveDate, Utc};
use futures::future::try_join_all;
use regex::Regex;

// Sanity check, if we hit this we have 1M S3 results.
// Currently that would be either 2,700 years of FunctionCall data or 1M contract folders.
// If we hit 1M contracts we should build an index to support efficient wildcard contract matching.
// temporarily increasing from 1000 to 30,000 until new wildcard handling and index structure is in place.
const MAX_S3_LIST_REQUESTS: usize = 30000;
const MAX_S3_LIST_REQUESTS: usize = 1000;

fn storage_path_for_account(account: &str) -> String {
let mut folders = account.split('.').collect::<Vec<&str>>();
folders.reverse();
folders.join("/")
}

pub async fn find_index_files_by_pattern(
aws_config: &SdkConfig,
Expand All @@ -20,18 +24,17 @@ pub async fn find_index_files_by_pattern(
) -> Result<Vec<String>> {
Ok(match pattern {
x if x.contains(',') => {
let contract_array = x.split(',');
let account_array = x.split(',');
let mut results = vec![];
for contract in contract_array {
let contract = contract.trim();
let sub_results = if contract.contains('*') {
list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &contract)
.await?
for account in account_array {
let account = account.trim();
let sub_results = if account.contains('*') {
list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &account).await?
} else {
list_s3_bucket_by_prefix(
aws_config,
s3_bucket,
&format!("{}/{}/", s3_folder, contract),
&format!("{}/{}/", s3_folder, storage_path_for_account(account)),
)
.await?
};
Expand All @@ -46,7 +49,7 @@ pub async fn find_index_files_by_pattern(
list_s3_bucket_by_prefix(
aws_config,
s3_bucket,
&format!("{}/{}/", s3_folder, pattern),
&format!("{}/{}/", s3_folder, storage_path_for_account(pattern),),
)
.await?
}
Expand All @@ -57,17 +60,18 @@ async fn list_index_files_by_wildcard(
aws_config: &SdkConfig,
s3_bucket: &str,
s3_folder: &str,
x: &&str,
pattern: &&str,
) -> Result<Vec<String>> {
// fetch all folders and filter by regex
// remove sub-account wildcard from pattern
let pattern = pattern.replace("*.", "");
let path = storage_path_for_account(&pattern);

let folders =
list_s3_bucket_by_prefix(aws_config, s3_bucket, &format!("{}/", s3_folder)).await?;
let regex_string = &x.replace('.', "\\.").replace('*', ".*");
let re = Regex::new(regex_string).unwrap();
let matching_folders = folders.into_iter().filter(|folder| re.is_match(folder));
list_s3_bucket_by_prefix(aws_config, s3_bucket, &format!("{}/{}/", s3_folder, path))
.await?;
// for each matching folder list files
let mut results = vec![];
for folder in matching_folders {
for folder in folders {
results.extend(list_s3_bucket_by_prefix(aws_config, s3_bucket, &folder).await?);
}
Ok(results)
Expand Down Expand Up @@ -199,39 +203,36 @@ fn file_name_date_after(start_date: DateTime<Utc>, file_name: &str) -> bool {

#[cfg(test)]
mod tests {
use crate::opts::{Opts, Parser};
use crate::historical_block_processing::INDEXED_ACTIONS_FILES_FOLDER;
use crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET;
use crate::opts::Opts;
use crate::s3::{find_index_files_by_pattern, list_s3_bucket_by_prefix};
use aws_types::SdkConfig;

/// Parses env vars from .env, Run with
/// cargo test s3::tests::list_delta_bucket -- mainnet from-latest;
#[tokio::test]
async fn list_delta_bucket() {
let opts = Opts::parse();
let aws_config: &SdkConfig = &opts.lake_aws_sdk_config();
let opts = Opts::test_opts_with_aws();

let list = list_s3_bucket_by_prefix(
aws_config,
crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET,
&format!(
"{}/",
crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER.to_string()
),
&opts.lake_aws_sdk_config(),
INDEXED_DATA_FILES_BUCKET,
&format!("{}/", INDEXED_ACTIONS_FILES_FOLDER.to_string()),
)
.await
.unwrap();
assert!(list.len() > 35000);
assert_eq!(list.len(), 4);
}

/// cargo test s3::tests::list_with_single_contract -- mainnet from-latest
#[tokio::test]
async fn list_with_single_contract() {
let opts = Opts::parse();
let opts = Opts::test_opts_with_aws();

let list = find_index_files_by_pattern(
&opts.lake_aws_sdk_config(),
crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET,
crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER,
INDEXED_DATA_FILES_BUCKET,
INDEXED_ACTIONS_FILES_FOLDER,
"hackathon.agency.near",
)
.await
Expand All @@ -242,28 +243,28 @@ mod tests {
/// cargo test s3::tests::list_with_csv_contracts -- mainnet from-latest
#[tokio::test]
async fn list_with_csv_contracts() {
let opts = Opts::parse();
let opts = Opts::test_opts_with_aws();

let list = find_index_files_by_pattern(
&opts.lake_aws_sdk_config(),
crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET,
crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER,
INDEXED_DATA_FILES_BUCKET,
INDEXED_ACTIONS_FILES_FOLDER,
"hackathon.agency.near, hackathon.aurora-silo-dev.near, hackathon.sputnik-dao.near",
)
.await
.unwrap();
assert!(list.len() >= 13); // expecting 13 but these contracts could get randomly called sometime
assert!(list.len() >= 15); // expecting 15 but these contracts could get randomly called sometime
}

/// cargo test s3::tests::list_with_wildcard_contracts -- mainnet from-latest
#[tokio::test]
async fn list_with_wildcard_contracts() {
let opts = Opts::parse();
let opts = Opts::test_opts_with_aws();

let list = find_index_files_by_pattern(
&opts.lake_aws_sdk_config(),
crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET,
crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER,
INDEXED_DATA_FILES_BUCKET,
INDEXED_ACTIONS_FILES_FOLDER,
"*.keypom.near",
)
.await
Expand All @@ -274,16 +275,34 @@ mod tests {
/// cargo test s3::tests::list_with_csv_and_wildcard_contracts -- mainnet from-latest
#[tokio::test]
async fn list_with_csv_and_wildcard_contracts() {
let opts = Opts::parse();
let opts = Opts::test_opts_with_aws();

let list = find_index_files_by_pattern(
&opts.lake_aws_sdk_config(),
crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET,
crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER,
INDEXED_DATA_FILES_BUCKET,
INDEXED_ACTIONS_FILES_FOLDER,
"*.keypom.near, hackathon.agency.near, *.nearcrowd.near",
)
.await
.unwrap();
assert!(list.len() > 1370);
}

#[test]
fn storage_path_for_account_splits_and_reverses_into_folders() {
let account = "buildnear.testnet";
let expected = "testnet/buildnear";
let actual = super::storage_path_for_account(account);
assert_eq!(expected, actual);

let account = "v2.keypom.near";
let expected = "near/keypom/v2";
let actual = super::storage_path_for_account(account);
assert_eq!(expected, actual);

let account = "0.app5.hipodev.near";
let expected = "near/hipodev/app5/0";
let actual = super::storage_path_for_account(account);
assert_eq!(expected, actual);
}
}

0 comments on commit d35878c

Please sign in to comment.