From 07faf4a924b126e5eff457b8d896f700c28a25a2 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 9 Oct 2024 08:37:13 -0700 Subject: [PATCH] support completed slot status in geyser (#3069) * support completed slot status in geyser * missing interface file * Addressed some feedback from Brennan --- core/src/tvu.rs | 7 ++--- core/src/validator.rs | 28 ++++++++++--------- .../src/geyser_plugin_interface.rs | 4 +++ .../src/geyser_plugin_service.rs | 3 +- .../src/slot_status_notifier.rs | 21 ++++---------- .../src/slot_status_observer.rs | 6 ++-- rpc/src/lib.rs | 1 + rpc/src/rpc_completed_slots_service.rs | 6 +++- rpc/src/slot_status_notifier.rs | 23 +++++++++++++++ turbine/src/retransmit_stage.rs | 6 ++-- 10 files changed, 65 insertions(+), 40 deletions(-) create mode 100644 rpc/src/slot_status_notifier.rs diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 5d5e18bc241395..23e5d9b6562451 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -25,10 +25,7 @@ use { bytes::Bytes, crossbeam_channel::{unbounded, Receiver, Sender}, solana_client::connection_cache::ConnectionCache, - solana_geyser_plugin_manager::{ - block_metadata_notifier_interface::BlockMetadataNotifierArc, - slot_status_notifier::SlotStatusNotifier, - }, + solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc, solana_gossip::{ cluster_info::ClusterInfo, duplicate_shred_handler::DuplicateShredHandler, duplicate_shred_listener::DuplicateShredListener, @@ -41,7 +38,7 @@ use { solana_poh::poh_recorder::PohRecorder, solana_rpc::{ max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSenderConfig, - rpc_subscriptions::RpcSubscriptions, + rpc_subscriptions::RpcSubscriptions, slot_status_notifier::SlotStatusNotifier, }, solana_runtime::{ accounts_background_service::AbsRequestSender, bank_forks::BankForks, diff --git a/core/src/validator.rs b/core/src/validator.rs index c05a3fa8474357..ec1031c656715e 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1091,19 +1091,21 @@ impl Validator { ) }; - let rpc_completed_slots_service = if !config.rpc_config.full_api { - None - } else { - let (completed_slots_sender, completed_slots_receiver) = - bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL); - blockstore.add_completed_slots_signal(completed_slots_sender); - - Some(RpcCompletedSlotsService::spawn( - completed_slots_receiver, - rpc_subscriptions.clone(), - exit.clone(), - )) - }; + let rpc_completed_slots_service = + if config.rpc_config.full_api || geyser_plugin_service.is_some() { + let (completed_slots_sender, completed_slots_receiver) = + bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL); + blockstore.add_completed_slots_signal(completed_slots_sender); + + Some(RpcCompletedSlotsService::spawn( + completed_slots_receiver, + rpc_subscriptions.clone(), + slot_status_notifier.clone(), + exit.clone(), + )) + } else { + None + }; let optimistically_confirmed_bank_tracker = Some(OptimisticallyConfirmedBankTracker::new( diff --git a/geyser-plugin-interface/src/geyser_plugin_interface.rs b/geyser-plugin-interface/src/geyser_plugin_interface.rs index 6ac7bb848b1444..97271310a99f5f 100644 --- a/geyser-plugin-interface/src/geyser_plugin_interface.rs +++ b/geyser-plugin-interface/src/geyser_plugin_interface.rs @@ -322,6 +322,9 @@ pub enum SlotStatus { /// First Shred Received FirstShredReceived, + + /// All shreds for the slot have been received. + Completed, } impl SlotStatus { @@ -331,6 +334,7 @@ impl SlotStatus { SlotStatus::Processed => "processed", SlotStatus::Rooted => "rooted", SlotStatus::FirstShredReceived => "first_shread_received", + SlotStatus::Completed => "completed", } } } diff --git a/geyser-plugin-manager/src/geyser_plugin_service.rs b/geyser-plugin-manager/src/geyser_plugin_service.rs index 8e293cbddbbeb0..61fca230030c11 100644 --- a/geyser-plugin-manager/src/geyser_plugin_service.rs +++ b/geyser-plugin-manager/src/geyser_plugin_service.rs @@ -5,7 +5,7 @@ use { block_metadata_notifier_interface::BlockMetadataNotifierArc, entry_notifier::EntryNotifierImpl, geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest}, - slot_status_notifier::{SlotStatusNotifier, SlotStatusNotifierImpl}, + slot_status_notifier::SlotStatusNotifierImpl, slot_status_observer::SlotStatusObserver, transaction_notifier::TransactionNotifierImpl, }, @@ -15,6 +15,7 @@ use { solana_ledger::entry_notifier_interface::EntryNotifierArc, solana_rpc::{ optimistically_confirmed_bank_tracker::SlotNotification, + slot_status_notifier::SlotStatusNotifier, transaction_notifier_interface::TransactionNotifierArc, }, std::{ diff --git a/geyser-plugin-manager/src/slot_status_notifier.rs b/geyser-plugin-manager/src/slot_status_notifier.rs index 18ea942810ef41..573ed97d7787af 100644 --- a/geyser-plugin-manager/src/slot_status_notifier.rs +++ b/geyser-plugin-manager/src/slot_status_notifier.rs @@ -4,26 +4,11 @@ use { log::*, solana_measure::measure::Measure, solana_metrics::*, + solana_rpc::slot_status_notifier::SlotStatusNotifierInterface, solana_sdk::clock::Slot, std::sync::{Arc, RwLock}, }; -pub trait SlotStatusNotifierInterface { - /// Notified when a slot is optimistically confirmed - fn notify_slot_confirmed(&self, slot: Slot, parent: Option); - - /// Notified when a slot is marked frozen. - fn notify_slot_processed(&self, slot: Slot, parent: Option); - - /// Notified when a slot is rooted. - fn notify_slot_rooted(&self, slot: Slot, parent: Option); - - /// Notified when the first shred is received for a slot. - fn notify_first_shred_received(&self, slot: Slot); -} - -pub type SlotStatusNotifier = Arc>; - pub struct SlotStatusNotifierImpl { plugin_manager: Arc>, } @@ -44,6 +29,10 @@ impl SlotStatusNotifierInterface for SlotStatusNotifierImpl { fn notify_first_shred_received(&self, slot: Slot) { self.notify_slot_status(slot, None, SlotStatus::FirstShredReceived); } + + fn notify_completed(&self, slot: Slot) { + self.notify_slot_status(slot, None, SlotStatus::Completed); + } } impl SlotStatusNotifierImpl { diff --git a/geyser-plugin-manager/src/slot_status_observer.rs b/geyser-plugin-manager/src/slot_status_observer.rs index 7eba6e54eb6c58..99cdb568a8e384 100644 --- a/geyser-plugin-manager/src/slot_status_observer.rs +++ b/geyser-plugin-manager/src/slot_status_observer.rs @@ -1,7 +1,9 @@ use { - crate::slot_status_notifier::SlotStatusNotifier, crossbeam_channel::Receiver, - solana_rpc::optimistically_confirmed_bank_tracker::SlotNotification, + solana_rpc::{ + optimistically_confirmed_bank_tracker::SlotNotification, + slot_status_notifier::SlotStatusNotifier, + }, std::{ sync::{ atomic::{AtomicBool, Ordering}, diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 8dc81692d59790..9763ebd791a162 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -13,6 +13,7 @@ pub mod rpc_pubsub_service; pub mod rpc_service; pub mod rpc_subscription_tracker; pub mod rpc_subscriptions; +pub mod slot_status_notifier; pub mod transaction_notifier_interface; pub mod transaction_status_service; diff --git a/rpc/src/rpc_completed_slots_service.rs b/rpc/src/rpc_completed_slots_service.rs index cb9059b1a5d20f..0c5ecf3c3e6abd 100644 --- a/rpc/src/rpc_completed_slots_service.rs +++ b/rpc/src/rpc_completed_slots_service.rs @@ -1,5 +1,5 @@ use { - crate::rpc_subscriptions::RpcSubscriptions, + crate::{rpc_subscriptions::RpcSubscriptions, slot_status_notifier::SlotStatusNotifier}, crossbeam_channel::RecvTimeoutError, solana_ledger::blockstore::CompletedSlotsReceiver, solana_rpc_client_api::response::SlotUpdate, @@ -21,6 +21,7 @@ impl RpcCompletedSlotsService { pub fn spawn( completed_slots_receiver: CompletedSlotsReceiver, rpc_subscriptions: Arc, + slot_status_notifier: Option, exit: Arc, ) -> JoinHandle<()> { Builder::new() @@ -45,6 +46,9 @@ impl RpcCompletedSlotsService { slot, timestamp: timestamp(), }); + if let Some(slot_status_notifier) = &slot_status_notifier { + slot_status_notifier.read().unwrap().notify_completed(slot); + } } } } diff --git a/rpc/src/slot_status_notifier.rs b/rpc/src/slot_status_notifier.rs new file mode 100644 index 00000000000000..97a84da42f33bf --- /dev/null +++ b/rpc/src/slot_status_notifier.rs @@ -0,0 +1,23 @@ +use { + solana_sdk::clock::Slot, + std::sync::{Arc, RwLock}, +}; + +pub trait SlotStatusNotifierInterface { + /// Notified when a slot is optimistically confirmed + fn notify_slot_confirmed(&self, slot: Slot, parent: Option); + + /// Notified when a slot is marked frozen. + fn notify_slot_processed(&self, slot: Slot, parent: Option); + + /// Notified when a slot is rooted. + fn notify_slot_rooted(&self, slot: Slot, parent: Option); + + /// Notified when the first shred is received for a slot. + fn notify_first_shred_received(&self, slot: Slot); + + /// Notified when the slot is completed. + fn notify_completed(&self, slot: Slot); +} + +pub type SlotStatusNotifier = Arc>; diff --git a/turbine/src/retransmit_stage.rs b/turbine/src/retransmit_stage.rs index b5e67cd3203a40..e820851d03e4ba 100644 --- a/turbine/src/retransmit_stage.rs +++ b/turbine/src/retransmit_stage.rs @@ -9,7 +9,6 @@ use { lru::LruCache, rand::Rng, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, - solana_geyser_plugin_manager::slot_status_notifier::SlotStatusNotifier, solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol}, solana_ledger::{ leader_schedule_cache::LeaderScheduleCache, @@ -18,7 +17,10 @@ use { solana_measure::measure::Measure, solana_perf::deduper::Deduper, solana_rayon_threadlimit::get_thread_count, - solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, + solana_rpc::{ + max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions, + slot_status_notifier::SlotStatusNotifier, + }, solana_rpc_client_api::response::SlotUpdate, solana_runtime::{ bank::{Bank, MAX_LEADER_SCHEDULE_STAKES},