Skip to content

Commit

Permalink
refactor: Speed up Historical Backfill process (#379)
Browse files Browse the repository at this point in the history
## Problem

The Historical backfill is composed of two main steps:
1. Fetching matching blocks from `near-delta-lake`/Databricks index
files
2. Manually filtering all blocks between the `last_indexed_block` from
`near-delta-lake` and the `block_height` in which the historical process
was triggered

Only after each of these steps are completed are the blocks flushed to
Redis. This leads to a large delay from when the process was triggered,
to when the blocks are actually executed by Runner, creating the
appearance of things 'not working'.

## Changes
This PR makes the following changes to reduce the time between trigger,
and execution:

### Flush blocks indexed blocks immediately
Fetching blocks from the index file (Step 1.), is relatively quick
process. Rather than wait for (Step 2.), we can flush the blocks
immediately, and then continue on to the following step.

### Prefetch blocks in manual filtering
In manual filtering, blocks are processed sequentially. To speed this
process up, we can fetch ahead so that we minimise the time spent
waiting for S3 requests. Luckily, `near-lake-framework` does exactly
this, therefore manual filtering has been refactored to use this.

## Results
The following is from a backfill of 19,959,790 blocks run locally. I'd
expect the Before to be much lower given the geographical distance
compared to actual our infrastructure, but the results are still
positive :).

Time till first block appears on Redis:
- Before: 33 minutes
- After: 2 seconds

Time to completion:
- Before: 33 minutes
- After: 45 seconds
  • Loading branch information
morgsmccauley committed Nov 12, 2023
1 parent 2f5505b commit 530efd3
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 245 deletions.
1 change: 1 addition & 0 deletions indexer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions indexer/queryapi_coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ authors = ["Near Inc <[email protected]>"]
[dependencies]
anyhow = "1.0.57"
actix-web = "=4.0.1"
aws-config = "0.53.0"
borsh = "0.10.2"
cached = "0.23.0"
chrono = "0.4.25"
Expand Down
285 changes: 118 additions & 167 deletions indexer/queryapi_coordinator/src/historical_block_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ use near_lake_framework::near_indexer_primitives::types::{BlockHeight, BlockId,
use serde_json::from_str;
use tokio::task::JoinHandle;

pub const INDEXED_DATA_FILES_BUCKET: &str = "near-delta-lake";
pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-";
pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_actions/metadata";
pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes.
pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20;
pub const UNINDEXED_BLOCKS_SOFT_LIMIT: u64 = 7200;

pub struct Task {
handle: JoinHandle<()>,
Expand Down Expand Up @@ -53,7 +50,7 @@ impl Streamer {
_ = cancellation_token_clone.cancelled() => {
tracing::info!(
target: crate::INDEXER,
"Cancelling existing historical backfill for indexer: {:?}",
"Cancelling existing historical backfill for indexer: {}",
indexer.get_full_name(),
);
},
Expand All @@ -64,7 +61,13 @@ impl Streamer {
&s3_client,
&chain_id,
&json_rpc_client,
) => { }
) => {
tracing::info!(
target: crate::INDEXER,
"Finished historical backfill for indexer: {}",
indexer.get_full_name(),
);
}
}
});

