From fa2c00a0a68f13804e55124b3642f0e5deba788b Mon Sep 17 00:00:00 2001 From: Gabe Hamilton Date: Fri, 28 Jul 2023 17:25:35 -0600 Subject: [PATCH 1/9] S3 & Historical processing errors are bubbled up to single handler. RPC block date lookup handles missing blocks. --- indexer/indexer_rules_engine/src/lib.rs | 8 +- .../src/outcomes_reducer_sync.rs | 8 +- .../src/historical_block_processing.rs | 290 ++++++++---------- ...ical_block_processing_integration_tests.rs | 42 ++- indexer/queryapi_coordinator/src/s3.rs | 166 +++++----- 5 files changed, 248 insertions(+), 266 deletions(-) diff --git a/indexer/indexer_rules_engine/src/lib.rs b/indexer/indexer_rules_engine/src/lib.rs index 1e694d3c3..e267b269f 100644 --- a/indexer/indexer_rules_engine/src/lib.rs +++ b/indexer/indexer_rules_engine/src/lib.rs @@ -30,8 +30,8 @@ pub fn reduce_indexer_rule_matches_sync( indexer_rule: &IndexerRule, streamer_message: &StreamerMessage, chain_id: ChainId, -) -> anyhow::Result> { - Ok(match &indexer_rule.matching_rule { +) -> Vec { + match &indexer_rule.matching_rule { MatchingRule::ActionAny { .. } | MatchingRule::ActionFunctionCall { .. } | MatchingRule::Event { .. } => { @@ -39,7 +39,7 @@ pub fn reduce_indexer_rule_matches_sync( indexer_rule, streamer_message, chain_id, - )? + ) } - }) + } } diff --git a/indexer/indexer_rules_engine/src/outcomes_reducer_sync.rs b/indexer/indexer_rules_engine/src/outcomes_reducer_sync.rs index e93a745c1..9e87efaf6 100644 --- a/indexer/indexer_rules_engine/src/outcomes_reducer_sync.rs +++ b/indexer/indexer_rules_engine/src/outcomes_reducer_sync.rs @@ -10,7 +10,7 @@ pub fn reduce_indexer_rule_matches_from_outcomes( indexer_rule: &IndexerRule, streamer_message: &StreamerMessage, chain_id: ChainId, -) -> anyhow::Result> { +) -> Vec { streamer_message .shards .iter() @@ -41,8 +41,8 @@ fn build_indexer_rule_match( block_header_hash: String, block_height: u64, chain_id: ChainId, -) -> anyhow::Result { - Ok(IndexerRuleMatch { +) -> IndexerRuleMatch { + IndexerRuleMatch { chain_id: chain_id.clone(), indexer_rule_id: indexer_rule.id, indexer_rule_name: indexer_rule.name.clone(), @@ -52,7 +52,7 @@ fn build_indexer_rule_match( block_header_hash, ), block_height, - }) + } } fn build_indexer_rule_match_payload( diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 9800d25d5..2031160e2 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -2,12 +2,13 @@ use crate::indexer_types::{IndexerFunction, IndexerQueueMessage}; use crate::opts::{Opts, Parser}; use crate::queue; use crate::s3; +use anyhow::{bail, Context}; use aws_sdk_s3::Client as S3Client; use aws_sdk_s3::Config; use aws_sdk_sqs::Client; use aws_types::SdkConfig; use chrono::{DateTime, LocalResult, TimeZone, Utc}; -use indexer_rule_type::indexer_rule::{IndexerRule, MatchingRule}; +use indexer_rule_type::indexer_rule::MatchingRule; use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatchPayload}; use near_jsonrpc_client::JsonRpcClient; use near_jsonrpc_primitives::types::blocks::RpcBlockRequest; @@ -19,6 +20,7 @@ pub const INDEXED_DATA_FILES_BUCKET: &str = "near-delta-lake"; pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-"; pub const INDEXED_DATA_FILES_FOLDER: &str = "silver/contracts/action_receipt_actions/metadata"; pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes. +pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20; pub fn spawn_historical_message_thread( block_height: BlockHeight, @@ -26,7 +28,7 @@ pub fn spawn_historical_message_thread( ) -> Option> { new_indexer_function.start_block_height.map(|_| { let new_indexer_function_copy = new_indexer_function.clone(); - tokio::spawn(process_historical_messages( + tokio::spawn(process_historical_messages_or_handle_error( block_height, new_indexer_function_copy, Opts::parse(), @@ -34,21 +36,39 @@ pub fn spawn_historical_message_thread( }) } -pub(crate) async fn process_historical_messages( +pub(crate) async fn process_historical_messages_or_handle_error( block_height: BlockHeight, indexer_function: IndexerFunction, opts: Opts, ) -> i64 { + match process_historical_messages(block_height, indexer_function, opts).await { + Ok(block_difference) => block_difference, + Err(err) => { + // todo: when Coordinator can send log messages to Runner, send this error to Runner + tracing::error!( + target: crate::INDEXER, + "Error processing historical messages: {:?}", + err + ); + 0 + } + } +} +pub(crate) async fn process_historical_messages( + block_height: BlockHeight, + indexer_function: IndexerFunction, + opts: Opts, +) -> anyhow::Result { let start_block = indexer_function.start_block_height.unwrap(); let block_difference: i64 = (block_height - start_block) as i64; match block_difference { i64::MIN..=-1 => { - tracing::error!(target: crate::INDEXER, "Skipping back fill, start_block_height is greater than current block height: {:?} {:?}", + bail!("Skipping back fill, start_block_height is greater than current block height: {:?} {:?}", indexer_function.account_id, indexer_function.function_name); } 0 => { - tracing::info!(target: crate::INDEXER, "Skipping back fill, start_block_height is equal to current block height: {:?} {:?}", + bail!("Skipping back fill, start_block_height is equal to current block height: {:?} {:?}", indexer_function.account_id, indexer_function.function_name); } @@ -67,35 +87,21 @@ pub(crate) async fn process_historical_messages( let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); let json_rpc_client = JsonRpcClient::connect(opts.rpc_url()); - let start_date = block_to_date(start_block, &json_rpc_client).await; - if start_date.is_none() { - tracing::error!( - target: crate::INDEXER, - "Failed to get start date for block {}", - start_block - ); - return block_difference; - } + let start_date = + lookup_block_date_or_next_block_date(start_block, &json_rpc_client).await?; let mut indexer_function = indexer_function.clone(); - let last_indexed_block = last_indexed_block_from_metadata(aws_config).await; - if last_indexed_block.is_err() { - tracing::error!( - target: crate::INDEXER, - last_indexed_block = ?last_indexed_block, - ); - return block_difference; - } - let last_indexed_block = last_indexed_block.unwrap(); + let last_indexed_block = last_indexed_block_from_metadata(aws_config).await?; + let last_indexed_block = last_indexed_block; let mut blocks_from_index = filter_matching_blocks_from_index_files( start_block, - &indexer_function.indexer_rule, + &indexer_function, aws_config, - start_date.unwrap(), + start_date, ) - .await; + .await?; // Check for the case where an index file is written right after we get the last_indexed_block metadata let last_block_in_data = blocks_from_index.last().unwrap_or(&start_block); @@ -109,19 +115,19 @@ pub(crate) async fn process_historical_messages( filter_matching_unindexed_blocks_from_lake( last_indexed_block, block_height, - &indexer_function.indexer_rule, + &indexer_function, aws_config, chain_id.clone(), ) - .await; + .await?; blocks_from_index.append(&mut blocks_between_indexed_and_current_block); - let first_block_in_data = *blocks_from_index.first().unwrap_or(&start_block); + let first_block_in_index = *blocks_from_index.first().unwrap_or(&start_block); for current_block in blocks_from_index { send_execution_message( block_height, - first_block_in_data, + first_block_in_index, chain_id.clone(), &queue_client, queue_url.clone(), @@ -133,7 +139,7 @@ pub(crate) async fn process_historical_messages( } } } - block_difference + Ok(block_difference) } pub(crate) async fn last_indexed_block_from_metadata( @@ -142,24 +148,15 @@ pub(crate) async fn last_indexed_block_from_metadata( let key = format!("{}/{}", INDEXED_DATA_FILES_FOLDER, "latest_block.json"); let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); let s3_client: S3Client = S3Client::from_conf(s3_config); - let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await; + let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await?; let metadata: serde_json::Value = serde_json::from_str(&metadata).unwrap(); let last_indexed_block = metadata["last_indexed_block"].clone(); - let last_indexed_block = last_indexed_block.as_str(); - if last_indexed_block.is_none() { - return Err(anyhow::anyhow!( - "No last_indexed_block found in latest_block.json" - )); - } - let last_indexed_block = last_indexed_block.unwrap(); - let last_indexed_block = from_str(last_indexed_block); - if last_indexed_block.is_err() { - return Err(anyhow::anyhow!( - "last_indexed_block couldn't be converted to u64" - )); - } - let last_indexed_block = last_indexed_block.unwrap(); + let last_indexed_block = last_indexed_block + .as_str() + .context("No last_indexed_block found in latest_block.json")?; + let last_indexed_block = + from_str(last_indexed_block).context("last_indexed_block couldn't be converted to u64")?; tracing::info!( target: crate::INDEXER, "Last indexed block from latest_block.json: {:?}", @@ -170,13 +167,14 @@ pub(crate) async fn last_indexed_block_from_metadata( pub(crate) async fn filter_matching_blocks_from_index_files( start_block_height: BlockHeight, - indexer_rule: &IndexerRule, + indexer_function: &IndexerFunction, aws_config: &SdkConfig, start_date: DateTime, -) -> Vec { +) -> anyhow::Result> { let s3_bucket = INDEXED_DATA_FILES_BUCKET; let mut needs_dedupe_and_sort = false; + let indexer_rule = &indexer_function.indexer_rule; let index_files_content = match &indexer_rule.matching_rule { MatchingRule::ActionAny { @@ -196,31 +194,18 @@ pub(crate) async fn filter_matching_blocks_from_index_files( .await } MatchingRule::ActionFunctionCall { .. } => { - tracing::error!( - target: crate::INDEXER, - "ActionFunctionCall matching rule not supported for historical processing" - ); - return vec![]; - - // if affected_account_id.contains('*') || affected_account_id.contains(',) { - // needs_dedupe_and_sort = true; - // } - // let s3_prefix = format!("{}/{}", INDEXED_DATA_FILES_FOLDER, affected_account_id); - // fetch_contract_index_files(aws_config, s3_bucket, s3_prefix).await - // // todo implement, use function name selector + bail!("ActionFunctionCall matching rule not yet supported for historical processing, function: {:?} {:?}", indexer_function.account_id, indexer_function.function_name); } MatchingRule::Event { .. } => { - tracing::error!( - target: crate::INDEXER, - "Event matching rule not supported for historical processing" - ); - return vec![]; + bail!("Event matching rule not yet supported for historical processing, function {:?} {:?}", indexer_function.account_id, indexer_function.function_name); } - }; + }?; tracing::info!( target: crate::INDEXER, - "Found {file_count} index files matching rule {indexer_rule:?}", + "Found {file_count} index files for function {:?} {:?} with matching rule {indexer_rule:?}", + indexer_function.account_id, + indexer_function.function_name, file_count = index_files_content.len() ); let mut blocks_to_process: Vec = @@ -231,11 +216,13 @@ pub(crate) async fn filter_matching_blocks_from_index_files( } tracing::info!( target: crate::INDEXER, - "Found {block_count} indexed blocks to process.", + "Found {block_count} indexed blocks to process for function {:?} {:?}", + indexer_function.account_id, + indexer_function.function_name, block_count = blocks_to_process.len() ); - blocks_to_process + Ok(blocks_to_process) } fn parse_blocks_from_index_files( @@ -277,106 +264,84 @@ fn parse_blocks_from_index_files( async fn filter_matching_unindexed_blocks_from_lake( last_indexed_block: BlockHeight, ending_block_height: BlockHeight, - indexer_rule: &IndexerRule, + indexer_function: &IndexerFunction, aws_config: &SdkConfig, chain_id: ChainId, -) -> Vec { +) -> anyhow::Result> { let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); let s3_client: S3Client = S3Client::from_conf(s3_config); let lake_bucket = lake_bucket_for_chain(chain_id.clone()); + let indexer_rule = &indexer_function.indexer_rule; let count = ending_block_height - last_indexed_block; if count > MAX_UNINDEXED_BLOCKS_TO_PROCESS { - tracing::error!( - target: crate::INDEXER, - "Too many unindexed blocks to filter: {count}. Last indexed block is {last_indexed_block}.", + bail!( + "Too many unindexed blocks to filter: {count}. Last indexed block is {last_indexed_block} for function {:?} {:?}", + indexer_function.account_id, + indexer_function.function_name, ); - return vec![]; } tracing::info!( target: crate::INDEXER, - "Filtering {count} unindexed blocks from lake: from block {last_indexed_block} to {ending_block_height}", + "Filtering {count} unindexed blocks from lake: from block {last_indexed_block} to {ending_block_height} for function {:?} {:?}", + indexer_function.account_id, + indexer_function.function_name, ); + let mut blocks_to_process: Vec = vec![]; for current_block in (last_indexed_block + 1)..ending_block_height { // fetch block file from S3 let key = format!("{}/block.json", normalize_block_height(current_block)); - let block = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await; + let block = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await?; let block_view = serde_json::from_slice::< near_lake_framework::near_indexer_primitives::views::BlockView, - >(block.as_ref()); + >(block.as_ref()) + .with_context(|| format!("Error parsing block {} from S3", current_block))?; + let mut shards = vec![]; - match block_view { - Ok(block_view) => { - for shard_id in 0..block_view.chunks.len() as u64 { - let key = format!( - "{}/shard_{}.json", - normalize_block_height(current_block), - shard_id - ); - let shard = - s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await; - match serde_json::from_slice::< - near_lake_framework::near_indexer_primitives::IndexerShard, - >(shard.as_ref()) - { - Ok(parsed_shard) => { - shards.push(parsed_shard); - } - Err(e) => { - tracing::error!( - target: crate::INDEXER, - "Error parsing shard: {}", - e.to_string() - ); - // todo this needs better error handling - } - } + for shard_id in 0..block_view.chunks.len() as u64 { + let key = format!( + "{}/shard_{}.json", + normalize_block_height(current_block), + shard_id + ); + let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await?; + match serde_json::from_slice::( + shard.as_ref(), + ) { + Ok(parsed_shard) => { + shards.push(parsed_shard); } - let streamer_message = - near_lake_framework::near_indexer_primitives::StreamerMessage { - block: block_view, - shards, - }; - - // // filter block - let matches = indexer_rules_engine::reduce_indexer_rule_matches_sync( - indexer_rule, - &streamer_message, - chain_id.clone(), - ); - match matches { - Ok(match_list) => { - if !match_list.is_empty() { - blocks_to_process.push(current_block); - } - } - Err(e) => { - tracing::warn!( - target: crate::INDEXER, - "Error matching block {} against S3 file: {:?}", - current_block, - e - ); - } + Err(e) => { + bail!("Error parsing shard: {}", e.to_string()); } } - Err(e) => { - tracing::error!( - target: crate::INDEXER, - "Error parsing block {} from S3: {:?}", - current_block, - e - ); - } + } + + let streamer_message = near_lake_framework::near_indexer_primitives::StreamerMessage { + block: block_view, + shards, + }; + + // filter block + let matches = indexer_rules_engine::reduce_indexer_rule_matches_sync( + indexer_rule, + &streamer_message, + chain_id.clone(), + ); + if !matches.is_empty() { + blocks_to_process.push(current_block); } } + tracing::info!( target: crate::INDEXER, - "Found {block_count} unindexed blocks to process.", + "Found {block_count} unindexed blocks to process for function {:?} {:?}", + indexer_function.account_id, + indexer_function.function_name, block_count = blocks_to_process.len() ); - blocks_to_process + Ok(blocks_to_process) } fn lake_bucket_for_chain(chain_id: ChainId) -> String { @@ -423,27 +388,36 @@ async fn send_execution_message( } } -pub async fn block_to_date(block_height: u64, client: &JsonRpcClient) -> Option> { - let request = RpcBlockRequest { - block_reference: BlockReference::BlockId(BlockId::Height(block_height)), - }; +// if block does not exist, try next block, up to MAX_RPC_BLOCKS_TO_PROCESS (20) blocks +pub async fn lookup_block_date_or_next_block_date( + block_height: u64, + client: &JsonRpcClient, +) -> anyhow::Result> { + let mut current_block_height = block_height; + let mut retry_count = 0; + loop { + let request = RpcBlockRequest { + block_reference: BlockReference::BlockId(BlockId::Height(current_block_height)), + }; - match client.call(request).await { - Ok(response) => { - let header = response.header; - let timestamp_nanosec = header.timestamp_nanosec; - match Utc.timestamp_opt((timestamp_nanosec / 1000000000) as i64, 0) { - LocalResult::Single(date) => Some(date), - LocalResult::Ambiguous(date, _) => Some(date), - LocalResult::None => { - tracing::error!("Unable to get block timestamp"); - None + match client.call(request).await { + Ok(response) => { + let header = response.header; + let timestamp_nanosec = header.timestamp_nanosec; + return match Utc.timestamp_opt((timestamp_nanosec / 1000000000) as i64, 0) { + LocalResult::Single(date) => Ok(date), + LocalResult::Ambiguous(date, _) => Ok(date), + LocalResult::None => Err(anyhow::anyhow!("Unable to get block timestamp")), + }; + } + Err(_) => { + tracing::debug!("RPC failed to get block: {:?}", current_block_height); + retry_count += 1; + if retry_count > MAX_RPC_BLOCKS_TO_PROCESS { + return Err(anyhow::anyhow!("Unable to get block")); } + current_block_height += 1; } } - Err(err) => { - tracing::error!("Unable to get block: {:?}", err); - None - } } } diff --git a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs index 3457974e7..9b9ae91d9 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs @@ -80,12 +80,13 @@ mod tests { historical_block_processing::last_indexed_block_from_metadata(aws_config) .await .unwrap(); - historical_block_processing::process_historical_messages( + let result = historical_block_processing::process_historical_messages( fake_block_height + 1, indexer_function, opts, ) .await; + assert!(result.unwrap() > 0); } /// Parses env vars from .env, Run with @@ -103,6 +104,15 @@ mod tests { id: None, name: None, }; + let indexer_function = IndexerFunction { + account_id: "buildnear.testnet".to_string().parse().unwrap(), + function_name: "index_stuff".to_string(), + code: "".to_string(), + start_block_height: Some(85376002), + schema: None, + provisioned: false, + indexer_rule: filter_rule, + }; let opts = Opts::test_opts_with_aws(); let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); @@ -115,15 +125,25 @@ mod tests { let datetime_utc = DateTime::::from_utc(naivedatetime_utc, Utc); let blocks = filter_matching_blocks_from_index_files( start_block_height, - &filter_rule, + &indexer_function, aws_config, datetime_utc, ) .await; - // // remove any blocks from after when the test was written -- not working, due to new contracts? - // let fixed_blocks: Vec = blocks.into_iter().filter(|&b| b <= 95175853u64).collect(); // 95175853u64 95242647u64 - assert!(blocks.len() >= 71899); + match blocks { + Ok(blocks) => { + // remove any blocks from after when the test was written + let fixed_blocks: Vec = + blocks.into_iter().filter(|&b| b <= 95175853u64).collect(); + println!("Found {} blocks", fixed_blocks.len()); + assert!(fixed_blocks.len() >= 71899); + } + Err(e) => { + println!("Error: {:?}", e); + assert!(false); + } + } } /// Parses env vars from .env, Run with @@ -141,6 +161,15 @@ mod tests { id: None, name: None, }; + let indexer_function = IndexerFunction { + account_id: "buildnear.testnet".to_string().parse().unwrap(), + function_name: "index_stuff".to_string(), + code: "".to_string(), + start_block_height: Some(85376002), + schema: None, + provisioned: false, + indexer_rule: filter_rule, + }; let opts = Opts::test_opts_with_aws(); let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); @@ -153,11 +182,12 @@ mod tests { let datetime_utc = DateTime::::from_utc(naivedatetime_utc, Utc); let blocks = filter_matching_blocks_from_index_files( start_block_height, - &filter_rule, + &indexer_function, aws_config, datetime_utc, ) .await; + let blocks = blocks.unwrap(); // remove any blocks from after when the test was written let fixed_blocks: Vec = diff --git a/indexer/queryapi_coordinator/src/s3.rs b/indexer/queryapi_coordinator/src/s3.rs index f5e63e753..be640b437 100644 --- a/indexer/queryapi_coordinator/src/s3.rs +++ b/indexer/queryapi_coordinator/src/s3.rs @@ -1,42 +1,46 @@ +use anyhow::{bail, Context, Result}; use aws_sdk_s3::Client as S3Client; use aws_sdk_s3::Config; use aws_types::SdkConfig; use chrono::{DateTime, NaiveDate, Utc}; -use futures::future::join_all; +use futures::future::try_join_all; use regex::Regex; // Sanity check, if we hit this we have 1M S3 results. // Currently that would be either 2,700 years of FunctionCall data or 1M contract folders. -const MAX_S3_LIST_REQUESTS: usize = 1000; +// If we hit 1M contracts we should build an index to support efficient wildcard contract matching. +// temporarily increasing from 1000 to 30,000 until new wildcard handling and index structure is in place. +const MAX_S3_LIST_REQUESTS: usize = 30000; pub async fn find_index_files_by_pattern( aws_config: &SdkConfig, s3_bucket: &str, s3_folder: &str, pattern: &str, -) -> Vec { - match pattern { +) -> Result> { + Ok(match pattern { x if x.contains(',') => { let contract_array = x.split(','); let mut results = vec![]; for contract in contract_array { let contract = contract.trim(); let sub_results = if contract.contains('*') { - list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &contract).await + list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &contract) + .await? } else { list_s3_bucket_by_prefix( aws_config, s3_bucket, &format!("{}/{}/", s3_folder, contract), ) - .await + .await? }; results.extend(sub_results); } results } x if x.contains('*') => { - list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &x).await + list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &x).await? } _ => { list_s3_bucket_by_prefix( @@ -44,11 +48,9 @@ pub async fn find_index_files_by_pattern( s3_bucket, &format!("{}/{}/", s3_folder, pattern), ) - .await + .await? } - } - - // todo will need to dedupe and sort the block output now + }) } async fn list_index_files_by_wildcard( @@ -56,25 +58,26 @@ async fn list_index_files_by_wildcard( s3_bucket: &str, s3_folder: &str, x: &&str, -) -> Vec { +) -> Result> { // fetch all folders and filter by regex - let folders = list_s3_bucket_by_prefix(aws_config, s3_bucket, &format!("{}/", s3_folder)).await; + let folders = + list_s3_bucket_by_prefix(aws_config, s3_bucket, &format!("{}/", s3_folder)).await?; let regex_string = &x.replace('.', "\\.").replace('*', ".*"); let re = Regex::new(regex_string).unwrap(); let matching_folders = folders.into_iter().filter(|folder| re.is_match(folder)); // for each matching folder list files let mut results = vec![]; for folder in matching_folders { - results.extend(list_s3_bucket_by_prefix(aws_config, s3_bucket, &folder).await); + results.extend(list_s3_bucket_by_prefix(aws_config, s3_bucket, &folder).await?); } - results + Ok(results) } async fn list_s3_bucket_by_prefix( aws_config: &SdkConfig, s3_bucket: &str, s3_prefix: &str, -) -> Vec { +) -> Result> { let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); let s3_client: S3Client = S3Client::from_conf(s3_config); @@ -93,37 +96,29 @@ async fn list_s3_bucket_by_prefix( configured_client = configured_client.continuation_token(continuation_token.unwrap()); } - match configured_client.send().await { - Ok(file_list) => { - if let Some(common_prefixes) = file_list.common_prefixes { - let keys: Vec = common_prefixes - .into_iter() - .map(|o| o.prefix.unwrap()) - .collect(); - results.extend(keys); - } - if let Some(objects) = file_list.contents { - let keys: Vec = objects.into_iter().map(|o| o.key.unwrap()).collect(); - results.extend(keys); - } - if file_list.next_continuation_token.is_some() { - continuation_token = file_list.next_continuation_token; - counter += 1; - if counter > MAX_S3_LIST_REQUESTS { - tracing::error!("Exceeded internal limit of {MAX_S3_LIST_REQUESTS}"); - break; - } - } else { - break; - } - } - Err(e) => { - tracing::error!("Error listing index files: {:?}", e); - break; + let file_list = configured_client.send().await?; + if let Some(common_prefixes) = file_list.common_prefixes { + let keys: Vec = common_prefixes + .into_iter() + .map(|o| o.prefix.unwrap()) + .collect(); + results.extend(keys); + } + if let Some(objects) = file_list.contents { + let keys: Vec = objects.into_iter().map(|o| o.key.unwrap()).collect(); + results.extend(keys); + } + if file_list.next_continuation_token.is_some() { + continuation_token = file_list.next_continuation_token; + counter += 1; + if counter > MAX_S3_LIST_REQUESTS { + bail!("Exceeded internal limit of {MAX_S3_LIST_REQUESTS}") } - }; + } else { + break; + } } - results + Ok(results) } pub async fn fetch_contract_index_files( @@ -132,13 +127,13 @@ pub async fn fetch_contract_index_files( s3_folder: &str, start_date: DateTime, contract_pattern: &str, -) -> Vec { +) -> Result> { let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); let s3_client: S3Client = S3Client::from_conf(s3_config); // list all index files let file_list = - find_index_files_by_pattern(aws_config, s3_bucket, s3_folder, contract_pattern).await; + find_index_files_by_pattern(aws_config, s3_bucket, s3_folder, contract_pattern).await?; let fetch_and_parse_tasks = file_list .into_iter() @@ -153,57 +148,34 @@ pub async fn fetch_contract_index_files( .collect::>(); // Execute all tasks in parallel and wait for completion - let file_contents: Vec = join_all(fetch_and_parse_tasks).await; - file_contents + let file_contents: Vec = try_join_all(fetch_and_parse_tasks).await?; + Ok(file_contents .into_iter() .filter(|file_contents| !file_contents.is_empty()) - .collect::>() + .collect::>()) } -pub async fn fetch_text_file_from_s3(s3_bucket: &str, key: String, s3_client: S3Client) -> String { +pub async fn fetch_text_file_from_s3( + s3_bucket: &str, + key: String, + s3_client: S3Client, +) -> Result { + // todo: can we retry if this fails like the lake s3_fetcher fn does? + // If so, can we differentiate between a file not existing (block height does not exist) and a network error? let get_object_output = s3_client .get_object() .bucket(s3_bucket) .key(key.clone()) .send() - .await; + .await + .with_context(|| format!("Error fetching index file {key}"))?; - match get_object_output { - Ok(object_output) => { - let bytes = object_output.body.collect().await; - match bytes { - Ok(bytes) => { - let file_contents = String::from_utf8(bytes.to_vec()); - match file_contents { - Ok(file_contents) => { - tracing::debug!( - target: crate::INDEXER, - "Fetched S3 file {}", - key.clone(), - ); - file_contents - } - Err(e) => { - tracing::error!( - target: crate::INDEXER, - "Error parsing index file: {:?}", - e - ); - "".to_string() - } - } - } - Err(e) => { - tracing::error!(target: crate::INDEXER, "Error fetching index file: {:?}", e); - "".to_string() - } - } - } - Err(e) => { - tracing::error!(target: crate::INDEXER, "Error fetching index file: {:?}", e); - "".to_string() - } - } + let bytes = get_object_output + .body + .collect() + .await + .with_context(|| format!("Error reading bytes of index file {key}"))?; + String::from_utf8(bytes.to_vec()).with_context(|| format!("Error parsing index file {key}")) } /// check whether the filename is a date after the start date @@ -214,7 +186,8 @@ fn file_name_date_after(start_date: DateTime, file_name: &str) -> bool { match file_name_date { Ok(file_name_date) => file_name_date >= start_date.date_naive(), Err(e) => { - tracing::error!( + // if we can't parse the date assume a file this code is not meant to handle + tracing::debug!( target: crate::INDEXER, "Error parsing file name date: {:?}", e @@ -245,7 +218,8 @@ mod tests { crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER.to_string() ), ) - .await; + .await + .unwrap(); assert!(list.len() > 35000); } @@ -260,7 +234,8 @@ mod tests { crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER, "hackathon.agency.near", ) - .await; + .await + .unwrap(); assert_eq!(list.len(), 1); } @@ -275,7 +250,8 @@ mod tests { crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER, "hackathon.agency.near, hackathon.aurora-silo-dev.near, hackathon.sputnik-dao.near", ) - .await; + .await + .unwrap(); assert!(list.len() >= 13); // expecting 13 but these contracts could get randomly called sometime } @@ -290,7 +266,8 @@ mod tests { crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER, "*.keypom.near", ) - .await; + .await + .unwrap(); assert!(list.len() >= 550); } @@ -305,7 +282,8 @@ mod tests { crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER, "*.keypom.near, hackathon.agency.near, *.nearcrowd.near", ) - .await; + .await + .unwrap(); assert!(list.len() > 1370); } } From 03e1d9c9e52b3ef24a593646adf5cd2f93f6576e Mon Sep 17 00:00:00 2001 From: Gabe Hamilton Date: Mon, 31 Jul 2023 15:50:01 -0600 Subject: [PATCH 2/9] Handle new index file folder structure that is optimized for sub-account wildcards. --- .../src/historical_block_processing.rs | 8 +- indexer/queryapi_coordinator/src/s3.rs | 105 ++++++++++++------ 2 files changed, 75 insertions(+), 38 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 2031160e2..09036fa20 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -18,7 +18,9 @@ use tokio::task::JoinHandle; pub const INDEXED_DATA_FILES_BUCKET: &str = "near-delta-lake"; pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-"; -pub const INDEXED_DATA_FILES_FOLDER: &str = "silver/contracts/action_receipt_actions/metadata"; +pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_actions"; +pub const METADATA_FOLDER: &str = "metadata"; +pub const DATA_FOLDER: &str = "data"; pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes. pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20; @@ -145,7 +147,7 @@ pub(crate) async fn process_historical_messages( pub(crate) async fn last_indexed_block_from_metadata( aws_config: &SdkConfig, ) -> anyhow::Result { - let key = format!("{}/{}", INDEXED_DATA_FILES_FOLDER, "latest_block.json"); + let key = format!("{}/{}", INDEXED_ACTIONS_FILES_FOLDER, "latest_block.json"); let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); let s3_client: S3Client = S3Client::from_conf(s3_config); let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await?; @@ -187,7 +189,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files( s3::fetch_contract_index_files( aws_config, s3_bucket, - INDEXED_DATA_FILES_FOLDER, + INDEXED_ACTIONS_FILES_FOLDER, start_date, affected_account_id, ) diff --git a/indexer/queryapi_coordinator/src/s3.rs b/indexer/queryapi_coordinator/src/s3.rs index be640b437..819bc165f 100644 --- a/indexer/queryapi_coordinator/src/s3.rs +++ b/indexer/queryapi_coordinator/src/s3.rs @@ -1,3 +1,4 @@ +use crate::historical_block_processing::METADATA_FOLDER; use anyhow::{bail, Context, Result}; use aws_sdk_s3::Client as S3Client; use aws_sdk_s3::Config; @@ -9,8 +10,13 @@ use regex::Regex; // Sanity check, if we hit this we have 1M S3 results. // Currently that would be either 2,700 years of FunctionCall data or 1M contract folders. // If we hit 1M contracts we should build an index to support efficient wildcard contract matching. -// temporarily increasing from 1000 to 30,000 until new wildcard handling and index structure is in place. -const MAX_S3_LIST_REQUESTS: usize = 30000; +const MAX_S3_LIST_REQUESTS: usize = 1000; + +fn storage_path_for_account(account: &str) -> String { + let mut folders = account.split('.').collect::>(); + folders.reverse(); + folders.join("/") +} pub async fn find_index_files_by_pattern( aws_config: &SdkConfig, @@ -20,18 +26,23 @@ pub async fn find_index_files_by_pattern( ) -> Result> { Ok(match pattern { x if x.contains(',') => { - let contract_array = x.split(','); + let account_array = x.split(','); let mut results = vec![]; - for contract in contract_array { - let contract = contract.trim(); - let sub_results = if contract.contains('*') { - list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &contract) + for account in account_array { + let account = account.trim(); + let sub_results = if account.contains('*') { + list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &account) .await? } else { list_s3_bucket_by_prefix( aws_config, s3_bucket, - &format!("{}/{}/", s3_folder, contract), + &format!( + "{}/{}/{}/", + s3_folder, + storage_path_for_account(account), + METADATA_FOLDER + ), ) .await? }; @@ -46,7 +57,12 @@ pub async fn find_index_files_by_pattern( list_s3_bucket_by_prefix( aws_config, s3_bucket, - &format!("{}/{}/", s3_folder, pattern), + &format!( + "{}/{}/{}/", + s3_folder, + storage_path_for_account(pattern), + METADATA_FOLDER + ), ) .await? } @@ -57,17 +73,18 @@ async fn list_index_files_by_wildcard( aws_config: &SdkConfig, s3_bucket: &str, s3_folder: &str, - x: &&str, + pattern: &&str, ) -> Result> { - // fetch all folders and filter by regex + // remove sub-account wildcard from pattern + let pattern = pattern.replace("*.", ""); + let path = storage_path_for_account(&pattern); + let folders = - list_s3_bucket_by_prefix(aws_config, s3_bucket, &format!("{}/", s3_folder)).await?; - let regex_string = &x.replace('.', "\\.").replace('*', ".*"); - let re = Regex::new(regex_string).unwrap(); - let matching_folders = folders.into_iter().filter(|folder| re.is_match(folder)); + list_s3_bucket_by_prefix(aws_config, s3_bucket, &format!("{}/{}/", s3_folder, path)) + .await?; // for each matching folder list files let mut results = vec![]; - for folder in matching_folders { + for folder in folders { results.extend(list_s3_bucket_by_prefix(aws_config, s3_bucket, &folder).await?); } Ok(results) @@ -199,23 +216,23 @@ fn file_name_date_after(start_date: DateTime, file_name: &str) -> bool { #[cfg(test)] mod tests { - use crate::opts::{Opts, Parser}; + use crate::opts::Opts; use crate::s3::{find_index_files_by_pattern, list_s3_bucket_by_prefix}; - use aws_types::SdkConfig; + use crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET; + use crate::historical_block_processing::INDEXED_ACTIONS_FILES_FOLDER; /// Parses env vars from .env, Run with /// cargo test s3::tests::list_delta_bucket -- mainnet from-latest; #[tokio::test] async fn list_delta_bucket() { - let opts = Opts::parse(); - let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); + let opts = Opts::test_opts_with_aws(); let list = list_s3_bucket_by_prefix( - aws_config, - crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET, + &opts.lake_aws_sdk_config(), + INDEXED_DATA_FILES_BUCKET, &format!( "{}/", - crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER.to_string() + INDEXED_ACTIONS_FILES_FOLDER.to_string() ), ) .await @@ -226,12 +243,12 @@ mod tests { /// cargo test s3::tests::list_with_single_contract -- mainnet from-latest #[tokio::test] async fn list_with_single_contract() { - let opts = Opts::parse(); + let opts = Opts::test_opts_with_aws(); let list = find_index_files_by_pattern( &opts.lake_aws_sdk_config(), - crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET, - crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER, + INDEXED_DATA_FILES_BUCKET, + INDEXED_ACTIONS_FILES_FOLDER, "hackathon.agency.near", ) .await @@ -242,12 +259,12 @@ mod tests { /// cargo test s3::tests::list_with_csv_contracts -- mainnet from-latest #[tokio::test] async fn list_with_csv_contracts() { - let opts = Opts::parse(); + let opts = Opts::test_opts_with_aws(); let list = find_index_files_by_pattern( &opts.lake_aws_sdk_config(), - crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET, - crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER, + INDEXED_DATA_FILES_BUCKET, + INDEXED_ACTIONS_FILES_FOLDER, "hackathon.agency.near, hackathon.aurora-silo-dev.near, hackathon.sputnik-dao.near", ) .await @@ -258,12 +275,12 @@ mod tests { /// cargo test s3::tests::list_with_wildcard_contracts -- mainnet from-latest #[tokio::test] async fn list_with_wildcard_contracts() { - let opts = Opts::parse(); + let opts = Opts::test_opts_with_aws(); let list = find_index_files_by_pattern( &opts.lake_aws_sdk_config(), - crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET, - crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER, + INDEXED_DATA_FILES_BUCKET, + INDEXED_ACTIONS_FILES_FOLDER, "*.keypom.near", ) .await @@ -274,16 +291,34 @@ mod tests { /// cargo test s3::tests::list_with_csv_and_wildcard_contracts -- mainnet from-latest #[tokio::test] async fn list_with_csv_and_wildcard_contracts() { - let opts = Opts::parse(); + let opts = Opts::test_opts_with_aws(); let list = find_index_files_by_pattern( &opts.lake_aws_sdk_config(), - crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET, - crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER, + INDEXED_DATA_FILES_BUCKET, + INDEXED_ACTIONS_FILES_FOLDER, "*.keypom.near, hackathon.agency.near, *.nearcrowd.near", ) .await .unwrap(); assert!(list.len() > 1370); } + + #[test] + fn storage_path_for_account_splits_and_reverses_into_folders() { + let account = "buildnear.testnet"; + let expected = "testnet/buildnear"; + let actual = super::storage_path_for_account(account); + assert_eq!(expected, actual); + + let account = "v2.keypom.near"; + let expected = "near/keypom/v2"; + let actual = super::storage_path_for_account(account); + assert_eq!(expected, actual); + + let account = "0.app5.hipodev.near"; + let expected = "near/hipodev/app5/0"; + let actual = super::storage_path_for_account(account); + assert_eq!(expected, actual); + } } From 53fb8a3f919521eaa0be41f177835b7fca18bcce Mon Sep 17 00:00:00 2001 From: Gabe Hamilton Date: Mon, 31 Jul 2023 15:53:29 -0600 Subject: [PATCH 3/9] cargo fmt --- indexer/queryapi_coordinator/src/s3.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/indexer/queryapi_coordinator/src/s3.rs b/indexer/queryapi_coordinator/src/s3.rs index 819bc165f..8a8c5a3d6 100644 --- a/indexer/queryapi_coordinator/src/s3.rs +++ b/indexer/queryapi_coordinator/src/s3.rs @@ -31,8 +31,7 @@ pub async fn find_index_files_by_pattern( for account in account_array { let account = account.trim(); let sub_results = if account.contains('*') { - list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &account) - .await? + list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &account).await? } else { list_s3_bucket_by_prefix( aws_config, @@ -216,10 +215,10 @@ fn file_name_date_after(start_date: DateTime, file_name: &str) -> bool { #[cfg(test)] mod tests { + use crate::historical_block_processing::INDEXED_ACTIONS_FILES_FOLDER; + use crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET; use crate::opts::Opts; use crate::s3::{find_index_files_by_pattern, list_s3_bucket_by_prefix}; - use crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET; - use crate::historical_block_processing::INDEXED_ACTIONS_FILES_FOLDER; /// Parses env vars from .env, Run with /// cargo test s3::tests::list_delta_bucket -- mainnet from-latest; @@ -230,10 +229,7 @@ mod tests { let list = list_s3_bucket_by_prefix( &opts.lake_aws_sdk_config(), INDEXED_DATA_FILES_BUCKET, - &format!( - "{}/", - INDEXED_ACTIONS_FILES_FOLDER.to_string() - ), + &format!("{}/", INDEXED_ACTIONS_FILES_FOLDER.to_string()), ) .await .unwrap(); From d2f44e857403eb1b1b6a7c8b534ed83c04db11a0 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Wed, 2 Aug 2023 07:57:35 +1200 Subject: [PATCH 4/9] DPLT-1074 Queue real-time messages on Redis Streams (#157) --- .../queryapi_coordinator/src/indexer_types.rs | 6 ++ indexer/queryapi_coordinator/src/main.rs | 16 +++++- indexer/storage/src/lib.rs | 56 +++++++++++++++++++ 3 files changed, 77 insertions(+), 1 deletion(-) diff --git a/indexer/queryapi_coordinator/src/indexer_types.rs b/indexer/queryapi_coordinator/src/indexer_types.rs index 4a4993ce5..f3880320b 100644 --- a/indexer/queryapi_coordinator/src/indexer_types.rs +++ b/indexer/queryapi_coordinator/src/indexer_types.rs @@ -40,3 +40,9 @@ pub struct IndexerFunction { pub provisioned: bool, pub indexer_rule: IndexerRule, } + +impl IndexerFunction { + pub fn get_full_name(&self) -> String { + format!("{}/{}", self.account_id, self.function_name) + } +} diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index e447f3316..6cd7398af 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -10,7 +10,7 @@ use near_lake_framework::near_indexer_primitives::{types, StreamerMessage}; use crate::indexer_types::IndexerFunction; use indexer_types::{IndexerQueueMessage, IndexerRegistry}; use opts::{Opts, Parser}; -use storage::ConnectionManager; +use storage::{self, ConnectionManager}; pub(crate) mod cache; mod historical_block_processing; @@ -195,6 +195,20 @@ async fn handle_streamer_message( if !indexer_function.provisioned { set_provisioned_flag(&mut indexer_registry_locked, &indexer_function); } + + storage::set( + context.redis_connection_manager, + &format!("{}:storage", indexer_function.get_full_name()), + serde_json::to_string(indexer_function)?, + ) + .await?; + + storage::add_to_registered_stream( + context.redis_connection_manager, + &format!("{}:stream", indexer_function.get_full_name()), + &[("block_height", block_height)], + ) + .await? } stream::iter(indexer_function_messages.into_iter()) diff --git a/indexer/storage/src/lib.rs b/indexer/storage/src/lib.rs index 59e8c8b04..332d1c789 100644 --- a/indexer/storage/src/lib.rs +++ b/indexer/storage/src/lib.rs @@ -2,6 +2,8 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs}; const STORAGE: &str = "storage_alertexer"; +const STREAMS_SET_KEY: &str = "streams"; + pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client { redis::Client::open(redis_connection_str).expect("can create redis client") } @@ -50,6 +52,60 @@ pub async fn get( tracing::debug!(target: STORAGE, "GET: {:?}: {:?}", &key, &value,); Ok(value) } + +async fn sadd( + redis_connection_manager: &ConnectionManager, + key: impl ToRedisArgs + std::fmt::Debug, + value: impl ToRedisArgs + std::fmt::Debug, +) -> anyhow::Result<()> { + tracing::debug!(target: STORAGE, "SADD: {:?}: {:?}", key, value); + + redis::cmd("SADD") + .arg(key) + .arg(value) + .query_async(&mut redis_connection_manager.clone()) + .await?; + + Ok(()) +} + +async fn xadd( + redis_connection_manager: &ConnectionManager, + stream_key: &str, + fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)], +) -> anyhow::Result<()> { + tracing::debug!(target: STORAGE, "XADD: {}, {:?}", stream_key, fields); + + // TODO: Remove stream cap when we finally start processing it + redis::cmd("XTRIM") + .arg("MAXLEN") + .arg(100) + .query_async(&mut redis_connection_manager.clone()) + .await?; + + let mut cmd = redis::cmd("XADD"); + cmd.arg(stream_key).arg("*"); + + for (field, value) in fields { + cmd.arg(*field).arg(value); + } + + cmd.query_async(&mut redis_connection_manager.clone()) + .await?; + + Ok(()) +} + +pub async fn add_to_registered_stream( + redis_connection_manager: &ConnectionManager, + key: &str, + fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)], +) -> anyhow::Result<()> { + sadd(redis_connection_manager, STREAMS_SET_KEY, key).await?; + xadd(redis_connection_manager, key, fields).await?; + + Ok(()) +} /// Sets the key `receipt_id: &str` with value `transaction_hash: &str` to the Redis storage. /// Increments the counter `receipts_{transaction_hash}` by one. /// The counter holds how many Receipts related to the Transaction are in watching list From 18d0f9f5d20e33de1fc645d9c32911016b850c74 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Wed, 2 Aug 2023 08:25:10 +1200 Subject: [PATCH 5/9] Revert "DPLT-1074 Queue real-time messages on Redis Streams" (#160) --- .../queryapi_coordinator/src/indexer_types.rs | 6 -- indexer/queryapi_coordinator/src/main.rs | 16 +----- indexer/storage/src/lib.rs | 56 ------------------- 3 files changed, 1 insertion(+), 77 deletions(-) diff --git a/indexer/queryapi_coordinator/src/indexer_types.rs b/indexer/queryapi_coordinator/src/indexer_types.rs index f3880320b..4a4993ce5 100644 --- a/indexer/queryapi_coordinator/src/indexer_types.rs +++ b/indexer/queryapi_coordinator/src/indexer_types.rs @@ -40,9 +40,3 @@ pub struct IndexerFunction { pub provisioned: bool, pub indexer_rule: IndexerRule, } - -impl IndexerFunction { - pub fn get_full_name(&self) -> String { - format!("{}/{}", self.account_id, self.function_name) - } -} diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index 6cd7398af..e447f3316 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -10,7 +10,7 @@ use near_lake_framework::near_indexer_primitives::{types, StreamerMessage}; use crate::indexer_types::IndexerFunction; use indexer_types::{IndexerQueueMessage, IndexerRegistry}; use opts::{Opts, Parser}; -use storage::{self, ConnectionManager}; +use storage::ConnectionManager; pub(crate) mod cache; mod historical_block_processing; @@ -195,20 +195,6 @@ async fn handle_streamer_message( if !indexer_function.provisioned { set_provisioned_flag(&mut indexer_registry_locked, &indexer_function); } - - storage::set( - context.redis_connection_manager, - &format!("{}:storage", indexer_function.get_full_name()), - serde_json::to_string(indexer_function)?, - ) - .await?; - - storage::add_to_registered_stream( - context.redis_connection_manager, - &format!("{}:stream", indexer_function.get_full_name()), - &[("block_height", block_height)], - ) - .await? } stream::iter(indexer_function_messages.into_iter()) diff --git a/indexer/storage/src/lib.rs b/indexer/storage/src/lib.rs index 332d1c789..59e8c8b04 100644 --- a/indexer/storage/src/lib.rs +++ b/indexer/storage/src/lib.rs @@ -2,8 +2,6 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs}; const STORAGE: &str = "storage_alertexer"; -const STREAMS_SET_KEY: &str = "streams"; - pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client { redis::Client::open(redis_connection_str).expect("can create redis client") } @@ -52,60 +50,6 @@ pub async fn get( tracing::debug!(target: STORAGE, "GET: {:?}: {:?}", &key, &value,); Ok(value) } - -async fn sadd( - redis_connection_manager: &ConnectionManager, - key: impl ToRedisArgs + std::fmt::Debug, - value: impl ToRedisArgs + std::fmt::Debug, -) -> anyhow::Result<()> { - tracing::debug!(target: STORAGE, "SADD: {:?}: {:?}", key, value); - - redis::cmd("SADD") - .arg(key) - .arg(value) - .query_async(&mut redis_connection_manager.clone()) - .await?; - - Ok(()) -} - -async fn xadd( - redis_connection_manager: &ConnectionManager, - stream_key: &str, - fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)], -) -> anyhow::Result<()> { - tracing::debug!(target: STORAGE, "XADD: {}, {:?}", stream_key, fields); - - // TODO: Remove stream cap when we finally start processing it - redis::cmd("XTRIM") - .arg("MAXLEN") - .arg(100) - .query_async(&mut redis_connection_manager.clone()) - .await?; - - let mut cmd = redis::cmd("XADD"); - cmd.arg(stream_key).arg("*"); - - for (field, value) in fields { - cmd.arg(*field).arg(value); - } - - cmd.query_async(&mut redis_connection_manager.clone()) - .await?; - - Ok(()) -} - -pub async fn add_to_registered_stream( - redis_connection_manager: &ConnectionManager, - key: &str, - fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)], -) -> anyhow::Result<()> { - sadd(redis_connection_manager, STREAMS_SET_KEY, key).await?; - xadd(redis_connection_manager, key, fields).await?; - - Ok(()) -} /// Sets the key `receipt_id: &str` with value `transaction_hash: &str` to the Redis storage. /// Increments the counter `receipts_{transaction_hash}` by one. /// The counter holds how many Receipts related to the Transaction are in watching list From 332b4a5aa65253665186699a95f08b9210b1ec3d Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Wed, 2 Aug 2023 14:48:56 +1200 Subject: [PATCH 6/9] Revert "Revert "DPLT-1074 Queue real-time messages on Redis Streams"" (#161) --- .../queryapi_coordinator/src/indexer_types.rs | 6 ++ indexer/queryapi_coordinator/src/main.rs | 22 ++++++- indexer/storage/src/lib.rs | 57 +++++++++++++++++++ 3 files changed, 83 insertions(+), 2 deletions(-) diff --git a/indexer/queryapi_coordinator/src/indexer_types.rs b/indexer/queryapi_coordinator/src/indexer_types.rs index 4a4993ce5..f3880320b 100644 --- a/indexer/queryapi_coordinator/src/indexer_types.rs +++ b/indexer/queryapi_coordinator/src/indexer_types.rs @@ -40,3 +40,9 @@ pub struct IndexerFunction { pub provisioned: bool, pub indexer_rule: IndexerRule, } + +impl IndexerFunction { + pub fn get_full_name(&self) -> String { + format!("{}/{}", self.account_id, self.function_name) + } +} diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index e447f3316..1ea48047c 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -10,7 +10,7 @@ use near_lake_framework::near_indexer_primitives::{types, StreamerMessage}; use crate::indexer_types::IndexerFunction; use indexer_types::{IndexerQueueMessage, IndexerRegistry}; use opts::{Opts, Parser}; -use storage::ConnectionManager; +use storage::{self, ConnectionManager}; pub(crate) mod cache; mod historical_block_processing; @@ -111,7 +111,11 @@ async fn main() -> anyhow::Result<()> { }) .buffer_unordered(1usize); - while let Some(_handle_message) = handlers.next().await {} + while let Some(handle_message) = handlers.next().await { + if let Err(err) = handle_message { + tracing::error!(target: INDEXER, "{:#?}", err); + } + } drop(handlers); // close the channel so the sender will stop // propagate errors from the sender @@ -195,6 +199,20 @@ async fn handle_streamer_message( if !indexer_function.provisioned { set_provisioned_flag(&mut indexer_registry_locked, &indexer_function); } + + storage::set( + context.redis_connection_manager, + &format!("{}:storage", indexer_function.get_full_name()), + serde_json::to_string(indexer_function)?, + ) + .await?; + + storage::add_to_registered_stream( + context.redis_connection_manager, + &format!("{}:stream", indexer_function.get_full_name()), + &[("block_height", block_height)], + ) + .await?; } stream::iter(indexer_function_messages.into_iter()) diff --git a/indexer/storage/src/lib.rs b/indexer/storage/src/lib.rs index 59e8c8b04..98c88a297 100644 --- a/indexer/storage/src/lib.rs +++ b/indexer/storage/src/lib.rs @@ -2,6 +2,8 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs}; const STORAGE: &str = "storage_alertexer"; +const STREAMS_SET_KEY: &str = "streams"; + pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client { redis::Client::open(redis_connection_str).expect("can create redis client") } @@ -50,6 +52,61 @@ pub async fn get( tracing::debug!(target: STORAGE, "GET: {:?}: {:?}", &key, &value,); Ok(value) } + +async fn sadd( + redis_connection_manager: &ConnectionManager, + key: impl ToRedisArgs + std::fmt::Debug, + value: impl ToRedisArgs + std::fmt::Debug, +) -> anyhow::Result<()> { + tracing::debug!(target: STORAGE, "SADD: {:?}: {:?}", key, value); + + redis::cmd("SADD") + .arg(key) + .arg(value) + .query_async(&mut redis_connection_manager.clone()) + .await?; + + Ok(()) +} + +async fn xadd( + redis_connection_manager: &ConnectionManager, + stream_key: &str, + fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)], +) -> anyhow::Result<()> { + tracing::debug!(target: STORAGE, "XADD: {}, {:?}", stream_key, fields); + + // TODO: Remove stream cap when we finally start processing it + redis::cmd("XTRIM") + .arg(stream_key) + .arg("MAXLEN") + .arg(100) + .query_async(&mut redis_connection_manager.clone()) + .await?; + + let mut cmd = redis::cmd("XADD"); + cmd.arg(stream_key).arg("*"); + + for (field, value) in fields { + cmd.arg(*field).arg(value); + } + + cmd.query_async(&mut redis_connection_manager.clone()) + .await?; + + Ok(()) +} + +pub async fn add_to_registered_stream( + redis_connection_manager: &ConnectionManager, + key: &str, + fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)], +) -> anyhow::Result<()> { + sadd(redis_connection_manager, STREAMS_SET_KEY, key).await?; + xadd(redis_connection_manager, key, fields).await?; + + Ok(()) +} /// Sets the key `receipt_id: &str` with value `transaction_hash: &str` to the Redis storage. /// Increments the counter `receipts_{transaction_hash}` by one. /// The counter holds how many Receipts related to the Transaction are in watching list From cfc62714c7d8a9c55718fdf1cce147e10d1f2dbd Mon Sep 17 00:00:00 2001 From: Gabe Hamilton Date: Wed, 2 Aug 2023 21:36:00 -0600 Subject: [PATCH 7/9] Matches latest index files structure. Additional debugging information in logs. --- .../src/historical_block_processing.rs | 4 +--- ...istorical_block_processing_integration_tests.rs | 2 +- .../queryapi_coordinator/src/indexer_registry.rs | 9 ++++++--- indexer/queryapi_coordinator/src/main.rs | 10 ++++++++-- indexer/queryapi_coordinator/src/s3.rs | 14 +++++--------- 5 files changed, 21 insertions(+), 18 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 09036fa20..366fe7da8 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -18,9 +18,7 @@ use tokio::task::JoinHandle; pub const INDEXED_DATA_FILES_BUCKET: &str = "near-delta-lake"; pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-"; -pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_actions"; -pub const METADATA_FOLDER: &str = "metadata"; -pub const DATA_FOLDER: &str = "data"; +pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_actions/metadata"; pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes. pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20; diff --git a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs index 9b9ae91d9..c2ca90a3a 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs @@ -192,6 +192,6 @@ mod tests { // remove any blocks from after when the test was written let fixed_blocks: Vec = blocks.into_iter().filter(|&b| b <= 95175853u64).collect(); - assert_eq!(fixed_blocks.len(), 6); // hackathon.agency.near = 45894627,45898423, hacker.agency.near = 45897358, hack.agency.near = 45894872,45895120,45896237 + assert_eq!(fixed_blocks.len(), 197); // hackathon.agency.near = 45894627,45898423, hacker.agency.near = 45897358, hack.agency.near = 45894872,45895120,45896237 } } diff --git a/indexer/queryapi_coordinator/src/indexer_registry.rs b/indexer/queryapi_coordinator/src/indexer_registry.rs index 987a59cb1..3ab2ec060 100644 --- a/indexer/queryapi_coordinator/src/indexer_registry.rs +++ b/indexer/queryapi_coordinator/src/indexer_registry.rs @@ -138,12 +138,14 @@ fn index_and_process_register_calls( .entry(new_indexer_function.account_id.clone()) .or_default(); - match fns.get(new_indexer_function.function_name.as_str()) { + let functions = fns.get(new_indexer_function.function_name.as_str()); + match functions { // if there is no existing function then we will insert the new one with the default state of provisioned = false None => { tracing::info!( target: crate::INDEXER, - "indexed creation call to {registry_method_name}: {:?} {:?}", + "Block {}. Indexed creation call to {registry_method_name}: {:?} {:?}", + block_height, new_indexer_function.account_id.clone(), new_indexer_function.function_name.clone() ); @@ -153,7 +155,8 @@ fn index_and_process_register_calls( Some(old_indexer_function) => { tracing::info!( target: crate::INDEXER, - "indexed update call to {registry_method_name}: {:?} {:?}", + "Block {}. Indexed update call to {registry_method_name}: {:?} {:?}", + block_height, new_indexer_function.account_id.clone(), new_indexer_function.function_name.clone(), ); diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index e447f3316..ddec6df5a 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -252,11 +252,17 @@ fn set_provisioned_flag( indexer_function.provisioned = true; } None => { + let keys = account_functions + .keys() + .map(|s| &**s) + .collect::>() + .join(", "); tracing::error!( target: INDEXER, - "Unable to set provisioning status, Indexer function with account_id {} and function_name {} not found in registry", + "Unable to set provisioning status, Indexer function with account_id {} and function_name {} not found in registry. Functions for this account are: {}", indexer_function.account_id, - indexer_function.function_name + indexer_function.function_name, + keys ); } } diff --git a/indexer/queryapi_coordinator/src/s3.rs b/indexer/queryapi_coordinator/src/s3.rs index 8a8c5a3d6..5c0f98b49 100644 --- a/indexer/queryapi_coordinator/src/s3.rs +++ b/indexer/queryapi_coordinator/src/s3.rs @@ -1,11 +1,9 @@ -use crate::historical_block_processing::METADATA_FOLDER; use anyhow::{bail, Context, Result}; use aws_sdk_s3::Client as S3Client; use aws_sdk_s3::Config; use aws_types::SdkConfig; use chrono::{DateTime, NaiveDate, Utc}; use futures::future::try_join_all; -use regex::Regex; // Sanity check, if we hit this we have 1M S3 results. // Currently that would be either 2,700 years of FunctionCall data or 1M contract folders. @@ -37,10 +35,9 @@ pub async fn find_index_files_by_pattern( aws_config, s3_bucket, &format!( - "{}/{}/{}/", + "{}/{}/", s3_folder, - storage_path_for_account(account), - METADATA_FOLDER + storage_path_for_account(account) ), ) .await? @@ -57,10 +54,9 @@ pub async fn find_index_files_by_pattern( aws_config, s3_bucket, &format!( - "{}/{}/{}/", + "{}/{}/", s3_folder, storage_path_for_account(pattern), - METADATA_FOLDER ), ) .await? @@ -233,7 +229,7 @@ mod tests { ) .await .unwrap(); - assert!(list.len() > 35000); + assert_eq!(list.len(), 4); } /// cargo test s3::tests::list_with_single_contract -- mainnet from-latest @@ -265,7 +261,7 @@ mod tests { ) .await .unwrap(); - assert!(list.len() >= 13); // expecting 13 but these contracts could get randomly called sometime + assert!(list.len() >= 15); // expecting 15 but these contracts could get randomly called sometime } /// cargo test s3::tests::list_with_wildcard_contracts -- mainnet from-latest From a829582000b377d8ff71992f6dc52a0d416dfcc4 Mon Sep 17 00:00:00 2001 From: Gabe Hamilton Date: Wed, 2 Aug 2023 21:40:26 -0600 Subject: [PATCH 8/9] cargo fmt of format --- indexer/queryapi_coordinator/src/s3.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/indexer/queryapi_coordinator/src/s3.rs b/indexer/queryapi_coordinator/src/s3.rs index 5c0f98b49..6c29a06c6 100644 --- a/indexer/queryapi_coordinator/src/s3.rs +++ b/indexer/queryapi_coordinator/src/s3.rs @@ -34,11 +34,7 @@ pub async fn find_index_files_by_pattern( list_s3_bucket_by_prefix( aws_config, s3_bucket, - &format!( - "{}/{}/", - s3_folder, - storage_path_for_account(account) - ), + &format!("{}/{}/", s3_folder, storage_path_for_account(account)), ) .await? }; @@ -53,11 +49,7 @@ pub async fn find_index_files_by_pattern( list_s3_bucket_by_prefix( aws_config, s3_bucket, - &format!( - "{}/{}/", - s3_folder, - storage_path_for_account(pattern), - ), + &format!("{}/{}/", s3_folder, storage_path_for_account(pattern),), ) .await? } From 4f66ec30a37a3f7811e3e30bb4d98cfa0dd1887d Mon Sep 17 00:00:00 2001 From: Roshaan Siddiqui Date: Thu, 3 Aug 2023 10:31:47 -0500 Subject: [PATCH 9/9] DEC-1372 feat: add moderation list support for feed (#163) --- .../feed/src/QueryApi.Examples.Feed.Posts.jsx | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/frontend/widgets/examples/feed/src/QueryApi.Examples.Feed.Posts.jsx b/frontend/widgets/examples/feed/src/QueryApi.Examples.Feed.Posts.jsx index 9c59d7a71..8e9e0e813 100644 --- a/frontend/widgets/examples/feed/src/QueryApi.Examples.Feed.Posts.jsx +++ b/frontend/widgets/examples/feed/src/QueryApi.Examples.Feed.Posts.jsx @@ -4,6 +4,7 @@ const GRAPHQL_ENDPOINT = const sortOption = props.postsOrderOption || "blockHeight"; // following, blockHeight const LIMIT = 25; let accountsFollowing = props.accountsFollowing +const moderatorAccount = props?.moderatorAccount || "bosmod.near"; if (context.accountId && !accountsFollowing) { const graph = Social.keys(`${context.accountId}/graph/follow/*`, "final"); @@ -12,6 +13,25 @@ if (context.accountId && !accountsFollowing) { } } +let filterUsersRaw = Social.get( + `${moderatorAccount}/moderate/users`, + "optimistic", + { + subscribe: true, + } +); + +if (filterUsers === null) { + // haven't loaded filter list yet, return early + return ""; +} + +const filterUsers = filterUsersRaw ? JSON.parse(filterUsersRaw) : []; + +const shouldFilter = (item) => { + return filterUsers.includes(item.accountId); +}; + State.init({ selectedTab: Storage.privateGet("selectedTab") || "all", posts: [], @@ -297,7 +317,7 @@ return ( )} - + !shouldFilter(i))}} />