Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sigverify wraps each PacketBatch with Arc #3035

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use {
leader_schedule_cache::LeaderScheduleCache,
},
solana_measure::measure::Measure,
solana_perf::packet::{to_packet_batches, PacketBatch},
solana_perf::packet::{to_arc_packet_batches, PacketBatch},
solana_poh::poh_recorder::{create_test_recorder, PohRecorder, WorkingBankEntry},
solana_runtime::{
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
Expand Down Expand Up @@ -185,7 +185,7 @@ fn make_transfer_transaction_with_compute_unit_price(
}

struct PacketsPerIteration {
packet_batches: Vec<PacketBatch>,
packet_batches: Vec<Arc<PacketBatch>>,
transactions: Vec<Transaction>,
packets_per_batch: usize,
}
Expand All @@ -209,7 +209,7 @@ impl PacketsPerIteration {
mint_txs_percentage,
);

let packet_batches: Vec<PacketBatch> = to_packet_batches(&transactions, packets_per_batch);
let packet_batches = to_arc_packet_batches(&transactions, packets_per_batch);
assert_eq!(packet_batches.len(), batches_per_iteration);
Self {
packet_batches,
Expand All @@ -224,7 +224,7 @@ impl PacketsPerIteration {
let sig: [u8; 64] = std::array::from_fn(|_| thread_rng().gen::<u8>());
tx.signatures[0] = Signature::from(sig);
}
self.packet_batches = to_packet_batches(&self.transactions, self.packets_per_batch);
self.packet_batches = to_arc_packet_batches(&self.transactions, self.packets_per_batch);
}
}

Expand Down
12 changes: 5 additions & 7 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use {
solana_core::validator::BlockProductionMethod,
solana_perf::packet::to_arc_packet_batches,
apfitzge marked this conversation as resolved.
Show resolved Hide resolved
solana_vote_program::{vote_state::TowerSync, vote_transaction::new_tower_sync_transaction},
};

