diff --git a/frontend/widgets/examples/feed/src/QueryApi.Examples.Feed.Posts.jsx b/frontend/widgets/examples/feed/src/QueryApi.Examples.Feed.Posts.jsx index 9c59d7a71..8e9e0e813 100644 --- a/frontend/widgets/examples/feed/src/QueryApi.Examples.Feed.Posts.jsx +++ b/frontend/widgets/examples/feed/src/QueryApi.Examples.Feed.Posts.jsx @@ -4,6 +4,7 @@ const GRAPHQL_ENDPOINT = const sortOption = props.postsOrderOption || "blockHeight"; // following, blockHeight const LIMIT = 25; let accountsFollowing = props.accountsFollowing +const moderatorAccount = props?.moderatorAccount || "bosmod.near"; if (context.accountId && !accountsFollowing) { const graph = Social.keys(`${context.accountId}/graph/follow/*`, "final"); @@ -12,6 +13,25 @@ if (context.accountId && !accountsFollowing) { } } +let filterUsersRaw = Social.get( + `${moderatorAccount}/moderate/users`, + "optimistic", + { + subscribe: true, + } +); + +if (filterUsers === null) { + // haven't loaded filter list yet, return early + return ""; +} + +const filterUsers = filterUsersRaw ? JSON.parse(filterUsersRaw) : []; + +const shouldFilter = (item) => { + return filterUsers.includes(item.accountId); +}; + State.init({ selectedTab: Storage.privateGet("selectedTab") || "all", posts: [], @@ -297,7 +317,7 @@ return ( )} - + !shouldFilter(i))}} /> diff --git a/indexer/indexer_rules_engine/src/lib.rs b/indexer/indexer_rules_engine/src/lib.rs index 1e694d3c3..e267b269f 100644 --- a/indexer/indexer_rules_engine/src/lib.rs +++ b/indexer/indexer_rules_engine/src/lib.rs @@ -30,8 +30,8 @@ pub fn reduce_indexer_rule_matches_sync( indexer_rule: &IndexerRule, streamer_message: &StreamerMessage, chain_id: ChainId, -) -> anyhow::Result> { - Ok(match &indexer_rule.matching_rule { +) -> Vec { + match &indexer_rule.matching_rule { MatchingRule::ActionAny { .. } | MatchingRule::ActionFunctionCall { .. } | MatchingRule::Event { .. } => { @@ -39,7 +39,7 @@ pub fn reduce_indexer_rule_matches_sync( indexer_rule, streamer_message, chain_id, - )? + ) } - }) + } } diff --git a/indexer/indexer_rules_engine/src/outcomes_reducer_sync.rs b/indexer/indexer_rules_engine/src/outcomes_reducer_sync.rs index e93a745c1..9e87efaf6 100644 --- a/indexer/indexer_rules_engine/src/outcomes_reducer_sync.rs +++ b/indexer/indexer_rules_engine/src/outcomes_reducer_sync.rs @@ -10,7 +10,7 @@ pub fn reduce_indexer_rule_matches_from_outcomes( indexer_rule: &IndexerRule, streamer_message: &StreamerMessage, chain_id: ChainId, -) -> anyhow::Result> { +) -> Vec { streamer_message .shards .iter() @@ -41,8 +41,8 @@ fn build_indexer_rule_match( block_header_hash: String, block_height: u64, chain_id: ChainId, -) -> anyhow::Result { - Ok(IndexerRuleMatch { +) -> IndexerRuleMatch { + IndexerRuleMatch { chain_id: chain_id.clone(), indexer_rule_id: indexer_rule.id, indexer_rule_name: indexer_rule.name.clone(), @@ -52,7 +52,7 @@ fn build_indexer_rule_match( block_header_hash, ), block_height, - }) + } } fn build_indexer_rule_match_payload( diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 9800d25d5..366fe7da8 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -2,12 +2,13 @@ use crate::indexer_types::{IndexerFunction, IndexerQueueMessage}; use crate::opts::{Opts, Parser}; use crate::queue; use crate::s3; +use anyhow::{bail, Context}; use aws_sdk_s3::Client as S3Client; use aws_sdk_s3::Config; use aws_sdk_sqs::Client; use aws_types::SdkConfig; use chrono::{DateTime, LocalResult, TimeZone, Utc}; -use indexer_rule_type::indexer_rule::{IndexerRule, MatchingRule}; +use indexer_rule_type::indexer_rule::MatchingRule; use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatchPayload}; use near_jsonrpc_client::JsonRpcClient; use near_jsonrpc_primitives::types::blocks::RpcBlockRequest; @@ -17,8 +18,9 @@ use tokio::task::JoinHandle; pub const INDEXED_DATA_FILES_BUCKET: &str = "near-delta-lake"; pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-"; -pub const INDEXED_DATA_FILES_FOLDER: &str = "silver/contracts/action_receipt_actions/metadata"; +pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_actions/metadata"; pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes. +pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20; pub fn spawn_historical_message_thread( block_height: BlockHeight, @@ -26,7 +28,7 @@ pub fn spawn_historical_message_thread( ) -> Option> { new_indexer_function.start_block_height.map(|_| { let new_indexer_function_copy = new_indexer_function.clone(); - tokio::spawn(process_historical_messages( + tokio::spawn(process_historical_messages_or_handle_error( block_height, new_indexer_function_copy, Opts::parse(), @@ -34,21 +36,39 @@ pub fn spawn_historical_message_thread( }) } -pub(crate) async fn process_historical_messages( +pub(crate) async fn process_historical_messages_or_handle_error( block_height: BlockHeight, indexer_function: IndexerFunction, opts: Opts, ) -> i64 { + match process_historical_messages(block_height, indexer_function, opts).await { + Ok(block_difference) => block_difference, + Err(err) => { + // todo: when Coordinator can send log messages to Runner, send this error to Runner + tracing::error!( + target: crate::INDEXER, + "Error processing historical messages: {:?}", + err + ); + 0 + } + } +} +pub(crate) async fn process_historical_messages( + block_height: BlockHeight, + indexer_function: IndexerFunction, + opts: Opts, +) -> anyhow::Result { let start_block = indexer_function.start_block_height.unwrap(); let block_difference: i64 = (block_height - start_block) as i64; match block_difference { i64::MIN..=-1 => { - tracing::error!(target: crate::INDEXER, "Skipping back fill, start_block_height is greater than current block height: {:?} {:?}", + bail!("Skipping back fill, start_block_height is greater than current block height: {:?} {:?}", indexer_function.account_id, indexer_function.function_name); } 0 => { - tracing::info!(target: crate::INDEXER, "Skipping back fill, start_block_height is equal to current block height: {:?} {:?}", + bail!("Skipping back fill, start_block_height is equal to current block height: {:?} {:?}", indexer_function.account_id, indexer_function.function_name); } @@ -67,35 +87,21 @@ pub(crate) async fn process_historical_messages( let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); let json_rpc_client = JsonRpcClient::connect(opts.rpc_url()); - let start_date = block_to_date(start_block, &json_rpc_client).await; - if start_date.is_none() { - tracing::error!( - target: crate::INDEXER, - "Failed to get start date for block {}", - start_block - ); - return block_difference; - } + let start_date = + lookup_block_date_or_next_block_date(start_block, &json_rpc_client).await?; let mut indexer_function = indexer_function.clone(); - let last_indexed_block = last_indexed_block_from_metadata(aws_config).await; - if last_indexed_block.is_err() { - tracing::error!( - target: crate::INDEXER, - last_indexed_block = ?last_indexed_block, - ); - return block_difference; - } - let last_indexed_block = last_indexed_block.unwrap(); + let last_indexed_block = last_indexed_block_from_metadata(aws_config).await?; + let last_indexed_block = last_indexed_block; let mut blocks_from_index = filter_matching_blocks_from_index_files( start_block, - &indexer_function.indexer_rule, + &indexer_function, aws_config, - start_date.unwrap(), + start_date, ) - .await; + .await?; // Check for the case where an index file is written right after we get the last_indexed_block metadata let last_block_in_data = blocks_from_index.last().unwrap_or(&start_block); @@ -109,19 +115,19 @@ pub(crate) async fn process_historical_messages( filter_matching_unindexed_blocks_from_lake( last_indexed_block, block_height, - &indexer_function.indexer_rule, + &indexer_function, aws_config, chain_id.clone(), ) - .await; + .await?; blocks_from_index.append(&mut blocks_between_indexed_and_current_block); - let first_block_in_data = *blocks_from_index.first().unwrap_or(&start_block); + let first_block_in_index = *blocks_from_index.first().unwrap_or(&start_block); for current_block in blocks_from_index { send_execution_message( block_height, - first_block_in_data, + first_block_in_index, chain_id.clone(), &queue_client, queue_url.clone(), @@ -133,33 +139,24 @@ pub(crate) async fn process_historical_messages( } } } - block_difference + Ok(block_difference) } pub(crate) async fn last_indexed_block_from_metadata( aws_config: &SdkConfig, ) -> anyhow::Result { - let key = format!("{}/{}", INDEXED_DATA_FILES_FOLDER, "latest_block.json"); + let key = format!("{}/{}", INDEXED_ACTIONS_FILES_FOLDER, "latest_block.json"); let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); let s3_client: S3Client = S3Client::from_conf(s3_config); - let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await; + let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await?; let metadata: serde_json::Value = serde_json::from_str(&metadata).unwrap(); let last_indexed_block = metadata["last_indexed_block"].clone(); - let last_indexed_block = last_indexed_block.as_str(); - if last_indexed_block.is_none() { - return Err(anyhow::anyhow!( - "No last_indexed_block found in latest_block.json" - )); - } - let last_indexed_block = last_indexed_block.unwrap(); - let last_indexed_block = from_str(last_indexed_block); - if last_indexed_block.is_err() { - return Err(anyhow::anyhow!( - "last_indexed_block couldn't be converted to u64" - )); - } - let last_indexed_block = last_indexed_block.unwrap(); + let last_indexed_block = last_indexed_block + .as_str() + .context("No last_indexed_block found in latest_block.json")?; + let last_indexed_block = + from_str(last_indexed_block).context("last_indexed_block couldn't be converted to u64")?; tracing::info!( target: crate::INDEXER, "Last indexed block from latest_block.json: {:?}", @@ -170,13 +167,14 @@ pub(crate) async fn last_indexed_block_from_metadata( pub(crate) async fn filter_matching_blocks_from_index_files( start_block_height: BlockHeight, - indexer_rule: &IndexerRule, + indexer_function: &IndexerFunction, aws_config: &SdkConfig, start_date: DateTime, -) -> Vec { +) -> anyhow::Result> { let s3_bucket = INDEXED_DATA_FILES_BUCKET; let mut needs_dedupe_and_sort = false; + let indexer_rule = &indexer_function.indexer_rule; let index_files_content = match &indexer_rule.matching_rule { MatchingRule::ActionAny { @@ -189,38 +187,25 @@ pub(crate) async fn filter_matching_blocks_from_index_files( s3::fetch_contract_index_files( aws_config, s3_bucket, - INDEXED_DATA_FILES_FOLDER, + INDEXED_ACTIONS_FILES_FOLDER, start_date, affected_account_id, ) .await } MatchingRule::ActionFunctionCall { .. } => { - tracing::error!( - target: crate::INDEXER, - "ActionFunctionCall matching rule not supported for historical processing" - ); - return vec![]; - - // if affected_account_id.contains('*') || affected_account_id.contains(',) { - // needs_dedupe_and_sort = true; - // } - // let s3_prefix = format!("{}/{}", INDEXED_DATA_FILES_FOLDER, affected_account_id); - // fetch_contract_index_files(aws_config, s3_bucket, s3_prefix).await - // // todo implement, use function name selector + bail!("ActionFunctionCall matching rule not yet supported for historical processing, function: {:?} {:?}", indexer_function.account_id, indexer_function.function_name); } MatchingRule::Event { .. } => { - tracing::error!( - target: crate::INDEXER, - "Event matching rule not supported for historical processing" - ); - return vec![]; + bail!("Event matching rule not yet supported for historical processing, function {:?} {:?}", indexer_function.account_id, indexer_function.function_name); } - }; + }?; tracing::info!( target: crate::INDEXER, - "Found {file_count} index files matching rule {indexer_rule:?}", + "Found {file_count} index files for function {:?} {:?} with matching rule {indexer_rule:?}", + indexer_function.account_id, + indexer_function.function_name, file_count = index_files_content.len() ); let mut blocks_to_process: Vec = @@ -231,11 +216,13 @@ pub(crate) async fn filter_matching_blocks_from_index_files( } tracing::info!( target: crate::INDEXER, - "Found {block_count} indexed blocks to process.", + "Found {block_count} indexed blocks to process for function {:?} {:?}", + indexer_function.account_id, + indexer_function.function_name, block_count = blocks_to_process.len() ); - blocks_to_process + Ok(blocks_to_process) } fn parse_blocks_from_index_files( @@ -277,106 +264,84 @@ fn parse_blocks_from_index_files( async fn filter_matching_unindexed_blocks_from_lake( last_indexed_block: BlockHeight, ending_block_height: BlockHeight, - indexer_rule: &IndexerRule, + indexer_function: &IndexerFunction, aws_config: &SdkConfig, chain_id: ChainId, -) -> Vec { +) -> anyhow::Result> { let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); let s3_client: S3Client = S3Client::from_conf(s3_config); let lake_bucket = lake_bucket_for_chain(chain_id.clone()); + let indexer_rule = &indexer_function.indexer_rule; let count = ending_block_height - last_indexed_block; if count > MAX_UNINDEXED_BLOCKS_TO_PROCESS { - tracing::error!( - target: crate::INDEXER, - "Too many unindexed blocks to filter: {count}. Last indexed block is {last_indexed_block}.", + bail!( + "Too many unindexed blocks to filter: {count}. Last indexed block is {last_indexed_block} for function {:?} {:?}", + indexer_function.account_id, + indexer_function.function_name, ); - return vec![]; } tracing::info!( target: crate::INDEXER, - "Filtering {count} unindexed blocks from lake: from block {last_indexed_block} to {ending_block_height}", + "Filtering {count} unindexed blocks from lake: from block {last_indexed_block} to {ending_block_height} for function {:?} {:?}", + indexer_function.account_id, + indexer_function.function_name, ); + let mut blocks_to_process: Vec = vec![]; for current_block in (last_indexed_block + 1)..ending_block_height { // fetch block file from S3 let key = format!("{}/block.json", normalize_block_height(current_block)); - let block = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await; + let block = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await?; let block_view = serde_json::from_slice::< near_lake_framework::near_indexer_primitives::views::BlockView, - >(block.as_ref()); + >(block.as_ref()) + .with_context(|| format!("Error parsing block {} from S3", current_block))?; + let mut shards = vec![]; - match block_view { - Ok(block_view) => { - for shard_id in 0..block_view.chunks.len() as u64 { - let key = format!( - "{}/shard_{}.json", - normalize_block_height(current_block), - shard_id - ); - let shard = - s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await; - match serde_json::from_slice::< - near_lake_framework::near_indexer_primitives::IndexerShard, - >(shard.as_ref()) - { - Ok(parsed_shard) => { - shards.push(parsed_shard); - } - Err(e) => { - tracing::error!( - target: crate::INDEXER, - "Error parsing shard: {}", - e.to_string() - ); - // todo this needs better error handling - } - } + for shard_id in 0..block_view.chunks.len() as u64 { + let key = format!( + "{}/shard_{}.json", + normalize_block_height(current_block), + shard_id + ); + let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await?; + match serde_json::from_slice::( + shard.as_ref(), + ) { + Ok(parsed_shard) => { + shards.push(parsed_shard); } - let streamer_message = - near_lake_framework::near_indexer_primitives::StreamerMessage { - block: block_view, - shards, - }; - - // // filter block - let matches = indexer_rules_engine::reduce_indexer_rule_matches_sync( - indexer_rule, - &streamer_message, - chain_id.clone(), - ); - match matches { - Ok(match_list) => { - if !match_list.is_empty() { - blocks_to_process.push(current_block); - } - } - Err(e) => { - tracing::warn!( - target: crate::INDEXER, - "Error matching block {} against S3 file: {:?}", - current_block, - e - ); - } + Err(e) => { + bail!("Error parsing shard: {}", e.to_string()); } } - Err(e) => { - tracing::error!( - target: crate::INDEXER, - "Error parsing block {} from S3: {:?}", - current_block, - e - ); - } + } + + let streamer_message = near_lake_framework::near_indexer_primitives::StreamerMessage { + block: block_view, + shards, + }; + + // filter block + let matches = indexer_rules_engine::reduce_indexer_rule_matches_sync( + indexer_rule, + &streamer_message, + chain_id.clone(), + ); + if !matches.is_empty() { + blocks_to_process.push(current_block); } } + tracing::info!( target: crate::INDEXER, - "Found {block_count} unindexed blocks to process.", + "Found {block_count} unindexed blocks to process for function {:?} {:?}", + indexer_function.account_id, + indexer_function.function_name, block_count = blocks_to_process.len() ); - blocks_to_process + Ok(blocks_to_process) } fn lake_bucket_for_chain(chain_id: ChainId) -> String { @@ -423,27 +388,36 @@ async fn send_execution_message( } } -pub async fn block_to_date(block_height: u64, client: &JsonRpcClient) -> Option> { - let request = RpcBlockRequest { - block_reference: BlockReference::BlockId(BlockId::Height(block_height)), - }; +// if block does not exist, try next block, up to MAX_RPC_BLOCKS_TO_PROCESS (20) blocks +pub async fn lookup_block_date_or_next_block_date( + block_height: u64, + client: &JsonRpcClient, +) -> anyhow::Result> { + let mut current_block_height = block_height; + let mut retry_count = 0; + loop { + let request = RpcBlockRequest { + block_reference: BlockReference::BlockId(BlockId::Height(current_block_height)), + }; - match client.call(request).await { - Ok(response) => { - let header = response.header; - let timestamp_nanosec = header.timestamp_nanosec; - match Utc.timestamp_opt((timestamp_nanosec / 1000000000) as i64, 0) { - LocalResult::Single(date) => Some(date), - LocalResult::Ambiguous(date, _) => Some(date), - LocalResult::None => { - tracing::error!("Unable to get block timestamp"); - None + match client.call(request).await { + Ok(response) => { + let header = response.header; + let timestamp_nanosec = header.timestamp_nanosec; + return match Utc.timestamp_opt((timestamp_nanosec / 1000000000) as i64, 0) { + LocalResult::Single(date) => Ok(date), + LocalResult::Ambiguous(date, _) => Ok(date), + LocalResult::None => Err(anyhow::anyhow!("Unable to get block timestamp")), + }; + } + Err(_) => { + tracing::debug!("RPC failed to get block: {:?}", current_block_height); + retry_count += 1; + if retry_count > MAX_RPC_BLOCKS_TO_PROCESS { + return Err(anyhow::anyhow!("Unable to get block")); } + current_block_height += 1; } } - Err(err) => { - tracing::error!("Unable to get block: {:?}", err); - None - } } } diff --git a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs index 3457974e7..c2ca90a3a 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs @@ -80,12 +80,13 @@ mod tests { historical_block_processing::last_indexed_block_from_metadata(aws_config) .await .unwrap(); - historical_block_processing::process_historical_messages( + let result = historical_block_processing::process_historical_messages( fake_block_height + 1, indexer_function, opts, ) .await; + assert!(result.unwrap() > 0); } /// Parses env vars from .env, Run with @@ -103,6 +104,15 @@ mod tests { id: None, name: None, }; + let indexer_function = IndexerFunction { + account_id: "buildnear.testnet".to_string().parse().unwrap(), + function_name: "index_stuff".to_string(), + code: "".to_string(), + start_block_height: Some(85376002), + schema: None, + provisioned: false, + indexer_rule: filter_rule, + }; let opts = Opts::test_opts_with_aws(); let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); @@ -115,15 +125,25 @@ mod tests { let datetime_utc = DateTime::::from_utc(naivedatetime_utc, Utc); let blocks = filter_matching_blocks_from_index_files( start_block_height, - &filter_rule, + &indexer_function, aws_config, datetime_utc, ) .await; - // // remove any blocks from after when the test was written -- not working, due to new contracts? - // let fixed_blocks: Vec = blocks.into_iter().filter(|&b| b <= 95175853u64).collect(); // 95175853u64 95242647u64 - assert!(blocks.len() >= 71899); + match blocks { + Ok(blocks) => { + // remove any blocks from after when the test was written + let fixed_blocks: Vec = + blocks.into_iter().filter(|&b| b <= 95175853u64).collect(); + println!("Found {} blocks", fixed_blocks.len()); + assert!(fixed_blocks.len() >= 71899); + } + Err(e) => { + println!("Error: {:?}", e); + assert!(false); + } + } } /// Parses env vars from .env, Run with @@ -141,6 +161,15 @@ mod tests { id: None, name: None, }; + let indexer_function = IndexerFunction { + account_id: "buildnear.testnet".to_string().parse().unwrap(), + function_name: "index_stuff".to_string(), + code: "".to_string(), + start_block_height: Some(85376002), + schema: None, + provisioned: false, + indexer_rule: filter_rule, + }; let opts = Opts::test_opts_with_aws(); let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); @@ -153,15 +182,16 @@ mod tests { let datetime_utc = DateTime::::from_utc(naivedatetime_utc, Utc); let blocks = filter_matching_blocks_from_index_files( start_block_height, - &filter_rule, + &indexer_function, aws_config, datetime_utc, ) .await; + let blocks = blocks.unwrap(); // remove any blocks from after when the test was written let fixed_blocks: Vec = blocks.into_iter().filter(|&b| b <= 95175853u64).collect(); - assert_eq!(fixed_blocks.len(), 6); // hackathon.agency.near = 45894627,45898423, hacker.agency.near = 45897358, hack.agency.near = 45894872,45895120,45896237 + assert_eq!(fixed_blocks.len(), 197); // hackathon.agency.near = 45894627,45898423, hacker.agency.near = 45897358, hack.agency.near = 45894872,45895120,45896237 } } diff --git a/indexer/queryapi_coordinator/src/indexer_registry.rs b/indexer/queryapi_coordinator/src/indexer_registry.rs index 987a59cb1..3ab2ec060 100644 --- a/indexer/queryapi_coordinator/src/indexer_registry.rs +++ b/indexer/queryapi_coordinator/src/indexer_registry.rs @@ -138,12 +138,14 @@ fn index_and_process_register_calls( .entry(new_indexer_function.account_id.clone()) .or_default(); - match fns.get(new_indexer_function.function_name.as_str()) { + let functions = fns.get(new_indexer_function.function_name.as_str()); + match functions { // if there is no existing function then we will insert the new one with the default state of provisioned = false None => { tracing::info!( target: crate::INDEXER, - "indexed creation call to {registry_method_name}: {:?} {:?}", + "Block {}. Indexed creation call to {registry_method_name}: {:?} {:?}", + block_height, new_indexer_function.account_id.clone(), new_indexer_function.function_name.clone() ); @@ -153,7 +155,8 @@ fn index_and_process_register_calls( Some(old_indexer_function) => { tracing::info!( target: crate::INDEXER, - "indexed update call to {registry_method_name}: {:?} {:?}", + "Block {}. Indexed update call to {registry_method_name}: {:?} {:?}", + block_height, new_indexer_function.account_id.clone(), new_indexer_function.function_name.clone(), ); diff --git a/indexer/queryapi_coordinator/src/indexer_types.rs b/indexer/queryapi_coordinator/src/indexer_types.rs index 4a4993ce5..f3880320b 100644 --- a/indexer/queryapi_coordinator/src/indexer_types.rs +++ b/indexer/queryapi_coordinator/src/indexer_types.rs @@ -40,3 +40,9 @@ pub struct IndexerFunction { pub provisioned: bool, pub indexer_rule: IndexerRule, } + +impl IndexerFunction { + pub fn get_full_name(&self) -> String { + format!("{}/{}", self.account_id, self.function_name) + } +} diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index e447f3316..96cb059d5 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -10,7 +10,7 @@ use near_lake_framework::near_indexer_primitives::{types, StreamerMessage}; use crate::indexer_types::IndexerFunction; use indexer_types::{IndexerQueueMessage, IndexerRegistry}; use opts::{Opts, Parser}; -use storage::ConnectionManager; +use storage::{self, ConnectionManager}; pub(crate) mod cache; mod historical_block_processing; @@ -111,7 +111,11 @@ async fn main() -> anyhow::Result<()> { }) .buffer_unordered(1usize); - while let Some(_handle_message) = handlers.next().await {} + while let Some(handle_message) = handlers.next().await { + if let Err(err) = handle_message { + tracing::error!(target: INDEXER, "{:#?}", err); + } + } drop(handlers); // close the channel so the sender will stop // propagate errors from the sender @@ -195,6 +199,20 @@ async fn handle_streamer_message( if !indexer_function.provisioned { set_provisioned_flag(&mut indexer_registry_locked, &indexer_function); } + + storage::set( + context.redis_connection_manager, + &format!("{}:storage", indexer_function.get_full_name()), + serde_json::to_string(indexer_function)?, + ) + .await?; + + storage::add_to_registered_stream( + context.redis_connection_manager, + &format!("{}:stream", indexer_function.get_full_name()), + &[("block_height", block_height)], + ) + .await?; } stream::iter(indexer_function_messages.into_iter()) @@ -252,11 +270,17 @@ fn set_provisioned_flag( indexer_function.provisioned = true; } None => { + let keys = account_functions + .keys() + .map(|s| &**s) + .collect::>() + .join(", "); tracing::error!( target: INDEXER, - "Unable to set provisioning status, Indexer function with account_id {} and function_name {} not found in registry", + "Unable to set provisioning status, Indexer function with account_id {} and function_name {} not found in registry. Functions for this account are: {}", indexer_function.account_id, - indexer_function.function_name + indexer_function.function_name, + keys ); } } diff --git a/indexer/queryapi_coordinator/src/s3.rs b/indexer/queryapi_coordinator/src/s3.rs index f5e63e753..6c29a06c6 100644 --- a/indexer/queryapi_coordinator/src/s3.rs +++ b/indexer/queryapi_coordinator/src/s3.rs @@ -1,80 +1,87 @@ +use anyhow::{bail, Context, Result}; use aws_sdk_s3::Client as S3Client; use aws_sdk_s3::Config; use aws_types::SdkConfig; use chrono::{DateTime, NaiveDate, Utc}; -use futures::future::join_all; -use regex::Regex; +use futures::future::try_join_all; // Sanity check, if we hit this we have 1M S3 results. // Currently that would be either 2,700 years of FunctionCall data or 1M contract folders. +// If we hit 1M contracts we should build an index to support efficient wildcard contract matching. const MAX_S3_LIST_REQUESTS: usize = 1000; +fn storage_path_for_account(account: &str) -> String { + let mut folders = account.split('.').collect::>(); + folders.reverse(); + folders.join("/") +} + pub async fn find_index_files_by_pattern( aws_config: &SdkConfig, s3_bucket: &str, s3_folder: &str, pattern: &str, -) -> Vec { - match pattern { +) -> Result> { + Ok(match pattern { x if x.contains(',') => { - let contract_array = x.split(','); + let account_array = x.split(','); let mut results = vec![]; - for contract in contract_array { - let contract = contract.trim(); - let sub_results = if contract.contains('*') { - list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &contract).await + for account in account_array { + let account = account.trim(); + let sub_results = if account.contains('*') { + list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &account).await? } else { list_s3_bucket_by_prefix( aws_config, s3_bucket, - &format!("{}/{}/", s3_folder, contract), + &format!("{}/{}/", s3_folder, storage_path_for_account(account)), ) - .await + .await? }; results.extend(sub_results); } results } x if x.contains('*') => { - list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &x).await + list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &x).await? } _ => { list_s3_bucket_by_prefix( aws_config, s3_bucket, - &format!("{}/{}/", s3_folder, pattern), + &format!("{}/{}/", s3_folder, storage_path_for_account(pattern),), ) - .await + .await? } - } - - // todo will need to dedupe and sort the block output now + }) } async fn list_index_files_by_wildcard( aws_config: &SdkConfig, s3_bucket: &str, s3_folder: &str, - x: &&str, -) -> Vec { - // fetch all folders and filter by regex - let folders = list_s3_bucket_by_prefix(aws_config, s3_bucket, &format!("{}/", s3_folder)).await; - let regex_string = &x.replace('.', "\\.").replace('*', ".*"); - let re = Regex::new(regex_string).unwrap(); - let matching_folders = folders.into_iter().filter(|folder| re.is_match(folder)); + pattern: &&str, +) -> Result> { + // remove sub-account wildcard from pattern + let pattern = pattern.replace("*.", ""); + let path = storage_path_for_account(&pattern); + + let folders = + list_s3_bucket_by_prefix(aws_config, s3_bucket, &format!("{}/{}/", s3_folder, path)) + .await?; // for each matching folder list files let mut results = vec![]; - for folder in matching_folders { - results.extend(list_s3_bucket_by_prefix(aws_config, s3_bucket, &folder).await); + for folder in folders { + results.extend(list_s3_bucket_by_prefix(aws_config, s3_bucket, &folder).await?); } - results + Ok(results) } async fn list_s3_bucket_by_prefix( aws_config: &SdkConfig, s3_bucket: &str, s3_prefix: &str, -) -> Vec { +) -> Result> { let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); let s3_client: S3Client = S3Client::from_conf(s3_config); @@ -93,37 +100,29 @@ async fn list_s3_bucket_by_prefix( configured_client = configured_client.continuation_token(continuation_token.unwrap()); } - match configured_client.send().await { - Ok(file_list) => { - if let Some(common_prefixes) = file_list.common_prefixes { - let keys: Vec = common_prefixes - .into_iter() - .map(|o| o.prefix.unwrap()) - .collect(); - results.extend(keys); - } - if let Some(objects) = file_list.contents { - let keys: Vec = objects.into_iter().map(|o| o.key.unwrap()).collect(); - results.extend(keys); - } - if file_list.next_continuation_token.is_some() { - continuation_token = file_list.next_continuation_token; - counter += 1; - if counter > MAX_S3_LIST_REQUESTS { - tracing::error!("Exceeded internal limit of {MAX_S3_LIST_REQUESTS}"); - break; - } - } else { - break; - } - } - Err(e) => { - tracing::error!("Error listing index files: {:?}", e); - break; + let file_list = configured_client.send().await?; + if let Some(common_prefixes) = file_list.common_prefixes { + let keys: Vec = common_prefixes + .into_iter() + .map(|o| o.prefix.unwrap()) + .collect(); + results.extend(keys); + } + if let Some(objects) = file_list.contents { + let keys: Vec = objects.into_iter().map(|o| o.key.unwrap()).collect(); + results.extend(keys); + } + if file_list.next_continuation_token.is_some() { + continuation_token = file_list.next_continuation_token; + counter += 1; + if counter > MAX_S3_LIST_REQUESTS { + bail!("Exceeded internal limit of {MAX_S3_LIST_REQUESTS}") } - }; + } else { + break; + } } - results + Ok(results) } pub async fn fetch_contract_index_files( @@ -132,13 +131,13 @@ pub async fn fetch_contract_index_files( s3_folder: &str, start_date: DateTime, contract_pattern: &str, -) -> Vec { +) -> Result> { let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); let s3_client: S3Client = S3Client::from_conf(s3_config); // list all index files let file_list = - find_index_files_by_pattern(aws_config, s3_bucket, s3_folder, contract_pattern).await; + find_index_files_by_pattern(aws_config, s3_bucket, s3_folder, contract_pattern).await?; let fetch_and_parse_tasks = file_list .into_iter() @@ -153,57 +152,34 @@ pub async fn fetch_contract_index_files( .collect::>(); // Execute all tasks in parallel and wait for completion - let file_contents: Vec = join_all(fetch_and_parse_tasks).await; - file_contents + let file_contents: Vec = try_join_all(fetch_and_parse_tasks).await?; + Ok(file_contents .into_iter() .filter(|file_contents| !file_contents.is_empty()) - .collect::>() + .collect::>()) } -pub async fn fetch_text_file_from_s3(s3_bucket: &str, key: String, s3_client: S3Client) -> String { +pub async fn fetch_text_file_from_s3( + s3_bucket: &str, + key: String, + s3_client: S3Client, +) -> Result { + // todo: can we retry if this fails like the lake s3_fetcher fn does? + // If so, can we differentiate between a file not existing (block height does not exist) and a network error? let get_object_output = s3_client .get_object() .bucket(s3_bucket) .key(key.clone()) .send() - .await; + .await + .with_context(|| format!("Error fetching index file {key}"))?; - match get_object_output { - Ok(object_output) => { - let bytes = object_output.body.collect().await; - match bytes { - Ok(bytes) => { - let file_contents = String::from_utf8(bytes.to_vec()); - match file_contents { - Ok(file_contents) => { - tracing::debug!( - target: crate::INDEXER, - "Fetched S3 file {}", - key.clone(), - ); - file_contents - } - Err(e) => { - tracing::error!( - target: crate::INDEXER, - "Error parsing index file: {:?}", - e - ); - "".to_string() - } - } - } - Err(e) => { - tracing::error!(target: crate::INDEXER, "Error fetching index file: {:?}", e); - "".to_string() - } - } - } - Err(e) => { - tracing::error!(target: crate::INDEXER, "Error fetching index file: {:?}", e); - "".to_string() - } - } + let bytes = get_object_output + .body + .collect() + .await + .with_context(|| format!("Error reading bytes of index file {key}"))?; + String::from_utf8(bytes.to_vec()).with_context(|| format!("Error parsing index file {key}")) } /// check whether the filename is a date after the start date @@ -214,7 +190,8 @@ fn file_name_date_after(start_date: DateTime, file_name: &str) -> bool { match file_name_date { Ok(file_name_date) => file_name_date >= start_date.date_naive(), Err(e) => { - tracing::error!( + // if we can't parse the date assume a file this code is not meant to handle + tracing::debug!( target: crate::INDEXER, "Error parsing file name date: {:?}", e @@ -226,86 +203,106 @@ fn file_name_date_after(start_date: DateTime, file_name: &str) -> bool { #[cfg(test)] mod tests { - use crate::opts::{Opts, Parser}; + use crate::historical_block_processing::INDEXED_ACTIONS_FILES_FOLDER; + use crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET; + use crate::opts::Opts; use crate::s3::{find_index_files_by_pattern, list_s3_bucket_by_prefix}; - use aws_types::SdkConfig; /// Parses env vars from .env, Run with /// cargo test s3::tests::list_delta_bucket -- mainnet from-latest; #[tokio::test] async fn list_delta_bucket() { - let opts = Opts::parse(); - let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); + let opts = Opts::test_opts_with_aws(); let list = list_s3_bucket_by_prefix( - aws_config, - crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET, - &format!( - "{}/", - crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER.to_string() - ), + &opts.lake_aws_sdk_config(), + INDEXED_DATA_FILES_BUCKET, + &format!("{}/", INDEXED_ACTIONS_FILES_FOLDER.to_string()), ) - .await; - assert!(list.len() > 35000); + .await + .unwrap(); + assert_eq!(list.len(), 4); } /// cargo test s3::tests::list_with_single_contract -- mainnet from-latest #[tokio::test] async fn list_with_single_contract() { - let opts = Opts::parse(); + let opts = Opts::test_opts_with_aws(); let list = find_index_files_by_pattern( &opts.lake_aws_sdk_config(), - crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET, - crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER, + INDEXED_DATA_FILES_BUCKET, + INDEXED_ACTIONS_FILES_FOLDER, "hackathon.agency.near", ) - .await; + .await + .unwrap(); assert_eq!(list.len(), 1); } /// cargo test s3::tests::list_with_csv_contracts -- mainnet from-latest #[tokio::test] async fn list_with_csv_contracts() { - let opts = Opts::parse(); + let opts = Opts::test_opts_with_aws(); let list = find_index_files_by_pattern( &opts.lake_aws_sdk_config(), - crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET, - crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER, + INDEXED_DATA_FILES_BUCKET, + INDEXED_ACTIONS_FILES_FOLDER, "hackathon.agency.near, hackathon.aurora-silo-dev.near, hackathon.sputnik-dao.near", ) - .await; - assert!(list.len() >= 13); // expecting 13 but these contracts could get randomly called sometime + .await + .unwrap(); + assert!(list.len() >= 15); // expecting 15 but these contracts could get randomly called sometime } /// cargo test s3::tests::list_with_wildcard_contracts -- mainnet from-latest #[tokio::test] async fn list_with_wildcard_contracts() { - let opts = Opts::parse(); + let opts = Opts::test_opts_with_aws(); let list = find_index_files_by_pattern( &opts.lake_aws_sdk_config(), - crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET, - crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER, + INDEXED_DATA_FILES_BUCKET, + INDEXED_ACTIONS_FILES_FOLDER, "*.keypom.near", ) - .await; + .await + .unwrap(); assert!(list.len() >= 550); } /// cargo test s3::tests::list_with_csv_and_wildcard_contracts -- mainnet from-latest #[tokio::test] async fn list_with_csv_and_wildcard_contracts() { - let opts = Opts::parse(); + let opts = Opts::test_opts_with_aws(); let list = find_index_files_by_pattern( &opts.lake_aws_sdk_config(), - crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET, - crate::historical_block_processing::INDEXED_DATA_FILES_FOLDER, + INDEXED_DATA_FILES_BUCKET, + INDEXED_ACTIONS_FILES_FOLDER, "*.keypom.near, hackathon.agency.near, *.nearcrowd.near", ) - .await; + .await + .unwrap(); assert!(list.len() > 1370); } + + #[test] + fn storage_path_for_account_splits_and_reverses_into_folders() { + let account = "buildnear.testnet"; + let expected = "testnet/buildnear"; + let actual = super::storage_path_for_account(account); + assert_eq!(expected, actual); + + let account = "v2.keypom.near"; + let expected = "near/keypom/v2"; + let actual = super::storage_path_for_account(account); + assert_eq!(expected, actual); + + let account = "0.app5.hipodev.near"; + let expected = "near/hipodev/app5/0"; + let actual = super::storage_path_for_account(account); + assert_eq!(expected, actual); + } } diff --git a/indexer/storage/src/lib.rs b/indexer/storage/src/lib.rs index 59e8c8b04..98c88a297 100644 --- a/indexer/storage/src/lib.rs +++ b/indexer/storage/src/lib.rs @@ -2,6 +2,8 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs}; const STORAGE: &str = "storage_alertexer"; +const STREAMS_SET_KEY: &str = "streams"; + pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client { redis::Client::open(redis_connection_str).expect("can create redis client") } @@ -50,6 +52,61 @@ pub async fn get( tracing::debug!(target: STORAGE, "GET: {:?}: {:?}", &key, &value,); Ok(value) } + +async fn sadd( + redis_connection_manager: &ConnectionManager, + key: impl ToRedisArgs + std::fmt::Debug, + value: impl ToRedisArgs + std::fmt::Debug, +) -> anyhow::Result<()> { + tracing::debug!(target: STORAGE, "SADD: {:?}: {:?}", key, value); + + redis::cmd("SADD") + .arg(key) + .arg(value) + .query_async(&mut redis_connection_manager.clone()) + .await?; + + Ok(()) +} + +async fn xadd( + redis_connection_manager: &ConnectionManager, + stream_key: &str, + fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)], +) -> anyhow::Result<()> { + tracing::debug!(target: STORAGE, "XADD: {}, {:?}", stream_key, fields); + + // TODO: Remove stream cap when we finally start processing it + redis::cmd("XTRIM") + .arg(stream_key) + .arg("MAXLEN") + .arg(100) + .query_async(&mut redis_connection_manager.clone()) + .await?; + + let mut cmd = redis::cmd("XADD"); + cmd.arg(stream_key).arg("*"); + + for (field, value) in fields { + cmd.arg(*field).arg(value); + } + + cmd.query_async(&mut redis_connection_manager.clone()) + .await?; + + Ok(()) +} + +pub async fn add_to_registered_stream( + redis_connection_manager: &ConnectionManager, + key: &str, + fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)], +) -> anyhow::Result<()> { + sadd(redis_connection_manager, STREAMS_SET_KEY, key).await?; + xadd(redis_connection_manager, key, fields).await?; + + Ok(()) +} /// Sets the key `receipt_id: &str` with value `transaction_hash: &str` to the Redis storage. /// Increments the counter `receipts_{transaction_hash}` by one. /// The counter holds how many Receipts related to the Transaction are in watching list