Skip to content

Commit

Permalink
compile
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Jan 19, 2024
1 parent dc53a50 commit cf0c83b
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 158 deletions.
4 changes: 2 additions & 2 deletions examples/stream_blocks_mainnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ use solana_sdk::transaction::TransactionError;
use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock;

use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{
create_geyser_reconnecting_stream, GeyserFilter,
create_geyser_reconnecting_stream,
};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use tokio::time::{sleep, Duration};

Check warning on line 30 in examples/stream_blocks_mainnet.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/stream_blocks_mainnet.rs
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig};
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig};

Check warning on line 33 in examples/stream_blocks_mainnet.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/stream_blocks_mainnet.rs

fn start_example_block_consumer(
multiplex_stream: impl Stream<Item = ProducedBlock> + Send + 'static,
Expand Down
10 changes: 5 additions & 5 deletions examples/stream_blocks_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::env;
use std::pin::pin;

use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{
create_geyser_reconnecting_stream, GeyserFilter,
create_geyser_reconnecting_stream,
};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
Expand All @@ -16,8 +16,8 @@ use tracing::warn;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;

Check warning on line 17 in examples/stream_blocks_single.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/stream_blocks_single.rs
use yellowstone_grpc_proto::prost::Message as _;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{create_geyser_reconnecting_task, Message};
use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig};
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{create_geyser_autoconnection_task, Message};
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig};

fn start_example_blockmini_consumer(
multiplex_stream: impl Stream<Item = BlockMini> + Send + 'static,
Expand Down Expand Up @@ -96,13 +96,13 @@ pub async fn main() {

info!("Write Block stream..");

let (jh_geyser_task, mut green_stream) = create_geyser_reconnecting_task(
let (jh_geyser_task, mut green_stream) = create_geyser_autoconnection_task(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
);

tokio::spawn(async move {
while let Ok(message) = green_stream.recv().await {
while let Some(message) = green_stream.recv().await {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
// info!("got update: {:?}", subscriber_update.update_oneof.);
Expand Down
68 changes: 0 additions & 68 deletions src/grpc_subscription_autoreconnect_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,74 +44,6 @@ enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
WaitReconnect(Attempt),
}

#[derive(Clone)]
pub struct GeyserFilter(pub CommitmentConfig);

impl GeyserFilter {
pub fn blocks_and_txs(&self) -> SubscribeRequest {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);

SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: blocks_subs,
blocks_meta: HashMap::new(),
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
}
}

pub fn blocks_meta(&self) -> SubscribeRequest {
let mut blocksmeta_subs = HashMap::new();
blocksmeta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {});

SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: HashMap::new(),
blocks_meta: blocksmeta_subs,
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
}
}
}

fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel {
// solana_sdk -> yellowstone
match commitment_config.commitment {
solana_sdk::commitment_config::CommitmentLevel::Processed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Processed
}
solana_sdk::commitment_config::CommitmentLevel::Confirmed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed
}
solana_sdk::commitment_config::CommitmentLevel::Finalized => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized
}
_ => {
panic!(
"unsupported commitment level {}",
commitment_config.commitment
)
}
}
}

// Take geyser filter, connect to Geyser and return a generic stream of SubscribeUpdate
// note: stream never terminates
pub fn create_geyser_reconnecting_stream(
Expand Down
103 changes: 20 additions & 83 deletions src/grpc_subscription_autoreconnect_tasks.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::grpc_subscription_autoreconnect_tasks::TheState::*;
use crate::grpc_subscription_autoreconnect_tasks::State::*;

Check warning on line 1 in src/grpc_subscription_autoreconnect_tasks.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/grpc_subscription_autoreconnect_tasks.rs
use futures::{Stream, StreamExt};
use log::{debug, error, info, log, trace, warn, Level};
use solana_sdk::commitment_config::CommitmentConfig;
Expand Down Expand Up @@ -38,82 +38,19 @@ pub enum Message {
Connecting(Attempt),
}

#[derive(Debug, Clone)]
pub enum AutoconnectionError {
AbortedFatalError,
}

enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
NotConnected(Attempt),
Connecting(Attempt, JoinHandle<GeyserGrpcClientResult<S>>),
Ready(Attempt, S),
WaitReconnect(Attempt),
}

#[derive(Clone)]
pub struct GeyserFilter(pub CommitmentConfig);

impl GeyserFilter {
pub fn blocks_and_txs(&self) -> SubscribeRequest {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);

SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: blocks_subs,
blocks_meta: HashMap::new(),
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
}
}

pub fn blocks_meta(&self) -> SubscribeRequest {
let mut blocksmeta_subs = HashMap::new();
blocksmeta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {});

SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: HashMap::new(),
blocks_meta: blocksmeta_subs,
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
}
}
}

fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel {
// solana_sdk -> yellowstone
match commitment_config.commitment {
solana_sdk::commitment_config::CommitmentLevel::Processed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Processed
}
solana_sdk::commitment_config::CommitmentLevel::Confirmed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed
}
solana_sdk::commitment_config::CommitmentLevel::Finalized => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized
}
_ => {
panic!(
"unsupported commitment level {}",
commitment_config.commitment
)
}
}
}

