Skip to content

Commit

Permalink
example
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Jan 22, 2024
1 parent 6e15250 commit b3808da
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
23 changes: 21 additions & 2 deletions src/channel_plugger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,26 @@ use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc::error::SendTimeoutError;

Check warning on line 4 in src/channel_plugger.rs

View workflow job for this annotation

GitHub Actions / test

unused import: `tokio::sync::mpsc::error::SendTimeoutError`

Check warning on line 4 in src/channel_plugger.rs

View workflow job for this annotation

GitHub Actions / test

unused import: `tokio::sync::mpsc::error::SendTimeoutError`
use tokio::time::{sleep, timeout};

Check warning on line 5 in src/channel_plugger.rs

View workflow job for this annotation

GitHub Actions / test

unused imports: `sleep`, `timeout`

Check warning on line 5 in src/channel_plugger.rs

View workflow job for this annotation

GitHub Actions / test

unused imports: `sleep`, `timeout`

/// usage: see plug_pattern test
pub fn spawn_broadcast_channel_plug<T: Send + 'static>(
downstream_broadcast: (
tokio::sync::broadcast::Sender<T>,
tokio::sync::broadcast::Receiver<T>,
),
upstream: tokio::sync::mpsc::Receiver<T>,
) -> tokio::sync::broadcast::Receiver<T> {
spawn_plugger_mpcs_to_broadcast(upstream, downstream_broadcast.0);
downstream_broadcast.1
}

/// note: backpressure will NOT get propagated to upstream
pub fn spawn_plugger_mpcs_to_broadcast<T: Send + 'static>(
mut upstream: tokio::sync::mpsc::Receiver<T>,
downstream: tokio::sync::broadcast::Sender<T>,
// TODO allow multiple downstreams + fanout
) {
// abort forwarder by closing the sender
let _donothing = tokio::spawn(async move {
let _private_handler = tokio::spawn(async move {
while let Some(value) = upstream.recv().await {
match downstream.send(value) {
Ok(n_subscribers) => {
Expand All @@ -26,11 +39,17 @@ pub fn spawn_plugger_mpcs_to_broadcast<T: Send + 'static>(
});
}


#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn plug_pattern() {
let (jh_task, message_channel) = tokio::sync::mpsc::channel::<u32>(1);
let broadcast_rx =
spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel);
}

#[tokio::test]
async fn connect_broadcast_to_mpsc() {
solana_logger::setup_with_default("debug");
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use yellowstone_grpc_proto::geyser::{
};
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;

pub mod channel_plugger;
pub mod grpc_subscription;
pub mod grpc_subscription_autoreconnect_streams;
pub mod grpc_subscription_autoreconnect_tasks;
pub mod grpcmultiplex_fastestwins;
pub mod channel_plugger;
mod obfuscate;

#[derive(Clone, Debug)]
Expand Down

0 comments on commit b3808da

Please sign in to comment.