From 6e3d0a68d75f7435f3067269c550662c795ba07f Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Thu, 9 Nov 2023 16:30:16 +1300 Subject: [PATCH 01/10] chore: Log on historical backfill completion --- .../src/historical_block_processing.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 82118aea3..d7d751b0a 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -64,7 +64,13 @@ impl Streamer { &s3_client, &chain_id, &json_rpc_client, - ) => { } + ) => { + tracing::info!( + target: crate::INDEXER, + "Finished historical backfill for indexer: {:?}", + indexer.get_full_name(), + ); + } } }); From 58a34f735bcd7cf8b40dab34c5b5f341449ea849 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Thu, 9 Nov 2023 16:31:15 +1300 Subject: [PATCH 02/10] feat: Reset Redis state prior to historical backfill --- .../src/historical_block_processing.rs | 40 +++++++++---------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index d7d751b0a..216a3c813 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -155,6 +155,25 @@ pub(crate) async fn process_historical_messages( indexer_function.function_name ); + storage::del( + redis_connection_manager, + storage::generate_historical_stream_key(&indexer_function.get_full_name()), + ) + .await?; + storage::sadd( + redis_connection_manager, + storage::STREAMS_SET_KEY, + storage::generate_historical_stream_key(&indexer_function.get_full_name()), + ) + .await?; + storage::set( + redis_connection_manager, + storage::generate_historical_storage_key(&indexer_function.get_full_name()), + serde_json::to_string(&indexer_function)?, + None, + ) + .await?; + let start_date = lookup_block_date_or_next_block_date(start_block, json_rpc_client).await?; @@ -188,27 +207,6 @@ pub(crate) async fn process_historical_messages( blocks_from_index.append(&mut blocks_between_indexed_and_current_block); - if !blocks_from_index.is_empty() { - storage::del( - redis_connection_manager, - storage::generate_historical_stream_key(&indexer_function.get_full_name()), - ) - .await?; - storage::sadd( - redis_connection_manager, - storage::STREAMS_SET_KEY, - storage::generate_historical_stream_key(&indexer_function.get_full_name()), - ) - .await?; - storage::set( - redis_connection_manager, - storage::generate_historical_storage_key(&indexer_function.get_full_name()), - serde_json::to_string(&indexer_function)?, - None, - ) - .await?; - } - for current_block in blocks_from_index { storage::xadd( redis_connection_manager, From fdd08ed1b3fe7eecf8cbb3a8bf3ea49ed5b59e76 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Thu, 9 Nov 2023 16:33:17 +1300 Subject: [PATCH 03/10] feat: Flush blocks from index before continuing historical backfill --- .../src/historical_block_processing.rs | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 216a3c813..8511ed2f6 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -179,7 +179,7 @@ pub(crate) async fn process_historical_messages( let last_indexed_block = last_indexed_block_from_metadata(s3_client).await?; - let mut blocks_from_index = filter_matching_blocks_from_index_files( + let blocks_from_index = filter_matching_blocks_from_index_files( start_block, &indexer_function, s3_client, @@ -187,6 +187,22 @@ pub(crate) async fn process_historical_messages( ) .await?; + tracing::info!( + target: crate::INDEXER, + "Flushing {} blocks to historical Stream for indexer: {}", + blocks_from_index.len(), + indexer_function.get_full_name(), + ); + + for block in &blocks_from_index { + storage::xadd( + redis_connection_manager, + storage::generate_historical_stream_key(&indexer_function.get_full_name()), + &[("block_height", block)], + ) + .await?; + } + // Check for the case where an index file is written right after we get the last_indexed_block metadata let last_block_in_data = blocks_from_index.last().unwrap_or(&start_block); let last_indexed_block = if last_block_in_data > &last_indexed_block { @@ -195,7 +211,7 @@ pub(crate) async fn process_historical_messages( last_indexed_block }; - let mut blocks_between_indexed_and_current_block: Vec = + let blocks_between_indexed_and_current_block: Vec = filter_matching_unindexed_blocks_from_lake( last_indexed_block, current_block_height, @@ -205,13 +221,11 @@ pub(crate) async fn process_historical_messages( ) .await?; - blocks_from_index.append(&mut blocks_between_indexed_and_current_block); - - for current_block in blocks_from_index { + for block in blocks_between_indexed_and_current_block { storage::xadd( redis_connection_manager, storage::generate_historical_stream_key(&indexer_function.get_full_name()), - &[("block_height", current_block)], + &[("block_height", block)], ) .await?; } From 882da056a59be3c4b008c72484920a55568f6974 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Fri, 10 Nov 2023 10:32:58 +1300 Subject: [PATCH 04/10] refactor: Configure AWS/Lake through environment --- indexer/Cargo.lock | 1 + indexer/queryapi_coordinator/Cargo.toml | 1 + ...ical_block_processing_integration_tests.rs | 31 +++++-------- indexer/queryapi_coordinator/src/main.rs | 5 +-- indexer/queryapi_coordinator/src/opts.rs | 33 +++----------- indexer/queryapi_coordinator/src/s3.rs | 43 ++++++------------- 6 files changed, 36 insertions(+), 78 deletions(-) diff --git a/indexer/Cargo.lock b/indexer/Cargo.lock index ee1cdfeb4..5a9baf7e9 100644 --- a/indexer/Cargo.lock +++ b/indexer/Cargo.lock @@ -3324,6 +3324,7 @@ version = "0.1.0" dependencies = [ "actix-web", "anyhow", + "aws-config", "aws-credential-types", "aws-sdk-s3", "aws-types", diff --git a/indexer/queryapi_coordinator/Cargo.toml b/indexer/queryapi_coordinator/Cargo.toml index b606c1b6a..d40f4822f 100644 --- a/indexer/queryapi_coordinator/Cargo.toml +++ b/indexer/queryapi_coordinator/Cargo.toml @@ -7,6 +7,7 @@ authors = ["Near Inc "] [dependencies] anyhow = "1.0.57" actix-web = "=4.0.1" +aws-config = "0.53.0" borsh = "0.10.2" cached = "0.23.0" chrono = "0.4.25" diff --git a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs index 86c281bb5..acd115e37 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs @@ -4,7 +4,6 @@ mod tests { use crate::indexer_types::IndexerFunction; use crate::opts::{ChainId, Opts, StartOptions}; use crate::{historical_block_processing, opts}; - use aws_types::SdkConfig; use chrono::{DateTime, NaiveDate, Utc}; use indexer_rule_type::indexer_rule::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; use near_lake_framework::near_indexer_primitives::types::BlockHeight; @@ -14,12 +13,11 @@ mod tests { impl Opts { pub fn test_opts_with_aws() -> Self { dotenv::dotenv().ok(); - let lake_aws_access_key = env::var("LAKE_AWS_ACCESS_KEY").unwrap(); - let lake_aws_secret_access_key = env::var("LAKE_AWS_SECRET_ACCESS_KEY").unwrap(); Opts { + aws_access_key_id: env::var("AWS_ACCESS_KEY_ID").unwrap(), + aws_secret_access_key: env::var("AWS_SECRET_ACCESS_KEY").unwrap(), + aws_region: "eu-central-1".to_string(), redis_connection_string: env::var("REDIS_CONNECTION_STRING").unwrap(), - lake_aws_access_key, - lake_aws_secret_access_key, registry_contract_id: "".to_string(), port: 0, chain_id: ChainId::Mainnet(StartOptions::FromLatest), @@ -31,10 +29,8 @@ mod tests { /// cargo test historical_block_processing_integration_tests::test_indexing_metadata_file; #[tokio::test] async fn test_indexing_metadata_file() { - let opts = Opts::test_opts_with_aws(); - let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let last_indexed_block = historical_block_processing::last_indexed_block_from_metadata(&s3_client) @@ -73,9 +69,8 @@ mod tests { let opts = Opts::test_opts_with_aws(); - let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let redis_connection_manager = storage::connect(&opts.redis_connection_string) .await @@ -124,10 +119,8 @@ mod tests { indexer_rule: filter_rule, }; - let opts = Opts::test_opts_with_aws(); - let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let start_block_height = 77016214; let naivedatetime_utc = NaiveDate::from_ymd_opt(2022, 10, 3) @@ -183,10 +176,8 @@ mod tests { indexer_rule: filter_rule, }; - let opts = Opts::test_opts_with_aws(); - let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let start_block_height = 45894620; let naivedatetime_utc = NaiveDate::from_ymd_opt(2021, 8, 1) diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index fe88af9c7..0ac7aa37e 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -51,9 +51,8 @@ async fn main() -> anyhow::Result<()> { let chain_id = &opts.chain_id(); let registry_contract_id = opts.registry_contract_id.clone(); - let aws_config = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); tracing::info!(target: INDEXER, "Connecting to redis..."); let redis_connection_manager = storage::connect(&opts.redis_connection_string).await?; diff --git a/indexer/queryapi_coordinator/src/opts.rs b/indexer/queryapi_coordinator/src/opts.rs index b99d580c8..83fb4f862 100644 --- a/indexer/queryapi_coordinator/src/opts.rs +++ b/indexer/queryapi_coordinator/src/opts.rs @@ -22,10 +22,13 @@ pub struct Opts { pub redis_connection_string: String, /// AWS Access Key with the rights to read from AWS S3 #[clap(long, env)] - pub lake_aws_access_key: String, - #[clap(long, env)] + pub aws_access_key_id: String, /// AWS Secret Access Key with the rights to read from AWS S3 - pub lake_aws_secret_access_key: String, + #[clap(long, env)] + pub aws_secret_access_key: String, + /// AWS Region to use for S3 + #[clap(long, env, default_value = "eu-central-1")] + pub aws_region: String, /// Registry contract to use #[clap(env)] pub registry_contract_id: String, @@ -72,26 +75,6 @@ impl Opts { } } - // Creates AWS Credentials for NEAR Lake - pub fn lake_credentials(&self) -> aws_credential_types::provider::SharedCredentialsProvider { - let provider = aws_credential_types::Credentials::new( - self.lake_aws_access_key.clone(), - self.lake_aws_secret_access_key.clone(), - None, - None, - "queryapi_coordinator_lake", - ); - aws_credential_types::provider::SharedCredentialsProvider::new(provider) - } - - /// Creates AWS Shared Config for NEAR Lake - pub fn lake_aws_sdk_config(&self) -> aws_types::sdk_config::SdkConfig { - aws_types::sdk_config::SdkConfig::builder() - .credentials_provider(self.lake_credentials()) - .region(aws_types::region::Region::new("eu-central-1")) - .build() - } - pub fn rpc_url(&self) -> &str { // To query metadata (timestamp) about blocks more than 5 epochs old we need an archival node match self.chain_id { @@ -103,9 +86,7 @@ impl Opts { impl Opts { pub async fn to_lake_config(&self) -> near_lake_framework::LakeConfig { - let s3_config = aws_sdk_s3::config::Builder::from(&self.lake_aws_sdk_config()).build(); - - let config_builder = near_lake_framework::LakeConfigBuilder::default().s3_config(s3_config); + let config_builder = near_lake_framework::LakeConfigBuilder::default(); match &self.chain_id { ChainId::Mainnet(_) => config_builder diff --git a/indexer/queryapi_coordinator/src/s3.rs b/indexer/queryapi_coordinator/src/s3.rs index 9b7e8696b..30bd7e52e 100644 --- a/indexer/queryapi_coordinator/src/s3.rs +++ b/indexer/queryapi_coordinator/src/s3.rs @@ -194,22 +194,18 @@ fn file_name_date_after(start_date: DateTime, file_name: &str) -> bool { #[cfg(test)] mod tests { + use crate::historical_block_processing::INDEXED_ACTIONS_FILES_FOLDER; use crate::historical_block_processing::INDEXED_DATA_FILES_BUCKET; - use crate::historical_block_processing::{INDEXED_ACTIONS_FILES_FOLDER, LAKE_BUCKET_PREFIX}; - use crate::opts::Opts; use crate::s3::{ fetch_text_file_from_s3, find_index_files_by_pattern, list_s3_bucket_by_prefix, }; - use aws_sdk_s3::{Client as S3Client, Config}; /// Parses env vars from .env, Run with /// cargo test s3::tests::list_delta_bucket -- mainnet from-latest; #[tokio::test] async fn list_delta_bucket() { - let opts = Opts::test_opts_with_aws(); - let aws_config = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let list = list_s3_bucket_by_prefix( &s3_client, @@ -224,10 +220,8 @@ mod tests { /// cargo test s3::tests::list_with_single_contract -- mainnet from-latest #[tokio::test] async fn list_with_single_contract() { - let opts = Opts::test_opts_with_aws(); - let aws_config = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let list = find_index_files_by_pattern( &s3_client, @@ -243,10 +237,8 @@ mod tests { /// cargo test s3::tests::list_with_csv_contracts -- mainnet from-latest #[tokio::test] async fn list_with_csv_contracts() { - let opts = Opts::test_opts_with_aws(); - let aws_config = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let list = find_index_files_by_pattern( &s3_client, @@ -262,10 +254,8 @@ mod tests { /// cargo test s3::tests::list_with_wildcard_contracts -- mainnet from-latest #[tokio::test] async fn list_with_wildcard_contracts() { - let opts = Opts::test_opts_with_aws(); - let aws_config = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let list = find_index_files_by_pattern( &s3_client, @@ -281,10 +271,8 @@ mod tests { /// cargo test s3::tests::list_with_csv_and_wildcard_contracts -- mainnet from-latest #[tokio::test] async fn list_with_csv_and_wildcard_contracts() { - let opts = Opts::test_opts_with_aws(); - let aws_config = &opts.lake_aws_sdk_config(); - let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let list = find_index_files_by_pattern( &s3_client, @@ -319,14 +307,11 @@ mod tests { async fn handle_key_404() { let mut success = false; - let opts = Opts::test_opts_with_aws(); - let s3_config: Config = - aws_sdk_s3::config::Builder::from(&opts.lake_aws_sdk_config()).build(); - - let s3_client: S3Client = S3Client::from_conf(s3_config); + let aws_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&aws_config); let s3_result = fetch_text_file_from_s3( - format!("{}{}", LAKE_BUCKET_PREFIX, "mainnet").as_str(), + "near-lake-data-mainnet", "does_not_exist/block.json".to_string(), &s3_client, ) From 745ff21198c682ef066601d650c2722630484f6c Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Fri, 10 Nov 2023 10:50:19 +1300 Subject: [PATCH 05/10] chore: Update historical backfill logs --- .../src/historical_block_processing.rs | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 8511ed2f6..0be5238f8 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -53,7 +53,7 @@ impl Streamer { _ = cancellation_token_clone.cancelled() => { tracing::info!( target: crate::INDEXER, - "Cancelling existing historical backfill for indexer: {:?}", + "Cancelling existing historical backfill for indexer: {}", indexer.get_full_name(), ); }, @@ -67,7 +67,7 @@ impl Streamer { ) => { tracing::info!( target: crate::INDEXER, - "Finished historical backfill for indexer: {:?}", + "Finished historical backfill for indexer: {}", indexer.get_full_name(), ); } @@ -138,21 +138,22 @@ pub(crate) async fn process_historical_messages( let block_difference: i64 = (current_block_height - start_block) as i64; match block_difference { i64::MIN..=-1 => { - bail!("Skipping back fill, start_block_height is greater than current block height: {:?} {:?}", - indexer_function.account_id, - indexer_function.function_name); + bail!( + "Skipping back fill, start_block_height is greater than current block height: {}", + indexer_function.get_full_name(), + ); } 0 => { - bail!("Skipping back fill, start_block_height is equal to current block height: {:?} {:?}", - indexer_function.account_id, - indexer_function.function_name); + bail!( + "Skipping back fill, start_block_height is equal to current block height: {}", + indexer_function.get_full_name(), + ); } 1..=i64::MAX => { tracing::info!( target: crate::INDEXER, - "Back filling {block_difference} blocks from {start_block} to current block height {current_block_height}: {:?} {:?}", - indexer_function.account_id, - indexer_function.function_name + "Back filling {block_difference} blocks from {start_block} to current block height {current_block_height}: {}", + indexer_function.get_full_name(), ); storage::del( @@ -189,7 +190,7 @@ pub(crate) async fn process_historical_messages( tracing::info!( target: crate::INDEXER, - "Flushing {} blocks to historical Stream for indexer: {}", + "Flushing {} blocks from index files to historical Stream for indexer: {}", blocks_from_index.len(), indexer_function.get_full_name(), ); From 53649ff6964d81941a7e0cd499373d5ff14f9bc7 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Fri, 10 Nov 2023 10:53:43 +1300 Subject: [PATCH 06/10] refactor: Use Lake Framework to process unindexed historical blocks --- .../src/historical_block_processing.rs | 176 +++++------------- 1 file changed, 47 insertions(+), 129 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 0be5238f8..30bf4f0bd 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -12,9 +12,7 @@ use serde_json::from_str; use tokio::task::JoinHandle; pub const INDEXED_DATA_FILES_BUCKET: &str = "near-delta-lake"; -pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-"; pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_actions/metadata"; -pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes. pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20; pub struct Task { @@ -212,24 +210,55 @@ pub(crate) async fn process_historical_messages( last_indexed_block }; - let blocks_between_indexed_and_current_block: Vec = - filter_matching_unindexed_blocks_from_lake( - last_indexed_block, - current_block_height, - &indexer_function, - s3_client, - chain_id, - ) - .await?; + tracing::info!( + target: crate::INDEXER, + "Filtering {} unindexed blocks from lake: from block {last_indexed_block} to {current_block_height} for indexer: {}", + current_block_height - last_indexed_block, + indexer_function.get_full_name(), + ); - for block in blocks_between_indexed_and_current_block { - storage::xadd( - redis_connection_manager, - storage::generate_historical_stream_key(&indexer_function.get_full_name()), - &[("block_height", block)], - ) - .await?; + let lake_config = match &chain_id { + ChainId::Mainnet => near_lake_framework::LakeConfigBuilder::default().mainnet(), + ChainId::Testnet => near_lake_framework::LakeConfigBuilder::default().testnet(), } + .start_block_height(last_indexed_block + 1) + .build() + .context("Failed to build lake config")?; + + let (sender, mut stream) = near_lake_framework::streamer(lake_config); + + let mut filtered_block_count = 0; + while let Some(streamer_message) = stream.recv().await { + let block_height = streamer_message.block.header.height; + if block_height == current_block_height { + break; + } + + let matches = indexer_rules_engine::reduce_indexer_rule_matches_sync( + &indexer_function.indexer_rule, + &streamer_message, + chain_id.clone(), + ); + + if !matches.is_empty() { + filtered_block_count += 1; + + storage::xadd( + redis_connection_manager, + storage::generate_historical_stream_key(&indexer_function.get_full_name()), + &[("block_height", block_height)], + ) + .await?; + } + } + drop(sender); + + tracing::info!( + target: crate::INDEXER, + "Flushed {} unindexed blocks to historical Stream for indexer: {}", + filtered_block_count, + indexer_function.get_full_name(), + ); } } Ok(block_difference) @@ -352,117 +381,6 @@ fn parse_blocks_from_index_files( .collect::>() } -async fn filter_matching_unindexed_blocks_from_lake( - last_indexed_block: BlockHeight, - ending_block_height: BlockHeight, - indexer_function: &IndexerFunction, - s3_client: &S3Client, - chain_id: &ChainId, -) -> anyhow::Result> { - let lake_bucket = lake_bucket_for_chain(chain_id); - - let indexer_rule = &indexer_function.indexer_rule; - let count = ending_block_height - last_indexed_block; - if count > MAX_UNINDEXED_BLOCKS_TO_PROCESS { - bail!( - "Too many unindexed blocks to filter: {count}. Last indexed block is {last_indexed_block} for function {:?} {:?}", - indexer_function.account_id, - indexer_function.function_name, - ); - } - tracing::info!( - target: crate::INDEXER, - "Filtering {count} unindexed blocks from lake: from block {last_indexed_block} to {ending_block_height} for function {:?} {:?}", - indexer_function.account_id, - indexer_function.function_name, - ); - - let mut blocks_to_process: Vec = vec![]; - for current_block in (last_indexed_block + 1)..ending_block_height { - // fetch block file from S3 - let key = format!("{}/block.json", normalize_block_height(current_block)); - let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await; - - if s3_result.is_err() { - let error = s3_result.err().unwrap(); - if error - .root_cause() - .downcast_ref::() - .is_some() - { - tracing::info!( - target: crate::INDEXER, - "In manual filtering, skipping block number {} which was not found. For function {:?} {:?}", - current_block, - indexer_function.account_id, - indexer_function.function_name, - ); - continue; - } else { - bail!(error); - } - } - - let block = s3_result.unwrap(); - let block_view = serde_json::from_slice::< - near_lake_framework::near_indexer_primitives::views::BlockView, - >(block.as_ref()) - .with_context(|| format!("Error parsing block {} from S3", current_block))?; - - let mut shards = vec![]; - for shard_id in 0..block_view.chunks.len() as u64 { - let key = format!( - "{}/shard_{}.json", - normalize_block_height(current_block), - shard_id - ); - let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await?; - match serde_json::from_slice::( - shard.as_ref(), - ) { - Ok(parsed_shard) => { - shards.push(parsed_shard); - } - Err(e) => { - bail!("Error parsing shard: {}", e.to_string()); - } - } - } - - let streamer_message = near_lake_framework::near_indexer_primitives::StreamerMessage { - block: block_view, - shards, - }; - - // filter block - let matches = indexer_rules_engine::reduce_indexer_rule_matches_sync( - indexer_rule, - &streamer_message, - chain_id.clone(), - ); - if !matches.is_empty() { - blocks_to_process.push(current_block); - } - } - - tracing::info!( - target: crate::INDEXER, - "Found {block_count} unindexed blocks to process for function {:?} {:?}", - indexer_function.account_id, - indexer_function.function_name, - block_count = blocks_to_process.len() - ); - Ok(blocks_to_process) -} - -fn lake_bucket_for_chain(chain_id: &ChainId) -> String { - format!("{}{}", LAKE_BUCKET_PREFIX, chain_id) -} - -fn normalize_block_height(block_height: BlockHeight) -> String { - format!("{:0>12}", block_height) -} - // if block does not exist, try next block, up to MAX_RPC_BLOCKS_TO_PROCESS (20) blocks pub async fn lookup_block_date_or_next_block_date( block_height: u64, From 87b09c4958efd579e0b6cf708e0d7391277f355e Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 13 Nov 2023 09:04:39 +1300 Subject: [PATCH 07/10] chore: Update historical log messages --- .../queryapi_coordinator/src/historical_block_processing.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 30bf4f0bd..9db703c0d 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -188,7 +188,7 @@ pub(crate) async fn process_historical_messages( tracing::info!( target: crate::INDEXER, - "Flushing {} blocks from index files to historical Stream for indexer: {}", + "Flushing {} block heights from index files to historical Stream for indexer: {}", blocks_from_index.len(), indexer_function.get_full_name(), ); @@ -255,7 +255,7 @@ pub(crate) async fn process_historical_messages( tracing::info!( target: crate::INDEXER, - "Flushed {} unindexed blocks to historical Stream for indexer: {}", + "Flushed {} unindexed block heights to historical Stream for indexer: {}", filtered_block_count, indexer_function.get_full_name(), ); From 1e6c24d62fcab94b095da8769bf805475116c1a5 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 13 Nov 2023 09:08:01 +1300 Subject: [PATCH 08/10] refactor: Move S3 related const to `s3` module --- .../src/historical_block_processing.rs | 15 +++++++++------ indexer/queryapi_coordinator/src/s3.rs | 2 ++ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 9db703c0d..5756950d0 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -11,8 +11,6 @@ use near_lake_framework::near_indexer_primitives::types::{BlockHeight, BlockId, use serde_json::from_str; use tokio::task::JoinHandle; -pub const INDEXED_DATA_FILES_BUCKET: &str = "near-delta-lake"; -pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_actions/metadata"; pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20; pub struct Task { @@ -267,8 +265,13 @@ pub(crate) async fn process_historical_messages( pub(crate) async fn last_indexed_block_from_metadata( s3_client: &S3Client, ) -> anyhow::Result { - let key = format!("{}/{}", INDEXED_ACTIONS_FILES_FOLDER, "latest_block.json"); - let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await?; + let key = format!( + "{}/{}", + s3::INDEXED_ACTIONS_FILES_FOLDER, + "latest_block.json" + ); + let metadata = + s3::fetch_text_file_from_s3(s3::INDEXED_DATA_FILES_BUCKET, key, s3_client).await?; let metadata: serde_json::Value = serde_json::from_str(&metadata).unwrap(); let last_indexed_block = metadata["last_indexed_block"].clone(); @@ -291,7 +294,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files( s3_client: &S3Client, start_date: DateTime, ) -> anyhow::Result> { - let s3_bucket = INDEXED_DATA_FILES_BUCKET; + let s3_bucket = s3::INDEXED_DATA_FILES_BUCKET; let mut needs_dedupe_and_sort = false; let indexer_rule = &indexer_function.indexer_rule; @@ -307,7 +310,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files( s3::fetch_contract_index_files( s3_client, s3_bucket, - INDEXED_ACTIONS_FILES_FOLDER, + s3::INDEXED_ACTIONS_FILES_FOLDER, start_date, affected_account_id, ) diff --git a/indexer/queryapi_coordinator/src/s3.rs b/indexer/queryapi_coordinator/src/s3.rs index 30bd7e52e..29c00d568 100644 --- a/indexer/queryapi_coordinator/src/s3.rs +++ b/indexer/queryapi_coordinator/src/s3.rs @@ -7,6 +7,8 @@ use futures::future::try_join_all; // Currently that would be either 2,700 years of FunctionCall data or 1M contract folders. // If we hit 1M contracts we should build an index to support efficient wildcard contract matching. const MAX_S3_LIST_REQUESTS: usize = 1000; +pub const INDEXED_DATA_FILES_BUCKET: &str = "near-delta-lake"; +pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_actions/metadata"; fn storage_path_for_account(account: &str) -> String { let mut folders = account.split('.').collect::>(); From 0bcd8d7dd86cd78867ebbc3c73af85fff83e438f Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 13 Nov 2023 09:13:08 +1300 Subject: [PATCH 09/10] fix: Ensure unfiltered process starts where databricks finished --- indexer/queryapi_coordinator/src/historical_block_processing.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 5756950d0..c4af99f9e 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -219,7 +219,7 @@ pub(crate) async fn process_historical_messages( ChainId::Mainnet => near_lake_framework::LakeConfigBuilder::default().mainnet(), ChainId::Testnet => near_lake_framework::LakeConfigBuilder::default().testnet(), } - .start_block_height(last_indexed_block + 1) + .start_block_height(last_indexed_block) .build() .context("Failed to build lake config")?; From 352c7e0cfc667d3440250a077aae002318a01abb Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 13 Nov 2023 09:41:16 +1300 Subject: [PATCH 10/10] chore: Log when filtering large amounts of unindexed blocks --- .../src/historical_block_processing.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index c4af99f9e..31af1c266 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -12,6 +12,7 @@ use serde_json::from_str; use tokio::task::JoinHandle; pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20; +pub const UNINDEXED_BLOCKS_SOFT_LIMIT: u64 = 7200; pub struct Task { handle: JoinHandle<()>, @@ -208,13 +209,23 @@ pub(crate) async fn process_historical_messages( last_indexed_block }; + let unindexed_block_difference = current_block_height - last_indexed_block; + tracing::info!( target: crate::INDEXER, "Filtering {} unindexed blocks from lake: from block {last_indexed_block} to {current_block_height} for indexer: {}", - current_block_height - last_indexed_block, + unindexed_block_difference, indexer_function.get_full_name(), ); + if unindexed_block_difference > UNINDEXED_BLOCKS_SOFT_LIMIT { + tracing::warn!( + target: crate::INDEXER, + "Unindexed block difference exceeds soft limit of: {UNINDEXED_BLOCKS_SOFT_LIMIT} for indexer: {}", + indexer_function.get_full_name(), + ); + } + let lake_config = match &chain_id { ChainId::Mainnet => near_lake_framework::LakeConfigBuilder::default().mainnet(), ChainId::Testnet => near_lake_framework::LakeConfigBuilder::default().testnet(),