Skip to content

Commit

Permalink
minor cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Dec 15, 2023
1 parent 16d9019 commit 6081426
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 30 deletions.
17 changes: 10 additions & 7 deletions examples/stream_blocks_mainnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ use solana_sdk::commitment_config::CommitmentConfig;
use std::pin::pin;

Check warning on line 5 in examples/stream_blocks_mainnet.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/stream_blocks_mainnet.rs

Check warning on line 5 in examples/stream_blocks_mainnet.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/stream_blocks_mainnet.rs

use geyser_grpc_connector::experimental::mock_literpc_core::{map_produced_block, ProducedBlock};
use geyser_grpc_connector::grpc_subscription_autoreconnect::GrpcSourceConfig;
use geyser_grpc_connector::grpc_subscription_autoreconnect::{create_geyser_reconnecting_stream, GrpcSourceConfig};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, FromYellowstoneMapper};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;

fn start_example_consumer(block_stream: impl Stream<Item = ProducedBlock> + Send + 'static) {
fn start_example_consumer(multiplex_stream: impl Stream<Item = ProducedBlock> + Send + 'static) {
tokio::spawn(async move {
let mut block_stream = pin!(block_stream);
let mut block_stream = pin!(multiplex_stream);
while let Some(block) = block_stream.next().await {
info!("received block #{}", block.slot,);
info!("received block #{} from multiplexer", block.slot,);
}
});
}
Expand All @@ -37,7 +37,7 @@ impl FromYellowstoneMapper for ExtractBlock {

#[tokio::main]
pub async fn main() {
// RUST_LOG=info,grpc_using_streams=debug
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
tracing_subscriber::fmt::init();
// console_subscriber::init();

Expand All @@ -56,10 +56,13 @@ pub async fn main() {
let toxiproxy_config =

Check warning on line 56 in examples/stream_blocks_mainnet.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/stream_blocks_mainnet.rs

Check warning on line 56 in examples/stream_blocks_mainnet.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/stream_blocks_mainnet.rs
GrpcSourceConfig::new("toxiproxy".to_string(), grpc_addr_mainnet_triton_toxi, None);

let green_stream = create_geyser_reconnecting_stream(green_config.clone(), CommitmentConfig::finalized());
let blue_stream = create_geyser_reconnecting_stream(blue_config.clone(), CommitmentConfig::finalized());
let toxiproxy_stream = create_geyser_reconnecting_stream(toxiproxy_config.clone(), CommitmentConfig::finalized());
let multiplex_stream = create_multiplex(
vec![green_config, blue_config, toxiproxy_config],
vec![green_stream, blue_stream, toxiproxy_stream],
CommitmentConfig::finalized(),
ExtractBlock(CommitmentConfig::confirmed()),
ExtractBlock(CommitmentConfig::finalized()),
);

start_example_consumer(multiplex_stream);
Expand Down
17 changes: 13 additions & 4 deletions src/grpc_subscription_autoreconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use log::{debug, info, trace, warn};
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::atomic::{AtomicI32, Ordering};
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
Expand Down Expand Up @@ -67,22 +68,25 @@ pub fn create_geyser_reconnecting_stream(
_ => panic!("Only CONFIRMED and FINALIZED is supported/suggested"),
};

// NOT_CONNECTED; CONNECTING
let mut state = ConnectionState::NotConnected;
let connection_attempts = AtomicI32::new(0);

// in case of cancellation, we restart from here:
// thus we want to keep the progression in a state object outside the stream! makro
stream! {
loop{
loop {
let yield_value;

(state, yield_value) = match state {

ConnectionState::NotConnected => {

let connection_task = tokio::spawn({
let addr = grpc_source.grpc_addr.clone();
let token = grpc_source.grpc_x_token.clone();
let config = grpc_source.tls_config.clone();
// let (block_filter, blockmeta_filter) = blocks_filters.clone();
info!("Connecting attempt #{} to {}", connection_attempts.fetch_add(1, Ordering::Relaxed), addr);
async move {

let connect_result = GeyserGrpcClient::connect_with_timeout(
Expand Down Expand Up @@ -128,6 +132,7 @@ pub fn create_geyser_reconnecting_stream(

(ConnectionState::Connecting(connection_task), None)
}

ConnectionState::Connecting(connection_task) => {
let subscribe_result = connection_task.await;

Expand All @@ -144,11 +149,12 @@ pub fn create_geyser_reconnecting_stream(
}

}

ConnectionState::Ready(mut geyser_stream) => {

match geyser_stream.next().await {
Some(Ok(update_message)) => {
trace!("> update message on {}", label);
trace!("> recv update message from {}", label);
(ConnectionState::Ready(geyser_stream), Some(update_message))
}
Some(Err(tonic_status)) => {
Expand All @@ -164,12 +170,15 @@ pub fn create_geyser_reconnecting_stream(
}

}

ConnectionState::WaitReconnect => {
// TODO implement backoff
info!("Waiting a bit, then connect to {}", label);
sleep(Duration::from_secs(1)).await;
(ConnectionState::NotConnected, None)
}

}; // -- match

yield yield_value
}

Expand Down
33 changes: 14 additions & 19 deletions src/grpcmultiplex_fastestwins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ pub trait FromYellowstoneMapper {
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)>;
}

pub fn create_multiplex<E>(
// TODO provide list of streams
grpc_sources: Vec<GrpcSourceConfig>,
/// use streams created by ``create_geyser_reconnecting_stream``
/// note: this is agnostic to the type of the stream
pub fn create_multiplex<M>(
grpc_source_streams: Vec<impl Stream<Item = Option<SubscribeUpdate>>>,
commitment_config: CommitmentConfig,
extractor: E,
) -> impl Stream<Item = E::Target>
mapper: M,
) -> impl Stream<Item = M::Target>
where
E: FromYellowstoneMapper,
M: FromYellowstoneMapper,
{
assert!(
commitment_config == CommitmentConfig::confirmed()
Expand All @@ -29,29 +30,23 @@ where
);
// note: PROCESSED blocks are not sequential in presense of forks; this will break the logic

if grpc_sources.is_empty() {
if grpc_source_streams.is_empty() {
panic!("Must have at least one source");
}

info!(
"Starting multiplexer with {} sources: {}",
grpc_sources.len(),
grpc_sources
.iter()
.map(|source| source.label.clone())
.join(", ")
"Starting multiplexer with {} sources",
grpc_source_streams.len(),
);

// use merge
let mut futures = futures::stream::SelectAll::new();

for grpc_source in grpc_sources {
futures.push(Box::pin(create_geyser_reconnecting_stream(
grpc_source.clone(),
commitment_config,
)));
for grpc_source in grpc_source_streams {
futures.push(Box::pin(grpc_source));
}

map_updates(futures, extractor)
map_updates(futures, mapper)
}

fn map_updates<S, E>(geyser_stream: S, mapper: E) -> impl Stream<Item = E::Target>
Expand Down

0 comments on commit 6081426

Please sign in to comment.