Skip to content

Commit

Permalink
set nodelay + add env for window+biffer sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Jun 14, 2024
1 parent a4a93de commit c096a04
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
23 changes: 21 additions & 2 deletions src/grpc_subscription_autoreconnect_tasks.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::env;
use std::future::Future;
use crate::{yellowstone_grpc_util, Attempt, GrpcSourceConfig, Message};
use futures::{Stream, StreamExt};
Expand All @@ -13,7 +14,7 @@ use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError};
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::service::Interceptor;
use yellowstone_grpc_proto::tonic::Status;
use crate::yellowstone_grpc_util::connect_with_timeout_with_buffers;
use crate::yellowstone_grpc_util::{connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig};

enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor> {
NotConnected(Attempt),
Expand Down Expand Up @@ -88,7 +89,8 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
addr
);

let buffer_config = yellowstone_grpc_util::GeyserGrpcClientBufferConfig::optimize_for_subscription(&subscribe_filter);
// let buffer_config = yellowstone_grpc_util::GeyserGrpcClientBufferConfig::optimize_for_subscription(&subscribe_filter);
let buffer_config = buffer_config_from_env();
debug!("Using Grpc Buffer config {:?}", buffer_config);

let connection_handler = |connect_result| match connect_result {
Expand Down Expand Up @@ -374,6 +376,23 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
jh_geyser_task
}

fn buffer_config_from_env() -> GeyserGrpcClientBufferConfig {
if env::var("BUFFER_SIZE").is_err() || env::var("CONN_WINDOW").is_err() || env::var("STREAM_WINDOW").is_err() {
return GeyserGrpcClientBufferConfig::default();
}

let buffer_size = env::var("BUFFER_SIZE").expect("buffer_size").parse::<usize>().expect("integer(bytes)");
let conn_window = env::var("CONN_WINDOW").expect("conn_window").parse::<u32>().expect("integer(bytes)");
let stream_window = env::var("STREAM_WINDOW").expect("stream_window").parse::<u32>().expect("integer(bytes)");

// conn_window should be larger than stream_window
GeyserGrpcClientBufferConfig {
buffer_size: Some(buffer_size),
conn_window: Some(conn_window),
stream_window: Some(stream_window),
}
}


enum MaybeExit<T> {
Continue(T),
Expand Down
2 changes: 2 additions & 0 deletions src/yellowstone_grpc_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ where
{
// see https://github.com/blockworks-foundation/geyser-grpc-connector/issues/10
let mut endpoint = tonic::transport::Endpoint::from_shared(endpoint)?
.tcp_nodelay(true)
.http2_adaptive_window(true)
.buffer_size(buffer_config.buffer_size)
.initial_connection_window_size(buffer_config.conn_window)
.initial_stream_window_size(buffer_config.stream_window);
Expand Down

0 comments on commit c096a04

Please sign in to comment.