Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add caching for get_block #346

Merged
merged 6 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uint = "0.9.5"
url = "2.5.2"
lru = "0.12.3"

# zero-bin related dependencies
ops = { path = "zero_bin/ops" }
Expand Down
1 change: 1 addition & 0 deletions mpt_trie/src/trie_hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ mod tests {
const NUM_INSERTS_FOR_ETH_TRIE_CRATE_MASSIVE_TEST: usize = 1000;
const NODES_PER_BRANCH_FOR_HASH_REPLACEMENT_TEST: usize = 200;

#[allow(dead_code)]
#[derive(Copy, Clone, Debug)]
struct U256Rlpable(U256);

Expand Down
12 changes: 7 additions & 5 deletions zero_bin/leader/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ pub(crate) async fn client_main(
block_interval: BlockInterval,
mut params: ProofParams,
) -> Result<()> {
let cached_provider = rpc::provider::CachedProvider::new(build_http_retry_provider(
rpc_params.rpc_url.clone(),
rpc_params.backoff,
rpc_params.max_retries,
));

let prover_input = rpc::prover_input(
&build_http_retry_provider(
rpc_params.rpc_url,
rpc_params.backoff,
rpc_params.max_retries,
),
&cached_provider,
block_interval,
params.checkpoint_block_number.into(),
rpc_params.rpc_type,
Expand Down
1 change: 1 addition & 0 deletions zero_bin/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ futures = { workspace = true }
url = { workspace = true }
__compat_primitive_types = { workspace = true }
tower = { workspace = true, features = ["retry"] }
lru = { workspace = true }

# Local dependencies
compat = { workspace = true }
Expand Down
12 changes: 8 additions & 4 deletions zero_bin/rpc/src/jerigon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use trace_decoder::trace_protocol::{
};

use super::fetch_other_block_data;
use crate::provider::CachedProvider;

/// Transaction traces retrieved from Erigon zeroTracer.
#[derive(Debug, Deserialize)]
Expand All @@ -23,7 +24,7 @@ pub struct ZeroTxResult {
pub struct ZeroBlockWitness(TrieCompact);

pub async fn block_prover_input<ProviderT, TransportT>(
provider: ProviderT,
cached_provider: &CachedProvider<ProviderT, TransportT>,
target_block_id: BlockId,
checkpoint_state_trie_root: B256,
) -> anyhow::Result<BlockProverInput>
Expand All @@ -32,20 +33,23 @@ where
TransportT: Transport + Clone,
{
// Grab trace information
let tx_results = provider
let tx_results = cached_provider
.as_provider()
.raw_request::<_, Vec<ZeroTxResult>>(
"debug_traceBlockByNumber".into(),
(target_block_id, json!({"tracer": "zeroTracer"})),
)
.await?;

// Grab block witness info (packed as combined trie pre-images)
let block_witness = provider
let block_witness = cached_provider
.as_provider()
.raw_request::<_, ZeroBlockWitness>("eth_getWitness".into(), vec![target_block_id])
.await?;

let other_data =
fetch_other_block_data(provider, target_block_id, checkpoint_state_trie_root).await?;
fetch_other_block_data(cached_provider, target_block_id, checkpoint_state_trie_root)
.await?;

// Assemble
Ok(BlockProverInput {
Expand Down
48 changes: 30 additions & 18 deletions zero_bin/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ use zero_bin_common::block_interval::BlockInterval;

pub mod jerigon;
pub mod native;
pub mod provider;
pub mod retry;

use crate::provider::CachedProvider;

const PREVIOUS_HASHES_COUNT: usize = 256;

/// The RPC type.
Expand All @@ -28,7 +31,7 @@ pub enum RpcType {

/// Obtain the prover input for a given block interval
pub async fn prover_input<ProviderT, TransportT>(
provider: &ProviderT,
cached_provider: &CachedProvider<ProviderT, TransportT>,
block_interval: BlockInterval,
checkpoint_block_id: BlockId,
rpc_type: RpcType,
Expand All @@ -38,10 +41,9 @@ where
TransportT: Transport + Clone,
{
// Grab interval checkpoint block state trie
let checkpoint_state_trie_root = provider
let checkpoint_state_trie_root = cached_provider
.get_block(checkpoint_block_id, BlockTransactionsKind::Hashes)
.await?
.context("block does not exist")?
.header
.state_root;

Expand All @@ -52,10 +54,12 @@ where
let block_id = BlockId::Number(BlockNumberOrTag::Number(block_num));
let block_prover_input = match rpc_type {
RpcType::Jerigon => {
jerigon::block_prover_input(&provider, block_id, checkpoint_state_trie_root).await?
jerigon::block_prover_input(cached_provider, block_id, checkpoint_state_trie_root)
.await?
}
RpcType::Native => {
native::block_prover_input(&provider, block_id, checkpoint_state_trie_root).await?
native::block_prover_input(cached_provider, block_id, checkpoint_state_trie_root)
.await?
}
};

Expand All @@ -68,35 +72,43 @@ where

/// Fetches other block data
async fn fetch_other_block_data<ProviderT, TransportT>(
provider: ProviderT,
cached_provider: &CachedProvider<ProviderT, TransportT>,
target_block_id: BlockId,
checkpoint_state_trie_root: B256,
) -> anyhow::Result<OtherBlockData>
where
ProviderT: Provider<TransportT>,
TransportT: Transport + Clone,
{
let target_block = provider
let target_block = cached_provider
.get_block(target_block_id, BlockTransactionsKind::Hashes)
.await?
.context("target block does not exist")?;
.await?;
let target_block_number = target_block
.header
.number
.context("target block is missing field `number`")?;
let chain_id = provider.get_chain_id().await?;
let chain_id = cached_provider.as_provider().get_chain_id().await?;

// For one block, we will fetch 128 previous blocks to get hashes instead of
// 256. But for two consecutive blocks (odd and even) we would fetch 256
// previous blocks in total. To overcome this, we add an offset so that we
// always start fetching from an odd index and eventually skip the additional
// block for an even `target_block_number`.
let odd_offset: i128 = target_block_number as i128 % 2;

let previous_block_numbers =
std::iter::successors(Some(target_block_number as i128 - 1), |&it| Some(it - 1))
.take(PREVIOUS_HASHES_COUNT)
.filter(|i| *i >= 0)
.collect::<Vec<_>>();
std::iter::successors(Some(target_block_number as i128 - 1 + odd_offset), |&it| {
Some(it - 1)
})
.take(PREVIOUS_HASHES_COUNT)
.filter(|i| *i >= 0)
.collect::<Vec<_>>();
let concurrency = previous_block_numbers.len();
let collected_hashes = futures::stream::iter(
previous_block_numbers
.chunks(2) // we get hash for previous and current block with one request
.map(|block_numbers| {
let provider = &provider;
let cached_provider = &cached_provider;
let block_num = &block_numbers[0];
let previos_block_num = if block_numbers.len() > 1 {
Some(block_numbers[1])
Expand All @@ -105,11 +117,10 @@ where
None
};
async move {
let block = provider
let block = cached_provider
.get_block((*block_num as u64).into(), BlockTransactionsKind::Hashes)
.await
.context("couldn't get block")?
.context("no such block")?;
.context("couldn't get block")?;
anyhow::Ok([
(block.header.hash, Some(*block_num)),
(Some(block.header.parent_hash), previos_block_num),
Expand All @@ -126,6 +137,7 @@ where
collected_hashes
.into_iter()
.flatten()
.skip(odd_offset as usize)
.for_each(|(hash, block_num)| {
if let (Some(hash), Some(block_num)) = (hash, block_num) {
// Most recent previous block hash is expected at the end of the array
Expand Down
9 changes: 8 additions & 1 deletion zero_bin/rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::io;

use alloy::rpc::types::eth::BlockId;
use clap::{Parser, ValueHint};
use rpc::provider::CachedProvider;
use rpc::{retry::build_http_retry_provider, RpcType};
use tracing_subscriber::{prelude::*, EnvFilter};
use url::Url;
Expand Down Expand Up @@ -53,9 +54,15 @@ impl Cli {
checkpoint_block_number.unwrap_or((start_block - 1).into());
let block_interval = BlockInterval::Range(start_block..end_block + 1);

let cached_provider = CachedProvider::new(build_http_retry_provider(
rpc_url.clone(),
backoff,
max_retries,
));

// Retrieve prover input from the Erigon node
let prover_input = rpc::prover_input(
&build_http_retry_provider(rpc_url, backoff, max_retries),
&cached_provider,
block_interval,
checkpoint_block_number,
rpc_type,
Expand Down
21 changes: 11 additions & 10 deletions zero_bin/rpc/src/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ use alloy::{
rpc::types::eth::{BlockId, BlockTransactionsKind},
transports::Transport,
};
use anyhow::Context as _;
use futures::try_join;
use prover::BlockProverInput;
use trace_decoder::trace_protocol::BlockTrace;

use crate::provider::CachedProvider;

mod state;
mod txn;

type CodeDb = HashMap<__compat_primitive_types::H256, Vec<u8>>;

/// Fetches the prover input for the given BlockId.
pub async fn block_prover_input<ProviderT, TransportT>(
provider: &ProviderT,
provider: &CachedProvider<ProviderT, TransportT>,
block_number: BlockId,
checkpoint_state_trie_root: B256,
) -> anyhow::Result<BlockProverInput>
Expand All @@ -27,8 +28,8 @@ where
TransportT: Transport + Clone,
{
let (block_trace, other_data) = try_join!(
process_block_trace(&provider, block_number),
crate::fetch_other_block_data(&provider, block_number, checkpoint_state_trie_root,)
process_block_trace(provider, block_number),
crate::fetch_other_block_data(provider, block_number, checkpoint_state_trie_root,)
)?;

Ok(BlockProverInput {
Expand All @@ -39,20 +40,20 @@ where

/// Processes the block with the given block number and returns the block trace.
async fn process_block_trace<ProviderT, TransportT>(
provider: &ProviderT,
cached_provider: &CachedProvider<ProviderT, TransportT>,
block_number: BlockId,
) -> anyhow::Result<BlockTrace>
where
ProviderT: Provider<TransportT>,
TransportT: Transport + Clone,
{
let block = provider
let block = cached_provider
.get_block(block_number, BlockTransactionsKind::Full)
.await?
.context("target block does not exist")?;
.await?;

let (code_db, txn_info) = txn::process_transactions(&block, provider).await?;
let trie_pre_images = state::process_state_witness(provider, block, &txn_info).await?;
let (code_db, txn_info) =
txn::process_transactions(&block, cached_provider.as_provider()).await?;
let trie_pre_images = state::process_state_witness(cached_provider, block, &txn_info).await?;

Ok(BlockTrace {
txn_info,
Expand Down
17 changes: 10 additions & 7 deletions zero_bin/rpc/src/native/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ use trace_decoder::trace_protocol::{
SeparateTriePreImages, TrieDirect, TxnInfo,
};

use crate::provider::CachedProvider;
use crate::Compat;

/// Processes the state witness for the given block.
pub async fn process_state_witness<ProviderT, TransportT>(
provider: &ProviderT,
cached_provider: &CachedProvider<ProviderT, TransportT>,
block: Block,
txn_infos: &[TxnInfo],
) -> anyhow::Result<BlockTraceTriePreImages>
Expand All @@ -32,15 +33,15 @@ where
.header
.number
.context("Block number not returned with block")?;
let prev_state_root = provider
let prev_state_root = cached_provider
.get_block((block_number - 1).into(), BlockTransactionsKind::Hashes)
.await?
.context("Failed to get previous block")?
.header
.state_root;

let (state, storage_proofs) =
generate_state_witness(prev_state_root, state_access, provider, block_number).await?;
generate_state_witness(prev_state_root, state_access, cached_provider, block_number)
.await?;

Ok(BlockTraceTriePreImages::Separate(SeparateTriePreImages {
state: SeparateTriePreImage::Direct(TrieDirect(state.build())),
Expand Down Expand Up @@ -97,7 +98,7 @@ pub fn process_states_access(
async fn generate_state_witness<ProviderT, TransportT>(
prev_state_root: B256,
accounts_state: HashMap<Address, HashSet<StorageKey>>,
provider: &ProviderT,
cached_provider: &CachedProvider<ProviderT, TransportT>,
block_number: u64,
) -> anyhow::Result<(
PartialTrieBuilder<HashedPartialTrie>,
Expand All @@ -111,7 +112,7 @@ where
let mut storage_proofs = HashMap::<B256, PartialTrieBuilder<HashedPartialTrie>>::new();

let (account_proofs, next_account_proofs) =
fetch_proof_data(accounts_state, provider, block_number).await?;
fetch_proof_data(accounts_state, cached_provider, block_number).await?;

// Insert account proofs
for (address, proof) in account_proofs.into_iter() {
Expand Down Expand Up @@ -146,7 +147,7 @@ where
/// Fetches the proof data for the given accounts and associated storage keys.
async fn fetch_proof_data<ProviderT, TransportT>(
accounts_state: HashMap<Address, HashSet<StorageKey>>,
provider: &ProviderT,
provider: &CachedProvider<ProviderT, TransportT>,
block_number: u64,
) -> anyhow::Result<(
Vec<(Address, EIP1186AccountProofResponse)>,
Expand All @@ -161,6 +162,7 @@ where
.into_iter()
.map(|(address, keys)| async move {
let proof = provider
.as_provider()
.get_proof(address, keys.into_iter().collect())
.block_id((block_number - 1).into())
.await
Expand All @@ -173,6 +175,7 @@ where
.into_iter()
.map(|(address, keys)| async move {
let proof = provider
.as_provider()
.get_proof(address, keys.into_iter().collect())
.block_id(block_number.into())
.await
Expand Down
2 changes: 1 addition & 1 deletion zero_bin/rpc/src/native/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ where
.as_transactions()
.context("No transactions in block")?
.iter()
.map(|tx| super::txn::process_transaction(provider, tx))
.map(|tx| process_transaction(provider, tx))
.collect::<FuturesOrdered<_>>()
.try_fold(
(HashMap::new(), Vec::new()),
Expand Down
Loading
Loading