From 34e9932c7ca10d6f4becd7edf9ce59460f880f56 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Wed, 11 Sep 2024 21:30:29 +0900 Subject: [PATCH] Introduce ledger-tool simulate-block-production (#2733) * Introduce ledger-tool simulate-block-production * Move counting code out of time-sensitive loop * Avoid misleading ::clone() altogether * Use while instead of loop+break * Add comment of using BTreeMap * Reduce simulation jitter due to mem deallocs * Rename to CostTracker::new_from_parent_limits() * Make ::load() take a slice * Clean up retracer code a bit * Add comment about BaningTracer even inside sim * Remove redundant dcou dev-dependencies * Apply suggestions from code review Co-authored-by: Andrew Fitzgerald * Fix up and promote to doc comments * Make warm-up code and doc simpler * Further clean up timed_batches_to_send * Fix wrong units... * Replace new_with_dummy_keypair() with traits * Tweak --no-block-cost-limits description * Remove redundant dev-dependencies * Use RwLock to mimic real ClusterInfo * Fix typo * Refactor too long BankingSimulator::start() * Reduce indent * Calculate required_duration in advance * Use correct format specifier instead of cast * Align formatting by using ::* * Make envs overridable * Add comment for SOLANA_VALIDATOR_EXIT_TIMEOUT * Clarify comment a bit * Fix typoss * Fix typos Co-authored-by: Andrew Fitzgerald * Use correct variant name: DeserializeError * Remove SimulatorLoopLogger::new() * Fix typos more * Add explicit _batch in field names * Avoid unneeded events: Vec<_> buffering * Manually adjust logging code styles * Align name: spawn_sender_loop/enter_simulator_loop * Refactor by introducing {Sender,Simulator}Loop * Fix out-of-sync sim due to timed preprocessing * Fix too-early base_simulation_time creation * Don't log confusing info! after leader slots * Add justification comment of BroadcastStage * Align timeout values * Comment about snapshot_slot=50 * Don't squash all errors unconditionally * Remove repetitive exitence check * Promote no_block_cost_limits logging level * Make ci/run-sanity.sh more robust * Improve wordking of --enable-hash-overrides * Remove marker-file based abortion mechanism * Remove needless touch --------- Co-authored-by: Andrew Fitzgerald --- ci/run-sanity.sh | 42 +- core/Cargo.toml | 5 +- core/benches/forwarder.rs | 2 +- core/src/banking_simulation.rs | 920 ++++++++++++++++++ core/src/banking_stage.rs | 46 +- core/src/banking_stage/forward_worker.rs | 12 +- core/src/banking_stage/forwarder.rs | 15 +- .../scheduler_controller.rs | 17 +- core/src/banking_trace.rs | 10 +- core/src/lib.rs | 1 + core/src/next_leader.rs | 5 +- cost-model/src/cost_tracker.rs | 14 + ledger-tool/Cargo.toml | 2 +- ledger-tool/src/args.rs | 4 + ledger-tool/src/main.rs | 188 +++- ledger/src/blockstore.rs | 14 +- ledger/src/blockstore_processor.rs | 38 +- runtime/src/bank.rs | 113 ++- validator/src/admin_rpc_service.rs | 9 +- 19 files changed, 1401 insertions(+), 56 deletions(-) create mode 100644 core/src/banking_simulation.rs diff --git a/ci/run-sanity.sh b/ci/run-sanity.sh index 88a6f40b1adf28..17c47a7a956de8 100755 --- a/ci/run-sanity.sh +++ b/ci/run-sanity.sh @@ -5,9 +5,27 @@ cd "$(dirname "$0")/.." # shellcheck source=multinode-demo/common.sh source multinode-demo/common.sh +if [[ -z $CI ]]; then + # Build eargerly if needed for local development. Otherwise, odd timing error occurs... + $solana_keygen --version + $solana_genesis --version + $solana_faucet --version + $solana_cli --version + $agave_validator --version + $solana_ledger_tool --version +fi + rm -rf config/run/init-completed config/ledger config/snapshot-ledger -SOLANA_RUN_SH_VALIDATOR_ARGS="--full-snapshot-interval-slots 200" timeout 120 ./scripts/run.sh & +# Sanity-check that agave-validator can successfully terminate itself without relying on +# process::exit() by extending the timeout... +# Also the banking_tracer thread needs some extra time to flush due to +# unsynchronized and buffered IO. +validator_timeout="${SOLANA_VALIDATOR_EXIT_TIMEOUT:-120}" +SOLANA_RUN_SH_VALIDATOR_ARGS="${SOLANA_RUN_SH_VALIDATOR_ARGS} --full-snapshot-interval-slots 200" \ + SOLANA_VALIDATOR_EXIT_TIMEOUT="$validator_timeout" \ + timeout "$validator_timeout" ./scripts/run.sh & + pid=$! attempts=20 @@ -21,7 +39,10 @@ while [[ ! -f config/run/init-completed ]]; do fi done -snapshot_slot=1 +# Needs bunch of slots for simulate-block-production. +# Better yet, run ~20 secs to run longer than its warm-up. +# As a bonus, this works as a sanity test of general slot-rooting behavior. +snapshot_slot=50 latest_slot=0 # wait a bit longer than snapshot_slot @@ -39,5 +60,18 @@ $solana_ledger_tool create-snapshot --ledger config/ledger "$snapshot_slot" conf cp config/ledger/genesis.tar.bz2 config/snapshot-ledger $solana_ledger_tool copy --ledger config/ledger \ --target-db config/snapshot-ledger --starting-slot "$snapshot_slot" --ending-slot "$latest_slot" -$solana_ledger_tool verify --ledger config/snapshot-ledger --block-verification-method blockstore-processor -$solana_ledger_tool verify --ledger config/snapshot-ledger --block-verification-method unified-scheduler +$solana_ledger_tool verify --abort-on-invalid-block \ + --ledger config/snapshot-ledger --block-verification-method blockstore-processor +$solana_ledger_tool verify --abort-on-invalid-block \ + --ledger config/snapshot-ledger --block-verification-method unified-scheduler + +first_simulated_slot=$((latest_slot / 2)) +purge_slot=$((first_simulated_slot + latest_slot / 4)) +echo "First simulated slot: ${first_simulated_slot}" +# Purge some slots so that later verify fails if sim is broken +$solana_ledger_tool purge --ledger config/ledger "$purge_slot" +$solana_ledger_tool simulate-block-production --ledger config/ledger \ + --first-simulated-slot $first_simulated_slot +# Slots should be available and correctly replayable upto snapshot_slot at least. +$solana_ledger_tool verify --abort-on-invalid-block \ + --ledger config/ledger --enable-hash-overrides --halt-at-slot "$snapshot_slot" diff --git a/core/Cargo.toml b/core/Cargo.toml index bde6144e142ae0..bbceb94dbc659e 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -105,7 +105,6 @@ solana-ledger = { workspace = true, features = ["dev-context-only-utils"] } solana-logger = { workspace = true } solana-poh = { workspace = true, features = ["dev-context-only-utils"] } solana-program-runtime = { workspace = true } -solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } solana-sdk = { workspace = true, features = ["dev-context-only-utils"] } solana-stake-program = { workspace = true } solana-unified-scheduler-pool = { workspace = true, features = [ @@ -123,7 +122,9 @@ sysctl = { workspace = true } rustc_version = { workspace = true, optional = true } [features] -dev-context-only-utils = [] +dev-context-only-utils = [ + "solana-runtime/dev-context-only-utils", +] frozen-abi = [ "dep:rustc_version", "dep:solana-frozen-abi", diff --git a/core/benches/forwarder.rs b/core/benches/forwarder.rs index bf40e1be7ac17e..10a050f3d97d4b 100644 --- a/core/benches/forwarder.rs +++ b/core/benches/forwarder.rs @@ -34,7 +34,7 @@ use { struct BenchSetup { exit: Arc, poh_service: PohService, - forwarder: Forwarder, + forwarder: Forwarder>, unprocessed_packet_batches: UnprocessedTransactionStorage, tracker: LeaderSlotMetricsTracker, stats: BankingStageStats, diff --git a/core/src/banking_simulation.rs b/core/src/banking_simulation.rs new file mode 100644 index 00000000000000..a8a67b3e5653b2 --- /dev/null +++ b/core/src/banking_simulation.rs @@ -0,0 +1,920 @@ +#![cfg(feature = "dev-context-only-utils")] +use { + crate::{ + banking_stage::{BankingStage, LikeClusterInfo}, + banking_trace::{ + BankingPacketBatch, BankingTracer, ChannelLabel, TimedTracedEvent, TracedEvent, + TracedSender, TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, BASENAME, + }, + validator::BlockProductionMethod, + }, + bincode::deserialize_from, + crossbeam_channel::{unbounded, Sender}, + itertools::Itertools, + log::*, + solana_client::connection_cache::ConnectionCache, + solana_gossip::{ + cluster_info::{ClusterInfo, Node}, + contact_info::ContactInfo, + }, + solana_ledger::{ + blockstore::{Blockstore, PurgeType}, + leader_schedule_cache::LeaderScheduleCache, + }, + solana_poh::{ + poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, + poh_service::{PohService, DEFAULT_HASHES_PER_BATCH, DEFAULT_PINNED_CPU_CORE}, + }, + solana_runtime::{ + bank::{Bank, HashOverrides}, + bank_forks::BankForks, + installed_scheduler_pool::BankWithScheduler, + prioritization_fee_cache::PrioritizationFeeCache, + }, + solana_sdk::{ + clock::{Slot, DEFAULT_MS_PER_SLOT, HOLD_TRANSACTIONS_SLOT_OFFSET}, + genesis_config::GenesisConfig, + pubkey::Pubkey, + shred_version::compute_shred_version, + signature::Signer, + signer::keypair::Keypair, + }, + solana_streamer::socket::SocketAddrSpace, + solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType}, + std::{ + collections::BTreeMap, + fmt::Display, + fs::File, + io::{self, BufRead, BufReader}, + net::{Ipv4Addr, UdpSocket}, + path::PathBuf, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::{self, sleep, JoinHandle}, + time::{Duration, SystemTime}, + }, + thiserror::Error, +}; + +/// This creates a simulated environment around `BankingStage` to produce leader's blocks based on +/// recorded banking trace events (`TimedTracedEvent`). +/// +/// At a high level, the task of `BankingStage` is to pack transactions into assigned, +/// fixed-duration, leader blocks. So, there are 3 abstract inputs to simulate: blocks, time, and +/// transactions. +/// +/// In the context of simulation, the first two are simple; both are well defined. +/// +/// For ancestor blocks, we first replay a certain number of blocks immediately up to target +/// simulation leader's slot with `halt_at_slot` mechanism. Ultimately freezing the ancestor block +/// with expected and deterministic hashes. This has the added possible benefit of warming caches +/// that may be used during simulation. +/// +/// After replay, a minor tweak is applied during simulation: we forcibly override leader's hashes +/// as the simulated `BankingStage` creates them, using recorded `BlockAndBankHash` events. This is +/// to provide indistinguishable sysvars to TX execution and identical TX age resolution as the +/// simulation goes on. Otherwise, the vast majority of TX processing would differ because the +/// simulated block's hashes would differ than the recorded ones as block composition difference is +/// inevitable. +/// +/// As in the real environment, for PoH time we use the `PohRecorder`. This is simply a 400ms +/// timer, external to `BankingStage` and thus mostly irrelevant to `BankingStage` performance. For +/// wall time, we use the first `BankStatus::BlockAndBankHash` and `SystemTime::now()` to define +/// T=0 for simulation. Then, simulation progress is timed accordingly. For context, this syncing +/// is necessary because all trace events are recorded in UTC, not relative to poh nor to leader +/// schedule for simplicity at recording. +/// +/// Lastly, the last and most complicated input to simulate: transactions. +/// +/// A closer look of the transaction load profile is below, regardless of internal banking +/// implementation and simulation: +/// +/// Due to solana's general tx broadcast strategy of client's submission and optional node +/// forwarding, many transactions often arrive before the first leader slot begins. Thus, the +/// initial leader block creation typically starts with rather large number of schedule-able +/// transactions. Also, note that additional transactions arrive during the 4 leader slot window +/// (roughly ~1.6 seconds). +/// +/// Simulation must mimic this load pattern while being agnostic to internal banking impl as much +/// as possible. For that agnostic objective, `TracedSender`s were introduced into the `SigVerify` +/// stage and gossip subsystem by `BankingTracer` to trace **all** `BankingPacketBatch`s' exact +/// payload and _sender_'s timing with `SystemTime::now()` for all `ChannelLabel`s. This deliberate +/// tracing placement is not to be affected by any `BankingStage`'s internal capacity (if any) nor +/// by its channel consumption pattern. +/// +/// BankingSimulator consists of 2 phases chronologically: warm-up and on-the-fly. The 2 phases are +/// segregated by the aforementioned T=0. +/// +/// Both phases just send `BankingPacketBatch` in the same fashion, pretending to be +/// `SigVerifyStage`/gossip from a single thread to busy loop for precise T=N at ~1us granularity. +/// +/// Warm-up starts at T=-WARMUP_DURATION (~ 13 secs). As soon as warm up is initiated, we invoke +/// `BankingStage::new_num_threads()` as well to simulate the pre-leader slot's tx-buffering time. +pub struct BankingSimulator { + banking_trace_events: BankingTraceEvents, + first_simulated_slot: Slot, +} + +#[derive(Error, Debug)] +pub enum SimulateError { + #[error("IO Error: {0}")] + IoError(#[from] io::Error), + + #[error("Deserialization Error: {0}")] + DeserializeError(#[from] bincode::Error), +} + +// Defined to be enough to cover the holding phase prior to leader slots with some idling (+5 secs) +const WARMUP_DURATION: Duration = + Duration::from_millis(HOLD_TRANSACTIONS_SLOT_OFFSET * DEFAULT_MS_PER_SLOT + 5000); + +/// BTreeMap is intentional because events could be unordered slightly due to tracing jitter. +type PacketBatchesByTime = BTreeMap; + +type FreezeTimeBySlot = BTreeMap; + +type TimedBatchesToSend = Vec<( + (Duration, (ChannelLabel, BankingPacketBatch)), + (usize, usize), +)>; + +type EventSenderThread = JoinHandle<(TracedSender, TracedSender, TracedSender)>; + +#[derive(Default)] +pub struct BankingTraceEvents { + packet_batches_by_time: PacketBatchesByTime, + freeze_time_by_slot: FreezeTimeBySlot, + hash_overrides: HashOverrides, +} + +impl BankingTraceEvents { + fn read_event_file( + event_file_path: &PathBuf, + mut callback: impl FnMut(TimedTracedEvent), + ) -> Result<(), SimulateError> { + let mut reader = BufReader::new(File::open(event_file_path)?); + + // EOF is reached at a correct deserialization boundary or just the file is just empty. + // We want to look-ahead the buf, so NOT calling reader.consume(..) is correct. + while !reader.fill_buf()?.is_empty() { + callback(deserialize_from(&mut reader)?); + } + + Ok(()) + } + + pub fn load(event_file_paths: &[PathBuf]) -> Result { + let mut event_count = 0; + let mut events = Self::default(); + for event_file_path in event_file_paths { + let old_event_count = event_count; + let read_result = Self::read_event_file(event_file_path, |event| { + event_count += 1; + events.load_event(event); + }); + info!( + "Read {} events from {:?}", + event_count - old_event_count, + event_file_path, + ); + + if matches!( + read_result, + Err(SimulateError::DeserializeError(ref deser_err)) + if matches!( + &**deser_err, + bincode::ErrorKind::Io(io_err) + if io_err.kind() == std::io::ErrorKind::UnexpectedEof + ) + ) { + // Silence errors here as this can happen under normal operation... + warn!( + "Reading {:?} failed {:?} due to file corruption or unclean validator shutdown", + event_file_path, read_result, + ); + } else { + read_result? + } + } + + Ok(events) + } + + fn load_event(&mut self, TimedTracedEvent(event_time, event): TimedTracedEvent) { + match event { + TracedEvent::PacketBatch(label, batch) => { + // Deserialized PacketBatches will mostly be ordered by event_time, but this + // isn't guaranteed when traced, because time are measured by multiple _sender_ + // threads without synchronization among them to avoid overhead. + // + // Also, there's a possibility of system clock change. In this case, + // the simulation is meaningless, though... + // + // Somewhat naively assume that event_times (nanosecond resolution) won't + // collide. + let is_new = self + .packet_batches_by_time + .insert(event_time, (label, batch)) + .is_none(); + assert!(is_new); + } + TracedEvent::BlockAndBankHash(slot, blockhash, bank_hash) => { + let is_new = self.freeze_time_by_slot.insert(slot, event_time).is_none(); + self.hash_overrides.add_override(slot, blockhash, bank_hash); + assert!(is_new); + } + } + } + + pub fn hash_overrides(&self) -> &HashOverrides { + &self.hash_overrides + } +} + +struct DummyClusterInfo { + // Artificially wrap Pubkey with RwLock to induce lock contention if any to mimic the real + // ClusterInfo + id: RwLock, +} + +impl LikeClusterInfo for Arc { + fn id(&self) -> Pubkey { + *self.id.read().unwrap() + } + + fn lookup_contact_info(&self, _id: &Pubkey, _map: F) -> Option + where + F: FnOnce(&ContactInfo) -> Y, + { + None + } +} + +struct SimulatorLoopLogger { + simulated_leader: Pubkey, + freeze_time_by_slot: FreezeTimeBySlot, + base_event_time: SystemTime, + base_simulation_time: SystemTime, +} + +impl SimulatorLoopLogger { + fn bank_costs(bank: &Bank) -> (u64, u64) { + bank.read_cost_tracker() + .map(|t| (t.block_cost(), t.vote_cost())) + .unwrap() + } + + fn log_frozen_bank_cost(&self, bank: &Bank) { + info!( + "bank cost: slot: {} {:?} (frozen)", + bank.slot(), + Self::bank_costs(bank), + ); + } + + fn log_ongoing_bank_cost(&self, bank: &Bank) { + debug!( + "bank cost: slot: {} {:?} (ongoing)", + bank.slot(), + Self::bank_costs(bank), + ); + } + + fn log_jitter(&self, bank: &Bank) { + let old_slot = bank.slot(); + if let Some(event_time) = self.freeze_time_by_slot.get(&old_slot) { + if log_enabled!(log::Level::Info) { + let current_simulation_time = SystemTime::now(); + let elapsed_simulation_time = current_simulation_time + .duration_since(self.base_simulation_time) + .unwrap(); + let elapsed_event_time = event_time.duration_since(self.base_event_time).unwrap(); + info!( + "jitter(parent_slot: {}): {}{:?} (sim: {:?} event: {:?})", + old_slot, + if elapsed_simulation_time > elapsed_event_time { + "+" + } else { + "-" + }, + if elapsed_simulation_time > elapsed_event_time { + elapsed_simulation_time - elapsed_event_time + } else { + elapsed_event_time - elapsed_simulation_time + }, + elapsed_simulation_time, + elapsed_event_time, + ); + } + } + } + + fn on_new_leader(&self, bank: &Bank, new_slot: Slot, new_leader: Pubkey) { + self.log_frozen_bank_cost(bank); + info!( + "{} isn't leader anymore at slot {}; new leader: {}", + self.simulated_leader, new_slot, new_leader + ); + } +} + +struct SenderLoop { + parent_slot: Slot, + first_simulated_slot: Slot, + non_vote_sender: TracedSender, + tpu_vote_sender: TracedSender, + gossip_vote_sender: TracedSender, + exit: Arc, + raw_base_event_time: SystemTime, + total_batch_count: usize, + timed_batches_to_send: TimedBatchesToSend, +} + +impl SenderLoop { + fn log_starting(&self) { + info!( + "simulating events: {} (out of {}), starting at slot {} (based on {} from traced event slot: {}) (warmup: -{:?})", + self.timed_batches_to_send.len(), self.total_batch_count, self.first_simulated_slot, + SenderLoopLogger::format_as_timestamp(self.raw_base_event_time), + self.parent_slot, WARMUP_DURATION, + ); + } + + fn spawn(self, base_simulation_time: SystemTime) -> Result { + let handle = thread::Builder::new() + .name("solSimSender".into()) + .spawn(move || self.start(base_simulation_time))?; + Ok(handle) + } + + fn start( + mut self, + base_simulation_time: SystemTime, + ) -> (TracedSender, TracedSender, TracedSender) { + let mut logger = SenderLoopLogger::new( + &self.non_vote_sender, + &self.tpu_vote_sender, + &self.gossip_vote_sender, + ); + let mut simulation_duration = Duration::default(); + for ((required_duration, (label, batches_with_stats)), (batch_count, tx_count)) in + self.timed_batches_to_send.drain(..) + { + // Busy loop for most accurate sending timings + while simulation_duration < required_duration { + let current_simulation_time = SystemTime::now(); + simulation_duration = current_simulation_time + .duration_since(base_simulation_time) + .unwrap(); + } + + let sender = match label { + ChannelLabel::NonVote => &self.non_vote_sender, + ChannelLabel::TpuVote => &self.tpu_vote_sender, + ChannelLabel::GossipVote => &self.gossip_vote_sender, + ChannelLabel::Dummy => unreachable!(), + }; + sender.send(batches_with_stats).unwrap(); + + logger.on_sending_batches(&simulation_duration, label, batch_count, tx_count); + if self.exit.load(Ordering::Relaxed) { + break; + } + } + logger.on_terminating(); + drop(self.timed_batches_to_send); + // hold these senders in join_handle to control banking stage termination! + ( + self.non_vote_sender, + self.tpu_vote_sender, + self.gossip_vote_sender, + ) + } +} + +struct SimulatorLoop { + bank: BankWithScheduler, + parent_slot: Slot, + first_simulated_slot: Slot, + freeze_time_by_slot: FreezeTimeBySlot, + base_event_time: SystemTime, + poh_recorder: Arc>, + simulated_leader: Pubkey, + bank_forks: Arc>, + blockstore: Arc, + leader_schedule_cache: Arc, + retransmit_slots_sender: Sender, + retracer: Arc, +} + +impl SimulatorLoop { + fn enter( + self, + base_simulation_time: SystemTime, + sender_thread: EventSenderThread, + ) -> (EventSenderThread, Sender) { + sleep(WARMUP_DURATION); + info!("warmup done!"); + self.start(base_simulation_time, sender_thread) + } + + fn start( + self, + base_simulation_time: SystemTime, + sender_thread: EventSenderThread, + ) -> (EventSenderThread, Sender) { + let logger = SimulatorLoopLogger { + simulated_leader: self.simulated_leader, + base_event_time: self.base_event_time, + base_simulation_time, + freeze_time_by_slot: self.freeze_time_by_slot, + }; + let mut bank = self.bank; + loop { + if self.poh_recorder.read().unwrap().bank().is_none() { + let next_leader_slot = self.leader_schedule_cache.next_leader_slot( + &self.simulated_leader, + bank.slot(), + &bank, + Some(&self.blockstore), + GRACE_TICKS_FACTOR * MAX_GRACE_SLOTS, + ); + debug!("{next_leader_slot:?}"); + self.poh_recorder + .write() + .unwrap() + .reset(bank.clone_without_scheduler(), next_leader_slot); + info!("Bank::new_from_parent()!"); + + logger.log_jitter(&bank); + bank.freeze(); + let new_slot = if bank.slot() == self.parent_slot { + info!("initial leader block!"); + self.first_simulated_slot + } else { + info!("next leader block!"); + bank.slot() + 1 + }; + let new_leader = self + .leader_schedule_cache + .slot_leader_at(new_slot, None) + .unwrap(); + if new_leader != self.simulated_leader { + logger.on_new_leader(&bank, new_slot, new_leader); + break; + } else if sender_thread.is_finished() { + warn!("sender thread existed maybe due to completion of sending traced events"); + break; + } else { + info!("new leader bank slot: {new_slot}"); + } + let new_bank = Bank::new_from_parent( + bank.clone_without_scheduler(), + &self.simulated_leader, + new_slot, + ); + // make sure parent is frozen for finalized hashes via the above + // new()-ing of its child bank + self.retracer + .hash_event(bank.slot(), &bank.last_blockhash(), &bank.hash()); + if *bank.collector_id() == self.simulated_leader { + logger.log_frozen_bank_cost(&bank); + } + self.retransmit_slots_sender.send(bank.slot()).unwrap(); + self.bank_forks.write().unwrap().insert(new_bank); + bank = self + .bank_forks + .read() + .unwrap() + .working_bank_with_scheduler() + .clone_with_scheduler(); + self.poh_recorder + .write() + .unwrap() + .set_bank(bank.clone_with_scheduler(), false); + } else { + logger.log_ongoing_bank_cost(&bank); + } + + sleep(Duration::from_millis(10)); + } + + (sender_thread, self.retransmit_slots_sender) + } +} + +struct SimulatorThreads { + poh_service: PohService, + banking_stage: BankingStage, + broadcast_stage: BroadcastStage, + retracer_thread: TracerThread, + exit: Arc, +} + +impl SimulatorThreads { + fn finish(self, sender_thread: EventSenderThread, retransmit_slots_sender: Sender) { + info!("Sleeping a bit before signaling exit"); + sleep(Duration::from_millis(100)); + self.exit.store(true, Ordering::Relaxed); + + // The order is important. Consuming sender_thread by joining will drop some channels. That + // triggers termination of banking_stage, in turn retracer thread will be terminated. + sender_thread.join().unwrap(); + self.banking_stage.join().unwrap(); + self.poh_service.join().unwrap(); + if let Some(retracer_thread) = self.retracer_thread { + retracer_thread.join().unwrap().unwrap(); + } + + info!("Joining broadcast stage..."); + drop(retransmit_slots_sender); + self.broadcast_stage.join().unwrap(); + } +} + +struct SenderLoopLogger<'a> { + non_vote_sender: &'a TracedSender, + tpu_vote_sender: &'a TracedSender, + gossip_vote_sender: &'a TracedSender, + last_log_duration: Duration, + last_tx_count: usize, + last_non_vote_batch_count: usize, + last_tpu_vote_tx_count: usize, + last_gossip_vote_tx_count: usize, + non_vote_batch_count: usize, + non_vote_tx_count: usize, + tpu_vote_batch_count: usize, + tpu_vote_tx_count: usize, + gossip_vote_batch_count: usize, + gossip_vote_tx_count: usize, +} + +impl<'a> SenderLoopLogger<'a> { + fn new( + non_vote_sender: &'a TracedSender, + tpu_vote_sender: &'a TracedSender, + gossip_vote_sender: &'a TracedSender, + ) -> Self { + Self { + non_vote_sender, + tpu_vote_sender, + gossip_vote_sender, + last_log_duration: Duration::default(), + last_tx_count: 0, + last_non_vote_batch_count: 0, + last_tpu_vote_tx_count: 0, + last_gossip_vote_tx_count: 0, + non_vote_batch_count: 0, + non_vote_tx_count: 0, + tpu_vote_batch_count: 0, + tpu_vote_tx_count: 0, + gossip_vote_batch_count: 0, + gossip_vote_tx_count: 0, + } + } + + fn on_sending_batches( + &mut self, + &simulation_duration: &Duration, + label: ChannelLabel, + batch_count: usize, + tx_count: usize, + ) { + debug!( + "sent {:?} {} batches ({} txes)", + label, batch_count, tx_count + ); + + use ChannelLabel::*; + let (total_batch_count, total_tx_count) = match label { + NonVote => (&mut self.non_vote_batch_count, &mut self.non_vote_tx_count), + TpuVote => (&mut self.tpu_vote_batch_count, &mut self.tpu_vote_tx_count), + GossipVote => ( + &mut self.gossip_vote_batch_count, + &mut self.gossip_vote_tx_count, + ), + Dummy => unreachable!(), + }; + *total_batch_count += batch_count; + *total_tx_count += tx_count; + + let log_interval = simulation_duration - self.last_log_duration; + if log_interval > Duration::from_millis(100) { + let current_tx_count = + self.non_vote_tx_count + self.tpu_vote_tx_count + self.gossip_vote_tx_count; + let duration = log_interval.as_secs_f64(); + let tps = (current_tx_count - self.last_tx_count) as f64 / duration; + let non_vote_tps = + (self.non_vote_tx_count - self.last_non_vote_batch_count) as f64 / duration; + let tpu_vote_tps = + (self.tpu_vote_tx_count - self.last_tpu_vote_tx_count) as f64 / duration; + let gossip_vote_tps = + (self.gossip_vote_tx_count - self.last_gossip_vote_tx_count) as f64 / duration; + info!( + "senders(non-,tpu-,gossip-vote): tps: {:.0} (={:.0}+{:.0}+{:.0}) over {:?} not-recved: ({}+{}+{})", + tps, non_vote_tps, tpu_vote_tps, gossip_vote_tps, log_interval, + self.non_vote_sender.len(), self.tpu_vote_sender.len(), self.gossip_vote_sender.len(), + ); + self.last_log_duration = simulation_duration; + self.last_tx_count = current_tx_count; + ( + self.last_non_vote_batch_count, + self.last_tpu_vote_tx_count, + self.last_gossip_vote_tx_count, + ) = ( + self.non_vote_tx_count, + self.tpu_vote_tx_count, + self.gossip_vote_batch_count, + ); + } + } + + fn on_terminating(self) { + info!( + "terminating to send...: non_vote: {} ({}), tpu_vote: {} ({}), gossip_vote: {} ({})", + self.non_vote_batch_count, + self.non_vote_tx_count, + self.tpu_vote_batch_count, + self.tpu_vote_tx_count, + self.gossip_vote_batch_count, + self.gossip_vote_tx_count, + ); + } + + fn format_as_timestamp(time: SystemTime) -> impl Display { + let time: chrono::DateTime = time.into(); + time.format("%Y-%m-%d %H:%M:%S.%f") + } +} + +impl BankingSimulator { + pub fn new(banking_trace_events: BankingTraceEvents, first_simulated_slot: Slot) -> Self { + Self { + banking_trace_events, + first_simulated_slot, + } + } + + pub fn parent_slot(&self) -> Option { + self.banking_trace_events + .freeze_time_by_slot + .range(..self.first_simulated_slot) + .last() + .map(|(slot, _time)| slot) + .copied() + } + + fn prepare_simulation( + self, + genesis_config: GenesisConfig, + bank_forks: Arc>, + blockstore: Arc, + block_production_method: BlockProductionMethod, + ) -> (SenderLoop, SimulatorLoop, SimulatorThreads) { + let parent_slot = self.parent_slot().unwrap(); + let mut packet_batches_by_time = self.banking_trace_events.packet_batches_by_time; + let freeze_time_by_slot = self.banking_trace_events.freeze_time_by_slot; + let bank = bank_forks + .read() + .unwrap() + .working_bank_with_scheduler() + .clone_with_scheduler(); + + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + assert_eq!(parent_slot, bank.slot()); + + let simulated_leader = leader_schedule_cache + .slot_leader_at(self.first_simulated_slot, None) + .unwrap(); + info!( + "Simulated leader and slot: {}, {}", + simulated_leader, self.first_simulated_slot, + ); + + let exit = Arc::new(AtomicBool::default()); + + if let Some(end_slot) = blockstore + .slot_meta_iterator(self.first_simulated_slot) + .unwrap() + .map(|(s, _)| s) + .last() + { + info!("purging slots {}, {}", self.first_simulated_slot, end_slot); + blockstore.purge_from_next_slots(self.first_simulated_slot, end_slot); + blockstore.purge_slots(self.first_simulated_slot, end_slot, PurgeType::Exact); + info!("done: purging"); + } else { + info!("skipping purging..."); + } + + info!("Poh is starting!"); + + let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new_with_clear_signal( + bank.tick_height(), + bank.last_blockhash(), + bank.clone(), + None, + bank.ticks_per_slot(), + false, + blockstore.clone(), + blockstore.get_new_shred_signal(0), + &leader_schedule_cache, + &genesis_config.poh_config, + None, + exit.clone(), + ); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); + let poh_service = PohService::new( + poh_recorder.clone(), + &genesis_config.poh_config, + exit.clone(), + bank.ticks_per_slot(), + DEFAULT_PINNED_CPU_CORE, + DEFAULT_HASHES_PER_BATCH, + record_receiver, + ); + + // Enable BankingTracer to approximate the real environment as close as possible because + // it's not expected to disable BankingTracer on production environments. + // + // It's not likely for it to affect the banking stage performance noticeably. So, make sure + // that assumption is held here. That said, it incurs additional channel sending, + // SystemTime::now() and buffered seq IO, and indirectly functions as a background dropper + // of `BankingPacketBatch`. + // + // Lastly, the actual retraced events can be used to evaluate simulation timing accuracy in + // the future. + let (retracer, retracer_thread) = BankingTracer::new(Some(( + &blockstore.banking_retracer_path(), + exit.clone(), + BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, + ))) + .unwrap(); + assert!(retracer.is_enabled()); + info!( + "Enabled banking retracer (dir_byte_limit: {})", + BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, + ); + + let (non_vote_sender, non_vote_receiver) = retracer.create_channel_non_vote(); + let (tpu_vote_sender, tpu_vote_receiver) = retracer.create_channel_tpu_vote(); + let (gossip_vote_sender, gossip_vote_receiver) = retracer.create_channel_gossip_vote(); + + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_sim")); + let (replay_vote_sender, _replay_vote_receiver) = unbounded(); + let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); + let shred_version = compute_shred_version( + &genesis_config.hash(), + Some(&bank_forks.read().unwrap().root_bank().hard_forks()), + ); + let (sender, _receiver) = tokio::sync::mpsc::channel(1); + + // Create a completely-dummy ClusterInfo for the broadcast stage. + // We only need it to write shreds into the blockstore and it seems given ClusterInfo is + // irrelevant for the neccesary minimum work for this simulation. + let random_keypair = Arc::new(Keypair::new()); + let cluster_info = Arc::new(ClusterInfo::new( + Node::new_localhost_with_pubkey(&random_keypair.pubkey()).info, + random_keypair, + SocketAddrSpace::Unspecified, + )); + // Broadcast stage is needed to save the simulated blocks for post-run analysis by + // inserting produced shreds into the blockstore. + let broadcast_stage = BroadcastStageType::Standard.new_broadcast_stage( + vec![UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap()], + cluster_info.clone(), + entry_receiver, + retransmit_slots_receiver, + exit.clone(), + blockstore.clone(), + bank_forks.clone(), + shred_version, + sender, + ); + + info!("Start banking stage!..."); + // Create a partially-dummy ClusterInfo for the banking stage. + let cluster_info = Arc::new(DummyClusterInfo { + id: simulated_leader.into(), + }); + let prioritization_fee_cache = &Arc::new(PrioritizationFeeCache::new(0u64)); + let banking_stage = BankingStage::new_num_threads( + block_production_method.clone(), + &cluster_info, + &poh_recorder, + non_vote_receiver, + tpu_vote_receiver, + gossip_vote_receiver, + BankingStage::num_threads(), + None, + replay_vote_sender, + None, + connection_cache, + bank_forks.clone(), + prioritization_fee_cache, + false, + ); + + let (&_slot, &raw_base_event_time) = freeze_time_by_slot + .range(parent_slot..) + .next() + .expect("timed hashes"); + let base_event_time = raw_base_event_time - WARMUP_DURATION; + + let total_batch_count = packet_batches_by_time.len(); + let timed_batches_to_send = packet_batches_by_time.split_off(&base_event_time); + let batch_and_tx_counts = timed_batches_to_send + .values() + .map(|(_label, batches_with_stats)| { + let batches = &batches_with_stats.0; + ( + batches.len(), + batches.iter().map(|batch| batch.len()).sum::(), + ) + }) + .collect::>(); + // Convert to a large plain old Vec and drain on it, finally dropping it outside + // the simulation loop to avoid jitter due to interleaved deallocs of BTreeMap. + let timed_batches_to_send = timed_batches_to_send + .into_iter() + .map(|(event_time, batches)| { + (event_time.duration_since(base_event_time).unwrap(), batches) + }) + .zip_eq(batch_and_tx_counts) + .collect::>(); + + let sender_loop = SenderLoop { + parent_slot, + first_simulated_slot: self.first_simulated_slot, + non_vote_sender, + tpu_vote_sender, + gossip_vote_sender, + exit: exit.clone(), + raw_base_event_time, + total_batch_count, + timed_batches_to_send, + }; + + let simulator_loop = SimulatorLoop { + bank, + parent_slot, + first_simulated_slot: self.first_simulated_slot, + freeze_time_by_slot, + base_event_time, + poh_recorder, + simulated_leader, + bank_forks, + blockstore, + leader_schedule_cache, + retransmit_slots_sender, + retracer, + }; + + let simulator_threads = SimulatorThreads { + poh_service, + banking_stage, + broadcast_stage, + retracer_thread, + exit, + }; + + (sender_loop, simulator_loop, simulator_threads) + } + + pub fn start( + self, + genesis_config: GenesisConfig, + bank_forks: Arc>, + blockstore: Arc, + block_production_method: BlockProductionMethod, + ) -> Result<(), SimulateError> { + let (sender_loop, simulator_loop, simulator_threads) = self.prepare_simulation( + genesis_config, + bank_forks, + blockstore, + block_production_method, + ); + + sender_loop.log_starting(); + let base_simulation_time = SystemTime::now(); + // Spawning and entering these two loops must be done at the same time as they're timed. + // So, all the mundane setup must be done in advance. + let sender_thread = sender_loop.spawn(base_simulation_time)?; + let (sender_thread, retransmit_slots_sender) = + simulator_loop.enter(base_simulation_time, sender_thread); + + simulator_threads.finish(sender_thread, retransmit_slots_sender); + + Ok(()) + } + + pub fn event_file_name(index: usize) -> String { + if index == 0 { + BASENAME.to_string() + } else { + format!("{BASENAME}.{index}") + } + } +} diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 5c9768aa0bc215..8dff75832106a9 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -31,7 +31,7 @@ use { crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, histogram::Histogram, solana_client::connection_cache::ConnectionCache, - solana_gossip::cluster_info::ClusterInfo, + solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_ledger::blockstore_processor::TransactionStatusSender, solana_measure::measure_us, solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH}, @@ -40,9 +40,10 @@ use { bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::ReplayVoteSender, }, - solana_sdk::timing::AtomicInterval, + solana_sdk::{pubkey::Pubkey, timing::AtomicInterval}, std::{ cmp, env, + ops::Deref, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, RwLock, @@ -323,12 +324,33 @@ pub struct FilterForwardingResults { pub(crate) total_filter_packets_us: u64, } +pub trait LikeClusterInfo: Send + Sync + 'static + Clone { + fn id(&self) -> Pubkey; + + fn lookup_contact_info(&self, id: &Pubkey, map: F) -> Option + where + F: FnOnce(&ContactInfo) -> Y; +} + +impl LikeClusterInfo for Arc { + fn id(&self) -> Pubkey { + self.deref().id() + } + + fn lookup_contact_info(&self, id: &Pubkey, map: F) -> Option + where + F: FnOnce(&ContactInfo) -> Y, + { + self.deref().lookup_contact_info(id, map) + } +} + impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. #[allow(clippy::too_many_arguments)] pub fn new( block_production_method: BlockProductionMethod, - cluster_info: &Arc, + cluster_info: &impl LikeClusterInfo, poh_recorder: &Arc>, non_vote_receiver: BankingPacketReceiver, tpu_vote_receiver: BankingPacketReceiver, @@ -362,7 +384,7 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] pub fn new_num_threads( block_production_method: BlockProductionMethod, - cluster_info: &Arc, + cluster_info: &impl LikeClusterInfo, poh_recorder: &Arc>, non_vote_receiver: BankingPacketReceiver, tpu_vote_receiver: BankingPacketReceiver, @@ -413,7 +435,7 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] pub fn new_thread_local_multi_iterator( - cluster_info: &Arc, + cluster_info: &impl LikeClusterInfo, poh_recorder: &Arc>, non_vote_receiver: BankingPacketReceiver, tpu_vote_receiver: BankingPacketReceiver, @@ -497,7 +519,7 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] pub fn new_central_scheduler( - cluster_info: &Arc, + cluster_info: &impl LikeClusterInfo, poh_recorder: &Arc>, non_vote_receiver: BankingPacketReceiver, tpu_vote_receiver: BankingPacketReceiver, @@ -629,7 +651,7 @@ impl BankingStage { Self { bank_thread_hdls } } - fn spawn_thread_local_multi_iterator_thread( + fn spawn_thread_local_multi_iterator_thread( id: u32, packet_receiver: BankingPacketReceiver, bank_forks: Arc>, @@ -637,7 +659,7 @@ impl BankingStage { committer: Committer, transaction_recorder: TransactionRecorder, log_messages_bytes_limit: Option, - mut forwarder: Forwarder, + mut forwarder: Forwarder, unprocessed_transaction_storage: UnprocessedTransactionStorage, ) -> JoinHandle<()> { let mut packet_receiver = PacketReceiver::new(id, packet_receiver, bank_forks); @@ -664,9 +686,9 @@ impl BankingStage { } #[allow(clippy::too_many_arguments)] - fn process_buffered_packets( + fn process_buffered_packets( decision_maker: &DecisionMaker, - forwarder: &mut Forwarder, + forwarder: &mut Forwarder, consumer: &Consumer, unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, banking_stage_stats: &BankingStageStats, @@ -730,10 +752,10 @@ impl BankingStage { } } - fn process_loop( + fn process_loop( packet_receiver: &mut PacketReceiver, decision_maker: &DecisionMaker, - forwarder: &mut Forwarder, + forwarder: &mut Forwarder, consumer: &Consumer, id: u32, mut unprocessed_transaction_storage: UnprocessedTransactionStorage, diff --git a/core/src/banking_stage/forward_worker.rs b/core/src/banking_stage/forward_worker.rs index 6c9fd45e029c2f..61cf311f0a8cf8 100644 --- a/core/src/banking_stage/forward_worker.rs +++ b/core/src/banking_stage/forward_worker.rs @@ -4,6 +4,7 @@ use { scheduler_messages::{FinishedForwardWork, ForwardWork}, ForwardOption, }, + crate::banking_stage::LikeClusterInfo, crossbeam_channel::{Receiver, RecvError, SendError, Sender}, thiserror::Error, }; @@ -16,19 +17,19 @@ pub enum ForwardWorkerError { Send(#[from] SendError), } -pub(crate) struct ForwardWorker { +pub(crate) struct ForwardWorker { forward_receiver: Receiver, forward_option: ForwardOption, - forwarder: Forwarder, + forwarder: Forwarder, forwarded_sender: Sender, } #[allow(dead_code)] -impl ForwardWorker { +impl ForwardWorker { pub fn new( forward_receiver: Receiver, forward_option: ForwardOption, - forwarder: Forwarder, + forwarder: Forwarder, forwarded_sender: Sender, ) -> Self { Self { @@ -90,6 +91,7 @@ mod tests { }, crossbeam_channel::unbounded, solana_client::connection_cache::ConnectionCache, + solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ blockstore::Blockstore, genesis_utils::GenesisConfigInfo, get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, @@ -121,7 +123,7 @@ mod tests { forwarded_receiver: Receiver, } - fn setup_test_frame() -> (TestFrame, ForwardWorker) { + fn setup_test_frame() -> (TestFrame, ForwardWorker>) { let GenesisConfigInfo { genesis_config, mint_keypair, diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index 563c93861cd30e..82af221842dd0b 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -6,14 +6,15 @@ use { ForwardOption, }, crate::{ - banking_stage::immutable_deserialized_packet::ImmutableDeserializedPacket, + banking_stage::{ + immutable_deserialized_packet::ImmutableDeserializedPacket, LikeClusterInfo, + }, next_leader::{next_leader, next_leader_tpu_vote}, tracer_packet_stats::TracerPacketStats, }, solana_client::connection_cache::ConnectionCache, solana_connection_cache::client_connection::ClientConnection as TpuConnection, solana_feature_set::FeatureSet, - solana_gossip::cluster_info::ClusterInfo, solana_measure::measure_us, solana_perf::{data_budget::DataBudget, packet::Packet}, solana_poh::poh_recorder::PohRecorder, @@ -27,21 +28,21 @@ use { }, }; -pub struct Forwarder { +pub struct Forwarder { poh_recorder: Arc>, bank_forks: Arc>, socket: UdpSocket, - cluster_info: Arc, + cluster_info: T, connection_cache: Arc, data_budget: Arc, forward_packet_batches_by_accounts: ForwardPacketBatchesByAccounts, } -impl Forwarder { +impl Forwarder { pub fn new( poh_recorder: Arc>, bank_forks: Arc>, - cluster_info: Arc, + cluster_info: T, connection_cache: Arc, data_budget: Arc, ) -> Self { @@ -307,7 +308,7 @@ mod tests { unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches}, unprocessed_transaction_storage::ThreadType, }, - solana_gossip::cluster_info::Node, + solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{blockstore::Blockstore, genesis_utils::GenesisConfigInfo}, solana_perf::packet::PacketFlags, solana_poh::{poh_recorder::create_test_recorder, poh_service::PohService}, diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index b576fd1576511d..9966a0527d0286 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -19,7 +19,7 @@ use { forwarder::Forwarder, immutable_deserialized_packet::ImmutableDeserializedPacket, packet_deserializer::PacketDeserializer, - ForwardOption, TOTAL_BUFFERED_PACKETS, + ForwardOption, LikeClusterInfo, TOTAL_BUFFERED_PACKETS, }, arrayvec::ArrayVec, crossbeam_channel::RecvTimeoutError, @@ -44,7 +44,7 @@ use { }; /// Controls packet and transaction flow into scheduler, and scheduling execution. -pub(crate) struct SchedulerController { +pub(crate) struct SchedulerController { /// Decision maker for determining what should be done with transactions. decision_maker: DecisionMaker, /// Packet/Transaction ingress. @@ -68,17 +68,17 @@ pub(crate) struct SchedulerController { /// Metric report handles for the worker threads. worker_metrics: Vec>, /// State for forwarding packets to the leader, if enabled. - forwarder: Option, + forwarder: Option>, } -impl SchedulerController { +impl SchedulerController { pub fn new( decision_maker: DecisionMaker, packet_deserializer: PacketDeserializer, bank_forks: Arc>, scheduler: PrioGraphScheduler, worker_metrics: Vec>, - forwarder: Option, + forwarder: Option>, ) -> Self { Self { decision_maker, @@ -670,6 +670,7 @@ mod tests { }, crossbeam_channel::{unbounded, Receiver, Sender}, itertools::Itertools, + solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ blockstore::Blockstore, genesis_utils::GenesisConfigInfo, get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, @@ -705,7 +706,7 @@ mod tests { finished_consume_work_sender: Sender, } - fn create_test_frame(num_threads: usize) -> (TestFrame, SchedulerController) { + fn create_test_frame(num_threads: usize) -> (TestFrame, SchedulerController>) { let GenesisConfigInfo { mut genesis_config, mint_keypair, @@ -800,7 +801,9 @@ mod tests { // in order to keep the decision as recent as possible for processing. // In the tests, the decision will not become stale, so it is more convenient // to receive first and then schedule. - fn test_receive_then_schedule(scheduler_controller: &mut SchedulerController) { + fn test_receive_then_schedule( + scheduler_controller: &mut SchedulerController>, + ) { let decision = scheduler_controller .decision_maker .make_consume_or_forward_decision(); diff --git a/core/src/banking_trace.rs b/core/src/banking_trace.rs index cc077dfa2c2755..150e9a33e1940f 100644 --- a/core/src/banking_trace.rs +++ b/core/src/banking_trace.rs @@ -42,7 +42,7 @@ pub enum TraceError { TooSmallDirByteLimit(DirByteLimit, DirByteLimit), } -const BASENAME: &str = "events"; +pub(crate) const BASENAME: &str = "events"; const TRACE_FILE_ROTATE_COUNT: u64 = 14; // target 2 weeks retention under normal load const TRACE_FILE_WRITE_INTERVAL_MS: u64 = 100; const BUF_WRITER_CAPACITY: usize = 10 * 1024 * 1024; @@ -359,6 +359,14 @@ impl TracedSender { } self.sender.send(batch) } + + pub fn len(&self) -> usize { + self.sender.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } #[cfg(any(test, feature = "dev-context-only-utils"))] diff --git a/core/src/lib.rs b/core/src/lib.rs index da9d69ed508875..2ba671ca62b580 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -10,6 +10,7 @@ pub mod accounts_hash_verifier; pub mod admin_rpc_post_init; +pub mod banking_simulation; pub mod banking_stage; pub mod banking_trace; pub mod cache_block_meta_service; diff --git a/core/src/next_leader.rs b/core/src/next_leader.rs index 7e77ecd869e4a1..738e728dcc4a30 100644 --- a/core/src/next_leader.rs +++ b/core/src/next_leader.rs @@ -1,4 +1,5 @@ use { + crate::banking_stage::LikeClusterInfo, itertools::Itertools, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_poh::poh_recorder::PohRecorder, @@ -34,14 +35,14 @@ pub(crate) fn upcoming_leader_tpu_vote_sockets( } pub(crate) fn next_leader_tpu_vote( - cluster_info: &ClusterInfo, + cluster_info: &impl LikeClusterInfo, poh_recorder: &RwLock, ) -> Option<(Pubkey, SocketAddr)> { next_leader(cluster_info, poh_recorder, ContactInfo::tpu_vote) } pub(crate) fn next_leader( - cluster_info: &ClusterInfo, + cluster_info: &impl LikeClusterInfo, poh_recorder: &RwLock, port_selector: F, ) -> Option<(Pubkey, SocketAddr)> diff --git a/cost-model/src/cost_tracker.rs b/cost-model/src/cost_tracker.rs index 23583068fb13b3..55d905047990a5 100644 --- a/cost-model/src/cost_tracker.rs +++ b/cost-model/src/cost_tracker.rs @@ -106,6 +106,16 @@ impl Default for CostTracker { } impl CostTracker { + pub fn new_from_parent_limits(&self) -> Self { + let mut new = Self::default(); + new.set_limits( + self.account_cost_limit, + self.block_cost_limit, + self.vote_cost_limit, + ); + new + } + pub fn reset(&mut self) { self.cost_by_writable_accounts.clear(); self.block_cost = 0; @@ -192,6 +202,10 @@ impl CostTracker { self.block_cost } + pub fn vote_cost(&self) -> u64 { + self.vote_cost + } + pub fn transaction_count(&self) -> u64 { self.transaction_count } diff --git a/ledger-tool/Cargo.toml b/ledger-tool/Cargo.toml index 39061de55f2382..abfa07ade49d38 100644 --- a/ledger-tool/Cargo.toml +++ b/ledger-tool/Cargo.toml @@ -32,7 +32,7 @@ solana-bpf-loader-program = { workspace = true } solana-clap-utils = { workspace = true } solana-cli-output = { workspace = true } solana-compute-budget = { workspace = true } -solana-core = { workspace = true } +solana-core = { workspace = true, features = ["dev-context-only-utils"] } solana-cost-model = { workspace = true } solana-entry = { workspace = true } solana-feature-set = { workspace = true } diff --git a/ledger-tool/src/args.rs b/ledger-tool/src/args.rs index d2bc0f691e130f..d01c542465256f 100644 --- a/ledger-tool/src/args.rs +++ b/ledger-tool/src/args.rs @@ -214,6 +214,8 @@ pub fn parse_process_options(ledger_path: &Path, arg_matches: &ArgMatches<'_>) - let debug_keys = pubkeys_of(arg_matches, "debug_key") .map(|pubkeys| Arc::new(pubkeys.into_iter().collect::>())); let allow_dead_slots = arg_matches.is_present("allow_dead_slots"); + let abort_on_invalid_block = arg_matches.is_present("abort_on_invalid_block"); + let no_block_cost_limits = arg_matches.is_present("no_block_cost_limits"); ProcessOptions { new_hard_forks, @@ -230,6 +232,8 @@ pub fn parse_process_options(ledger_path: &Path, arg_matches: &ArgMatches<'_>) - allow_dead_slots, halt_at_slot, use_snapshot_archives_at_startup, + abort_on_invalid_block, + no_block_cost_limits, ..ProcessOptions::default() } } diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 0c74d53f3e41e2..a9b9a864bd9b42 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -31,13 +31,14 @@ use { }, solana_cli_output::OutputFormat, solana_core::{ + banking_simulation::{BankingSimulator, BankingTraceEvents}, system_monitor_service::{SystemMonitorService, SystemMonitorStatsReportConfig}, - validator::BlockVerificationMethod, + validator::{BlockProductionMethod, BlockVerificationMethod}, }, solana_cost_model::{cost_model::CostModel, cost_tracker::CostTracker}, solana_feature_set::{self as feature_set, FeatureSet}, solana_ledger::{ - blockstore::{create_new_ledger, Blockstore}, + blockstore::{banking_trace_path, create_new_ledger, Blockstore}, blockstore_options::{AccessType, LedgerColumnOptions}, blockstore_processor::{ ProcessSlotCallback, TransactionStatusMessage, TransactionStatusSender, @@ -83,8 +84,8 @@ use { }, std::{ collections::{HashMap, HashSet}, - ffi::OsStr, - fs::File, + ffi::{OsStr, OsString}, + fs::{read_dir, File}, io::{self, Write}, mem::swap, path::{Path, PathBuf}, @@ -536,6 +537,70 @@ fn assert_capitalization(bank: &Bank) { assert!(bank.calculate_and_verify_capitalization(debug_verify)); } +fn load_banking_trace_events_or_exit(ledger_path: &Path) -> BankingTraceEvents { + let file_paths = read_banking_trace_event_file_paths_or_exit(banking_trace_path(ledger_path)); + + info!("Using: banking trace event files: {file_paths:?}"); + match BankingTraceEvents::load(&file_paths) { + Ok(banking_trace_events) => banking_trace_events, + Err(error) => { + eprintln!("Failed to load banking trace events: {error:?}"); + exit(1) + } + } +} + +fn read_banking_trace_event_file_paths_or_exit(banking_trace_path: PathBuf) -> Vec { + info!("Using: banking trace events dir: {banking_trace_path:?}"); + + let entries = match read_dir(&banking_trace_path) { + Ok(entries) => entries, + Err(error) => { + eprintln!("Error: failed to open banking_trace_path: {error:?}"); + exit(1); + } + }; + + let mut entry_names = entries + .flat_map(|entry| entry.ok().map(|entry| entry.file_name())) + .collect::>(); + + let mut event_file_paths = vec![]; + + if entry_names.is_empty() { + warn!("banking_trace_path dir is empty."); + return event_file_paths; + } + + for index in 0.. { + let event_file_name: OsString = BankingSimulator::event_file_name(index).into(); + if entry_names.remove(&event_file_name) { + event_file_paths.push(banking_trace_path.join(event_file_name)); + } else { + break; + } + } + + if event_file_paths.is_empty() { + warn!("Error: no event files found"); + } + + if !entry_names.is_empty() { + let full_names = entry_names + .into_iter() + .map(|name| banking_trace_path.join(name)) + .collect::>(); + warn!( + "Some files in {banking_trace_path:?} is ignored due to gapped events file rotation \ + or unrecognized names: {full_names:?}" + ); + } + + // Reverse to load in the chronicle order (note that this isn't strictly needed) + event_file_paths.reverse(); + event_file_paths +} + struct SlotRecorderConfig { transaction_recorder: Option>, transaction_status_sender: Option, @@ -1146,6 +1211,30 @@ fn main() { "geyser_plugin_config", ]) .help("In addition to the bank hash, optionally include accounts and/or transactions details for the slot"), + ) + .arg( + Arg::with_name("abort_on_invalid_block") + .long("abort-on-invalid-block") + .takes_value(false) + .help( + "Exits with failed status early as soon as any bad block is detected", + ), + ) + .arg( + Arg::with_name("no_block_cost_limits") + .long("no-block-cost-limits") + .takes_value(false) + .help("Disable block cost limits effectively by setting them to the max"), + ) + .arg( + Arg::with_name("enable_hash_overrides") + .long("enable-hash-overrides") + .takes_value(false) + .help( + "Enable override of blockhashes and bank hashes from banking trace \ + event files to correctly verify blocks produced by \ + the simulate-block-production subcommand", + ), ), ) .subcommand( @@ -1388,6 +1477,36 @@ fn main() { .help("If snapshot creation should succeed with a capitalization delta."), ), ) + .subcommand( + SubCommand::with_name("simulate-block-production") + .about("Simulate producing blocks with banking trace event files in the ledger") + .arg(&load_genesis_config_arg) + .args(&accounts_db_config_args) + .args(&snapshot_config_args) + .arg( + Arg::with_name("block_production_method") + .long("block-production-method") + .value_name("METHOD") + .takes_value(true) + .possible_values(BlockProductionMethod::cli_names()) + .help(BlockProductionMethod::cli_message()), + ) + .arg( + Arg::with_name("first_simulated_slot") + .long("first-simulated-slot") + .value_name("SLOT") + .validator(is_slot) + .takes_value(true) + .required(true) + .help("Start simulation at the given slot") + ) + .arg( + Arg::with_name("no_block_cost_limits") + .long("no-block-cost-limits") + .takes_value(false) + .help("Disable block cost limits effectively by setting them to the max"), + ), + ) .subcommand( SubCommand::with_name("accounts") .about("Print account stats and contents after processing the ledger") @@ -1662,6 +1781,12 @@ fn main() { ); let mut process_options = parse_process_options(&ledger_path, arg_matches); + if arg_matches.is_present("enable_hash_overrides") { + let banking_trace_events = load_banking_trace_events_or_exit(&ledger_path); + process_options.hash_overrides = + Some(banking_trace_events.hash_overrides().clone()); + } + let (slot_callback, slot_recorder_config) = setup_slot_recording(arg_matches); process_options.slot_callback = slot_callback; let transaction_status_sender = slot_recorder_config @@ -2350,6 +2475,61 @@ fn main() { system_monitor_service.join().unwrap(); } } + ("simulate-block-production", Some(arg_matches)) => { + let mut process_options = parse_process_options(&ledger_path, arg_matches); + + let banking_trace_events = load_banking_trace_events_or_exit(&ledger_path); + process_options.hash_overrides = + Some(banking_trace_events.hash_overrides().clone()); + + let slot = value_t!(arg_matches, "first_simulated_slot", Slot).unwrap(); + let simulator = BankingSimulator::new(banking_trace_events, slot); + let Some(parent_slot) = simulator.parent_slot() else { + eprintln!( + "Couldn't determine parent_slot of first_simulated_slot: {slot} \ + due to missing banking_trace_event data." + ); + exit(1); + }; + process_options.halt_at_slot = Some(parent_slot); + + let blockstore = Arc::new(open_blockstore( + &ledger_path, + arg_matches, + AccessType::Primary, // needed for purging already existing simulated block shreds... + )); + let genesis_config = open_genesis_config_by(&ledger_path, arg_matches); + let LoadAndProcessLedgerOutput { bank_forks, .. } = + load_and_process_ledger_or_exit( + arg_matches, + &genesis_config, + blockstore.clone(), + process_options, + None, // transaction status sender + ); + + let block_production_method = value_t!( + arg_matches, + "block_production_method", + BlockProductionMethod + ) + .unwrap_or_default(); + + info!("Using: block-production-method: {block_production_method}"); + + match simulator.start( + genesis_config, + bank_forks, + blockstore, + block_production_method, + ) { + Ok(()) => println!("Ok"), + Err(error) => { + eprintln!("{error:?}"); + exit(1); + } + }; + } ("accounts", Some(arg_matches)) => { let process_options = parse_process_options(&ledger_path, arg_matches); let genesis_config = open_genesis_config_by(&ledger_path, arg_matches); diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 2101896d9a0558..54f612483be958 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -299,6 +299,14 @@ impl SlotMetaWorkingSetEntry { } } +pub fn banking_trace_path(path: &Path) -> PathBuf { + path.join("banking_trace") +} + +pub fn banking_retrace_path(path: &Path) -> PathBuf { + path.join("banking_retrace") +} + impl Blockstore { pub fn db(self) -> Arc { self.db @@ -309,7 +317,11 @@ impl Blockstore { } pub fn banking_trace_path(&self) -> PathBuf { - self.ledger_path.join("banking_trace") + banking_trace_path(&self.ledger_path) + } + + pub fn banking_retracer_path(&self) -> PathBuf { + banking_retrace_path(&self.ledger_path) } /// Opens a Ledger in directory, provides "infinite" window of shreds diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index b34bdee591dd9c..89a013531f407c 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "dev-context-only-utils")] -use qualifier_attr::qualifiers; use { crate::{ block_error::BlockError, @@ -76,6 +74,8 @@ use { thiserror::Error, ExecuteTimingType::{NumExecuteBatches, TotalBatchesLen}, }; +#[cfg(feature = "dev-context-only-utils")] +use {qualifier_attr::qualifiers, solana_runtime::bank::HashOverrides}; pub struct TransactionBatchWithIndexes<'a, 'b> { pub batch: TransactionBatch<'a, 'b>, @@ -768,6 +768,10 @@ pub struct ProcessOptions { /// This is useful for debugging. pub run_final_accounts_hash_calc: bool, pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup, + #[cfg(feature = "dev-context-only-utils")] + pub hash_overrides: Option, + pub abort_on_invalid_block: bool, + pub no_block_cost_limits: bool, } pub fn test_process_blockstore( @@ -903,6 +907,20 @@ pub fn process_blockstore_from_root( // Starting slot must be a root, and thus has no parents assert_eq!(bank_forks.read().unwrap().banks().len(), 1); let bank = bank_forks.read().unwrap().root_bank(); + #[cfg(feature = "dev-context-only-utils")] + if let Some(hash_overrides) = &opts.hash_overrides { + info!( + "Will override following slots' hashes: {:#?}", + hash_overrides + ); + bank.set_hash_overrides(hash_overrides.clone()); + } + if opts.no_block_cost_limits { + warn!("setting block cost limits to MAX"); + bank.write_cost_tracker() + .unwrap() + .set_limits(u64::MAX, u64::MAX, u64::MAX); + } assert!(bank.parent().is_none()); (bank.slot(), bank.hash()) }; @@ -1837,7 +1855,7 @@ fn load_frozen_forks( let mut progress = ConfirmationProgress::new(last_entry_hash); let mut m = Measure::start("process_single_slot"); let bank = bank_forks.write().unwrap().insert_from_ledger(bank); - if process_single_slot( + if let Err(error) = process_single_slot( blockstore, &bank, replay_tx_thread_pool, @@ -1849,10 +1867,11 @@ fn load_frozen_forks( entry_notification_sender, None, timing, - ) - .is_err() - { + ) { assert!(bank_forks.write().unwrap().remove(bank.slot()).is_some()); + if opts.abort_on_invalid_block { + Err(error)? + } continue; } txs += progress.num_txs; @@ -2055,6 +2074,13 @@ pub fn process_single_slot( replay_vote_sender, timing, ) + .and_then(|()| { + if let Some((result, completed_timings)) = bank.wait_for_completed_scheduler() { + timing.accumulate(&completed_timings); + result? + } + Ok(()) + }) .map_err(|err| { let slot = bank.slot(); warn!("slot {} failed to verify: {}", slot, err); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index e0072749a4c078..ee6e785a1e6b68 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -506,6 +506,8 @@ impl PartialEq for Bank { if std::ptr::eq(self, other) { return true; } + // Suppress rustfmt until https://github.com/rust-lang/rustfmt/issues/5920 is fixed ... + #[rustfmt::skip] let Self { skipped_rewrites: _, rc: _, @@ -544,6 +546,8 @@ impl PartialEq for Bank { stakes_cache, epoch_stakes, is_delta, + #[cfg(feature = "dev-context-only-utils")] + hash_overrides, // TODO: Confirm if all these fields are intentionally ignored! rewards: _, cluster_type: _, @@ -601,6 +605,10 @@ impl PartialEq for Bank { && *stakes_cache.stakes() == *other.stakes_cache.stakes() && epoch_stakes == &other.epoch_stakes && is_delta.load(Relaxed) == other.is_delta.load(Relaxed) + // No deadlock is possbile, when Arc::ptr_eq() returns false, because of being + // different Mutexes. + && (Arc::ptr_eq(hash_overrides, &other.hash_overrides) || + *hash_overrides.lock().unwrap() == *other.hash_overrides.lock().unwrap()) } } @@ -669,6 +677,50 @@ pub trait DropCallback: fmt::Debug { #[derive(Debug, Default)] pub struct OptionalDropCallback(Option>); +#[derive(Default, Debug, Clone, PartialEq)] +#[cfg(feature = "dev-context-only-utils")] +pub struct HashOverrides { + hashes: HashMap, +} + +#[cfg(feature = "dev-context-only-utils")] +impl HashOverrides { + fn get_hash_override(&self, slot: Slot) -> Option<&HashOverride> { + self.hashes.get(&slot) + } + + fn get_blockhash_override(&self, slot: Slot) -> Option<&Hash> { + self.get_hash_override(slot) + .map(|hash_override| &hash_override.blockhash) + } + + fn get_bank_hash_override(&self, slot: Slot) -> Option<&Hash> { + self.get_hash_override(slot) + .map(|hash_override| &hash_override.bank_hash) + } + + pub fn add_override(&mut self, slot: Slot, blockhash: Hash, bank_hash: Hash) { + let is_new = self + .hashes + .insert( + slot, + HashOverride { + blockhash, + bank_hash, + }, + ) + .is_none(); + assert!(is_new); + } +} + +#[derive(Debug, Clone, PartialEq)] +#[cfg(feature = "dev-context-only-utils")] +struct HashOverride { + blockhash: Hash, + bank_hash: Hash, +} + /// Manager for the state of all accounts and programs after processing its entries. #[derive(Debug)] pub struct Bank { @@ -845,6 +897,11 @@ pub struct Bank { /// Fee structure to use for assessing transaction fees. fee_structure: FeeStructure, + + /// blockhash and bank_hash overrides keyed by slot for simulated block production. + /// This _field_ was needed to be DCOU-ed to avoid 2 locks per bank freezing... + #[cfg(feature = "dev-context-only-utils")] + hash_overrides: Arc>, } struct VoteWithStakeDelegations { @@ -963,6 +1020,8 @@ impl Bank { compute_budget: None, transaction_account_lock_limit: None, fee_structure: FeeStructure::default(), + #[cfg(feature = "dev-context-only-utils")] + hash_overrides: Arc::new(Mutex::new(HashOverrides::default())), }; bank.transaction_processor = @@ -1204,7 +1263,7 @@ impl Bank { .map(|drop_callback| drop_callback.clone_box()), )), freeze_started: AtomicBool::new(false), - cost_tracker: RwLock::new(CostTracker::default()), + cost_tracker: RwLock::new(parent.read_cost_tracker().unwrap().new_from_parent_limits()), accounts_data_size_initial, accounts_data_size_delta_on_chain: AtomicI64::new(0), accounts_data_size_delta_off_chain: AtomicI64::new(0), @@ -1215,6 +1274,8 @@ impl Bank { compute_budget: parent.compute_budget, transaction_account_lock_limit: parent.transaction_account_lock_limit, fee_structure: parent.fee_structure.clone(), + #[cfg(feature = "dev-context-only-utils")] + hash_overrides: parent.hash_overrides.clone(), }; let (_, ancestors_time_us) = measure_us!({ @@ -1591,6 +1652,8 @@ impl Bank { compute_budget: runtime_config.compute_budget, transaction_account_lock_limit: runtime_config.transaction_account_lock_limit, fee_structure: FeeStructure::default(), + #[cfg(feature = "dev-context-only-utils")] + hash_overrides: Arc::new(Mutex::new(HashOverrides::default())), }; bank.transaction_processor = @@ -3139,6 +3202,27 @@ impl Bank { // readers can starve this write lock acquisition and ticks would be slowed down too // much if the write lock is acquired for each tick. let mut w_blockhash_queue = self.blockhash_queue.write().unwrap(); + + #[cfg(feature = "dev-context-only-utils")] + let blockhash_override = self + .hash_overrides + .lock() + .unwrap() + .get_blockhash_override(self.slot()) + .copied() + .inspect(|blockhash_override| { + if blockhash_override != blockhash { + info!( + "bank: slot: {}: overrode blockhash: {} with {}", + self.slot(), + blockhash, + blockhash_override + ); + } + }); + #[cfg(feature = "dev-context-only-utils")] + let blockhash = blockhash_override.as_ref().unwrap_or(blockhash); + w_blockhash_queue.register_hash(blockhash, self.fee_rate_governor.lamports_per_signature); self.update_recent_blockhashes_locked(&w_blockhash_queue); } @@ -5264,6 +5348,29 @@ impl Bank { hash = hard_forked_hash; } + #[cfg(feature = "dev-context-only-utils")] + let hash_override = self + .hash_overrides + .lock() + .unwrap() + .get_bank_hash_override(slot) + .copied() + .inspect(|&hash_override| { + if hash_override != hash { + info!( + "bank: slot: {}: overrode bank hash: {} with {}", + self.slot(), + hash, + hash_override + ); + } + }); + // Avoid to optimize out `hash` along with the whole computation by super smart rustc. + // hash_override is used by ledger-tool's simulate-block-production, which prefers + // the actual bank freezing processing for accurate simulation. + #[cfg(feature = "dev-context-only-utils")] + let hash = hash_override.unwrap_or(std::hint::black_box(hash)); + let bank_hash_stats = self .rc .accounts @@ -6821,6 +6928,10 @@ impl Bank { None => Err(TransactionError::AccountNotFound), } } + + pub fn set_hash_overrides(&self, hash_overrides: HashOverrides) { + *self.hash_overrides.lock().unwrap() = hash_overrides; + } } /// Compute how much an account has changed size. This function is useful when the data size delta diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 99ef4b53a0b94d..45af1f26dfd183 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -26,7 +26,7 @@ use { }, std::{ collections::{HashMap, HashSet}, - error, + env, error, fmt::{self, Display}, net::SocketAddr, path::{Path, PathBuf}, @@ -266,7 +266,12 @@ impl AdminRpc for AdminRpcImpl { // (rocksdb background processing or some other stuck thread perhaps?). // // If the process is still alive after five seconds, exit harder - thread::sleep(Duration::from_secs(5)); + thread::sleep(Duration::from_secs( + env::var("SOLANA_VALIDATOR_EXIT_TIMEOUT") + .ok() + .and_then(|x| x.parse().ok()) + .unwrap_or(5), + )); warn!("validator exit timeout"); std::process::exit(0); })