diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 82b60b8c3..ddbefae19 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -1,15 +1,10 @@ -use crate::indexer_types::{IndexerFunction, IndexerQueueMessage}; -use crate::opts::{Opts, Parser}; -use crate::queue; +use crate::indexer_types::IndexerFunction; use crate::s3; use anyhow::{bail, Context}; use aws_sdk_s3::Client as S3Client; -use aws_sdk_s3::Config; -use aws_sdk_sqs::Client; -use aws_types::SdkConfig; use chrono::{DateTime, LocalResult, TimeZone, Utc}; use indexer_rule_type::indexer_rule::MatchingRule; -use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatchPayload}; +use indexer_rules_engine::types::indexer_rule_match::ChainId; use near_jsonrpc_client::JsonRpcClient; use near_jsonrpc_primitives::types::blocks::RpcBlockRequest; use near_lake_framework::near_indexer_primitives::types::{BlockHeight, BlockId, BlockReference}; @@ -26,16 +21,25 @@ pub fn spawn_historical_message_thread( block_height: BlockHeight, new_indexer_function: &IndexerFunction, redis_connection_manager: &storage::ConnectionManager, + s3_client: &S3Client, + chain_id: &ChainId, + json_rpc_client: &JsonRpcClient, ) -> Option> { let redis_connection_manager = redis_connection_manager.clone(); + let s3_client = s3_client.clone(); + let chain_id = chain_id.clone(); + let json_rpc_client = json_rpc_client.clone(); + new_indexer_function.start_block_height.map(|_| { let new_indexer_function_copy = new_indexer_function.clone(); tokio::spawn(async move { process_historical_messages_or_handle_error( block_height, new_indexer_function_copy, - Opts::parse(), &redis_connection_manager, + &s3_client, + &chain_id, + &json_rpc_client, ) .await }) @@ -45,14 +49,18 @@ pub fn spawn_historical_message_thread( pub(crate) async fn process_historical_messages_or_handle_error( block_height: BlockHeight, indexer_function: IndexerFunction, - opts: Opts, redis_connection_manager: &storage::ConnectionManager, + s3_client: &S3Client, + chain_id: &ChainId, + json_rpc_client: &JsonRpcClient, ) -> i64 { match process_historical_messages( block_height, indexer_function, - opts, redis_connection_manager, + s3_client, + chain_id, + json_rpc_client, ) .await { @@ -71,8 +79,10 @@ pub(crate) async fn process_historical_messages_or_handle_error( pub(crate) async fn process_historical_messages( block_height: BlockHeight, indexer_function: IndexerFunction, - opts: Opts, redis_connection_manager: &storage::ConnectionManager, + s3_client: &S3Client, + chain_id: &ChainId, + json_rpc_client: &JsonRpcClient, ) -> anyhow::Result { let start_block = indexer_function.start_block_height.unwrap(); let block_difference: i64 = (block_height - start_block) as i64; @@ -95,25 +105,16 @@ pub(crate) async fn process_historical_messages( indexer_function.function_name ); - let chain_id = opts.chain_id().clone(); - let aws_region = opts.aws_queue_region.clone(); - let queue_client = queue::queue_client(aws_region, opts.queue_credentials()); - let queue_url = opts.start_from_block_queue_url.clone(); - let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); - - let json_rpc_client = JsonRpcClient::connect(opts.rpc_url()); let start_date = - lookup_block_date_or_next_block_date(start_block, &json_rpc_client).await?; + lookup_block_date_or_next_block_date(start_block, json_rpc_client).await?; - let mut indexer_function = indexer_function.clone(); - - let last_indexed_block = last_indexed_block_from_metadata(aws_config).await?; + let last_indexed_block = last_indexed_block_from_metadata(s3_client).await?; let last_indexed_block = last_indexed_block; let mut blocks_from_index = filter_matching_blocks_from_index_files( start_block, &indexer_function, - aws_config, + s3_client, start_date, ) .await?; @@ -131,15 +132,13 @@ pub(crate) async fn process_historical_messages( last_indexed_block, block_height, &indexer_function, - aws_config, - chain_id.clone(), + s3_client, + chain_id, ) .await?; blocks_from_index.append(&mut blocks_between_indexed_and_current_block); - let first_block_in_index = *blocks_from_index.first().unwrap_or(&start_block); - if !blocks_from_index.is_empty() { storage::sadd( redis_connection_manager, @@ -163,18 +162,6 @@ pub(crate) async fn process_historical_messages( &[("block_height", current_block)], ) .await?; - - send_execution_message( - block_height, - first_block_in_index, - chain_id.clone(), - &queue_client, - queue_url.clone(), - &mut indexer_function, - current_block, - None, - ) - .await; } } } @@ -182,11 +169,9 @@ pub(crate) async fn process_historical_messages( } pub(crate) async fn last_indexed_block_from_metadata( - aws_config: &SdkConfig, + s3_client: &S3Client, ) -> anyhow::Result { let key = format!("{}/{}", INDEXED_ACTIONS_FILES_FOLDER, "latest_block.json"); - let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client: S3Client = S3Client::from_conf(s3_config); let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await?; let metadata: serde_json::Value = serde_json::from_str(&metadata).unwrap(); @@ -207,7 +192,7 @@ pub(crate) async fn last_indexed_block_from_metadata( pub(crate) async fn filter_matching_blocks_from_index_files( start_block_height: BlockHeight, indexer_function: &IndexerFunction, - aws_config: &SdkConfig, + s3_client: &S3Client, start_date: DateTime, ) -> anyhow::Result> { let s3_bucket = INDEXED_DATA_FILES_BUCKET; @@ -224,7 +209,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files( needs_dedupe_and_sort = true; } s3::fetch_contract_index_files( - aws_config, + s3_client, s3_bucket, INDEXED_ACTIONS_FILES_FOLDER, start_date, @@ -304,12 +289,10 @@ async fn filter_matching_unindexed_blocks_from_lake( last_indexed_block: BlockHeight, ending_block_height: BlockHeight, indexer_function: &IndexerFunction, - aws_config: &SdkConfig, - chain_id: ChainId, + s3_client: &S3Client, + chain_id: &ChainId, ) -> anyhow::Result> { - let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client: S3Client = S3Client::from_conf(s3_config); - let lake_bucket = lake_bucket_for_chain(chain_id.clone()); + let lake_bucket = lake_bucket_for_chain(chain_id); let indexer_rule = &indexer_function.indexer_rule; let count = ending_block_height - last_indexed_block; @@ -331,7 +314,7 @@ async fn filter_matching_unindexed_blocks_from_lake( for current_block in (last_indexed_block + 1)..ending_block_height { // fetch block file from S3 let key = format!("{}/block.json", normalize_block_height(current_block)); - let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await; + let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await; if s3_result.is_err() { let error = s3_result.err().unwrap(); @@ -362,7 +345,7 @@ async fn filter_matching_unindexed_blocks_from_lake( normalize_block_height(current_block), shard_id ); - let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await?; + let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, &s3_client).await?; match serde_json::from_slice::( shard.as_ref(), ) { @@ -401,7 +384,7 @@ async fn filter_matching_unindexed_blocks_from_lake( Ok(blocks_to_process) } -fn lake_bucket_for_chain(chain_id: ChainId) -> String { +fn lake_bucket_for_chain(chain_id: &ChainId) -> String { format!("{}{}", LAKE_BUCKET_PREFIX, chain_id) } @@ -409,42 +392,6 @@ fn normalize_block_height(block_height: BlockHeight) -> String { format!("{:0>12}", block_height) } -async fn send_execution_message( - block_height: BlockHeight, - first_block: BlockHeight, - chain_id: ChainId, - queue_client: &Client, - queue_url: String, - indexer_function: &mut IndexerFunction, - current_block: u64, - payload: Option, -) { - // only request provisioning on the first block - if current_block != first_block { - indexer_function.provisioned = true; - } - - let msg = IndexerQueueMessage { - chain_id, - indexer_rule_id: 0, - indexer_rule_name: indexer_function.function_name.clone(), - payload, - block_height: current_block, - indexer_function: indexer_function.clone(), - is_historical: true, - }; - - match queue::send_to_indexer_queue(queue_client, queue_url, vec![msg]).await { - Ok(_) => {} - Err(err) => tracing::error!( - target: crate::INDEXER, - "#{} an error occurred when sending messages to the queue\n{:#?}", - block_height, - err - ), - } -} - // if block does not exist, try next block, up to MAX_RPC_BLOCKS_TO_PROCESS (20) blocks pub async fn lookup_block_date_or_next_block_date( block_height: u64, diff --git a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs index 69ca67af9..f9b94185f 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs @@ -38,9 +38,11 @@ mod tests { async fn test_indexing_metadata_file() { let opts = Opts::test_opts_with_aws(); let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let last_indexed_block = - historical_block_processing::last_indexed_block_from_metadata(aws_config) + historical_block_processing::last_indexed_block_from_metadata(&s3_client) .await .unwrap(); let a: Range = 90000000..9000000000; // valid for the next 300 years @@ -75,19 +77,28 @@ mod tests { }; let opts = Opts::test_opts_with_aws(); + let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let redis_connection_manager = storage::connect(&opts.redis_connection_string) .await .unwrap(); + + let json_rpc_client = near_jsonrpc_client::JsonRpcClient::connect(opts.rpc_url()); + let fake_block_height = - historical_block_processing::last_indexed_block_from_metadata(aws_config) + historical_block_processing::last_indexed_block_from_metadata(&s3_client) .await .unwrap(); let result = historical_block_processing::process_historical_messages( fake_block_height + 1, indexer_function, - opts, &redis_connection_manager, + &s3_client, + &opts.chain_id(), + &json_rpc_client, ) .await; assert!(result.unwrap() > 0); @@ -120,6 +131,8 @@ mod tests { let opts = Opts::test_opts_with_aws(); let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let start_block_height = 77016214; let naivedatetime_utc = NaiveDate::from_ymd_opt(2022, 10, 03) @@ -130,7 +143,7 @@ mod tests { let blocks = filter_matching_blocks_from_index_files( start_block_height, &indexer_function, - aws_config, + &s3_client, datetime_utc, ) .await; @@ -177,6 +190,8 @@ mod tests { let opts = Opts::test_opts_with_aws(); let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let start_block_height = 45894620; let naivedatetime_utc = NaiveDate::from_ymd_opt(2021, 08, 01) @@ -187,7 +202,7 @@ mod tests { let blocks = filter_matching_blocks_from_index_files( start_block_height, &indexer_function, - aws_config, + &s3_client, datetime_utc, ) .await; diff --git a/indexer/queryapi_coordinator/src/indexer_registry.rs b/indexer/queryapi_coordinator/src/indexer_registry.rs index 2c79cc70c..73aca2d0b 100644 --- a/indexer/queryapi_coordinator/src/indexer_registry.rs +++ b/indexer/queryapi_coordinator/src/indexer_registry.rs @@ -171,8 +171,11 @@ fn index_and_process_register_calls( if let Some(thread) = crate::historical_block_processing::spawn_historical_message_thread( block_height, - &mut new_indexer_function, + &new_indexer_function, context.redis_connection_manager, + context.s3_client, + context.chain_id, + context.json_rpc_client, ) { spawned_start_from_block_threads.push(thread); diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index a67f5653a..da011ec9e 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -42,6 +42,8 @@ pub(crate) struct QueryApiContext<'a> { pub streamer_message: near_lake_framework::near_indexer_primitives::StreamerMessage, pub chain_id: &'a ChainId, pub queue_client: &'a queue::QueueClient, + pub s3_client: &'a aws_sdk_s3::Client, + pub json_rpc_client: &'a JsonRpcClient, pub queue_url: &'a str, pub registry_contract_id: &'a str, pub balance_cache: &'a BalanceCache, @@ -62,6 +64,10 @@ async fn main() -> anyhow::Result<()> { let queue_url = opts.queue_url.clone(); let registry_contract_id = opts.registry_contract_id.clone(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + // We want to prevent unnecessary RPC queries to find previous balance let balances_cache: BalanceCache = std::sync::Arc::new(Mutex::new(SizedCache::with_size(100_000))); @@ -106,6 +112,8 @@ async fn main() -> anyhow::Result<()> { streamer_message, chain_id, queue_client: &queue_client, + json_rpc_client: &json_rpc_client, + s3_client: &s3_client, }; handle_streamer_message(context, indexer_registry.clone()) }) diff --git a/indexer/queryapi_coordinator/src/s3.rs b/indexer/queryapi_coordinator/src/s3.rs index f88323688..cf1defc67 100644 --- a/indexer/queryapi_coordinator/src/s3.rs +++ b/indexer/queryapi_coordinator/src/s3.rs @@ -1,7 +1,5 @@ use anyhow::{bail, Context, Result}; use aws_sdk_s3::Client as S3Client; -use aws_sdk_s3::Config; -use aws_types::SdkConfig; use chrono::{DateTime, NaiveDate, Utc}; use futures::future::try_join_all; @@ -17,7 +15,7 @@ fn storage_path_for_account(account: &str) -> String { } pub async fn find_index_files_by_pattern( - aws_config: &SdkConfig, + s3_client: &S3Client, s3_bucket: &str, s3_folder: &str, pattern: &str, @@ -29,10 +27,10 @@ pub async fn find_index_files_by_pattern( for account in account_array { let account = account.trim(); let sub_results = if account.contains('*') { - list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &account).await? + list_index_files_by_wildcard(s3_client, s3_bucket, s3_folder, &account).await? } else { list_s3_bucket_by_prefix( - aws_config, + s3_client, s3_bucket, &format!("{}/{}/", s3_folder, storage_path_for_account(account)), ) @@ -43,11 +41,11 @@ pub async fn find_index_files_by_pattern( results } x if x.contains('*') => { - list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &x).await? + list_index_files_by_wildcard(s3_client, s3_bucket, s3_folder, &x).await? } _ => { list_s3_bucket_by_prefix( - aws_config, + s3_client, s3_bucket, &format!("{}/{}/", s3_folder, storage_path_for_account(pattern),), ) @@ -57,7 +55,7 @@ pub async fn find_index_files_by_pattern( } async fn list_index_files_by_wildcard( - aws_config: &SdkConfig, + s3_client: &S3Client, s3_bucket: &str, s3_folder: &str, pattern: &&str, @@ -67,24 +65,20 @@ async fn list_index_files_by_wildcard( let path = storage_path_for_account(&pattern); let folders = - list_s3_bucket_by_prefix(aws_config, s3_bucket, &format!("{}/{}/", s3_folder, path)) - .await?; + list_s3_bucket_by_prefix(s3_client, s3_bucket, &format!("{}/{}/", s3_folder, path)).await?; // for each matching folder list files let mut results = vec![]; for folder in folders { - results.extend(list_s3_bucket_by_prefix(aws_config, s3_bucket, &folder).await?); + results.extend(list_s3_bucket_by_prefix(s3_client, s3_bucket, &folder).await?); } Ok(results) } async fn list_s3_bucket_by_prefix( - aws_config: &SdkConfig, + s3_client: &S3Client, s3_bucket: &str, s3_prefix: &str, ) -> Result> { - let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client: S3Client = S3Client::from_conf(s3_config); - let mut results = vec![]; let mut continuation_token: Option = None; @@ -126,18 +120,15 @@ async fn list_s3_bucket_by_prefix( } pub async fn fetch_contract_index_files( - aws_config: &SdkConfig, + s3_client: &S3Client, s3_bucket: &str, s3_folder: &str, start_date: DateTime, contract_pattern: &str, ) -> Result> { - let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client: S3Client = S3Client::from_conf(s3_config); - // list all index files let file_list = - find_index_files_by_pattern(aws_config, s3_bucket, s3_folder, contract_pattern).await?; + find_index_files_by_pattern(s3_client, s3_bucket, s3_folder, contract_pattern).await?; let fetch_and_parse_tasks = file_list .into_iter() @@ -146,7 +137,7 @@ pub async fn fetch_contract_index_files( let s3_client = s3_client.clone(); async move { // Fetch the file - fetch_text_file_from_s3(s3_bucket, key, s3_client).await + fetch_text_file_from_s3(s3_bucket, key, &s3_client).await } }) .collect::>(); @@ -162,7 +153,7 @@ pub async fn fetch_contract_index_files( pub async fn fetch_text_file_from_s3( s3_bucket: &str, key: String, - s3_client: S3Client, + s3_client: &S3Client, ) -> Result { // todo: can we retry if this fails like the lake s3_fetcher fn does? // If so, can we differentiate between a file not existing (block height does not exist) and a network error? @@ -216,9 +207,12 @@ mod tests { #[tokio::test] async fn list_delta_bucket() { let opts = Opts::test_opts_with_aws(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let list = list_s3_bucket_by_prefix( - &opts.lake_aws_sdk_config(), + &s3_client, INDEXED_DATA_FILES_BUCKET, &format!("{}/", INDEXED_ACTIONS_FILES_FOLDER.to_string()), ) @@ -231,9 +225,12 @@ mod tests { #[tokio::test] async fn list_with_single_contract() { let opts = Opts::test_opts_with_aws(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let list = find_index_files_by_pattern( - &opts.lake_aws_sdk_config(), + &s3_client, INDEXED_DATA_FILES_BUCKET, INDEXED_ACTIONS_FILES_FOLDER, "hackathon.agency.near", @@ -247,9 +244,12 @@ mod tests { #[tokio::test] async fn list_with_csv_contracts() { let opts = Opts::test_opts_with_aws(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let list = find_index_files_by_pattern( - &opts.lake_aws_sdk_config(), + &s3_client, INDEXED_DATA_FILES_BUCKET, INDEXED_ACTIONS_FILES_FOLDER, "hackathon.agency.near, hackathon.aurora-silo-dev.near, hackathon.sputnik-dao.near", @@ -263,9 +263,12 @@ mod tests { #[tokio::test] async fn list_with_wildcard_contracts() { let opts = Opts::test_opts_with_aws(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let list = find_index_files_by_pattern( - &opts.lake_aws_sdk_config(), + &s3_client, INDEXED_DATA_FILES_BUCKET, INDEXED_ACTIONS_FILES_FOLDER, "*.keypom.near", @@ -279,9 +282,12 @@ mod tests { #[tokio::test] async fn list_with_csv_and_wildcard_contracts() { let opts = Opts::test_opts_with_aws(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let list = find_index_files_by_pattern( - &opts.lake_aws_sdk_config(), + &s3_client, INDEXED_DATA_FILES_BUCKET, INDEXED_ACTIONS_FILES_FOLDER, "*.keypom.near, hackathon.agency.near, *.nearcrowd.near", @@ -322,7 +328,7 @@ mod tests { let s3_result = fetch_text_file_from_s3( format!("{}{}", LAKE_BUCKET_PREFIX, "mainnet").as_str(), "does_not_exist/block.json".to_string(), - s3_client, + &s3_client, ) .await;