diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 32cc3fbe44dda1..33dd38f9648265 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -65,7 +65,6 @@ pub mod unprocessed_transaction_storage; mod consume_worker; mod decision_maker; mod forward_packet_batches_by_accounts; -mod forward_worker; mod immutable_deserialized_packet; mod latest_unprocessed_votes; mod leader_slot_timing_metrics; @@ -74,7 +73,6 @@ mod packet_deserializer; mod packet_filter; mod packet_receiver; mod read_write_account_set; -#[allow(dead_code)] mod scheduler_messages; mod transaction_scheduler; diff --git a/core/src/banking_stage/forward_worker.rs b/core/src/banking_stage/forward_worker.rs deleted file mode 100644 index 61cf311f0a8cf8..00000000000000 --- a/core/src/banking_stage/forward_worker.rs +++ /dev/null @@ -1,219 +0,0 @@ -use { - super::{ - forwarder::Forwarder, - scheduler_messages::{FinishedForwardWork, ForwardWork}, - ForwardOption, - }, - crate::banking_stage::LikeClusterInfo, - crossbeam_channel::{Receiver, RecvError, SendError, Sender}, - thiserror::Error, -}; - -#[derive(Debug, Error)] -pub enum ForwardWorkerError { - #[error("Failed to receive work from scheduler: {0}")] - Recv(#[from] RecvError), - #[error("Failed to send finalized forward work to scheduler: {0}")] - Send(#[from] SendError), -} - -pub(crate) struct ForwardWorker { - forward_receiver: Receiver, - forward_option: ForwardOption, - forwarder: Forwarder, - forwarded_sender: Sender, -} - -#[allow(dead_code)] -impl ForwardWorker { - pub fn new( - forward_receiver: Receiver, - forward_option: ForwardOption, - forwarder: Forwarder, - forwarded_sender: Sender, - ) -> Self { - Self { - forward_receiver, - forward_option, - forwarder, - forwarded_sender, - } - } - - pub fn run(self) -> Result<(), ForwardWorkerError> { - loop { - let work = self.forward_receiver.recv()?; - self.forward_loop(work)?; - } - } - - fn forward_loop(&self, work: ForwardWork) -> Result<(), ForwardWorkerError> { - for work in try_drain_iter(work, &self.forward_receiver) { - let (res, _num_packets, _forward_us, _leader_pubkey) = self.forwarder.forward_packets( - &self.forward_option, - work.packets.iter().map(|p| p.original_packet()), - ); - match res { - Ok(()) => self.forwarded_sender.send(FinishedForwardWork { - work, - successful: true, - })?, - Err(_err) => return self.failed_forward_drain(work), - }; - } - Ok(()) - } - - fn failed_forward_drain(&self, work: ForwardWork) -> Result<(), ForwardWorkerError> { - for work in try_drain_iter(work, &self.forward_receiver) { - self.forwarded_sender.send(FinishedForwardWork { - work, - successful: false, - })?; - } - Ok(()) - } -} - -/// Helper function to create an non-blocking iterator over work in the receiver, -/// starting with the given work item. -fn try_drain_iter(work: T, receiver: &Receiver) -> impl Iterator + '_ { - std::iter::once(work).chain(receiver.try_iter()) -} - -#[cfg(test)] -mod tests { - use { - super::*, - crate::banking_stage::{ - immutable_deserialized_packet::ImmutableDeserializedPacket, - tests::{create_slow_genesis_config, new_test_cluster_info, simulate_poh}, - }, - crossbeam_channel::unbounded, - solana_client::connection_cache::ConnectionCache, - solana_gossip::cluster_info::ClusterInfo, - solana_ledger::{ - blockstore::Blockstore, genesis_utils::GenesisConfigInfo, - get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, - }, - solana_perf::packet::to_packet_batches, - solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry}, - solana_runtime::bank::Bank, - solana_sdk::{ - genesis_config::GenesisConfig, poh_config::PohConfig, pubkey::Pubkey, - signature::Keypair, system_transaction, - }, - std::{ - sync::{atomic::AtomicBool, Arc, RwLock}, - thread::JoinHandle, - }, - tempfile::TempDir, - }; - - // Helper struct to create tests that hold channels, files, etc. - // such that our tests can be more easily set up and run. - struct TestFrame { - mint_keypair: Keypair, - genesis_config: GenesisConfig, - _ledger_path: TempDir, - _entry_receiver: Receiver, - _poh_simulator: JoinHandle<()>, - - forward_sender: Sender, - forwarded_receiver: Receiver, - } - - fn setup_test_frame() -> (TestFrame, ForwardWorker>) { - let GenesisConfigInfo { - genesis_config, - mint_keypair, - .. - } = create_slow_genesis_config(10_000); - let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let blockstore = Blockstore::open(ledger_path.path()) - .expect("Expected to be able to open database ledger"); - let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new( - bank.tick_height(), - bank.last_blockhash(), - bank.clone(), - Some((4, 4)), - bank.ticks_per_slot(), - Arc::new(blockstore), - &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), - &PohConfig::default(), - Arc::new(AtomicBool::default()), - ); - let poh_recorder = Arc::new(RwLock::new(poh_recorder)); - let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - - let (_local_node, cluster_info) = new_test_cluster_info(None); - let cluster_info = Arc::new(cluster_info); - let forwarder = Forwarder::new( - poh_recorder, - bank_forks, - cluster_info, - Arc::new(ConnectionCache::new("test")), - Arc::default(), - ); - - let (forward_sender, forward_receiver) = unbounded(); - let (forwarded_sender, forwarded_receiver) = unbounded(); - let worker = ForwardWorker::new( - forward_receiver, - ForwardOption::ForwardTransaction, - forwarder, - forwarded_sender, - ); - - ( - TestFrame { - mint_keypair, - genesis_config, - _ledger_path: ledger_path, - _entry_receiver: entry_receiver, - _poh_simulator: poh_simulator, - forward_sender, - forwarded_receiver, - }, - worker, - ) - } - - #[test] - fn test_worker_forward_simple() { - let (test_frame, worker) = setup_test_frame(); - let TestFrame { - mint_keypair, - genesis_config, - forward_sender, - forwarded_receiver, - .. - } = &test_frame; - let worker_thread = std::thread::spawn(move || worker.run()); - - let pubkey1 = Pubkey::new_unique(); - let pubkey2 = Pubkey::new_unique(); - - let txs = vec![ - system_transaction::transfer(mint_keypair, &pubkey1, 2, genesis_config.hash()), - system_transaction::transfer(mint_keypair, &pubkey2, 2, genesis_config.hash()), - ]; - - let packets = to_packet_batches(&txs, 2); - assert_eq!(packets.len(), 1); - let packets = packets[0] - .into_iter() - .cloned() - .map(|p| ImmutableDeserializedPacket::new(p).unwrap()) - .map(Arc::new) - .collect(); - forward_sender.send(ForwardWork { packets }).unwrap(); - let forwarded = forwarded_receiver.recv().unwrap(); - assert!(forwarded.successful); - - drop(test_frame); - let _ = worker_thread.join().unwrap(); - } -} diff --git a/core/src/banking_stage/scheduler_messages.rs b/core/src/banking_stage/scheduler_messages.rs index ee5c4ebeef9738..d93d2d6dbb6c52 100644 --- a/core/src/banking_stage/scheduler_messages.rs +++ b/core/src/banking_stage/scheduler_messages.rs @@ -1,7 +1,6 @@ use { - super::immutable_deserialized_packet::ImmutableDeserializedPacket, solana_sdk::{clock::Slot, transaction::SanitizedTransaction}, - std::{fmt::Display, sync::Arc}, + std::fmt::Display, }; /// A unique identifier for a transaction batch. @@ -45,22 +44,9 @@ pub struct ConsumeWork { pub max_age_slots: Vec, } -/// Message: [Scheduler -> Worker] -/// Transactions to be forwarded to the next leader(s) -pub struct ForwardWork { - pub packets: Vec>, -} - /// Message: [Worker -> Scheduler] /// Processed transactions. pub struct FinishedConsumeWork { pub work: ConsumeWork, pub retryable_indexes: Vec, } - -/// Message: [Worker -> Scheduler] -/// Forwarded transactions. -pub struct FinishedForwardWork { - pub work: ForwardWork, - pub successful: bool, -}