From 63a48ed08191cf2191f223a8acda296ae201230c Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sat, 31 Aug 2024 21:48:47 -0600 Subject: [PATCH 1/4] geyser: wrap message into `Box` in snapshot channel --- CHANGELOG.md | 2 ++ yellowstone-grpc-geyser/src/grpc.rs | 8 ++++---- yellowstone-grpc-geyser/src/plugin.rs | 7 ++++--- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 10f1ae4a..b41eb5b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ The minor version will be incremented upon a breaking change and the patch versi ### Features +- geyser: wrap message into `Box` in snapshot channel ([#???](https://github.com/rpcpool/yellowstone-grpc/pull/???)) + ### Breaking ## 2024-08-26 diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index eb0a1233..f2f43b69 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -728,7 +728,7 @@ pub struct GrpcService { config_filters: Arc, blocks_meta: Option, subscribe_id: AtomicUsize, - snapshot_rx: Mutex>>>, + snapshot_rx: Mutex>>>>, broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>>)>, debug_clients_tx: Option>, } @@ -741,7 +741,7 @@ impl GrpcService { debug_clients_tx: Option>, is_reload: bool, ) -> anyhow::Result<( - Option>>, + Option>>>, mpsc::UnboundedSender>, Arc, )> { @@ -1125,7 +1125,7 @@ impl GrpcService { config_filters: Arc, stream_tx: mpsc::Sender>, mut client_rx: mpsc::UnboundedReceiver>, - mut snapshot_rx: Option>>, + mut snapshot_rx: Option>>>, mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc>>)>, debug_client_tx: Option>, drop_client: impl FnOnce(), @@ -1256,7 +1256,7 @@ impl GrpcService { endpoint: &str, stream_tx: &mpsc::Sender>, client_rx: &mut mpsc::UnboundedReceiver>, - snapshot_rx: crossbeam_channel::Receiver>, + snapshot_rx: crossbeam_channel::Receiver>>, is_alive: &mut bool, filter: &mut Filter, ) { diff --git a/yellowstone-grpc-geyser/src/plugin.rs b/yellowstone-grpc-geyser/src/plugin.rs index 65730ef1..9c2e1e36 100644 --- a/yellowstone-grpc-geyser/src/plugin.rs +++ b/yellowstone-grpc-geyser/src/plugin.rs @@ -26,7 +26,7 @@ use { #[derive(Debug)] pub struct PluginInner { runtime: Runtime, - snapshot_channel: Option>>, + snapshot_channel: Option>>>, snapshot_channel_closed: AtomicBool, grpc_channel: mpsc::UnboundedSender>, grpc_shutdown: Arc, @@ -137,10 +137,10 @@ impl GeyserPlugin for Plugin { ReplicaAccountInfoVersions::V0_0_3(info) => info, }; - let message = Message::Account((account, slot, is_startup).into()); if is_startup { if let Some(channel) = &inner.snapshot_channel { - match channel.send(Some(message)) { + let message = Message::Account((account, slot, is_startup).into()); + match channel.send(Some(Box::new(message))) { Ok(()) => MESSAGE_QUEUE_SIZE.inc(), Err(_) => { if !inner.snapshot_channel_closed.swap(true, Ordering::Relaxed) { @@ -152,6 +152,7 @@ impl GeyserPlugin for Plugin { } } } else { + let message = Message::Account((account, slot, is_startup).into()); inner.send_message(message); } From 50fb536879e9299105c3b7fbf981cfccd56075f7 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sat, 31 Aug 2024 22:32:50 -0600 Subject: [PATCH 2/4] drop sender on end of startup --- yellowstone-grpc-geyser/src/plugin.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/yellowstone-grpc-geyser/src/plugin.rs b/yellowstone-grpc-geyser/src/plugin.rs index 9c2e1e36..be6082b8 100644 --- a/yellowstone-grpc-geyser/src/plugin.rs +++ b/yellowstone-grpc-geyser/src/plugin.rs @@ -13,7 +13,7 @@ use { concat, env, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, Mutex, }, time::Duration, }, @@ -26,7 +26,7 @@ use { #[derive(Debug)] pub struct PluginInner { runtime: Runtime, - snapshot_channel: Option>>>, + snapshot_channel: Mutex>>>>, snapshot_channel_closed: AtomicBool, grpc_channel: mpsc::UnboundedSender>, grpc_shutdown: Arc, @@ -101,7 +101,7 @@ impl GeyserPlugin for Plugin { self.inner = Some(PluginInner { runtime, - snapshot_channel, + snapshot_channel: Mutex::new(snapshot_channel), snapshot_channel_closed: AtomicBool::new(false), grpc_channel, grpc_shutdown, @@ -138,7 +138,7 @@ impl GeyserPlugin for Plugin { }; if is_startup { - if let Some(channel) = &inner.snapshot_channel { + if let Some(channel) = inner.snapshot_channel.lock().unwrap().as_ref() { let message = Message::Account((account, slot, is_startup).into()); match channel.send(Some(Box::new(message))) { Ok(()) => MESSAGE_QUEUE_SIZE.inc(), @@ -162,7 +162,7 @@ impl GeyserPlugin for Plugin { fn notify_end_of_startup(&self) -> PluginResult<()> { self.with_inner(|inner| { - if let Some(channel) = &inner.snapshot_channel { + if let Some(channel) = inner.snapshot_channel.lock().unwrap().take() { match channel.send(None) { Ok(()) => MESSAGE_QUEUE_SIZE.inc(), Err(_) => panic!("failed to send message to startup queue: channel closed"), From 1cb2b55ae4c73d3781ef3521502cd515e371d96a Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sat, 31 Aug 2024 22:35:55 -0600 Subject: [PATCH 3/4] remove useless option --- yellowstone-grpc-geyser/src/grpc.rs | 16 ++++++---------- yellowstone-grpc-geyser/src/plugin.rs | 11 +++-------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index f2f43b69..fd82ebac 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -728,7 +728,7 @@ pub struct GrpcService { config_filters: Arc, blocks_meta: Option, subscribe_id: AtomicUsize, - snapshot_rx: Mutex>>>>, + snapshot_rx: Mutex>>>, broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>>)>, debug_clients_tx: Option>, } @@ -741,7 +741,7 @@ impl GrpcService { debug_clients_tx: Option>, is_reload: bool, ) -> anyhow::Result<( - Option>>>, + Option>>, mpsc::UnboundedSender>, Arc, )> { @@ -1125,7 +1125,7 @@ impl GrpcService { config_filters: Arc, stream_tx: mpsc::Sender>, mut client_rx: mpsc::UnboundedReceiver>, - mut snapshot_rx: Option>>>, + mut snapshot_rx: Option>>, mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc>>)>, debug_client_tx: Option>, drop_client: impl FnOnce(), @@ -1256,7 +1256,7 @@ impl GrpcService { endpoint: &str, stream_tx: &mpsc::Sender>, client_rx: &mut mpsc::UnboundedReceiver>, - snapshot_rx: crossbeam_channel::Receiver>>, + snapshot_rx: crossbeam_channel::Receiver>, is_alive: &mut bool, filter: &mut Filter, ) { @@ -1292,18 +1292,14 @@ impl GrpcService { let message = match snapshot_rx.try_recv() { Ok(message) => { MESSAGE_QUEUE_SIZE.dec(); - match message { - Some(message) => message, - None => break, - } + message } Err(crossbeam_channel::TryRecvError::Empty) => { sleep(Duration::from_millis(1)).await; continue; } Err(crossbeam_channel::TryRecvError::Disconnected) => { - error!("client #{id}: snapshot channel disconnected"); - *is_alive = false; + info!("client #{id}: end of startup"); break; } }; diff --git a/yellowstone-grpc-geyser/src/plugin.rs b/yellowstone-grpc-geyser/src/plugin.rs index be6082b8..c221bbbc 100644 --- a/yellowstone-grpc-geyser/src/plugin.rs +++ b/yellowstone-grpc-geyser/src/plugin.rs @@ -26,7 +26,7 @@ use { #[derive(Debug)] pub struct PluginInner { runtime: Runtime, - snapshot_channel: Mutex>>>>, + snapshot_channel: Mutex>>>, snapshot_channel_closed: AtomicBool, grpc_channel: mpsc::UnboundedSender>, grpc_shutdown: Arc, @@ -140,7 +140,7 @@ impl GeyserPlugin for Plugin { if is_startup { if let Some(channel) = inner.snapshot_channel.lock().unwrap().as_ref() { let message = Message::Account((account, slot, is_startup).into()); - match channel.send(Some(Box::new(message))) { + match channel.send(Box::new(message)) { Ok(()) => MESSAGE_QUEUE_SIZE.inc(), Err(_) => { if !inner.snapshot_channel_closed.swap(true, Ordering::Relaxed) { @@ -162,12 +162,7 @@ impl GeyserPlugin for Plugin { fn notify_end_of_startup(&self) -> PluginResult<()> { self.with_inner(|inner| { - if let Some(channel) = inner.snapshot_channel.lock().unwrap().take() { - match channel.send(None) { - Ok(()) => MESSAGE_QUEUE_SIZE.inc(), - Err(_) => panic!("failed to send message to startup queue: channel closed"), - } - } + let _snapshot_channel = inner.snapshot_channel.lock().unwrap().take(); Ok(()) }) } From 4205f8a21582015e7678e9dfd54d31ce71c354c2 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sat, 31 Aug 2024 22:37:03 -0600 Subject: [PATCH 4/4] changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b41eb5b7..04c5d651 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ The minor version will be incremented upon a breaking change and the patch versi ### Features -- geyser: wrap message into `Box` in snapshot channel ([#???](https://github.com/rpcpool/yellowstone-grpc/pull/???)) +- geyser: wrap message into `Box` in snapshot channel ([#418](https://github.com/rpcpool/yellowstone-grpc/pull/418)) ### Breaking