Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor/remove sqs from historical backfill #364

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 35 additions & 88 deletions indexer/queryapi_coordinator/src/historical_block_processing.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
use crate::indexer_types::{IndexerFunction, IndexerQueueMessage};
use crate::opts::{Opts, Parser};
use crate::queue;
use crate::indexer_types::IndexerFunction;
use crate::s3;
use anyhow::{bail, Context};
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::Config;
use aws_sdk_sqs::Client;
use aws_types::SdkConfig;
use chrono::{DateTime, LocalResult, TimeZone, Utc};
use indexer_rule_type::indexer_rule::MatchingRule;
use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatchPayload};
use indexer_rules_engine::types::indexer_rule_match::ChainId;
use near_jsonrpc_client::JsonRpcClient;
use near_jsonrpc_primitives::types::blocks::RpcBlockRequest;
use near_lake_framework::near_indexer_primitives::types::{BlockHeight, BlockId, BlockReference};
Expand All @@ -26,16 +21,25 @@ pub fn spawn_historical_message_thread(
block_height: BlockHeight,
new_indexer_function: &IndexerFunction,
redis_connection_manager: &storage::ConnectionManager,
s3_client: &S3Client,
chain_id: &ChainId,
json_rpc_client: &JsonRpcClient,
) -> Option<JoinHandle<i64>> {
let redis_connection_manager = redis_connection_manager.clone();
let s3_client = s3_client.clone();
let chain_id = chain_id.clone();
let json_rpc_client = json_rpc_client.clone();

new_indexer_function.start_block_height.map(|_| {
let new_indexer_function_copy = new_indexer_function.clone();
tokio::spawn(async move {
process_historical_messages_or_handle_error(
block_height,
new_indexer_function_copy,
Opts::parse(),
&redis_connection_manager,
&s3_client,
&chain_id,
&json_rpc_client,
)
.await
})
Expand All @@ -45,14 +49,18 @@ pub fn spawn_historical_message_thread(
pub(crate) async fn process_historical_messages_or_handle_error(
block_height: BlockHeight,
indexer_function: IndexerFunction,
opts: Opts,
redis_connection_manager: &storage::ConnectionManager,
s3_client: &S3Client,
chain_id: &ChainId,
json_rpc_client: &JsonRpcClient,
) -> i64 {
match process_historical_messages(
block_height,
indexer_function,
opts,
redis_connection_manager,
s3_client,
chain_id,
json_rpc_client,
)
.await
{
Expand All @@ -71,8 +79,10 @@ pub(crate) async fn process_historical_messages_or_handle_error(
pub(crate) async fn process_historical_messages(
block_height: BlockHeight,
indexer_function: IndexerFunction,
opts: Opts,
redis_connection_manager: &storage::ConnectionManager,
s3_client: &S3Client,
chain_id: &ChainId,
json_rpc_client: &JsonRpcClient,
) -> anyhow::Result<i64> {
let start_block = indexer_function.start_block_height.unwrap();
let block_difference: i64 = (block_height - start_block) as i64;
Expand All @@ -95,25 +105,16 @@ pub(crate) async fn process_historical_messages(
indexer_function.function_name
);

let chain_id = opts.chain_id().clone();
let aws_region = opts.aws_queue_region.clone();
let queue_client = queue::queue_client(aws_region, opts.queue_credentials());
let queue_url = opts.start_from_block_queue_url.clone();
let aws_config: &SdkConfig = &opts.lake_aws_sdk_config();

let json_rpc_client = JsonRpcClient::connect(opts.rpc_url());
let start_date =
lookup_block_date_or_next_block_date(start_block, &json_rpc_client).await?;
lookup_block_date_or_next_block_date(start_block, json_rpc_client).await?;

let mut indexer_function = indexer_function.clone();

let last_indexed_block = last_indexed_block_from_metadata(aws_config).await?;
let last_indexed_block = last_indexed_block_from_metadata(s3_client).await?;
let last_indexed_block = last_indexed_block;

let mut blocks_from_index = filter_matching_blocks_from_index_files(
start_block,
&indexer_function,
aws_config,
s3_client,
start_date,
)
.await?;
Expand All @@ -131,15 +132,13 @@ pub(crate) async fn process_historical_messages(
last_indexed_block,
block_height,
&indexer_function,
aws_config,
chain_id.clone(),
s3_client,
chain_id,
)
.await?;

blocks_from_index.append(&mut blocks_between_indexed_and_current_block);

let first_block_in_index = *blocks_from_index.first().unwrap_or(&start_block);

if !blocks_from_index.is_empty() {
storage::sadd(
redis_connection_manager,
Expand All @@ -163,30 +162,16 @@ pub(crate) async fn process_historical_messages(
&[("block_height", current_block)],
)
.await?;

send_execution_message(
block_height,
first_block_in_index,
chain_id.clone(),
&queue_client,
queue_url.clone(),
&mut indexer_function,
current_block,
None,
)
.await;
}
}
}
Ok(block_difference)
}

pub(crate) async fn last_indexed_block_from_metadata(
aws_config: &SdkConfig,
s3_client: &S3Client,
) -> anyhow::Result<BlockHeight> {
let key = format!("{}/{}", INDEXED_ACTIONS_FILES_FOLDER, "latest_block.json");
let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build();
let s3_client: S3Client = S3Client::from_conf(s3_config);
let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await?;

let metadata: serde_json::Value = serde_json::from_str(&metadata).unwrap();
Expand All @@ -207,7 +192,7 @@ pub(crate) async fn last_indexed_block_from_metadata(
pub(crate) async fn filter_matching_blocks_from_index_files(
start_block_height: BlockHeight,
indexer_function: &IndexerFunction,
aws_config: &SdkConfig,
s3_client: &S3Client,
start_date: DateTime<Utc>,
) -> anyhow::Result<Vec<BlockHeight>> {
let s3_bucket = INDEXED_DATA_FILES_BUCKET;
Expand All @@ -224,7 +209,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files(
needs_dedupe_and_sort = true;
}
s3::fetch_contract_index_files(
aws_config,
s3_client,
s3_bucket,
INDEXED_ACTIONS_FILES_FOLDER,
start_date,
Expand Down Expand Up @@ -304,12 +289,10 @@ async fn filter_matching_unindexed_blocks_from_lake(
last_indexed_block: BlockHeight,
ending_block_height: BlockHeight,
indexer_function: &IndexerFunction,
aws_config: &SdkConfig,
chain_id: ChainId,
s3_client: &S3Client,
chain_id: &ChainId,
) -> anyhow::Result<Vec<u64>> {
let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build();
let s3_client: S3Client = S3Client::from_conf(s3_config);
let lake_bucket = lake_bucket_for_chain(chain_id.clone());
let lake_bucket = lake_bucket_for_chain(chain_id);

let indexer_rule = &indexer_function.indexer_rule;
let count = ending_block_height - last_indexed_block;
Expand All @@ -331,7 +314,7 @@ async fn filter_matching_unindexed_blocks_from_lake(
for current_block in (last_indexed_block + 1)..ending_block_height {
// fetch block file from S3
let key = format!("{}/block.json", normalize_block_height(current_block));
let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await;
let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await;

if s3_result.is_err() {
let error = s3_result.err().unwrap();
Expand Down Expand Up @@ -362,7 +345,7 @@ async fn filter_matching_unindexed_blocks_from_lake(
normalize_block_height(current_block),
shard_id
);
let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await?;
let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, &s3_client).await?;
match serde_json::from_slice::<near_lake_framework::near_indexer_primitives::IndexerShard>(
shard.as_ref(),
) {
Expand Down Expand Up @@ -401,50 +384,14 @@ async fn filter_matching_unindexed_blocks_from_lake(
Ok(blocks_to_process)
}

fn lake_bucket_for_chain(chain_id: ChainId) -> String {
fn lake_bucket_for_chain(chain_id: &ChainId) -> String {
format!("{}{}", LAKE_BUCKET_PREFIX, chain_id)
}

fn normalize_block_height(block_height: BlockHeight) -> String {
format!("{:0>12}", block_height)
}

async fn send_execution_message(
block_height: BlockHeight,
first_block: BlockHeight,
chain_id: ChainId,
queue_client: &Client,
queue_url: String,
indexer_function: &mut IndexerFunction,
current_block: u64,
payload: Option<IndexerRuleMatchPayload>,
) {
// only request provisioning on the first block
if current_block != first_block {
indexer_function.provisioned = true;
}

let msg = IndexerQueueMessage {
chain_id,
indexer_rule_id: 0,
indexer_rule_name: indexer_function.function_name.clone(),
payload,
block_height: current_block,
indexer_function: indexer_function.clone(),
is_historical: true,
};

match queue::send_to_indexer_queue(queue_client, queue_url, vec![msg]).await {
Ok(_) => {}
Err(err) => tracing::error!(
target: crate::INDEXER,
"#{} an error occurred when sending messages to the queue\n{:#?}",
block_height,
err
),
}
}

// if block does not exist, try next block, up to MAX_RPC_BLOCKS_TO_PROCESS (20) blocks
pub async fn lookup_block_date_or_next_block_date(
block_height: u64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ mod tests {
async fn test_indexing_metadata_file() {
let opts = Opts::test_opts_with_aws();
let aws_config: &SdkConfig = &opts.lake_aws_sdk_config();
let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build();
let s3_client = aws_sdk_s3::Client::from_conf(s3_config);

let last_indexed_block =
historical_block_processing::last_indexed_block_from_metadata(aws_config)
historical_block_processing::last_indexed_block_from_metadata(&s3_client)
.await
.unwrap();
let a: Range<u64> = 90000000..9000000000; // valid for the next 300 years
Expand Down Expand Up @@ -75,19 +77,28 @@ mod tests {
};

let opts = Opts::test_opts_with_aws();

let aws_config: &SdkConfig = &opts.lake_aws_sdk_config();
let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build();
let s3_client = aws_sdk_s3::Client::from_conf(s3_config);

let redis_connection_manager = storage::connect(&opts.redis_connection_string)
.await
.unwrap();

let json_rpc_client = near_jsonrpc_client::JsonRpcClient::connect(opts.rpc_url());

let fake_block_height =
historical_block_processing::last_indexed_block_from_metadata(aws_config)
historical_block_processing::last_indexed_block_from_metadata(&s3_client)
.await
.unwrap();
let result = historical_block_processing::process_historical_messages(
fake_block_height + 1,
indexer_function,
opts,
&redis_connection_manager,
&s3_client,
&opts.chain_id(),
&json_rpc_client,
)
.await;
assert!(result.unwrap() > 0);
Expand Down Expand Up @@ -120,6 +131,8 @@ mod tests {

let opts = Opts::test_opts_with_aws();
let aws_config: &SdkConfig = &opts.lake_aws_sdk_config();
let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build();
let s3_client = aws_sdk_s3::Client::from_conf(s3_config);

let start_block_height = 77016214;
let naivedatetime_utc = NaiveDate::from_ymd_opt(2022, 10, 03)
Expand All @@ -130,7 +143,7 @@ mod tests {
let blocks = filter_matching_blocks_from_index_files(
start_block_height,
&indexer_function,
aws_config,
&s3_client,
datetime_utc,
)
.await;
Expand Down Expand Up @@ -177,6 +190,8 @@ mod tests {

let opts = Opts::test_opts_with_aws();
let aws_config: &SdkConfig = &opts.lake_aws_sdk_config();
let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build();
let s3_client = aws_sdk_s3::Client::from_conf(s3_config);

let start_block_height = 45894620;
let naivedatetime_utc = NaiveDate::from_ymd_opt(2021, 08, 01)
Expand All @@ -187,7 +202,7 @@ mod tests {
let blocks = filter_matching_blocks_from_index_files(
start_block_height,
&indexer_function,
aws_config,
&s3_client,
datetime_utc,
)
.await;
Expand Down
5 changes: 4 additions & 1 deletion indexer/queryapi_coordinator/src/indexer_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,11 @@ fn index_and_process_register_calls(
if let Some(thread) =
crate::historical_block_processing::spawn_historical_message_thread(
block_height,
&mut new_indexer_function,
&new_indexer_function,
context.redis_connection_manager,
context.s3_client,
context.chain_id,
context.json_rpc_client,
)
{
spawned_start_from_block_threads.push(thread);
Expand Down
8 changes: 8 additions & 0 deletions indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub(crate) struct QueryApiContext<'a> {
pub streamer_message: near_lake_framework::near_indexer_primitives::StreamerMessage,
pub chain_id: &'a ChainId,
pub queue_client: &'a queue::QueueClient,
pub s3_client: &'a aws_sdk_s3::Client,
pub json_rpc_client: &'a JsonRpcClient,
pub queue_url: &'a str,
pub registry_contract_id: &'a str,
pub balance_cache: &'a BalanceCache,
Expand All @@ -62,6 +64,10 @@ async fn main() -> anyhow::Result<()> {
let queue_url = opts.queue_url.clone();
let registry_contract_id = opts.registry_contract_id.clone();

let aws_config = &opts.lake_aws_sdk_config();
let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build();
let s3_client = aws_sdk_s3::Client::from_conf(s3_config);

// We want to prevent unnecessary RPC queries to find previous balance
let balances_cache: BalanceCache =
std::sync::Arc::new(Mutex::new(SizedCache::with_size(100_000)));
Expand Down Expand Up @@ -106,6 +112,8 @@ async fn main() -> anyhow::Result<()> {
streamer_message,
chain_id,
queue_client: &queue_client,
json_rpc_client: &json_rpc_client,
s3_client: &s3_client,
};
handle_streamer_message(context, indexer_registry.clone())
})
Expand Down
Loading
Loading