diff --git a/indexer/Cargo.lock b/indexer/Cargo.lock index ee1cdfeb4..5a9baf7e9 100644 --- a/indexer/Cargo.lock +++ b/indexer/Cargo.lock @@ -3324,6 +3324,7 @@ version = "0.1.0" dependencies = [ "actix-web", "anyhow", + "aws-config", "aws-credential-types", "aws-sdk-s3", "aws-types", diff --git a/indexer/queryapi_coordinator/Cargo.toml b/indexer/queryapi_coordinator/Cargo.toml index b606c1b6a..d40f4822f 100644 --- a/indexer/queryapi_coordinator/Cargo.toml +++ b/indexer/queryapi_coordinator/Cargo.toml @@ -7,6 +7,7 @@ authors = ["Near Inc "] [dependencies] anyhow = "1.0.57" actix-web = "=4.0.1" +aws-config = "0.53.0" borsh = "0.10.2" cached = "0.23.0" chrono = "0.4.25" diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 82118aea3..31af1c266 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -11,11 +11,8 @@ use near_lake_framework::near_indexer_primitives::types::{BlockHeight, BlockId, use serde_json::from_str; use tokio::task::JoinHandle; -pub const INDEXED_DATA_FILES_BUCKET: &str = "near-delta-lake"; -pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-"; -pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_actions/metadata"; -pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes. pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20; +pub const UNINDEXED_BLOCKS_SOFT_LIMIT: u64 = 7200; pub struct Task { handle: JoinHandle<()>, @@ -53,7 +50,7 @@ impl Streamer { _ = cancellation_token_clone.cancelled() => { tracing::info!( target: crate::INDEXER, - "Cancelling existing historical backfill for indexer: {:?}", + "Cancelling existing historical backfill for indexer: {}", indexer.get_full_name(), ); }, @@ -64,7 +61,13 @@ impl Streamer { &s3_client, &chain_id, &json_rpc_client, - ) => { } + ) => { + tracing::info!( + target: crate::INDEXER, + "Finished historical backfill for indexer: {}", + indexer.get_full_name(), + ); + } } }); @@ -132,29 +135,49 @@ pub(crate) async fn process_historical_messages( let block_difference: i64 = (current_block_height - start_block) as i64; match block_difference { i64::MIN..=-1 => { - bail!("Skipping back fill, start_block_height is greater than current block height: {:?} {:?}", - indexer_function.account_id, - indexer_function.function_name); + bail!( + "Skipping back fill, start_block_height is greater than current block height: {}", + indexer_function.get_full_name(), + ); } 0 => { - bail!("Skipping back fill, start_block_height is equal to current block height: {:?} {:?}", - indexer_function.account_id, - indexer_function.function_name); + bail!( + "Skipping back fill, start_block_height is equal to current block height: {}", + indexer_function.get_full_name(), + ); } 1..=i64::MAX => { tracing::info!( target: crate::INDEXER, - "Back filling {block_difference} blocks from {start_block} to current block height {current_block_height}: {:?} {:?}", - indexer_function.account_id, - indexer_function.function_name + "Back filling {block_difference} blocks from {start_block} to current block height {current_block_height}: {}", + indexer_function.get_full_name(), ); + storage::del( + redis_connection_manager, + storage::generate_historical_stream_key(&indexer_function.get_full_name()), + ) + .await?; + storage::sadd( + redis_connection_manager, + storage::STREAMS_SET_KEY, + storage::generate_historical_stream_key(&indexer_function.get_full_name()), + ) + .await?; + storage::set( + redis_connection_manager, + storage::generate_historical_storage_key(&indexer_function.get_full_name()), + serde_json::to_string(&indexer_function)?, + None, + ) + .await?; + let start_date = lookup_block_date_or_next_block_date(start_block, json_rpc_client).await?; let last_indexed_block = last_indexed_block_from_metadata(s3_client).await?; - let mut blocks_from_index = filter_matching_blocks_from_index_files( + let blocks_from_index = filter_matching_blocks_from_index_files( start_block, &indexer_function, s3_client, @@ -162,6 +185,22 @@ pub(crate) async fn process_historical_messages( ) .await?; + tracing::info!( + target: crate::INDEXER, + "Flushing {} block heights from index files to historical Stream for indexer: {}", + blocks_from_index.len(), + indexer_function.get_full_name(), + ); + + for block in &blocks_from_index { + storage::xadd( + redis_connection_manager, + storage::generate_historical_stream_key(&indexer_function.get_full_name()), + &[("block_height", block)], + ) + .await?; + } + // Check for the case where an index file is written right after we get the last_indexed_block metadata let last_block_in_data = blocks_from_index.last().unwrap_or(&start_block); let last_indexed_block = if last_block_in_data > &last_indexed_block { @@ -170,47 +209,65 @@ pub(crate) async fn process_historical_messages( last_indexed_block }; - let mut blocks_between_indexed_and_current_block: Vec = - filter_matching_unindexed_blocks_from_lake( - last_indexed_block, - current_block_height, - &indexer_function, - s3_client, - chain_id, - ) - .await?; + let unindexed_block_difference = current_block_height - last_indexed_block; - blocks_from_index.append(&mut blocks_between_indexed_and_current_block); + tracing::info!( + target: crate::INDEXER, + "Filtering {} unindexed blocks from lake: from block {last_indexed_block} to {current_block_height} for indexer: {}", + unindexed_block_difference, + indexer_function.get_full_name(), + ); - if !blocks_from_index.is_empty() { - storage::del( - redis_connection_manager, - storage::generate_historical_stream_key(&indexer_function.get_full_name()), - ) - .await?; - storage::sadd( - redis_connection_manager, - storage::STREAMS_SET_KEY, - storage::generate_historical_stream_key(&indexer_function.get_full_name()), - ) - .await?; - storage::set( - redis_connection_manager, - storage::generate_historical_storage_key(&indexer_function.get_full_name()), - serde_json::to_string(&indexer_function)?, - None, - ) - .await?; + if unindexed_block_difference > UNINDEXED_BLOCKS_SOFT_LIMIT { + tracing::warn!( + target: crate::INDEXER, + "Unindexed block difference exceeds soft limit of: {UNINDEXED_BLOCKS_SOFT_LIMIT} for indexer: {}", + indexer_function.get_full_name(), + ); } - for current_block in blocks_from_index { - storage::xadd( - redis_connection_manager, - storage::generate_historical_stream_key(&indexer_function.get_full_name()), - &[("block_height", current_block)], - ) - .await?; + let lake_config = match &chain_id { + ChainId::Mainnet => near_lake_framework::LakeConfigBuilder::default().mainnet(), + ChainId::Testnet => near_lake_framework::LakeConfigBuilder::default().testnet(), } + .start_block_height(last_indexed_block) + .build() + .context("Failed to build lake config")?; + + let (sender, mut stream) = near_lake_framework::streamer(lake_config); + + let mut filtered_block_count = 0; + while let Some(streamer_message) = stream.recv().await { + let block_height = streamer_message.block.header.height; + if block_height == current_block_height { + break; + } + + let matches = indexer_rules_engine::reduce_indexer_rule_matches_sync( + &indexer_function.indexer_rule, + &streamer_message, + chain_id.clone(), + ); + + if !matches.is_empty() { + filtered_block_count += 1; + + storage::xadd( + redis_connection_manager, + storage::generate_historical_stream_key(&indexer_function.get_full_name()), + &[("block_height", block_height)], + ) + .await?; + } + } + drop(sender); + + tracing::info!( + target: crate::INDEXER, + "Flushed {} unindexed block heights to historical Stream for indexer: {}", + filtered_block_count, + indexer_function.get_full_name(), + ); } } Ok(block_difference) @@ -219,8 +276,13 @@ pub(crate) async fn process_historical_messages( pub(crate) async fn last_indexed_block_from_metadata( s3_client: &S3Client, ) -> anyhow::Result { - let key = format!("{}/{}", INDEXED_ACTIONS_FILES_FOLDER, "latest_block.json"); - let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await?; + let key = format!( + "{}/{}", + s3::INDEXED_ACTIONS_FILES_FOLDER, + "latest_block.json" + ); + let metadata = + s3::fetch_text_file_from_s3(s3::INDEXED_DATA_FILES_BUCKET, key, s3_client).await?; let metadata: serde_json::Value = serde_json::from_str(&metadata).unwrap(); let last_indexed_block = metadata["last_indexed_block"].clone(); @@ -243,7 +305,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files( s3_client: &S3Client, start_date: DateTime, ) -> anyhow::Result> { - let s3_bucket = INDEXED_DATA_FILES_BUCKET; + let s3_bucket = s3::INDEXED_DATA_FILES_BUCKET; let mut needs_dedupe_and_sort = false; let indexer_rule = &indexer_function.indexer_rule; @@ -259,7 +321,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files( s3::fetch_contract_index_files( s3_client, s3_bucket, - INDEXED_ACTIONS_FILES_FOLDER, + s3::INDEXED_ACTIONS_FILES_FOLDER, start_date, affected_account_id, ) @@ -333,117 +395,6 @@ fn parse_blocks_from_index_files( .collect::>() } -async fn filter_matching_unindexed_blocks_from_lake( - last_indexed_block: BlockHeight, - ending_block_height: BlockHeight, - indexer_function: &IndexerFunction, - s3_client: &S3Client, - chain_id: &ChainId, -) -> anyhow::Result> { - let lake_bucket = lake_bucket_for_chain(chain_id); - - let indexer_rule = &indexer_function.indexer_rule; - let count = ending_block_height - last_indexed_block; - if count > MAX_UNINDEXED_BLOCKS_TO_PROCESS { - bail!( - "Too many unindexed blocks to filter: {count}. Last indexed block is {last_indexed_block} for function {:?} {:?}", - indexer_function.account_id, - indexer_function.function_name, - ); - } - tracing::info!( - target: crate::INDEXER, - "Filtering {count} unindexed blocks from lake: from block {last_indexed_block} to {ending_block_height} for function {:?} {:?}", - indexer_function.account_id, - indexer_function.function_name, - ); - - let mut blocks_to_process: Vec = vec![]; - for current_block in (last_indexed_block + 1)..ending_block_height { - // fetch block file from S3 - let key = format!("{}/block.json", normalize_block_height(current_block)); - let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await; - - if s3_result.is_err() { - let error = s3_result.err().unwrap(); - if error - .root_cause() - .downcast_ref::() - .is_some() - { - tracing::info!( - target: crate::INDEXER, - "In manual filtering, skipping block number {} which was not found. For function {:?} {:?}", - current_block, - indexer_function.account_id, - indexer_function.function_name, - ); - continue; - } else { - bail!(error); - } - } - - let block = s3_result.unwrap(); - let block_view = serde_json::from_slice::< - near_lake_framework::near_indexer_primitives::views::BlockView, - >(block.as_ref()) - .with_context(|| format!("Error parsing block {} from S3", current_block))?; - - let mut shards = vec![]; - for shard_id in 0..block_view.chunks.len() as u64 { - let key = format!( - "{}/shard_{}.json", - normalize_block_height(current_block), - shard_id - ); - let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await?; - match serde_json::from_slice::( - shard.as_ref(), - ) { - Ok(parsed_shard) => { - shards.push(parsed_shard); - } - Err(e) => { - bail!("Error parsing shard: {}", e.to_string()); - } - } - } - - let streamer_message = near_lake_framework::near_indexer_primitives::StreamerMessage { - block: block_view, - shards, - }; - - // filter block - let matches = indexer_rules_engine::reduce_indexer_rule_matches_sync( - indexer_rule, - &streamer_message, - chain_id.clone(), - ); - if !matches.is_empty() { - blocks_to_process.push(current_block); - } - } - - tracing::info!( - target: crate::INDEXER, - "Found {block_count} unindexed blocks to process for function {:?} {:?}", - indexer_function.account_id, - indexer_function.function_name, - block_count = blocks_to_process.len() - ); - Ok(blocks_to_process) -} - -fn lake_bucket_for_chain(chain_id: &ChainId) -> String { - format!("{}{}", LAKE_BUCKET_PREFIX, chain_id) -} - -fn normalize_block_height(block_height: BlockHeight) -> String { - format!("{:0>12}", block_height) -} - // if block does not exist, try next block, up to MAX_RPC_BLOCKS_TO_PROCESS (20) blocks pub async fn lookup_block_date_or_next_block_date( block_height: u64, diff --git a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs index 86c281bb5..acd115e37 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs @@ -4,7 +4,6 @@ mod tests { use crate::indexer_types::IndexerFunction; use crate::opts::{ChainId, Opts, StartOptions}; use crate::{historical_block_processing, opts}; - use aws_types::SdkConfig; use chrono::{DateTime, NaiveDate, Utc}; use indexer_rule_type::indexer_rule::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; use near_lake_framework::near_indexer_primitives::types::BlockHeight; @@ -14,12 +13,11 @@ mod tests { impl Opts { pub fn test_opts_with_aws() -> Self { dotenv::dotenv().ok(); - let lake_aws_access_key = env::var("LAKE_AWS_ACCESS_KEY").unwrap(); - let lake_aws_secret_access_key = env::var("LAKE_AWS_SECRET_ACCESS_KEY").unwrap(); Opts { + aws_access_key_id: env::var("AWS_ACCESS_KEY_ID").unwrap(), + aws_secret_access_key: env::var("AWS_SECRET_ACCESS_KEY").unwrap(), + aws_region: "eu-central-1".to_string(), redis_connection_string: env::var("REDIS_CONNECTION_STRING").unwrap(), - lake_aws_access_key, - lake_aws_secret_access_key, registry_contract_id: "".to_string(), port: 0, chain_id: ChainId::Mainnet(StartOptions::FromLatest), @@ -31,10 +29,8 @@ mod tests { /// cargo test historical_block_processing_integration_tests::test_indexing_metadata_file; #[tokio::test] async fn test_indexing_metadata_file() { - let opts = Opts::test_opts_with_aws(); - let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let last_indexed_block = historical_block_processing::last_indexed_block_from_metadata(&s3_client) @@ -73,9 +69,8 @@ mod tests { let opts = Opts::test_opts_with_aws(); - let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let redis_connection_manager = storage::connect(&opts.redis_connection_string) .await @@ -124,10 +119,8 @@ mod tests { indexer_rule: filter_rule, }; - let opts = Opts::test_opts_with_aws(); - let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let start_block_height = 77016214; let naivedatetime_utc = NaiveDate::from_ymd_opt(2022, 10, 3) @@ -183,10 +176,8 @@ mod tests { indexer_rule: filter_rule, }; - let opts = Opts::test_opts_with_aws(); - let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let start_block_height = 45894620; let naivedatetime_utc = NaiveDate::from_ymd_opt(2021, 8, 1) diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index fe88af9c7..0ac7aa37e 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -51,9 +51,8 @@ async fn main() -> anyhow::Result<()> { let chain_id = &opts.chain_id(); let registry_contract_id = opts.registry_contract_id.clone(); - let aws_config = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); tracing::info!(target: INDEXER, "Connecting to redis..."); let redis_connection_manager = storage::connect(&opts.redis_connection_string).await?; diff --git a/indexer/queryapi_coordinator/src/opts.rs b/indexer/queryapi_coordinator/src/opts.rs index b99d580c8..83fb4f862 100644 --- a/indexer/queryapi_coordinator/src/opts.rs +++ b/indexer/queryapi_coordinator/src/opts.rs @@ -22,10 +22,13 @@ pub struct Opts { pub redis_connection_string: String, /// AWS Access Key with the rights to read from AWS S3 #[clap(long, env)] - pub lake_aws_access_key: String, - #[clap(long, env)] + pub aws_access_key_id: String, /// AWS Secret Access Key with the rights to read from AWS S3 - pub lake_aws_secret_access_key: String, + #[clap(long, env)] + pub aws_secret_access_key: String, + /// AWS Region to use for S3 + #[clap(long, env, default_value = "eu-central-1")] + pub aws_region: String, /// Registry contract to use #[clap(env)] pub registry_contract_id: String, @@ -72,26 +75,6 @@ impl Opts { } } - // Creates AWS Credentials for NEAR Lake - pub fn lake_credentials(&self) -> aws_credential_types::provider::SharedCredentialsProvider { - let provider = aws_credential_types::Credentials::new( - self.lake_aws_access_key.clone(), - self.lake_aws_secret_access_key.clone(), - None, - None, - "queryapi_coordinator_lake", - ); - aws_credential_types::provider::SharedCredentialsProvider::new(provider) - } - - /// Creates AWS Shared Config for NEAR Lake - pub fn lake_aws_sdk_config(&self) -> aws_types::sdk_config::SdkConfig { - aws_types::sdk_config::SdkConfig::builder() - .credentials_provider(self.lake_credentials()) - .region(aws_types::region::Region::new("eu-central-1")) - .build() - } - pub fn rpc_url(&self) -> &str { // To query metadata (timestamp) about blocks more than 5 epochs old we need an archival node match self.chain_id { @@ -103,9 +86,7 @@ impl Opts { impl Opts { pub async fn to_lake_config(&self) -> near_lake_framework::LakeConfig { - let s3_config = aws_sdk_s3::config::Builder::from(&self.lake_aws_sdk_config()).build(); - - let config_builder = near_lake_framework::LakeConfigBuilder::default().s3_config(s3_config); + let config_builder = near_lake_framework::LakeConfigBuilder::default(); match &self.chain_id { ChainId::Mainnet(_) => config_builder diff --git a/indexer/queryapi_coordinator/src/s3.rs b/indexer/queryapi_coordinator/src/s3.rs index 9b7e8696b..29c00d568 100644 --- a/indexer/queryapi_coordinator/src/s3.rs +++ b/indexer/queryapi_coordinator/src/s3.rs @@ -7,6 +7,8 @@ use futures::future::try_join_all; // Currently that would be either 2,700 years of FunctionCall data or 1M contract folders. // If we hit 1M contracts we should build an index to support efficient wildcard contract matching. const MAX_S3_LIST_REQUESTS: usize = 1000; +pub const INDEXED_DATA_FILES_BUCKET: &str = "near-delta-lake"; +pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_actions/metadata"; fn storage_path_for_account(account: &str) -> String { let mut folders = account.split('.').collect::>(); @@ -194,22 +196,18 @@ fn file_name_date_after(start_date: DateTime, file_name: &str) -> bool { #[cfg(test)] mod tests { + use crate::historical_block_processing::INDEXED_ACTIONS_FILES_FOLDER; use crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET; - use crate::historical_block_processing::{INDEXED_ACTIONS_FILES_FOLDER, LAKE_BUCKET_PREFIX}; - use crate::opts::Opts; use crate::s3::{ fetch_text_file_from_s3, find_index_files_by_pattern, list_s3_bucket_by_prefix, }; - use aws_sdk_s3::{Client as S3Client, Config}; /// Parses env vars from .env, Run with /// cargo test s3::tests::list_delta_bucket -- mainnet from-latest; #[tokio::test] async fn list_delta_bucket() { - let opts = Opts::test_opts_with_aws(); - let aws_config = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let list = list_s3_bucket_by_prefix( &s3_client, @@ -224,10 +222,8 @@ mod tests { /// cargo test s3::tests::list_with_single_contract -- mainnet from-latest #[tokio::test] async fn list_with_single_contract() { - let opts = Opts::test_opts_with_aws(); - let aws_config = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let list = find_index_files_by_pattern( &s3_client, @@ -243,10 +239,8 @@ mod tests { /// cargo test s3::tests::list_with_csv_contracts -- mainnet from-latest #[tokio::test] async fn list_with_csv_contracts() { - let opts = Opts::test_opts_with_aws(); - let aws_config = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let list = find_index_files_by_pattern( &s3_client, @@ -262,10 +256,8 @@ mod tests { /// cargo test s3::tests::list_with_wildcard_contracts -- mainnet from-latest #[tokio::test] async fn list_with_wildcard_contracts() { - let opts = Opts::test_opts_with_aws(); - let aws_config = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let list = find_index_files_by_pattern( &s3_client, @@ -281,10 +273,8 @@ mod tests { /// cargo test s3::tests::list_with_csv_and_wildcard_contracts -- mainnet from-latest #[tokio::test] async fn list_with_csv_and_wildcard_contracts() { - let opts = Opts::test_opts_with_aws(); - let aws_config = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let list = find_index_files_by_pattern( &s3_client, @@ -319,14 +309,11 @@ mod tests { async fn handle_key_404() { let mut success = false; - let opts = Opts::test_opts_with_aws(); - let s3_config: Config = - aws_sdk_s3::config::Builder::from(&opts.lake_aws_sdk_config()).build(); - - let s3_client: S3Client = S3Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let s3_result = fetch_text_file_from_s3( - format!("{}{}", LAKE_BUCKET_PREFIX, "mainnet").as_str(), + "near-lake-data-mainnet", "does_not_exist/block.json".to_string(), &s3_client, )