enum TheState<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor> {
enum State<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor> {
NotConnected(Attempt),
Connected(Attempt, GeyserGrpcClient<F>),
Ready(Attempt, S),
Expand All @@ -125,10 +62,10 @@ enum TheState<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor>
}

/// return handler will exit on fatal error
pub fn create_geyser_reconnecting_task(
pub fn create_geyser_autoconnection_task(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
) -> (JoinHandle<()>, Receiver<Message>) {
) -> (JoinHandle<Result<(), AutoconnectionError>>, Receiver<Message>) {
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/
let (sender, receiver_stream) = tokio::sync::mpsc::channel::<Message>(1);

Expand Down Expand Up @@ -249,16 +186,16 @@ pub fn create_geyser_reconnecting_task(
FatalError(_) => {
// TOOD what to do
error!("! fatal error grpc connection - aborting");
bail!("! fatal error grpc connection - aborting");
return Err(AutoconnectionError::AbortedFatalError);
}
TheState::WaitReconnect(attempt) => {
State::WaitReconnect(attempt) => {
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
info!(
"! waiting {} seconds, then reconnect to {}",
backoff_secs, grpc_source
);
sleep(Duration::from_secs_f32(backoff_secs)).await;
TheState::NotConnected(attempt)
State::NotConnected(attempt)
}
Ready(attempt, mut geyser_stream) => {
'recv_loop: loop {
Expand All @@ -274,27 +211,27 @@ pub fn create_geyser_reconnecting_task(
match sender.send_timeout(Message::GeyserSubscribeUpdate(Box::new(update_message)), warning_threshold).await {
Ok(()) => {
messages_forwared += 1;
trace!("sent update message to channel in {:.02}ms", started_at.elapsed().as_secs_f32());
trace!("sent update message to channel in {:.02}ms", started_at.elapsed().as_secs_f32() * 1000.0);
continue 'recv_loop;
}
Err(SendTimeoutError::Timeout(_)) => {
Err(SendTimeoutError::Timeout(the_message)) => {
warn!("downstream receiver did not pick put message for {}ms - keep waiting", warning_threshold.as_millis());

match sender.send(Message::GeyserSubscribeUpdate(Box::new(update_message))).await {
match sender.send(the_message).await {
Ok(()) => {
messages_forwared += 1;
trace!("sent delayed update message to channel in {:.02}ms", started_at.elapsed().as_secs_f32());
trace!("sent delayed update message to channel in {:.02}ms", started_at.elapsed().as_secs_f32() * 1000.0);
}
Err(_send_error ) => {
warn!("downstream receiver closed, message is lost - aborting");
break 'recv_loop TheState::FatalError(attempt);
break 'recv_loop State::FatalError(attempt);
}
}

}
Err(SendTimeoutError::Closed(_)) => {
warn!("downstream receiver closed - aborting");
break 'recv_loop TheState::FatalError(attempt);
break 'recv_loop State::FatalError(attempt);
}
}
// {
Expand All @@ -316,11 +253,11 @@ pub fn create_geyser_reconnecting_task(
Some(Err(tonic_status)) => {
// all tonic errors are recoverable
warn!("! error on {} - retrying: {:?}", grpc_source, tonic_status);
break 'recv_loop TheState::WaitReconnect(attempt);
break 'recv_loop State::WaitReconnect(attempt);
}
None => {
warn!("geyser stream closed on {} - retrying", grpc_source);
break 'recv_loop TheState::WaitReconnect(attempt);
break 'recv_loop State::WaitReconnect(attempt);
}
}
} // -- end loop
Expand Down
72 changes: 72 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::time::Duration;
use solana_sdk::commitment_config::CommitmentConfig;
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta};
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;

pub mod grpc_subscription;
Expand Down Expand Up @@ -64,3 +67,72 @@ impl GrpcSourceConfig {
}
}
}


#[derive(Clone)]
pub struct GeyserFilter(pub CommitmentConfig);

impl GeyserFilter {
pub fn blocks_and_txs(&self) -> SubscribeRequest {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);

SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: blocks_subs,
blocks_meta: HashMap::new(),
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
}
}

pub fn blocks_meta(&self) -> SubscribeRequest {
let mut blocksmeta_subs = HashMap::new();
blocksmeta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {});

SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: HashMap::new(),
blocks_meta: blocksmeta_subs,
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
}
}
}

fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel {
// solana_sdk -> yellowstone
match commitment_config.commitment {
solana_sdk::commitment_config::CommitmentLevel::Processed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Processed
}
solana_sdk::commitment_config::CommitmentLevel::Confirmed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed
}
solana_sdk::commitment_config::CommitmentLevel::Finalized => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized
}
_ => {
panic!(
"unsupported commitment level {}",
commitment_config.commitment
)
}
}
}

0 comments on commit cf0c83b

Please sign in to comment.