Skip to content

Commit

Permalink
fix: optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
atanmarko committed Jul 19, 2024
1 parent 164c94b commit 04123fa
Showing 1 changed file with 43 additions and 35 deletions.
78 changes: 43 additions & 35 deletions zero_bin/prover/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use trace_decoder::{
use tracing::info;
use zero_bin_common::fs::generate_block_proof_file_name;

const GENERATION_INPUTS_AGG_CHUNK_SIZE: usize = 2;
// Limit the number of segments that are proven in parallel
// due to prevent excessive memory consumption
const GENERATION_INPUTS_AGG_CHUNK_SIZE_INNER: usize = 2;
const GENERATION_INPUTS_AGG_CHUNK_SIZE: usize = 5;

#[derive(Debug, Deserialize, Serialize)]
pub struct BlockProverInput {
Expand Down Expand Up @@ -69,51 +72,55 @@ impl BlockProverInput {
};

// Aggregate transaction proofs
let txn_agg_proof = ops::TxnAggProof {
let txn_agg_proof = ops::TxnAggProof {
save_inputs_on_error,
};

// Map the transactions to a stream of transaction proofs.
let tx_proof_futs : FuturesUnordered<_> = txs.chunks(GENERATION_INPUTS_AGG_CHUNK_SIZE)
.enumerate()
.map(|(idy, chunk)|
{
// Tweak inner and outer chunk size if needed for memory/utilization balance
let mut all_txn_aggregatable_proofs = Vec::new();
for chunk in txs.chunks(GENERATION_INPUTS_AGG_CHUNK_SIZE) {
// Map the chunk of transactions to transaction proofs.
let chunk_tx_proof_futs: FuturesUnordered<_> = chunk
.iter()
.enumerate()
.map(|(idx, txn)| {
let data_iterator = SegmentDataIterator {
partial_next_data: None,
inputs: txn,
max_cpu_len_log: Some(max_cpu_len_log),
};

Directive::map(IndexedStream::from(data_iterator), &seg_ops)
.fold(&agg_ops)
.run(runtime)
.map(move |e| {
e.map(|p| (idx, proof_gen::proof_types::TxnAggregatableProof::from(p)))
.chunks(GENERATION_INPUTS_AGG_CHUNK_SIZE_INNER)
.map(|generation_inputs| {
let chunk_tx_proof_futs: FuturesUnordered<_> = generation_inputs
.iter()
.enumerate()
.map(|(idx, generation_inputs)| {
let data_iterator = SegmentDataIterator {
partial_next_data: None,
inputs: generation_inputs,
max_cpu_len_log: Some(max_cpu_len_log),
};

Directive::map(IndexedStream::from(data_iterator), &seg_ops)
.fold(&agg_ops)
.run(runtime)
.map(move |e| {
e.map(|p| {
(idx, proof_gen::proof_types::TxnAggregatableProof::from(p))
})
})
})
.collect();

Directive::fold(IndexedStream::new(chunk_tx_proof_futs), &txn_agg_proof)
.run(runtime)
})
.collect();

let result = Directive::fold(
IndexedStream::new(chunk_tx_proof_futs),
&txn_agg_proof,
).run(runtime);
result.map(move |p| (idy, p))
})
.collect();

let mut txn_aggregatable_proofs = Vec::new();
for fut in tx_proof_futs {
let (_idx, txn_aggregatable_proof) = fut.await;
txn_aggregatable_proofs.push(txn_aggregatable_proof?);
// Await for the chunk to be computed
all_txn_aggregatable_proofs.extend(
futures::future::join_all(chunk_tx_proof_futs)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?,
);
}

// Fold the transaction proof stream into a single transaction proof.
// Fold all the transaction proofs into a single transaction proof.
let final_txn_proof = Directive::fold(
IndexedStream::from(txn_aggregatable_proofs.into_iter()),
IndexedStream::from(all_txn_aggregatable_proofs.into_iter()),
&txn_agg_proof,
)
.run(runtime)
Expand All @@ -128,6 +135,7 @@ impl BlockProverInput {
None => None,
};

// Generated the block proof
let block_proof = paladin::directive::Literal(proof)
.map(&ops::BlockProof {
prev,
Expand Down

0 comments on commit 04123fa

Please sign in to comment.