Skip to content

Commit

Permalink
geyser: fix x-request-snapshot handler (#413)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Aug 23, 2024
1 parent a9e2fa0 commit 3c08e27
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 30 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ The minor version will be incremented upon a breaking change and the patch versi

### Breaking

## 2024-08-23

- yellowstone-grpc-client-1.16.2+solana.2.0.5
- yellowstone-grpc-geyser-1.16.2+solana.2.0.5
- yellowstone-grpc-proto-1.15.0+solana.2.0.5
- yellowstone-grpc-tools-1.0.0-rc.12+solana.2.0.5

### Fixes

- geyser: fix `x-request-snapshot` handler ([#413](https://github.com/rpcpool/yellowstone-grpc/pull/413))

## 2024-08-22

- yellowstone-grpc-client-1.16.1+solana.2.0.5
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
resolver = "2"
members = [
"examples/rust", # 1.14.1+solana.2.0.5
"yellowstone-grpc-client", # 1.16.1+solana.2.0.5
"yellowstone-grpc-geyser", # 1.16.1+solana.2.0.5
"yellowstone-grpc-client", # 1.16.2+solana.2.0.5
"yellowstone-grpc-geyser", # 1.16.2+solana.2.0.5
"yellowstone-grpc-proto", # 1.15.0+solana.2.0.5
"yellowstone-grpc-tools", # 1.0.0-rc.12+solana.2.0.5
]
Expand Down Expand Up @@ -71,7 +71,7 @@ tracing = "0.1.37"
tracing-subscriber = "0.3.17"
uuid = "1.8.0"
vergen = "9.0.0"
yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.16.1+solana.2.0.5" }
yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.16.2+solana.2.0.5" }
yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "=1.15.0+solana.2.0.5", default-features = false }

[profile.release]
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-client"
version = "1.16.1+solana.2.0.5"
version = "1.16.2+solana.2.0.5"
authors = { workspace = true }
edition = { workspace = true }
description = "Yellowstone gRPC Geyser Simple Client"
Expand Down
6 changes: 3 additions & 3 deletions yellowstone-grpc-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,11 @@ impl GeyserGrpcBuilder {
}

// Include `x-request-snapshot`
pub fn set_x_request_snapshot(self, value: bool) -> GeyserGrpcBuilderResult<Self> {
Ok(Self {
pub fn set_x_request_snapshot(self, value: bool) -> Self {
Self {
x_request_snapshot: value,
..self
})
}
}

// Endpoint options
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-geyser/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-geyser"
version = "1.16.1+solana.2.0.5"
version = "1.16.2+solana.2.0.5"
authors = { workspace = true }
edition = { workspace = true }
description = "Yellowstone gRPC Geyser Plugin"
Expand Down
36 changes: 18 additions & 18 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,6 @@ impl GrpcService {
async fn client_loop(
id: usize,
endpoint: String,
x_request_snapshot: bool,
config_filters: Arc<ConfigGrpcFilters>,
stream_tx: mpsc::Sender<TonicResult<SubscribeUpdate>>,
mut client_rx: mpsc::UnboundedReceiver<Option<Filter>>,
Expand Down Expand Up @@ -1157,19 +1156,17 @@ impl GrpcService {
info!("client #{id}: new");

let mut is_alive = true;
if x_request_snapshot {
if let Some(snapshot_rx) = snapshot_rx.take() {
Self::client_loop_snapshot(
id,
&endpoint,
&stream_tx,
&mut client_rx,
snapshot_rx,
&mut is_alive,
&mut filter,
)
.await;
}
if let Some(snapshot_rx) = snapshot_rx.take() {
Self::client_loop_snapshot(
id,
&endpoint,
&stream_tx,
&mut client_rx,
snapshot_rx,
&mut is_alive,
&mut filter,
)
.await;
}

if is_alive {
Expand Down Expand Up @@ -1273,7 +1270,6 @@ impl GrpcService {
if stream_tx.send(Ok(msg)).await.is_err() {
error!("client #{id}: stream closed");
*is_alive = false;
break;
}
continue;
}
Expand Down Expand Up @@ -1332,7 +1328,13 @@ impl Geyser for GrpcService {
mut request: Request<Streaming<SubscribeRequest>>,
) -> TonicResult<Response<Self::SubscribeStream>> {
let id = self.subscribe_id.fetch_add(1, Ordering::Relaxed);
let snapshot_rx = self.snapshot_rx.lock().await.take();

let x_request_snapshot = request.metadata().contains_key("x-request-snapshot");
let snapshot_rx = if x_request_snapshot {
self.snapshot_rx.lock().await.take()
} else {
None
};
let (stream_tx, stream_rx) = mpsc::channel(if snapshot_rx.is_some() {
self.config_snapshot_client_channel_capacity
} else {
Expand Down Expand Up @@ -1378,7 +1380,6 @@ impl Geyser for GrpcService {
.get("x-endpoint")
.and_then(|h| h.to_str().ok().map(|s| s.to_string()))
.unwrap_or_else(|| "".to_owned());
let x_request_snapshot = request.metadata().contains_key("x-request-snapshot");

let config_filters = Arc::clone(&self.config_filters);
let incoming_stream_tx = stream_tx.clone();
Expand Down Expand Up @@ -1425,7 +1426,6 @@ impl Geyser for GrpcService {
tokio::spawn(Self::client_loop(
id,
endpoint,
x_request_snapshot,
Arc::clone(&self.config_filters),
stream_tx,
client_rx,
Expand Down
19 changes: 17 additions & 2 deletions yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ use {
ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult,
SlotStatus,
},
std::{concat, env, sync::Arc, time::Duration},
std::{
concat, env,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
},
tokio::{
runtime::{Builder, Runtime},
sync::{mpsc, Notify},
Expand All @@ -20,6 +27,7 @@ use {
pub struct PluginInner {
runtime: Runtime,
snapshot_channel: Option<crossbeam_channel::Sender<Option<Message>>>,
snapshot_channel_closed: AtomicBool,
grpc_channel: mpsc::UnboundedSender<Arc<Message>>,
grpc_shutdown: Arc<Notify>,
prometheus: PrometheusService,
Expand Down Expand Up @@ -94,6 +102,7 @@ impl GeyserPlugin for Plugin {
self.inner = Some(PluginInner {
runtime,
snapshot_channel,
snapshot_channel_closed: AtomicBool::new(false),
grpc_channel,
grpc_shutdown,
prometheus,
Expand Down Expand Up @@ -133,7 +142,13 @@ impl GeyserPlugin for Plugin {
if let Some(channel) = &inner.snapshot_channel {
match channel.send(Some(message)) {
Ok(()) => MESSAGE_QUEUE_SIZE.inc(),
Err(_) => panic!("failed to send message to startup queue: channel closed"),
Err(_) => {
if !inner.snapshot_channel_closed.swap(true, Ordering::Relaxed) {
log::error!(
"failed to send message to startup queue: channel closed"
)
}
}
}
}
} else {
Expand Down

0 comments on commit 3c08e27

Please sign in to comment.