Expand Down Expand Up @@ -132,36 +135,72 @@ pub(crate) async fn process_historical_messages(
let block_difference: i64 = (current_block_height - start_block) as i64;
match block_difference {
i64::MIN..=-1 => {
bail!("Skipping back fill, start_block_height is greater than current block height: {:?} {:?}",
indexer_function.account_id,
indexer_function.function_name);
bail!(
"Skipping back fill, start_block_height is greater than current block height: {}",
indexer_function.get_full_name(),
);
}
0 => {
bail!("Skipping back fill, start_block_height is equal to current block height: {:?} {:?}",
indexer_function.account_id,
indexer_function.function_name);
bail!(
"Skipping back fill, start_block_height is equal to current block height: {}",
indexer_function.get_full_name(),
);
}
1..=i64::MAX => {
tracing::info!(
target: crate::INDEXER,
"Back filling {block_difference} blocks from {start_block} to current block height {current_block_height}: {:?} {:?}",
indexer_function.account_id,
indexer_function.function_name
"Back filling {block_difference} blocks from {start_block} to current block height {current_block_height}: {}",
indexer_function.get_full_name(),
);

storage::del(
redis_connection_manager,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
)
.await?;
storage::sadd(
redis_connection_manager,
storage::STREAMS_SET_KEY,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
)
.await?;
storage::set(
redis_connection_manager,
storage::generate_historical_storage_key(&indexer_function.get_full_name()),
serde_json::to_string(&indexer_function)?,
None,
)
.await?;

let start_date =
lookup_block_date_or_next_block_date(start_block, json_rpc_client).await?;

let last_indexed_block = last_indexed_block_from_metadata(s3_client).await?;

let mut blocks_from_index = filter_matching_blocks_from_index_files(
let blocks_from_index = filter_matching_blocks_from_index_files(
start_block,
&indexer_function,
s3_client,
start_date,
)
.await?;

tracing::info!(
target: crate::INDEXER,
"Flushing {} block heights from index files to historical Stream for indexer: {}",
blocks_from_index.len(),
indexer_function.get_full_name(),
);

for block in &blocks_from_index {
storage::xadd(
redis_connection_manager,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
&[("block_height", block)],
)
.await?;
}

// Check for the case where an index file is written right after we get the last_indexed_block metadata
let last_block_in_data = blocks_from_index.last().unwrap_or(&start_block);
let last_indexed_block = if last_block_in_data > &last_indexed_block {
Expand All @@ -170,47 +209,65 @@ pub(crate) async fn process_historical_messages(
last_indexed_block
};

let mut blocks_between_indexed_and_current_block: Vec<BlockHeight> =
filter_matching_unindexed_blocks_from_lake(
last_indexed_block,
current_block_height,
&indexer_function,
s3_client,
chain_id,
)
.await?;
let unindexed_block_difference = current_block_height - last_indexed_block;

blocks_from_index.append(&mut blocks_between_indexed_and_current_block);
tracing::info!(
target: crate::INDEXER,
"Filtering {} unindexed blocks from lake: from block {last_indexed_block} to {current_block_height} for indexer: {}",
unindexed_block_difference,
indexer_function.get_full_name(),
);

if !blocks_from_index.is_empty() {
storage::del(
redis_connection_manager,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
)
.await?;
storage::sadd(
redis_connection_manager,
storage::STREAMS_SET_KEY,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
)
.await?;
storage::set(
redis_connection_manager,
storage::generate_historical_storage_key(&indexer_function.get_full_name()),
serde_json::to_string(&indexer_function)?,
None,
)
.await?;
if unindexed_block_difference > UNINDEXED_BLOCKS_SOFT_LIMIT {
tracing::warn!(
target: crate::INDEXER,
"Unindexed block difference exceeds soft limit of: {UNINDEXED_BLOCKS_SOFT_LIMIT} for indexer: {}",
indexer_function.get_full_name(),
);
}

for current_block in blocks_from_index {
storage::xadd(
redis_connection_manager,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
&[("block_height", current_block)],
)
.await?;
let lake_config = match &chain_id {
ChainId::Mainnet => near_lake_framework::LakeConfigBuilder::default().mainnet(),
ChainId::Testnet => near_lake_framework::LakeConfigBuilder::default().testnet(),
}
.start_block_height(last_indexed_block)
.build()
.context("Failed to build lake config")?;

let (sender, mut stream) = near_lake_framework::streamer(lake_config);

let mut filtered_block_count = 0;
while let Some(streamer_message) = stream.recv().await {
let block_height = streamer_message.block.header.height;
if block_height == current_block_height {
break;
}

let matches = indexer_rules_engine::reduce_indexer_rule_matches_sync(
&indexer_function.indexer_rule,
&streamer_message,
chain_id.clone(),
);

if !matches.is_empty() {
filtered_block_count += 1;

storage::xadd(
redis_connection_manager,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
&[("block_height", block_height)],
)
.await?;
}
}
drop(sender);

tracing::info!(
target: crate::INDEXER,
"Flushed {} unindexed block heights to historical Stream for indexer: {}",
filtered_block_count,
indexer_function.get_full_name(),
);
}
}
Ok(block_difference)
Expand All @@ -219,8 +276,13 @@ pub(crate) async fn process_historical_messages(
pub(crate) async fn last_indexed_block_from_metadata(
s3_client: &S3Client,
) -> anyhow::Result<BlockHeight> {
let key = format!("{}/{}", INDEXED_ACTIONS_FILES_FOLDER, "latest_block.json");
let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await?;
let key = format!(
"{}/{}",
s3::INDEXED_ACTIONS_FILES_FOLDER,
"latest_block.json"
);
let metadata =
s3::fetch_text_file_from_s3(s3::INDEXED_DATA_FILES_BUCKET, key, s3_client).await?;

let metadata: serde_json::Value = serde_json::from_str(&metadata).unwrap();
let last_indexed_block = metadata["last_indexed_block"].clone();
Expand All @@ -243,7 +305,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files(
s3_client: &S3Client,
start_date: DateTime<Utc>,
) -> anyhow::Result<Vec<BlockHeight>> {
let s3_bucket = INDEXED_DATA_FILES_BUCKET;
let s3_bucket = s3::INDEXED_DATA_FILES_BUCKET;

let mut needs_dedupe_and_sort = false;
let indexer_rule = &indexer_function.indexer_rule;
Expand All @@ -259,7 +321,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files(
s3::fetch_contract_index_files(
s3_client,
s3_bucket,
INDEXED_ACTIONS_FILES_FOLDER,
s3::INDEXED_ACTIONS_FILES_FOLDER,
start_date,
affected_account_id,
)
Expand Down Expand Up @@ -333,117 +395,6 @@ fn parse_blocks_from_index_files(
.collect::<Vec<u64>>()
}

async fn filter_matching_unindexed_blocks_from_lake(
last_indexed_block: BlockHeight,
ending_block_height: BlockHeight,
indexer_function: &IndexerFunction,
s3_client: &S3Client,
chain_id: &ChainId,
) -> anyhow::Result<Vec<u64>> {
let lake_bucket = lake_bucket_for_chain(chain_id);

let indexer_rule = &indexer_function.indexer_rule;
let count = ending_block_height - last_indexed_block;
if count > MAX_UNINDEXED_BLOCKS_TO_PROCESS {
bail!(
"Too many unindexed blocks to filter: {count}. Last indexed block is {last_indexed_block} for function {:?} {:?}",
indexer_function.account_id,
indexer_function.function_name,
);
}
tracing::info!(
target: crate::INDEXER,
"Filtering {count} unindexed blocks from lake: from block {last_indexed_block} to {ending_block_height} for function {:?} {:?}",
indexer_function.account_id,
indexer_function.function_name,
);

let mut blocks_to_process: Vec<u64> = vec![];
for current_block in (last_indexed_block + 1)..ending_block_height {
// fetch block file from S3
let key = format!("{}/block.json", normalize_block_height(current_block));
let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await;

if s3_result.is_err() {
let error = s3_result.err().unwrap();
if error
.root_cause()
.downcast_ref::<aws_sdk_s3::error::NoSuchKey>()
.is_some()
{
tracing::info!(
target: crate::INDEXER,
"In manual filtering, skipping block number {} which was not found. For function {:?} {:?}",
current_block,
indexer_function.account_id,
indexer_function.function_name,
);
continue;
} else {
bail!(error);
}
}

let block = s3_result.unwrap();
let block_view = serde_json::from_slice::<
near_lake_framework::near_indexer_primitives::views::BlockView,
>(block.as_ref())
.with_context(|| format!("Error parsing block {} from S3", current_block))?;

let mut shards = vec![];
for shard_id in 0..block_view.chunks.len() as u64 {
let key = format!(
"{}/shard_{}.json",
normalize_block_height(current_block),
shard_id
);
let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await?;
match serde_json::from_slice::<near_lake_framework::near_indexer_primitives::IndexerShard>(
shard.as_ref(),
) {
Ok(parsed_shard) => {
shards.push(parsed_shard);
}
Err(e) => {
bail!("Error parsing shard: {}", e.to_string());
}
}
}

let streamer_message = near_lake_framework::near_indexer_primitives::StreamerMessage {
block: block_view,
shards,
};

// filter block
let matches = indexer_rules_engine::reduce_indexer_rule_matches_sync(
indexer_rule,
&streamer_message,
chain_id.clone(),
);
if !matches.is_empty() {
blocks_to_process.push(current_block);
}
}

tracing::info!(
target: crate::INDEXER,
"Found {block_count} unindexed blocks to process for function {:?} {:?}",
indexer_function.account_id,
indexer_function.function_name,
block_count = blocks_to_process.len()
);
Ok(blocks_to_process)
}

fn lake_bucket_for_chain(chain_id: &ChainId) -> String {
format!("{}{}", LAKE_BUCKET_PREFIX, chain_id)
}

fn normalize_block_height(block_height: BlockHeight) -> String {
format!("{:0>12}", block_height)
}

// if block does not exist, try next block, up to MAX_RPC_BLOCKS_TO_PROCESS (20) blocks
pub async fn lookup_block_date_or_next_block_date(
block_height: u64,
Expand Down
Loading

0 comments on commit 530efd3

Please sign in to comment.