Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support notify first shred received in geyser #3030

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use {
bytes::Bytes,
crossbeam_channel::{unbounded, Receiver, Sender},
solana_client::connection_cache::ConnectionCache,
solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc,
solana_geyser_plugin_manager::{
block_metadata_notifier_interface::BlockMetadataNotifierArc,
slot_status_notifier::SlotStatusNotifier,
},
solana_gossip::{
cluster_info::ClusterInfo, duplicate_shred_handler::DuplicateShredHandler,
duplicate_shred_listener::DuplicateShredListener,
Expand Down Expand Up @@ -161,6 +164,7 @@ impl Tvu {
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
cluster_slots: Arc<ClusterSlots>,
wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
slot_status_notifier: Option<SlotStatusNotifier>,
) -> Result<Self, String> {
let in_wen_restart = wen_restart_repair_slots.is_some();

Expand Down Expand Up @@ -210,6 +214,7 @@ impl Tvu {
retransmit_receiver,
max_slots.clone(),
Some(rpc_subscriptions.clone()),
slot_status_notifier,
);

let (ancestor_duplicate_slots_sender, ancestor_duplicate_slots_receiver) = unbounded();
Expand Down Expand Up @@ -542,6 +547,7 @@ pub mod tests {
outstanding_repair_requests,
cluster_slots,
wen_restart_repair_slots,
None,
)
.expect("assume success");
if enable_wen_restart {
Expand Down
5 changes: 5 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,10 @@ impl Validator {
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier());

let slot_status_notifier = geyser_plugin_service
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_slot_status_notifier());

info!(
"Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \
entry_notifier: {}",
Expand Down Expand Up @@ -1405,6 +1409,7 @@ impl Validator {
outstanding_repair_requests.clone(),
cluster_slots.clone(),
wen_restart_repair_slots.clone(),
slot_status_notifier,
)
.map_err(ValidatorError::Other)?;

Expand Down
4 changes: 4 additions & 0 deletions geyser-plugin-interface/src/geyser_plugin_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ pub enum SlotStatus {

/// The highest slot that has been voted on by supermajority of the cluster, ie. is confirmed.
Confirmed,

/// First Shred Received

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the comments for the other SlotStatus enums wrong? Almost seems like they were copy pasted from confirmation levels.. We send theses notifications out each time a slot transitions to the new state, right (assuming it makes it to that state)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes -- they were largely copied. Which one is wrong?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the part explaining this is "The highest slot" makes more sense in the context of confirmation levels.

I would probably rewrite them like this:

    /// First shred has been received for this slot.
    FirstShredReceived,

    /// This slot has been processed.
    Processed,

    /// This slot has been optimistically confirmed by a supermajority of the cluster.
    Confirmed,

    /// This slot has been confirmed and reached max vote lockout on the canonical fork.
    Rooted,

I acknowledge this is sort of out of scope for this PR - I just noticed it because your comment is written more like how I would expect for a slot status notification but doesn't quite match the others.

FirstShredReceived,
}

impl SlotStatus {
Expand All @@ -327,6 +330,7 @@ impl SlotStatus {
SlotStatus::Confirmed => "confirmed",
SlotStatus::Processed => "processed",
SlotStatus::Rooted => "rooted",
SlotStatus::FirstShredReceived => "first_shread_received",
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions geyser-plugin-manager/src/geyser_plugin_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
block_metadata_notifier_interface::BlockMetadataNotifierArc,
entry_notifier::EntryNotifierImpl,
geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest},
slot_status_notifier::SlotStatusNotifierImpl,
slot_status_notifier::{SlotStatusNotifier, SlotStatusNotifierImpl},
slot_status_observer::SlotStatusObserver,
transaction_notifier::TransactionNotifierImpl,
},
Expand Down Expand Up @@ -37,6 +37,7 @@ pub struct GeyserPluginService {
transaction_notifier: Option<TransactionNotifierArc>,
entry_notifier: Option<EntryNotifierArc>,
block_metadata_notifier: Option<BlockMetadataNotifierArc>,
slot_status_notifier: Option<SlotStatusNotifier>,
}

impl GeyserPluginService {
Expand Down Expand Up @@ -107,9 +108,10 @@ impl GeyserPluginService {
None
};

let (slot_status_observer, block_metadata_notifier): (
let (slot_status_observer, block_metadata_notifier, slot_status_notifier): (
Option<SlotStatusObserver>,
Option<BlockMetadataNotifierArc>,
Option<SlotStatusNotifier>,
) = if account_data_notifications_enabled
|| transaction_notifications_enabled
|| entry_notifications_enabled
Expand All @@ -119,14 +121,15 @@ impl GeyserPluginService {
(
Some(SlotStatusObserver::new(
confirmed_bank_receiver,
slot_status_notifier,
slot_status_notifier.clone(),
)),
Some(Arc::new(BlockMetadataNotifierImpl::new(
plugin_manager.clone(),
))),
Some(slot_status_notifier),
)
} else {
(None, None)
(None, None, None)
};

// Initialize plugin manager rpc handler thread if needed
Expand All @@ -143,6 +146,7 @@ impl GeyserPluginService {
transaction_notifier,
entry_notifier,
block_metadata_notifier,
slot_status_notifier,
bw-solana marked this conversation as resolved.
Show resolved Hide resolved
})
}

Expand Down Expand Up @@ -172,6 +176,10 @@ impl GeyserPluginService {
self.block_metadata_notifier.clone()
}

pub fn get_slot_status_notifier(&self) -> Option<SlotStatusNotifier> {
self.slot_status_notifier.clone()
}

pub fn join(self) -> thread::Result<()> {
if let Some(mut slot_status_observer) = self.slot_status_observer {
slot_status_observer.join()?;
Expand Down
7 changes: 7 additions & 0 deletions geyser-plugin-manager/src/slot_status_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub trait SlotStatusNotifierInterface {

/// Notified when a slot is rooted.
fn notify_slot_rooted(&self, slot: Slot, parent: Option<Slot>);

/// Notified when the first shred is received for a slot.
fn notify_first_shred_received(&self, slot: Slot);
}

pub type SlotStatusNotifier = Arc<RwLock<dyn SlotStatusNotifierInterface + Sync + Send>>;
Expand All @@ -37,6 +40,10 @@ impl SlotStatusNotifierInterface for SlotStatusNotifierImpl {
fn notify_slot_rooted(&self, slot: Slot, parent: Option<Slot>) {
self.notify_slot_status(slot, parent, SlotStatus::Rooted);
}

fn notify_first_shred_received(&self, slot: Slot) {
self.notify_slot_status(slot, None, SlotStatus::FirstShredReceived);
}
}

impl SlotStatusNotifierImpl {
Expand Down
1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions turbine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ rayon = { workspace = true }
rustls = { workspace = true }
solana-entry = { workspace = true }
solana-feature-set = { workspace = true }
solana-geyser-plugin-manager = { workspace = true }
solana-gossip = { workspace = true }
solana-ledger = { workspace = true }
solana-measure = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions turbine/benches/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
shreds_receiver,
Arc::default(), // solana_rpc::max_slots::MaxSlots
None,
None,
);

let mut index = 0;
Expand Down
24 changes: 23 additions & 1 deletion turbine/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use {
lru::LruCache,
rand::Rng,
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_geyser_plugin_manager::slot_status_notifier::SlotStatusNotifier,
solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol},
solana_ledger::{
leader_schedule_cache::LeaderScheduleCache,
Expand Down Expand Up @@ -184,6 +185,7 @@ fn retransmit(
shred_deduper: &mut ShredDeduper<2>,
max_slots: &MaxSlots,
rpc_subscriptions: Option<&RpcSubscriptions>,
slot_status_notifier: Option<&SlotStatusNotifier>,
) -> Result<(), RecvTimeoutError> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
let mut shreds = shreds_receiver.recv_timeout(RECV_TIMEOUT)?;
Expand Down Expand Up @@ -299,7 +301,12 @@ fn retransmit(
.reduce(HashMap::new, RetransmitSlotStats::merge)
})
};
stats.upsert_slot_stats(slot_stats, root_bank.slot(), rpc_subscriptions);
stats.upsert_slot_stats(
slot_stats,
root_bank.slot(),
rpc_subscriptions,
slot_status_notifier,
);
timer_start.stop();
stats.total_time += timer_start.as_us();
stats.maybe_submit(&root_bank, &working_bank, cluster_info, cluster_nodes_cache);
Expand Down Expand Up @@ -381,6 +388,7 @@ pub fn retransmitter(
shreds_receiver: Receiver<Vec</*shred:*/ Vec<u8>>>,
max_slots: Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
slot_status_notifier: Option<SlotStatusNotifier>,
) -> JoinHandle<()> {
let cluster_nodes_cache = ClusterNodesCache::<RetransmitStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
Expand Down Expand Up @@ -412,6 +420,7 @@ pub fn retransmitter(
&mut shred_deduper,
&max_slots,
rpc_subscriptions.as_deref(),
slot_status_notifier.as_ref(),
) {
Ok(()) => (),
Err(RecvTimeoutError::Timeout) => (),
Expand All @@ -435,6 +444,7 @@ impl RetransmitStage {
retransmit_receiver: Receiver<Vec</*shred:*/ Vec<u8>>>,
max_slots: Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
slot_status_notifier: Option<SlotStatusNotifier>,
) -> Self {
let retransmit_thread_handle = retransmitter(
retransmit_sockets,
Expand All @@ -445,6 +455,7 @@ impl RetransmitStage {
retransmit_receiver,
max_slots,
rpc_subscriptions,
slot_status_notifier,
);

Self {
Expand Down Expand Up @@ -507,6 +518,7 @@ impl RetransmitStats {
feed: I,
root: Slot,
rpc_subscriptions: Option<&RpcSubscriptions>,
slot_status_notifier: Option<&SlotStatusNotifier>,
) where
I: IntoIterator<Item = (Slot, RetransmitSlotStats)>,
{
Expand All @@ -523,6 +535,16 @@ impl RetransmitStats {
datapoint_info!("retransmit-first-shred", ("slot", slot, i64));
}
}

if let Some(slot_status_notifier) = slot_status_notifier {
if slot > root {
slot_status_notifier
.read()
.unwrap()
.notify_first_shred_received(slot);
}
}

self.slot_stats.put(slot, slot_stats);
}
Some(entry) => {
Expand Down
Loading