Skip to content

Commit

Permalink
support completed slot status in geyser (#3069)
Browse files Browse the repository at this point in the history
* support completed slot status in geyser

* missing interface file

* Addressed some feedback from Brennan
  • Loading branch information
lijunwangs authored Oct 9, 2024
1 parent 6e62af0 commit 07faf4a
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 40 deletions.
7 changes: 2 additions & 5 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ use {
bytes::Bytes,
crossbeam_channel::{unbounded, Receiver, Sender},
solana_client::connection_cache::ConnectionCache,
solana_geyser_plugin_manager::{
block_metadata_notifier_interface::BlockMetadataNotifierArc,
slot_status_notifier::SlotStatusNotifier,
},
solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc,
solana_gossip::{
cluster_info::ClusterInfo, duplicate_shred_handler::DuplicateShredHandler,
duplicate_shred_listener::DuplicateShredListener,
Expand All @@ -41,7 +38,7 @@ use {
solana_poh::poh_recorder::PohRecorder,
solana_rpc::{
max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
rpc_subscriptions::RpcSubscriptions,
rpc_subscriptions::RpcSubscriptions, slot_status_notifier::SlotStatusNotifier,
},
solana_runtime::{
accounts_background_service::AbsRequestSender, bank_forks::BankForks,
Expand Down
28 changes: 15 additions & 13 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1091,19 +1091,21 @@ impl Validator {
)
};

let rpc_completed_slots_service = if !config.rpc_config.full_api {
None
} else {
let (completed_slots_sender, completed_slots_receiver) =
bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL);
blockstore.add_completed_slots_signal(completed_slots_sender);

Some(RpcCompletedSlotsService::spawn(
completed_slots_receiver,
rpc_subscriptions.clone(),
exit.clone(),
))
};
let rpc_completed_slots_service =
if config.rpc_config.full_api || geyser_plugin_service.is_some() {
let (completed_slots_sender, completed_slots_receiver) =
bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL);
blockstore.add_completed_slots_signal(completed_slots_sender);

Some(RpcCompletedSlotsService::spawn(
completed_slots_receiver,
rpc_subscriptions.clone(),
slot_status_notifier.clone(),
exit.clone(),
))
} else {
None
};

let optimistically_confirmed_bank_tracker =
Some(OptimisticallyConfirmedBankTracker::new(
Expand Down
4 changes: 4 additions & 0 deletions geyser-plugin-interface/src/geyser_plugin_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ pub enum SlotStatus {

/// First Shred Received
FirstShredReceived,

/// All shreds for the slot have been received.
Completed,
}

impl SlotStatus {
Expand All @@ -331,6 +334,7 @@ impl SlotStatus {
SlotStatus::Processed => "processed",
SlotStatus::Rooted => "rooted",
SlotStatus::FirstShredReceived => "first_shread_received",
SlotStatus::Completed => "completed",
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion geyser-plugin-manager/src/geyser_plugin_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
block_metadata_notifier_interface::BlockMetadataNotifierArc,
entry_notifier::EntryNotifierImpl,
geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest},
slot_status_notifier::{SlotStatusNotifier, SlotStatusNotifierImpl},
slot_status_notifier::SlotStatusNotifierImpl,
slot_status_observer::SlotStatusObserver,
transaction_notifier::TransactionNotifierImpl,
},
Expand All @@ -15,6 +15,7 @@ use {
solana_ledger::entry_notifier_interface::EntryNotifierArc,
solana_rpc::{
optimistically_confirmed_bank_tracker::SlotNotification,
slot_status_notifier::SlotStatusNotifier,
transaction_notifier_interface::TransactionNotifierArc,
},
std::{
Expand Down
21 changes: 5 additions & 16 deletions geyser-plugin-manager/src/slot_status_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,11 @@ use {
log::*,
solana_measure::measure::Measure,
solana_metrics::*,
solana_rpc::slot_status_notifier::SlotStatusNotifierInterface,
solana_sdk::clock::Slot,
std::sync::{Arc, RwLock},
};

pub trait SlotStatusNotifierInterface {
/// Notified when a slot is optimistically confirmed
fn notify_slot_confirmed(&self, slot: Slot, parent: Option<Slot>);

/// Notified when a slot is marked frozen.
fn notify_slot_processed(&self, slot: Slot, parent: Option<Slot>);

/// Notified when a slot is rooted.
fn notify_slot_rooted(&self, slot: Slot, parent: Option<Slot>);

/// Notified when the first shred is received for a slot.
fn notify_first_shred_received(&self, slot: Slot);
}

pub type SlotStatusNotifier = Arc<RwLock<dyn SlotStatusNotifierInterface + Sync + Send>>;

pub struct SlotStatusNotifierImpl {
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
}
Expand All @@ -44,6 +29,10 @@ impl SlotStatusNotifierInterface for SlotStatusNotifierImpl {
fn notify_first_shred_received(&self, slot: Slot) {
self.notify_slot_status(slot, None, SlotStatus::FirstShredReceived);
}

fn notify_completed(&self, slot: Slot) {
self.notify_slot_status(slot, None, SlotStatus::Completed);
}
}

impl SlotStatusNotifierImpl {
Expand Down
6 changes: 4 additions & 2 deletions geyser-plugin-manager/src/slot_status_observer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use {
crate::slot_status_notifier::SlotStatusNotifier,
crossbeam_channel::Receiver,
solana_rpc::optimistically_confirmed_bank_tracker::SlotNotification,
solana_rpc::{
optimistically_confirmed_bank_tracker::SlotNotification,
slot_status_notifier::SlotStatusNotifier,
},
std::{
sync::{
atomic::{AtomicBool, Ordering},
Expand Down
1 change: 1 addition & 0 deletions rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod rpc_pubsub_service;
pub mod rpc_service;
pub mod rpc_subscription_tracker;
pub mod rpc_subscriptions;
pub mod slot_status_notifier;
pub mod transaction_notifier_interface;
pub mod transaction_status_service;

Expand Down
6 changes: 5 additions & 1 deletion rpc/src/rpc_completed_slots_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::rpc_subscriptions::RpcSubscriptions,
crate::{rpc_subscriptions::RpcSubscriptions, slot_status_notifier::SlotStatusNotifier},
crossbeam_channel::RecvTimeoutError,
solana_ledger::blockstore::CompletedSlotsReceiver,
solana_rpc_client_api::response::SlotUpdate,
Expand All @@ -21,6 +21,7 @@ impl RpcCompletedSlotsService {
pub fn spawn(
completed_slots_receiver: CompletedSlotsReceiver,
rpc_subscriptions: Arc<RpcSubscriptions>,
slot_status_notifier: Option<SlotStatusNotifier>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
Expand All @@ -45,6 +46,9 @@ impl RpcCompletedSlotsService {
slot,
timestamp: timestamp(),
});
if let Some(slot_status_notifier) = &slot_status_notifier {
slot_status_notifier.read().unwrap().notify_completed(slot);
}
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions rpc/src/slot_status_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use {
solana_sdk::clock::Slot,
std::sync::{Arc, RwLock},
};

pub trait SlotStatusNotifierInterface {
/// Notified when a slot is optimistically confirmed
fn notify_slot_confirmed(&self, slot: Slot, parent: Option<Slot>);

/// Notified when a slot is marked frozen.
fn notify_slot_processed(&self, slot: Slot, parent: Option<Slot>);

/// Notified when a slot is rooted.
fn notify_slot_rooted(&self, slot: Slot, parent: Option<Slot>);

/// Notified when the first shred is received for a slot.
fn notify_first_shred_received(&self, slot: Slot);

/// Notified when the slot is completed.
fn notify_completed(&self, slot: Slot);
}

pub type SlotStatusNotifier = Arc<RwLock<dyn SlotStatusNotifierInterface + Sync + Send>>;
6 changes: 4 additions & 2 deletions turbine/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use {
lru::LruCache,
rand::Rng,
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_geyser_plugin_manager::slot_status_notifier::SlotStatusNotifier,
solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol},
solana_ledger::{
leader_schedule_cache::LeaderScheduleCache,
Expand All @@ -18,7 +17,10 @@ use {
solana_measure::measure::Measure,
solana_perf::deduper::Deduper,
solana_rayon_threadlimit::get_thread_count,
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
solana_rpc::{
max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions,
slot_status_notifier::SlotStatusNotifier,
},
solana_rpc_client_api::response::SlotUpdate,
solana_runtime::{
bank::{Bank, MAX_LEADER_SCHEDULE_STAKES},
Expand Down

0 comments on commit 07faf4a

Please sign in to comment.