Skip to content

Commit

Permalink
minor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Jan 22, 2024
1 parent 87a4bbc commit 6e15250
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 24 deletions.
18 changes: 9 additions & 9 deletions examples/stream_blocks_autoconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::pin::pin;

use geyser_grpc_connector::channel_plugger::{
spawn_broadcast_channel_plug, spawn_plugger_mpcs_to_broadcast,
};
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{
create_geyser_autoconnection_task, Message,
Expand All @@ -18,7 +21,6 @@ use tracing::warn;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::prost::Message as _;
use geyser_grpc_connector::channel_plugger::spawn_plugger_mpcs_to_broadcast;

fn start_example_blockmini_consumer(
multiplex_stream: impl Stream<Item = BlockMini> + Send + 'static,
Expand Down Expand Up @@ -79,7 +81,7 @@ enum TestCases {
CloseAfterReceiving,
AbortTaskFromOutside,
}
const TEST_CASE: TestCases = TestCases::TemporaryLaggingReceiver;
const TEST_CASE: TestCases = TestCases::Basic;

#[tokio::main]
pub async fn main() {
Expand Down Expand Up @@ -107,13 +109,12 @@ pub async fn main() {

info!("Write Block stream..");

let (broadcast_tx, broadcast_rx) = tokio::sync::broadcast::channel(100);
let (jh_geyser_task, mut message_channel) = create_geyser_autoconnection_task(
let (jh_geyser_task, message_channel) = create_geyser_autoconnection_task(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
);
spawn_plugger_mpcs_to_broadcast(message_channel, broadcast_tx);
let mut message_channel = broadcast_rx;
let mut message_channel =
spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel);

tokio::spawn(async move {
if let TestCases::SlowReceiverStartup = TEST_CASE {
Expand All @@ -131,8 +132,7 @@ pub async fn main() {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
message_count += 1;
// info!("got update: {:?}", subscriber_update.update_oneof.);
info!("got update!!!");
info!("got update - {} bytes", subscriber_update.encoded_len());

if let TestCases::CloseAfterReceiving = TEST_CASE {
info!("(testcase) closing stream after receiving");
Expand All @@ -155,5 +155,5 @@ pub async fn main() {
});

// "infinite" sleep
sleep(Duration::from_secs(1800)).await;
sleep(Duration::from_secs(2000)).await;
}
15 changes: 0 additions & 15 deletions src/grpc_subscription_autoreconnect_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,21 +295,6 @@ pub fn create_geyser_autoconnection_task(
);
}
}
// {
// Ok(n_subscribers) => {
// trace!(
// "sent update message to {} subscribers (buffer={})",
// n_subscribers,
// sender.len()
// );
// continue 'recv_loop;
// }
// Err(SendError(_)) => {
// // note: error does not mean that future sends will also fail!
// trace!("no subscribers for update message");
// continue 'recv_loop;
// }
// };
}
Some(Err(tonic_status)) => {
// all tonic errors are recoverable
Expand Down

0 comments on commit 6e15250

Please sign in to comment.