Skip to content

Commit

Permalink
WIP integrate window patch
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Apr 9, 2024
1 parent a821ca2 commit 71a9eae
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 54 deletions.
71 changes: 36 additions & 35 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ authors = ["GroovieGermanikus <[email protected]>"]
repository = "https://github.com/blockworks-foundation/geyser-grpc-connector"

[dependencies]
yellowstone-grpc-client = { version = "1.13.0+solana.1.17.15", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }
yellowstone-grpc-proto = { version = "1.12.0+solana.1.17.15", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }
yellowstone-grpc-client = "1.15.0"
yellowstone-grpc-proto = "1.14.0"


# required for CommitmentConfig
solana-sdk = "~1.17.15"
solana-sdk = "~1.17.28"

url = "2.5.0"
async-stream = "0.3.5"
Expand Down
67 changes: 51 additions & 16 deletions src/grpc_subscription_autoreconnect_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use log::{debug, info, log, trace, warn, Level};
use std::time::Duration;

Check warning on line 5 in src/grpc_subscription_autoreconnect_streams.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/grpc_subscription_autoreconnect_streams.rs
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
use yellowstone_grpc_client::{GeyserGrpcBuilder, GeyserGrpcBuilderResult, GeyserGrpcClient, GeyserGrpcClientResult, InterceptorXToken};
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::metadata::AsciiMetadataValue;
use yellowstone_grpc_proto::tonic::Status;

enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
Expand All @@ -24,41 +25,48 @@ pub fn create_geyser_reconnecting_stream(
) -> impl Stream<Item = Message> {

Check warning on line 25 in src/grpc_subscription_autoreconnect_streams.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/grpc_subscription_autoreconnect_streams.rs
let mut state = ConnectionState::NotConnected(1);



// in case of cancellation, we restart from here:
// thus we want to keep the progression in a state object outside the stream! makro
let the_stream = stream! {

let addr = grpc_source.grpc_addr.clone();
let grpc_x_token = grpc_source.grpc_x_token.clone();
let tls_config = grpc_source.tls_config.clone();
// let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout);
let receive_timeout = Some(Duration::from_secs(10)); // FIXME

loop {
let yield_value;


(state, yield_value) = match state {

ConnectionState::NotConnected(attempt) => {

// let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout);
// let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
// let subscribe_timeout = grpc_source.timeouts.map(|t| t.subscribe_timeout);
let subscribe_timeout = Some(Duration::from_secs(10)); // FIXME




let connection_task = tokio::spawn({
let addr = grpc_source.grpc_addr.clone();
let token = grpc_source.grpc_x_token.clone();
let config = grpc_source.tls_config.clone();
let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout);
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
let subscribe_filter = subscribe_filter.clone();
let builder = build_client(grpc_source.clone()).unwrap(); // TODO instead of unwrap, move to Fatal state
log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr);
async move {

let connect_result = GeyserGrpcClient::connect_with_timeout(
addr, token, config,
connect_timeout,
request_timeout,
false)
.await;
let mut client = connect_result?;

let mut client = builder.connect().await.unwrap(); // FIXME

debug!("Subscribe with filter {:?}", subscribe_filter);

let subscribe_result = timeout(subscribe_timeout.unwrap_or(Duration::MAX),
client
.subscribe_once2(subscribe_filter))
.subscribe_once(subscribe_filter))
.await;

// maybe not optimal
Expand Down Expand Up @@ -88,7 +96,6 @@ pub fn create_geyser_reconnecting_stream(
}

ConnectionState::Ready(mut geyser_stream) => {
let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout);
match timeout(receive_timeout.unwrap_or(Duration::MAX), geyser_stream.next()).await {
Ok(Some(Ok(update_message))) => {
trace!("> recv update message from {}", grpc_source);
Expand Down Expand Up @@ -130,6 +137,34 @@ pub fn create_geyser_reconnecting_stream(
the_stream

Check warning on line 137 in src/grpc_subscription_autoreconnect_streams.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/grpc_subscription_autoreconnect_streams.rs
}

fn build_client(grpc_source_config: GrpcSourceConfig) -> GeyserGrpcBuilderResult<GeyserGrpcBuilder> {
let mut builder = GeyserGrpcClient::build_from_shared(grpc_source_config.grpc_addr)?;

if let Some(tls_config) = grpc_source_config.tls_config {
builder = builder.tls_config(tls_config)?;
}

Check warning on line 145 in src/grpc_subscription_autoreconnect_streams.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/grpc_subscription_autoreconnect_streams.rs

if let Some(timeouts) = grpc_source_config.timeouts {

builder = builder.timeout(timeouts.connect_timeout);

builder = builder.timeout(timeouts.request_timeout);

Check warning on line 152 in src/grpc_subscription_autoreconnect_streams.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/grpc_subscription_autoreconnect_streams.rs
// subscribe + receive timeout are handled somewhere else

}

let x_token: Option<AsciiMetadataValue> = match grpc_source_config.grpc_x_token {
Some(x_token) => Some(x_token.try_into()?),
None => None,
};
let interceptor = InterceptorXToken { x_token };

builder = builder.interceptor(interceptor);

Ok(builder)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
12 changes: 12 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::time::Duration;
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeUpdate};
use yellowstone_grpc_proto::prost::bytes::Bytes;
use yellowstone_grpc_proto::tonic;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;

pub mod channel_plugger;
Expand Down Expand Up @@ -31,6 +33,7 @@ pub struct GrpcConnectionTimeouts {
pub receive_timeout: Duration,

Check warning on line 33 in src/lib.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/lib.rs
}


#[derive(Clone)]
pub struct GrpcSourceConfig {
pub grpc_addr: String,
Expand Down Expand Up @@ -78,6 +81,15 @@ impl GrpcSourceConfig {
timeouts: Some(timeouts),
}
}

pub fn build_tonic_endpoint(&self) -> tonic::transport::Endpoint {
let mut endpoint = tonic::transport::Endpoint::from_shared(self.grpc_addr.clone())

Check warning on line 86 in src/lib.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/lib.rs
.expect("grpc_addr must be a valid url");
if let Some(tls_config) = &self.tls_config {
endpoint = endpoint.tls_config(tls_config.clone()).expect("tls_config must be valid");
}
endpoint
}
}

#[derive(Clone)]
Expand Down

0 comments on commit 71a9eae

Please sign in to comment.