Skip to content

Commit

Permalink
Add compression bool to create_geyser_autoconnection_task_with_mpsc
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Apr 23, 2024
1 parent 950559b commit 05f110c
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 110 deletions.
186 changes: 97 additions & 89 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions examples/stream_blocks_autoconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub async fn main() {
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
exit_notify,
false,
);
let mut message_channel =
spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel);
Expand Down
6 changes: 6 additions & 0 deletions examples/stream_blocks_mainnet_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,17 @@ pub async fn main() {
let green_stream = create_geyser_reconnecting_stream(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
false,
);
let blue_stream = create_geyser_reconnecting_stream(
blue_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
false,
);
let toxiproxy_stream = create_geyser_reconnecting_stream(
toxiproxy_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
false,
);
let multiplex_stream = create_multiplexed_stream(
vec![green_stream, blue_stream, toxiproxy_stream],
Expand All @@ -164,14 +167,17 @@ pub async fn main() {
let green_stream = create_geyser_reconnecting_stream(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
false,
);
let blue_stream = create_geyser_reconnecting_stream(
blue_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
false,
);
let toxiproxy_stream = create_geyser_reconnecting_stream(
toxiproxy_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
false,
);
let multiplex_stream = create_multiplexed_stream(
vec![green_stream, blue_stream, toxiproxy_stream],
Expand Down
3 changes: 3 additions & 0 deletions examples/stream_blocks_mainnet_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,21 @@ pub async fn main() {
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.resubscribe(),
false,
);
let _blue_stream_ah = create_geyser_autoconnection_task_with_mpsc(
blue_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.resubscribe(),
false,
);
let _toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc(
toxiproxy_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify,
false,
);
start_example_blockmeta_consumer(blockmeta_rx);

Expand Down
2 changes: 2 additions & 0 deletions examples/stream_blocks_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@ pub async fn main() {
let green_stream = create_geyser_reconnecting_stream(
config.clone(),
GeyserFilter(CommitmentConfig::processed()).accounts(),
false,
);

let blue_stream = create_geyser_reconnecting_stream(
config.clone(),
GeyserFilter(CommitmentConfig::processed()).blocks_and_txs(),
false,
);

tokio::spawn(async move {
Expand Down
18 changes: 10 additions & 8 deletions src/grpc_subscription_autoreconnect_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
pub fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
compressed: bool,
) -> impl Stream<Item = Message> {
let mut state = ConnectionState::NotConnected(1);

Expand All @@ -47,14 +48,15 @@ pub fn create_geyser_reconnecting_stream(
async move {

let connect_result = connect_with_timeout_with_buffers(
addr,
token,
config,
connect_timeout,
request_timeout,
GeyserGrpcClientBufferConfig::optimize_for_subscription(&subscribe_filter),
)
.await;
addr,
token,
config,
connect_timeout,
request_timeout,
GeyserGrpcClientBufferConfig::optimize_for_subscription(&subscribe_filter),
compressed,
)
.await;

let mut client = connect_result?;

Expand Down
21 changes: 13 additions & 8 deletions src/grpc_subscription_autoreconnect_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub fn create_geyser_autoconnection_task(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
exit_notify: broadcast::Receiver<()>,
compressed: bool,
) -> (JoinHandle<()>, Receiver<Message>) {
let (sender, receiver_channel) = tokio::sync::mpsc::channel::<Message>(1);

Expand All @@ -47,6 +48,7 @@ pub fn create_geyser_autoconnection_task(
subscribe_filter,

Check warning on line 48 in src/grpc_subscription_autoreconnect_tasks.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/grpc_subscription_autoreconnect_tasks.rs
sender,
exit_notify,
compressed
);

(join_handle, receiver_channel)
Expand All @@ -60,6 +62,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
subscribe_filter: SubscribeRequest,
mpsc_downstream: tokio::sync::mpsc::Sender<Message>,
mut exit_notify: broadcast::Receiver<()>,
compressed: bool,
) -> JoinHandle<()> {
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/

Expand Down Expand Up @@ -131,14 +134,16 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
}

Check warning on line 134 in src/grpc_subscription_autoreconnect_tasks.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/grpc_subscription_autoreconnect_tasks.rs
};

let fut_connector = connect_with_timeout_with_buffers(
addr,
token,
config,
connect_timeout,
request_timeout,
buffer_config,
);
let fut_connector =
connect_with_timeout_with_buffers(
addr,
token,
config,
connect_timeout,
request_timeout,
buffer_config,
compressed,
);

match await_or_exit(fut_connector, exit_notify.recv()).await {
MaybeExit::Continue(connection_result) => connection_handler(connection_result),
Expand Down
14 changes: 9 additions & 5 deletions src/yellowstone_grpc_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub async fn connect_with_timeout_with_buffers<E, T>(
connect_timeout: Option<Duration>,
request_timeout: Option<Duration>,
buffer_config: GeyserGrpcClientBufferConfig,
compression: bool,
) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor>>
where
E: Into<Bytes>,
Expand Down Expand Up @@ -85,12 +86,15 @@ where
let interceptor = InterceptorXToken { x_token };

let channel = endpoint.connect_lazy();
let mut geyser_client = GeyserClient::with_interceptor(channel.clone(), interceptor.clone())
.max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size());

if compression {
geyser_client = geyser_client.accept_compressed(CompressionEncoding::Gzip).send_compressed(CompressionEncoding::Gzip);
}
let client = GeyserGrpcClient::new(
HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
GeyserClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size())
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
HealthClient::with_interceptor(channel, interceptor),
geyser_client
);
Ok(client)
}

0 comments on commit 05f110c

Please sign in to comment.