Skip to content

Commit

Permalink
extract filter
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Dec 15, 2023
1 parent b1790fb commit 366461b
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 43 deletions.
31 changes: 17 additions & 14 deletions examples/stream_blocks_mainnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use solana_sdk::commitment_config::CommitmentConfig;
use std::pin::pin;

use geyser_grpc_connector::experimental::mock_literpc_core::{map_produced_block, ProducedBlock};
use geyser_grpc_connector::grpc_subscription_autoreconnect::{create_geyser_reconnecting_stream, GrpcConnectionTimeouts, GrpcSourceConfig};
use geyser_grpc_connector::grpc_subscription_autoreconnect::{create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig};

Check warning on line 9 in examples/stream_blocks_mainnet.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/stream_blocks_mainnet.rs
use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, FromYellowstoneMapper};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
Expand Down Expand Up @@ -56,7 +56,7 @@ impl FromYellowstoneMapper for BlockMetaExtractor {
type Target = BlockMetaMini;
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> {
match update.update_oneof {
Some(UpdateOneof::Block(update_blockmeta_message)) => {
Some(UpdateOneof::BlockMeta(update_blockmeta_message)) => {
let slot = update_blockmeta_message.slot;
let mini = BlockMetaMini {
slot,
Expand All @@ -75,6 +75,9 @@ pub async fn main() {
tracing_subscriber::fmt::init();
// console_subscriber::init();

let subscribe_blocks = true;
let subscribe_blockmeta = false;

let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();
let grpc_addr_blue = env::var("GRPC_ADDR2").expect("need grpc url for blue");
Expand All @@ -98,34 +101,34 @@ pub async fn main() {
let toxiproxy_config =
GrpcSourceConfig::new_with_timeout("toxiproxy".to_string(), grpc_addr_toxiproxy, None, timeouts.clone());

{
if subscribe_blocks {
info!("Write Block stream..");
let green_stream =

Check warning on line 106 in examples/stream_blocks_mainnet.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/stream_blocks_mainnet.rs
create_geyser_reconnecting_stream(green_config.clone(), CommitmentConfig::finalized());
create_geyser_reconnecting_stream(green_config.clone(), GeyserFilter::blocks(), CommitmentConfig::confirmed());
let blue_stream =
create_geyser_reconnecting_stream(blue_config.clone(), CommitmentConfig::finalized());
create_geyser_reconnecting_stream(blue_config.clone(), GeyserFilter::blocks(), CommitmentConfig::confirmed());
let toxiproxy_stream =
create_geyser_reconnecting_stream(toxiproxy_config.clone(), CommitmentConfig::finalized());
create_geyser_reconnecting_stream(toxiproxy_config.clone(), GeyserFilter::blocks(), CommitmentConfig::confirmed());
let multiplex_stream = create_multiplex(
vec![green_stream, blue_stream, toxiproxy_stream],
CommitmentConfig::finalized(),
BlockExtractor(CommitmentConfig::finalized()),
CommitmentConfig::confirmed(),
BlockExtractor(CommitmentConfig::confirmed()),
);
start_example_block_consumer(multiplex_stream);
}

{
if subscribe_blockmeta {
info!("Write BlockMeta stream..");
let green_stream =
create_geyser_reconnecting_stream(green_config.clone(), CommitmentConfig::finalized());
create_geyser_reconnecting_stream(green_config.clone(), GeyserFilter::blocks_meta(), CommitmentConfig::confirmed());
let blue_stream =
create_geyser_reconnecting_stream(blue_config.clone(), CommitmentConfig::finalized());
create_geyser_reconnecting_stream(blue_config.clone(), GeyserFilter::blocks_meta(), CommitmentConfig::confirmed());
let toxiproxy_stream =
create_geyser_reconnecting_stream(toxiproxy_config.clone(), CommitmentConfig::finalized());
create_geyser_reconnecting_stream(toxiproxy_config.clone(), GeyserFilter::blocks_meta(), CommitmentConfig::confirmed());
let multiplex_stream = create_multiplex(
vec![green_stream, blue_stream, toxiproxy_stream],
CommitmentConfig::finalized(),
BlockMetaExtractor(CommitmentConfig::finalized()),
CommitmentConfig::confirmed(),
BlockMetaExtractor(CommitmentConfig::confirmed()),
);
start_example_blockmeta_consumer(multiplex_stream);
}
Expand Down
79 changes: 52 additions & 27 deletions src/grpc_subscription_autoreconnect.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::grpc_subscription_autoreconnect::Message::Connecting;
use async_stream::stream;
use futures::{Stream, StreamExt};
use log::{debug, info, log, trace, warn, Level};
Expand All @@ -8,6 +7,7 @@ use std::ops::{Add, Sub};
use std::pin::Pin;
use std::sync::atomic::{AtomicI32, AtomicU32, Ordering};
use std::time::{Duration, Instant};
use anyhow::Context;
use tokio::task::JoinHandle;
use tokio::time::{sleep, sleep_until, timeout};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
Expand Down Expand Up @@ -69,7 +69,8 @@ impl GrpcSourceConfig {

pub enum Message {
GeyserSubscribeUpdate(SubscribeUpdate),
Connecting,
// connect (attempt=1) or reconnect(attempt>1)
Connecting(u32),
}

enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
Expand All @@ -79,10 +80,33 @@ enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
WaitReconnect(u32),
}

#[derive(Clone)]
pub enum GeyserFilter {
Blocks(SubscribeRequestFilterBlocks),
BlocksMeta(SubscribeRequestFilterBlocksMeta),
}

impl GeyserFilter {
pub fn blocks() -> Self {
Self::Blocks(SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
})
}
pub fn blocks_meta() -> Self {
Self::BlocksMeta(SubscribeRequestFilterBlocksMeta {
})
}
}


// Takes geyser filter for geyser, connect to Geyser and return a generic stream of SubscribeUpdate
// note: stream never terminates
pub fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
filter: GeyserFilter,
commitment_config: CommitmentConfig,
) -> impl Stream<Item = Message> {
let label = grpc_source.label.clone();
Expand Down Expand Up @@ -118,6 +142,7 @@ pub fn create_geyser_reconnecting_stream(
let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout);
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
let filter = filter.clone();
log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr);
async move {

Expand All @@ -129,54 +154,54 @@ pub fn create_geyser_reconnecting_stream(
.await;
let mut client = connect_result?;

// TODO make filter configurable for caller
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);

let mut blocksmeta_subs = HashMap::new();
blocksmeta_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocksMeta {},
);

timeout(subscribe_timeout.unwrap_or(Duration::MAX),
match filter {
GeyserFilter::Blocks(filter) => {
blocks_subs.insert(
"client".to_string(),
filter,
);
}
GeyserFilter::BlocksMeta(filter) => {
blocksmeta_subs.insert(
"client".to_string(),
filter,
);
}
}

let subscribe_result = timeout(subscribe_timeout.unwrap_or(Duration::MAX),
client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
// FIXME extract as paramter
blocks_subs,
blocksmeta_subs,
Some(commitment_level),
Default::default(),
None,
))
.await.expect("timeout on subscribe_once")
.await;

subscribe_result.expect("timeout") // FIXME
}
});

(ConnectionState::Connecting(attempt, connection_task), Message::Connecting)
(ConnectionState::Connecting(attempt, connection_task), Message::Connecting(attempt))
}

ConnectionState::Connecting(attempt, connection_task) => {
let subscribe_result = connection_task.await;

match subscribe_result {
Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(attempt, subscribed_stream), Message::Connecting),
Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(attempt, subscribed_stream), Message::Connecting(attempt)),
Ok(Err(geyser_error)) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Subscribe failed on {} - retrying: {:?}", label, geyser_error);
(ConnectionState::WaitReconnect(attempt), Message::Connecting)
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
},
Err(geyser_grpc_task_error) => {
panic!("Task aborted - should not happen :{geyser_grpc_task_error}");
Expand All @@ -195,12 +220,12 @@ pub fn create_geyser_reconnecting_stream(
Some(Err(tonic_status)) => {
// TODO identify non-recoverable errors and cancel stream
debug!("! error on {} - retrying: {:?}", label, tonic_status);
(ConnectionState::WaitReconnect(attempt), Message::Connecting)
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
}
None => {
//TODO should not arrive. Mean the stream close.
warn!("! geyser stream closed on {} - retrying", label);
(ConnectionState::WaitReconnect(attempt), Message::Connecting)
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
}
}

Expand All @@ -210,7 +235,7 @@ pub fn create_geyser_reconnecting_stream(
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
info!("Waiting {} seconds, then connect to {}", backoff_secs, label);
sleep(Duration::from_secs_f32(backoff_secs)).await;
(ConnectionState::NotConnected(attempt), Message::Connecting)
(ConnectionState::NotConnected(attempt), Message::Connecting(attempt))
}

}; // -- match
Expand Down
6 changes: 4 additions & 2 deletions src/grpcmultiplex_fastestwins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ where
}
}
}
Message::Connecting => {
warn!("Stream-{} performs reconnect", stream_idx);
Message::Connecting(attempt) => {
if attempt > 1 {
warn!("Stream-{} performs reconnect attempt {}", stream_idx, attempt);
}
}
}
}
Expand Down

0 comments on commit 366461b

Please sign in to comment.