Expand Down Expand Up @@ -34,10 +35,7 @@ use {
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path_auto_delete,
},
solana_perf::{
packet::{to_packet_batches, Packet},
test_tx::test_tx,
},
solana_perf::{packet::Packet, test_tx::test_tx},
solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry},
solana_runtime::{
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
Expand Down Expand Up @@ -266,11 +264,11 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
assert!(r.is_ok(), "sanity parallel execution");
}
bank.clear_signatures();
let verified: Vec<_> = to_packet_batches(&transactions, PACKETS_PER_BATCH);
let verified: Vec<_> = to_arc_packet_batches(&transactions, PACKETS_PER_BATCH);
let vote_packets = vote_txs.map(|vote_txs| {
let mut packet_batches = to_packet_batches(&vote_txs, PACKETS_PER_BATCH);
let mut packet_batches = to_arc_packet_batches(&vote_txs, PACKETS_PER_BATCH);
for batch in packet_batches.iter_mut() {
for packet in batch.iter_mut() {
for packet in Arc::make_mut(batch).iter_mut() {
apfitzge marked this conversation as resolved.
Show resolved Hide resolved
packet.meta_mut().set_simple_vote(true);
}
}
Expand Down
21 changes: 11 additions & 10 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ mod tests {
get_tmp_ledger_path_auto_delete,
leader_schedule_cache::LeaderScheduleCache,
},
solana_perf::packet::{to_packet_batches, PacketBatch},
solana_perf::packet::{to_arc_packet_batches, PacketBatch},
solana_poh::{
poh_recorder::{
create_test_recorder, PohRecorderError, Record, RecordTransactionsSummary,
Expand Down Expand Up @@ -994,10 +994,11 @@ mod tests {
}

pub fn convert_from_old_verified(
mut with_vers: Vec<(PacketBatch, Vec<u8>)>,
) -> Vec<PacketBatch> {
mut with_vers: Vec<(Arc<PacketBatch>, Vec<u8>)>,
) -> Vec<Arc<PacketBatch>> {
with_vers.iter_mut().for_each(|(b, v)| {
b.iter_mut()
Arc::make_mut(b)
.iter_mut()
.zip(v)
.for_each(|(p, f)| p.meta_mut().set_discard(*f == 0))
});
Expand Down Expand Up @@ -1072,7 +1073,7 @@ mod tests {
let tx_anf = system_transaction::transfer(&keypair, &to3, 1, start_hash);

// send 'em over
let packet_batches = to_packet_batches(&[tx_no_ver, tx_anf, tx], 3);
let packet_batches = to_arc_packet_batches(&[tx_no_ver, tx_anf, tx], 3);

// glad they all fit
assert_eq!(packet_batches.len(), 1);
Expand Down Expand Up @@ -1160,7 +1161,7 @@ mod tests {
let tx =
system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, genesis_config.hash());

let packet_batches = to_packet_batches(&[tx], 1);
let packet_batches = to_arc_packet_batches(&[tx], 1);
let packet_batches = packet_batches
.into_iter()
.map(|batch| (batch, vec![1u8]))
Expand All @@ -1173,7 +1174,7 @@ mod tests {
// Process a second batch that uses the same from account, so conflicts with above TX
let tx =
system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, genesis_config.hash());
let packet_batches = to_packet_batches(&[tx], 1);
let packet_batches = to_arc_packet_batches(&[tx], 1);
let packet_batches = packet_batches
.into_iter()
.map(|batch| (batch, vec![1u8]))
Expand Down Expand Up @@ -1472,9 +1473,9 @@ mod tests {
})
.collect_vec();

let non_vote_packet_batches = to_packet_batches(&txs, 10);
let tpu_packet_batches = to_packet_batches(&tpu_votes, 10);
let gossip_packet_batches = to_packet_batches(&gossip_votes, 10);
let non_vote_packet_batches = to_arc_packet_batches(&txs, 10);
let tpu_packet_batches = to_arc_packet_batches(&tpu_votes, 10);
let gossip_packet_batches = to_arc_packet_batches(&gossip_votes, 10);

// Send em all
[
Expand Down
10 changes: 6 additions & 4 deletions core/src/banking_stage/packet_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl PacketDeserializer {
mod tests {
use {
super::*,
solana_perf::packet::to_packet_batches,
solana_perf::packet::to_arc_packet_batches,
solana_sdk::{
hash::Hash, pubkey::Pubkey, signature::Keypair, system_transaction,
transaction::Transaction,
Expand All @@ -270,7 +270,7 @@ mod tests {
#[test]
fn test_deserialize_and_collect_packets_simple_batches() {
let transactions = vec![random_transfer(), random_transfer()];
let packet_batches = to_packet_batches(&transactions, 1);
let packet_batches = to_arc_packet_batches(&transactions, 1);
assert_eq!(packet_batches.len(), 2);

let packet_count: usize = packet_batches.iter().map(|x| x.len()).sum();
Expand All @@ -289,9 +289,11 @@ mod tests {
#[test]
fn test_deserialize_and_collect_packets_simple_batches_with_failure() {
let transactions = vec![random_transfer(), random_transfer()];
let mut packet_batches = to_packet_batches(&transactions, 1);
let mut packet_batches = to_arc_packet_batches(&transactions, 1);
assert_eq!(packet_batches.len(), 2);
packet_batches[0][0].meta_mut().set_discard(true);
Arc::make_mut(&mut packet_batches[0])[0]
.meta_mut()
.set_discard(true);

let packet_count: usize = packet_batches.iter().map(|x| x.len()).sum();
let results = PacketDeserializer::deserialize_and_collect_packets(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,6 @@ mod tests {
tests::create_slow_genesis_config,
},
banking_trace::BankingPacketBatch,
sigverify::SigverifyTracerPacketStats,
},
crossbeam_channel::{unbounded, Receiver, Sender},
itertools::Itertools,
Expand All @@ -675,7 +674,7 @@ mod tests {
blockstore::Blockstore, genesis_utils::GenesisConfigInfo,
get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache,
},
solana_perf::packet::{to_packet_batches, PacketBatch, NUM_PACKETS},
solana_perf::packet::{to_arc_packet_batches, NUM_PACKETS},
solana_poh::poh_recorder::{PohRecorder, Record, WorkingBankEntry},
solana_runtime::bank::Bank,
solana_sdk::{
Expand All @@ -700,7 +699,7 @@ mod tests {
_entry_receiver: Receiver<WorkingBankEntry>,
_record_receiver: Receiver<Record>,
poh_recorder: Arc<RwLock<PohRecorder>>,
banking_packet_sender: Sender<Arc<(Vec<PacketBatch>, Option<SigverifyTracerPacketStats>)>>,
banking_packet_sender: Sender<BankingPacketBatch>,

consume_work_receivers: Vec<Receiver<ConsumeWork>>,
finished_consume_work_sender: Sender<FinishedConsumeWork>,
Expand Down Expand Up @@ -790,7 +789,7 @@ mod tests {
}

fn to_banking_packet_batch(txs: &[Transaction]) -> BankingPacketBatch {
let packet_batch = to_packet_batches(txs, NUM_PACKETS);
let packet_batch = to_arc_packet_batches(txs, NUM_PACKETS);
Arc::new((packet_batch, None))
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/banking_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use {
thiserror::Error,
};

pub type BankingPacketBatch = Arc<(Vec<PacketBatch>, Option<SigverifyTracerPacketStats>)>;
pub type BankingPacketBatch = Arc<(Vec<Arc<PacketBatch>>, Option<SigverifyTracerPacketStats>)>;
pub type BankingPacketSender = TracedSender;
pub type BankingPacketReceiver = Receiver<BankingPacketBatch>;
pub type TracerThreadResult = Result<(), TraceError>;
Expand Down Expand Up @@ -65,7 +65,7 @@ pub struct BankingTracer {
#[cfg_attr(
feature = "frozen-abi",
derive(AbiExample),
frozen_abi(digest = "F5GH1poHbPqipU4DB3MczhSxHZw4o27f3C7QnMVirFci")
frozen_abi(digest = "GxZSv4cLjY97v6UospZnyfKurN6UciYqDvFgZJByP9KW")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No actual change in serialization because arc itself isnt' serialized, only contents.

)]
#[derive(Serialize, Deserialize, Debug)]
pub struct TimedTracedEvent(pub std::time::SystemTime, pub TracedEvent);
Expand Down Expand Up @@ -373,12 +373,12 @@ impl TracedSender {
pub mod for_test {
use {
super::*,
solana_perf::{packet::to_packet_batches, test_tx::test_tx},
solana_perf::{packet::to_arc_packet_batches, test_tx::test_tx},
tempfile::TempDir,
};

pub fn sample_packet_batch() -> BankingPacketBatch {
BankingPacketBatch::new((to_packet_batches(&vec![test_tx(); 4], 10), None))
BankingPacketBatch::new((to_arc_packet_batches(&vec![test_tx(); 4], 10), None))
}

pub fn drop_and_clean_temp_dir_unless_suppressed(temp_dir: TempDir) {
Expand Down
6 changes: 3 additions & 3 deletions core/src/cluster_info_vote_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl ClusterInfoVoteListener {
fn verify_votes(
votes: Vec<Transaction>,
bank_forks: &RwLock<BankForks>,
) -> (Vec<Transaction>, Vec<PacketBatch>) {
) -> (Vec<Transaction>, Vec<Arc<PacketBatch>>) {
let mut packet_batches = packet::to_packet_batches(&votes, 1);

// Votes should already be filtered by this point.
Expand Down Expand Up @@ -299,7 +299,7 @@ impl ClusterInfoVoteListener {
if !keys.any(|(i, key)| tx.message.is_signer(i) && key == authorized_voter) {
return None;
}
Some((tx, packet_batch))
Some((tx, Arc::new(packet_batch)))
})
.unzip()
}
Expand Down Expand Up @@ -1447,7 +1447,7 @@ mod tests {
assert!(packets.is_empty());
}

fn verify_packets_len(packets: &[PacketBatch], ref_value: usize) {
fn verify_packets_len(packets: &[Arc<PacketBatch>], ref_value: usize) {
let num_packets: usize = packets.iter().map(|pb| pb.len()).sum();
assert_eq!(num_packets, ref_value);
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use {
},
solana_perf::{cuda_runtime::PinnedVec, packet::PacketBatch, recycler::Recycler, sigverify},
solana_sdk::{packet::Packet, saturating_add_assign},
std::sync::Arc,
};

#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
Expand Down Expand Up @@ -128,7 +129,7 @@ impl SigVerifier for TransactionSigVerifier {
) -> Result<(), SigVerifyServiceError<Self::SendType>> {
let tracer_packet_stats_to_send = std::mem::take(&mut self.tracer_packet_stats);
self.packet_sender.send(BankingPacketBatch::new((
packet_batches,
packet_batches.into_iter().map(Arc::new).collect(),
apfitzge marked this conversation as resolved.
Show resolved Hide resolved
Some(tracer_packet_stats_to_send),
)))?;
Ok(())
Expand Down
34 changes: 24 additions & 10 deletions perf/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use {
net::SocketAddr,
ops::{Index, IndexMut},
slice::{Iter, IterMut, SliceIndex},
sync::Arc,
},
};

Expand Down Expand Up @@ -210,17 +211,30 @@ impl From<PacketBatch> for Vec<Packet> {
}
}

fn to_packet_batch_iter<T: Serialize>(
items: &[T],
chunk_size: usize,
) -> impl Iterator<Item = PacketBatch> + '_ {
items.chunks(chunk_size).map(|batch_items| {
let mut batch = PacketBatch::with_capacity(batch_items.len());
batch.resize(batch_items.len(), Packet::default());
for (item, packet) in batch_items.iter().zip(batch.packets.iter_mut()) {
Packet::populate_packet(packet, None, item).expect("serialize request");
}
batch
})
}

pub fn to_packet_batches<T: Serialize>(items: &[T], chunk_size: usize) -> Vec<PacketBatch> {
items
.chunks(chunk_size)
.map(|batch_items| {
let mut batch = PacketBatch::with_capacity(batch_items.len());
batch.resize(batch_items.len(), Packet::default());
for (item, packet) in batch_items.iter().zip(batch.packets.iter_mut()) {
Packet::populate_packet(packet, None, item).expect("serialize request");
}
batch
})
to_packet_batch_iter(items, chunk_size).collect()
}

pub fn to_arc_packet_batches<T: Serialize>(
items: &[T],
chunk_size: usize,
) -> Vec<Arc<PacketBatch>> {
to_packet_batch_iter(items, chunk_size)
.map(Arc::new)
.collect()
}

Expand Down
Loading