Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Speed up Historical Backfill process #379

Merged
merged 10 commits into from
Nov 12, 2023
1 change: 1 addition & 0 deletions indexer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions indexer/queryapi_coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ authors = ["Near Inc <[email protected]>"]
[dependencies]
anyhow = "1.0.57"
actix-web = "=4.0.1"
aws-config = "0.53.0"
borsh = "0.10.2"
cached = "0.23.0"
chrono = "0.4.25"
Expand Down
285 changes: 118 additions & 167 deletions indexer/queryapi_coordinator/src/historical_block_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ use near_lake_framework::near_indexer_primitives::types::{BlockHeight, BlockId,
use serde_json::from_str;
use tokio::task::JoinHandle;

pub const INDEXED_DATA_FILES_BUCKET: &str = "near-delta-lake";
pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-";
pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_actions/metadata";
pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of served as an alarm for if something went wrong with the index file creation process. Since we are removing this, we should have some metric or error that checks if the latest index file failed to create. It doesn't need to be here or in this PR though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was intentional, forgot to comment on it sorry.

Now that we're using near-lake-framework the BPS should be significantly faster. I'm not sure this should be a concern anymore. I'll add a warning log for now and we can add more later if this becomes a problem :)

pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20;
pub const UNINDEXED_BLOCKS_SOFT_LIMIT: u64 = 7200;

pub struct Task {
handle: JoinHandle<()>,
Expand Down Expand Up @@ -53,7 +50,7 @@ impl Streamer {
_ = cancellation_token_clone.cancelled() => {
tracing::info!(
target: crate::INDEXER,
"Cancelling existing historical backfill for indexer: {:?}",
"Cancelling existing historical backfill for indexer: {}",
indexer.get_full_name(),
);
},
Expand All @@ -64,7 +61,13 @@ impl Streamer {
&s3_client,
&chain_id,
&json_rpc_client,
) => { }
) => {
tracing::info!(
target: crate::INDEXER,
"Finished historical backfill for indexer: {}",
indexer.get_full_name(),
);
}
}
});

