Skip to content

Commit

Permalink
Handle new index file folder structure that is optimized for sub-acco…
Browse files Browse the repository at this point in the history
…unt wildcards.
  • Loading branch information
gabehamilton committed Jul 31, 2023
1 parent 099234b commit 03e1d9c
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use tokio::task::JoinHandle;

pub const INDEXED_DATA_FILES_BUCKET: &str = "near-delta-lake";
pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-";
pub const INDEXED_DATA_FILES_FOLDER: &str = "silver/contracts/action_receipt_actions/metadata";
pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_actions";
pub const METADATA_FOLDER: &str = "metadata";
pub const DATA_FOLDER: &str = "data";
pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes.
pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20;

Expand Down Expand Up @@ -145,7 +147,7 @@ pub(crate) async fn process_historical_messages(
pub(crate) async fn last_indexed_block_from_metadata(
aws_config: &SdkConfig,
) -> anyhow::Result<BlockHeight> {
let key = format!("{}/{}", INDEXED_DATA_FILES_FOLDER, "latest_block.json");
let key = format!("{}/{}", INDEXED_ACTIONS_FILES_FOLDER, "latest_block.json");
let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build();
let s3_client: S3Client = S3Client::from_conf(s3_config);
let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await?;
Expand Down Expand Up @@ -187,7 +189,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files(
s3::fetch_contract_index_files(
aws_config,
s3_bucket,
INDEXED_DATA_FILES_FOLDER,
INDEXED_ACTIONS_FILES_FOLDER,
start_date,
affected_account_id,
)
Expand Down
105 changes: 70 additions & 35 deletions indexer/queryapi_coordinator/src/s3.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::historical_block_processing::METADATA_FOLDER;
use anyhow::{bail, Context, Result};
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::Config;
Expand All @@ -9,8 +10,13 @@ use regex::Regex;
// Sanity check, if we hit this we have 1M S3 results.
// Currently that would be either 2,700 years of FunctionCall data or 1M contract folders.
// If we hit 1M contracts we should build an index to support efficient wildcard contract matching.
// temporarily increasing from 1000 to 30,000 until new wildcard handling and index structure is in place.
const MAX_S3_LIST_REQUESTS: usize = 30000;
const MAX_S3_LIST_REQUESTS: usize = 1000;

fn storage_path_for_account(account: &str) -> String {
let mut folders = account.split('.').collect::<Vec<&str>>();
folders.reverse();
folders.join("/")
}

pub async fn find_index_files_by_pattern(
aws_config: &SdkConfig,
Expand All @@ -20,18 +26,23 @@ pub async fn find_index_files_by_pattern(
) -> Result<Vec<String>> {
Ok(match pattern {
x if x.contains(',') => {
let contract_array = x.split(',');
let account_array = x.split(',');
let mut results = vec![];
for contract in contract_array {
let contract = contract.trim();
let sub_results = if contract.contains('*') {
list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &contract)
for account in account_array {
let account = account.trim();
let sub_results = if account.contains('*') {
list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &account)
.await?
} else {
list_s3_bucket_by_prefix(
aws_config,
s3_bucket,
&format!("{}/{}/", s3_folder, contract),
&format!(
"{}/{}/{}/",
s3_folder,
storage_path_for_account(account),
METADATA_FOLDER
),
)
.await?
};
Expand All @@ -46,7 +57,12 @@ pub async fn find_index_files_by_pattern(
list_s3_bucket_by_prefix(
aws_config,
s3_bucket,
&format!("{}/{}/", s3_folder, pattern),
&format!(
"{}/{}/{}/",
s3_folder,
storage_path_for_account(pattern),
METADATA_FOLDER
),
)
.await?
}
Expand All @@ -57,17 +73,18 @@ async fn list_index_files_by_wildcard(
aws_config: &SdkConfig,
s3_bucket: &str,
s3_folder: &str,
x: &&str,
pattern: &&str,
) -> Result<Vec<String>> {
// fetch all folders and filter by regex
// remove sub-account wildcard from pattern
let pattern = pattern.replace("*.", "");
let path = storage_path_for_account(&pattern);

let folders =
list_s3_bucket_by_prefix(aws_config, s3_bucket, &format!("{}/", s3_folder)).await?;
let regex_string = &x.replace('.', "\\.").replace('*', ".*");
let re = Regex::new(regex_string).unwrap();
let matching_folders = folders.into_iter().filter(|folder| re.is_match(folder));
list_s3_bucket_by_prefix(aws_config, s3_bucket, &format!("{}/{}/", s3_folder, path))
.await?;
// for each matching folder list files
let mut results = vec![];
for folder in matching_folders {
for folder in folders {
results.extend(list_s3_bucket_by_prefix(aws_config, s3_bucket, &folder).await?);
}
Ok(results)
Expand Down Expand Up @@ -199,23 +216,23 @@ fn file_name_date_after(start_date: DateTime<Utc>, file_name: &str) -> bool {

#[cfg(test)]
mod tests {
use crate::opts::{Opts, Parser};
use crate::opts::Opts;
use crate::s3::{find_index_files_by_pattern, list_s3_bucket_by_prefix};
use aws_types::SdkConfig;
use crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET;
use crate::historical_block_processing::INDEXED_ACTIONS_FILES_FOLDER;

/// Parses env vars from .env, Run with
/// cargo test s3::tests::list_delta_bucket -- mainnet from-latest;
#[tokio::test]
async fn list_delta_bucket() {
let opts = Opts::parse();
let aws_config: &SdkConfig = &opts.lake_aws_sdk_config();
let opts = Opts::test_opts_with_aws();

let list = list_s3_bucket_by_prefix(
aws_config,
crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET,
&opts.lake_aws_sdk_config(),
INDEXED_DATA_FILES_BUCKET,
&format!(
"{}/",
crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER.to_string()
INDEXED_ACTIONS_FILES_FOLDER.to_string()
),
)
.await
Expand All @@ -226,12 +243,12 @@ mod tests {
/// cargo test s3::tests::list_with_single_contract -- mainnet from-latest
#[tokio::test]
async fn list_with_single_contract() {
let opts = Opts::parse();
let opts = Opts::test_opts_with_aws();

let list = find_index_files_by_pattern(
&opts.lake_aws_sdk_config(),
crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET,
crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER,
INDEXED_DATA_FILES_BUCKET,
INDEXED_ACTIONS_FILES_FOLDER,
"hackathon.agency.near",
)
.await
Expand All @@ -242,12 +259,12 @@ mod tests {
/// cargo test s3::tests::list_with_csv_contracts -- mainnet from-latest
#[tokio::test]
async fn list_with_csv_contracts() {
let opts = Opts::parse();
let opts = Opts::test_opts_with_aws();

let list = find_index_files_by_pattern(
&opts.lake_aws_sdk_config(),
crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET,
crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER,
INDEXED_DATA_FILES_BUCKET,
INDEXED_ACTIONS_FILES_FOLDER,
"hackathon.agency.near, hackathon.aurora-silo-dev.near, hackathon.sputnik-dao.near",
)
.await
Expand All @@ -258,12 +275,12 @@ mod tests {
/// cargo test s3::tests::list_with_wildcard_contracts -- mainnet from-latest
#[tokio::test]
async fn list_with_wildcard_contracts() {
let opts = Opts::parse();
let opts = Opts::test_opts_with_aws();

let list = find_index_files_by_pattern(
&opts.lake_aws_sdk_config(),
crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET,
crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER,
INDEXED_DATA_FILES_BUCKET,
INDEXED_ACTIONS_FILES_FOLDER,
"*.keypom.near",
)
.await
Expand All @@ -274,16 +291,34 @@ mod tests {
/// cargo test s3::tests::list_with_csv_and_wildcard_contracts -- mainnet from-latest
#[tokio::test]
async fn list_with_csv_and_wildcard_contracts() {
let opts = Opts::parse();
let opts = Opts::test_opts_with_aws();

let list = find_index_files_by_pattern(
&opts.lake_aws_sdk_config(),
crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET,
crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER,
INDEXED_DATA_FILES_BUCKET,
INDEXED_ACTIONS_FILES_FOLDER,
"*.keypom.near, hackathon.agency.near, *.nearcrowd.near",
)
.await
.unwrap();
assert!(list.len() > 1370);
}

#[test]
fn storage_path_for_account_splits_and_reverses_into_folders() {
let account = "buildnear.testnet";
let expected = "testnet/buildnear";
let actual = super::storage_path_for_account(account);
assert_eq!(expected, actual);

let account = "v2.keypom.near";
let expected = "near/keypom/v2";
let actual = super::storage_path_for_account(account);
assert_eq!(expected, actual);

let account = "0.app5.hipodev.near";
let expected = "near/hipodev/app5/0";
let actual = super::storage_path_for_account(account);
assert_eq!(expected, actual);
}
}

0 comments on commit 03e1d9c

Please sign in to comment.