Skip to content

Commit

Permalink
Changing Notify channel to broadcast channel
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Apr 2, 2024
1 parent ce6ca26 commit 688e4d2
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 20 deletions.
5 changes: 2 additions & 3 deletions examples/stream_blocks_autoconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use log::info;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::sync::broadcast;

use geyser_grpc_connector::channel_plugger::spawn_broadcast_channel_plug;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task;
Expand Down Expand Up @@ -89,7 +88,7 @@ pub async fn main() {

info!("Write Block stream..");

let exit_notify = Arc::new(Notify::new());
let (_, exit_notify) = broadcast::channel(1);

let (jh_geyser_task, message_channel) = create_geyser_autoconnection_task(
green_config.clone(),
Expand Down
10 changes: 4 additions & 6 deletions examples/stream_blocks_mainnet_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use log::{info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::sync::Arc;
use tokio::sync::Notify;

use base64::Engine;
use itertools::Itertools;
Expand Down Expand Up @@ -120,7 +118,7 @@ pub async fn main() {
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
let exit_notify = Arc::new(Notify::new());
let (_, exit_notify) = tokio::sync::broadcast::channel(1);

let green_config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
Expand All @@ -134,19 +132,19 @@ pub async fn main() {
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.clone(),
exit_notify.resubscribe(),
);
let _blue_stream_ah = create_geyser_autoconnection_task_with_mpsc(
blue_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.clone(),
exit_notify.resubscribe(),
);
let _toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc(
toxiproxy_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.clone(),
exit_notify,
);
start_example_blockmeta_consumer(blockmeta_rx);

Expand Down
21 changes: 10 additions & 11 deletions src/grpc_subscription_autoreconnect_tasks.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::{yellowstone_grpc_util, Attempt, GrpcSourceConfig, Message};
use futures::{Stream, StreamExt};

Check warning on line 2 in src/grpc_subscription_autoreconnect_tasks.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/grpc_subscription_autoreconnect_tasks.rs
use log::{debug, error, info, log, trace, warn, Level};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::Receiver;
use tokio::sync::Notify;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout, Instant};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError};
Expand Down Expand Up @@ -35,7 +34,7 @@ enum FatalErrorReason {
pub fn create_geyser_autoconnection_task(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
exit_notify: Arc<Notify>,
exit_notify: broadcast::Receiver<()>,
) -> (JoinHandle<()>, Receiver<Message>) {
let (sender, receiver_channel) = tokio::sync::mpsc::channel::<Message>(1);

Expand All @@ -56,7 +55,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
mpsc_downstream: tokio::sync::mpsc::Sender<Message>,
exit_notify: Arc<Notify>,
mut exit_notify: broadcast::Receiver<()>,
) -> JoinHandle<()> {
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/

Expand Down Expand Up @@ -97,7 +96,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break 'main_loop;
}
};
Expand Down Expand Up @@ -156,7 +155,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break 'main_loop;
}
};
Expand Down Expand Up @@ -219,7 +218,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
_ = sleep(Duration::from_secs_f32(backoff_secs)) => {
//slept
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break 'main_loop;
}
};
Expand Down Expand Up @@ -253,7 +252,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
_ = sleep(Duration::from_secs_f32(backoff_secs)) => {
//slept
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break 'main_loop;
}
};
Expand All @@ -269,7 +268,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break 'main_loop;
}
};
Expand All @@ -292,7 +291,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break 'main_loop;
}
};
Expand All @@ -319,7 +318,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
res = mpsc_downstream.send(the_message)=> {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break 'main_loop;
}
};
Expand Down

0 comments on commit 688e4d2

Please sign in to comment.