Expand Down Expand Up @@ -132,36 +135,72 @@ pub(crate) async fn process_historical_messages(
let block_difference: i64 = (current_block_height - start_block) as i64;
match block_difference {
i64::MIN..=-1 => {
bail!("Skipping back fill, start_block_height is greater than current block height: {:?} {:?}",
indexer_function.account_id,
indexer_function.function_name);
bail!(
"Skipping back fill, start_block_height is greater than current block height: {}",
indexer_function.get_full_name(),
);
}
0 => {
bail!("Skipping back fill, start_block_height is equal to current block height: {:?} {:?}",
indexer_function.account_id,
indexer_function.function_name);
bail!(
"Skipping back fill, start_block_height is equal to current block height: {}",
indexer_function.get_full_name(),
);
}
1..=i64::MAX => {
tracing::info!(
target: crate::INDEXER,
"Back filling {block_difference} blocks from {start_block} to current block height {current_block_height}: {:?} {:?}",
indexer_function.account_id,
indexer_function.function_name
"Back filling {block_difference} blocks from {start_block} to current block height {current_block_height}: {}",
indexer_function.get_full_name(),
);

storage::del(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be a bug in the runner-side prefetch code related to this. If a historical process is kicked off and it fails, then the runner-side prefetch continuously pulls from its buffer array, not the stream (Although the stream message is still there). So, even if you delete the stream and fill it with a new historical stream, the earlier failed message will continue to block execution. I'll test this and ship a PR to fix that if it's in fact true.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

Maybe we need to think of alternate ways of handling pre-fetch, perhaps we could:

  • Update/overwrite the existing stream messages rather than maintaining an in-memory queue
  • set an expire-able key in redis, similar to to the real-time caching

redis_connection_manager,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
)
.await?;
storage::sadd(
redis_connection_manager,
storage::STREAMS_SET_KEY,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
)
.await?;
storage::set(
redis_connection_manager,
storage::generate_historical_storage_key(&indexer_function.get_full_name()),
serde_json::to_string(&indexer_function)?,
None,
)
.await?;

let start_date =
lookup_block_date_or_next_block_date(start_block, json_rpc_client).await?;

let last_indexed_block = last_indexed_block_from_metadata(s3_client).await?;

let mut blocks_from_index = filter_matching_blocks_from_index_files(
let blocks_from_index = filter_matching_blocks_from_index_files(
start_block,
&indexer_function,
s3_client,
start_date,
)
.await?;

tracing::info!(
target: crate::INDEXER,
"Flushing {} block heights from index files to historical Stream for indexer: {}",
blocks_from_index.len(),
indexer_function.get_full_name(),
);

for block in &blocks_from_index {
storage::xadd(
redis_connection_manager,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
&[("block_height", block)],
)
.await?;
}

// Check for the case where an index file is written right after we get the last_indexed_block metadata
let last_block_in_data = blocks_from_index.last().unwrap_or(&start_block);
let last_indexed_block = if last_block_in_data > &last_indexed_block {
Expand All @@ -170,47 +209,65 @@ pub(crate) async fn process_historical_messages(
last_indexed_block
};

let mut blocks_between_indexed_and_current_block: Vec<BlockHeight> =
filter_matching_unindexed_blocks_from_lake(
last_indexed_block,
current_block_height,
&indexer_function,
s3_client,
chain_id,
)
.await?;
let unindexed_block_difference = current_block_height - last_indexed_block;

blocks_from_index.append(&mut blocks_between_indexed_and_current_block);
tracing::info!(
target: crate::INDEXER,
"Filtering {} unindexed blocks from lake: from block {last_indexed_block} to {current_block_height} for indexer: {}",
unindexed_block_difference,
indexer_function.get_full_name(),
);

if !blocks_from_index.is_empty() {
storage::del(
redis_connection_manager,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
)
.await?;
storage::sadd(
redis_connection_manager,
storage::STREAMS_SET_KEY,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
)
.await?;
storage::set(
redis_connection_manager,
storage::generate_historical_storage_key(&indexer_function.get_full_name()),
serde_json::to_string(&indexer_function)?,
None,
)
.await?;
if unindexed_block_difference > UNINDEXED_BLOCKS_SOFT_LIMIT {
tracing::warn!(
target: crate::INDEXER,
"Unindexed block difference exceeds soft limit of: {UNINDEXED_BLOCKS_SOFT_LIMIT} for indexer: {}",
indexer_function.get_full_name(),
);
}

for current_block in blocks_from_index {
storage::xadd(
redis_connection_manager,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
&[("block_height", current_block)],
)
.await?;
let lake_config = match &chain_id {
ChainId::Mainnet => near_lake_framework::LakeConfigBuilder::default().mainnet(),
ChainId::Testnet => near_lake_framework::LakeConfigBuilder::default().testnet(),
}
.start_block_height(last_indexed_block)
.build()
.context("Failed to build lake config")?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we fail to build the lake config, then should we purge the historical feed again? I think it could be confusing to the customer if it looks like their historical process ran only to then have a gap between the block they specified and what blocks actually ran. On that front, I feel it might be better to construct this earlier. Maybe also have a retry on it depending on the error type.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There any many things that can fail and cause the historical process to exit - it's not just limited to the construction of near-lake-framework.

At this point, I think it's worth pushing this out as is so we can test. We then can have a deeper conversation about error handling later.


let (sender, mut stream) = near_lake_framework::streamer(lake_config);

let mut filtered_block_count = 0;
while let Some(streamer_message) = stream.recv().await {
let block_height = streamer_message.block.header.height;
if block_height == current_block_height {
break;
}

let matches = indexer_rules_engine::reduce_indexer_rule_matches_sync(
&indexer_function.indexer_rule,
&streamer_message,
chain_id.clone(),
);

if !matches.is_empty() {
filtered_block_count += 1;

storage::xadd(
redis_connection_manager,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
&[("block_height", block_height)],
)
.await?;
}
}
drop(sender);

tracing::info!(
target: crate::INDEXER,
"Flushed {} unindexed block heights to historical Stream for indexer: {}",
filtered_block_count,
indexer_function.get_full_name(),
);
}
}
Ok(block_difference)
Expand All @@ -219,8 +276,13 @@ pub(crate) async fn process_historical_messages(
pub(crate) async fn last_indexed_block_from_metadata(
s3_client: &S3Client,
) -> anyhow::Result<BlockHeight> {
let key = format!("{}/{}", INDEXED_ACTIONS_FILES_FOLDER, "latest_block.json");
let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await?;
let key = format!(
"{}/{}",
s3::INDEXED_ACTIONS_FILES_FOLDER,
"latest_block.json"
);
let metadata =
s3::fetch_text_file_from_s3(s3::INDEXED_DATA_FILES_BUCKET, key, s3_client).await?;

let metadata: serde_json::Value = serde_json::from_str(&metadata).unwrap();
let last_indexed_block = metadata["last_indexed_block"].clone();
Expand All @@ -243,7 +305,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files(
s3_client: &S3Client,
start_date: DateTime<Utc>,
) -> anyhow::Result<Vec<BlockHeight>> {
let s3_bucket = INDEXED_DATA_FILES_BUCKET;
let s3_bucket = s3::INDEXED_DATA_FILES_BUCKET;

let mut needs_dedupe_and_sort = false;
let indexer_rule = &indexer_function.indexer_rule;
Expand All @@ -259,7 +321,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files(
s3::fetch_contract_index_files(
s3_client,
s3_bucket,
INDEXED_ACTIONS_FILES_FOLDER,
s3::INDEXED_ACTIONS_FILES_FOLDER,
start_date,
affected_account_id,
)
Expand Down Expand Up @@ -333,117 +395,6 @@ fn parse_blocks_from_index_files(
.collect::<Vec<u64>>()
}

async fn filter_matching_unindexed_blocks_from_lake(
last_indexed_block: BlockHeight,
ending_block_height: BlockHeight,
indexer_function: &IndexerFunction,
s3_client: &S3Client,
chain_id: &ChainId,
) -> anyhow::Result<Vec<u64>> {
let lake_bucket = lake_bucket_for_chain(chain_id);

let indexer_rule = &indexer_function.indexer_rule;
let count = ending_block_height - last_indexed_block;
if count > MAX_UNINDEXED_BLOCKS_TO_PROCESS {
bail!(
"Too many unindexed blocks to filter: {count}. Last indexed block is {last_indexed_block} for function {:?} {:?}",
indexer_function.account_id,
indexer_function.function_name,
);
}
tracing::info!(
target: crate::INDEXER,
"Filtering {count} unindexed blocks from lake: from block {last_indexed_block} to {ending_block_height} for function {:?} {:?}",
indexer_function.account_id,
indexer_function.function_name,
);

let mut blocks_to_process: Vec<u64> = vec![];
for current_block in (last_indexed_block + 1)..ending_block_height {
// fetch block file from S3
let key = format!("{}/block.json", normalize_block_height(current_block));
let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await;

if s3_result.is_err() {
let error = s3_result.err().unwrap();
if error
.root_cause()
.downcast_ref::<aws_sdk_s3::error::NoSuchKey>()
.is_some()
{
tracing::info!(
target: crate::INDEXER,
"In manual filtering, skipping block number {} which was not found. For function {:?} {:?}",
current_block,
indexer_function.account_id,
indexer_function.function_name,
);
continue;
} else {
bail!(error);
}
}

let block = s3_result.unwrap();
let block_view = serde_json::from_slice::<
near_lake_framework::near_indexer_primitives::views::BlockView,
>(block.as_ref())
.with_context(|| format!("Error parsing block {} from S3", current_block))?;

let mut shards = vec![];
for shard_id in 0..block_view.chunks.len() as u64 {
let key = format!(
"{}/shard_{}.json",
normalize_block_height(current_block),
shard_id
);
let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await?;
match serde_json::from_slice::<near_lake_framework::near_indexer_primitives::IndexerShard>(
shard.as_ref(),
) {
Ok(parsed_shard) => {
shards.push(parsed_shard);
}
Err(e) => {
bail!("Error parsing shard: {}", e.to_string());
}
}
}

let streamer_message = near_lake_framework::near_indexer_primitives::StreamerMessage {
block: block_view,
shards,
};

// filter block
let matches = indexer_rules_engine::reduce_indexer_rule_matches_sync(
indexer_rule,
&streamer_message,
chain_id.clone(),
);
if !matches.is_empty() {
blocks_to_process.push(current_block);
}
}

tracing::info!(
target: crate::INDEXER,
"Found {block_count} unindexed blocks to process for function {:?} {:?}",
indexer_function.account_id,
indexer_function.function_name,
block_count = blocks_to_process.len()
);
Ok(blocks_to_process)
}

fn lake_bucket_for_chain(chain_id: &ChainId) -> String {
format!("{}{}", LAKE_BUCKET_PREFIX, chain_id)
}

fn normalize_block_height(block_height: BlockHeight) -> String {
format!("{:0>12}", block_height)
}

// if block does not exist, try next block, up to MAX_RPC_BLOCKS_TO_PROCESS (20) blocks
pub async fn lookup_block_date_or_next_block_date(
block_height: u64,
Expand Down
Loading
Loading