Skip to content

Commit

Permalink
Merge pull request #149 from dimitar-petrov/master
Browse files Browse the repository at this point in the history
feat: add support for reverse binding between PUB and SUB
  • Loading branch information
Alexei-Kornienko authored Dec 6, 2021
2 parents be72709 + 6d75caf commit 9a24dd6
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 25 deletions.
27 changes: 27 additions & 0 deletions examples/forwarder_device.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
mod async_helpers;
use std::error::Error;
use zeromq::prelude::*;

#[async_helpers::main]
async fn main() -> Result<(), Box<dyn Error>> {
println!("Start forwarder");
let mut frontend = zeromq::SubSocket::new();
frontend.bind("tcp://127.0.0.1:30001").await?;

let mut backend = zeromq::PubSocket::new();
backend.bind("tcp://127.0.0.1:30002").await?;

frontend.subscribe("").await?;

let forward = async move {
loop {
let message = frontend.recv().await.unwrap();
println!("passing message: {:?}", message);
backend.send(message).await.unwrap();
}
};

forward.await;

Ok(())
}
4 changes: 3 additions & 1 deletion src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::util::PeerIdentity;
use crate::{
MultiPeerBackend, SocketBackend, SocketEvent, SocketOptions, SocketType, ZmqError, ZmqResult,
};
use async_trait::async_trait;
use crossbeam::queue::SegQueue;
use dashmap::DashMap;
use futures::channel::mpsc;
Expand Down Expand Up @@ -97,8 +98,9 @@ impl SocketBackend for GenericSocketBackend {
}
}

#[async_trait]
impl MultiPeerBackend for GenericSocketBackend {
fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
let (recv_queue, send_queue) = io.into_parts();
self.peers.insert(peer_id.clone(), Peer { send_queue });
self.round_robin.push(peer_id.clone());
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,12 @@ impl Default for SocketOptions {
}
}

#[async_trait]
pub trait MultiPeerBackend: SocketBackend {
/// This should not be public..
/// Find a better way of doing this

fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo);
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo);
fn peer_disconnected(&self, peer_id: &PeerIdentity);
}

Expand Down
3 changes: 2 additions & 1 deletion src/pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ impl SocketBackend for PubSocketBackend {
}
}

#[async_trait]
impl MultiPeerBackend for PubSocketBackend {
fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
let (mut recv_queue, send_queue) = io.into_parts();
// TODO provide handling for recv_queue
let (sender, stop_receiver) = oneshot::channel();
Expand Down
3 changes: 2 additions & 1 deletion src/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ impl Socket for RepSocket {
}
}

#[async_trait]
impl MultiPeerBackend for RepSocketBackend {
fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
let (recv_queue, send_queue) = io.into_parts();

self.peers.insert(
Expand Down
3 changes: 2 additions & 1 deletion src/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ impl Socket for ReqSocket {
}
}

#[async_trait]
impl MultiPeerBackend for ReqSocketBackend {
fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
let (recv_queue, send_queue) = io.into_parts();
self.peers.insert(
peer_id.clone(),
Expand Down
135 changes: 116 additions & 19 deletions src/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,114 @@ use crate::{
MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketOptions, SocketRecv, SocketType,
};

use crate::backend::GenericSocketBackend;
use crate::backend::Peer;
use crate::fair_queue::FairQueue;
use crate::fair_queue::QueueInner;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use crossbeam::queue::SegQueue;
use dashmap::DashMap;
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
use std::collections::HashMap;
use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

pub enum SubBackendMsgType {
UNSUBSCRIBE = 0,
SUBSCRIBE = 1,
}

pub(crate) struct SubSocketBackend {
pub(crate) peers: DashMap<PeerIdentity, Peer>,
fair_queue_inner: Option<Arc<Mutex<QueueInner<ZmqFramedRead, PeerIdentity>>>>,
pub(crate) round_robin: SegQueue<PeerIdentity>,
socket_type: SocketType,
socket_options: SocketOptions,
pub(crate) socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
subs: Mutex<HashSet<String>>,
}

impl SubSocketBackend {
pub(crate) fn with_options(
fair_queue_inner: Option<Arc<Mutex<QueueInner<ZmqFramedRead, PeerIdentity>>>>,
socket_type: SocketType,
options: SocketOptions,
) -> Self {
Self {
peers: DashMap::new(),
fair_queue_inner,
round_robin: SegQueue::new(),
socket_type,
socket_options: options,
socket_monitor: Mutex::new(None),
subs: Mutex::new(HashSet::new()),
}
}

pub fn create_subs_message(subscription: &str, msg_type: SubBackendMsgType) -> ZmqMessage {
let mut buf = BytesMut::with_capacity(subscription.len() + 1);
buf.put_u8(msg_type as u8);
buf.extend_from_slice(subscription.as_bytes());

buf.freeze().into()
}
}

impl SocketBackend for SubSocketBackend {
fn socket_type(&self) -> SocketType {
self.socket_type
}

fn socket_options(&self) -> &SocketOptions {
&self.socket_options
}

fn shutdown(&self) {
self.peers.clear();
}

fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>> {
&self.socket_monitor
}
}

#[async_trait]
impl MultiPeerBackend for SubSocketBackend {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
let (recv_queue, mut send_queue) = io.into_parts();

let subs_msgs: Vec<ZmqMessage> = self
.subs
.lock()
.iter()
.map(|x| SubSocketBackend::create_subs_message(x, SubBackendMsgType::SUBSCRIBE))
.collect();

for message in subs_msgs.iter() {
send_queue
.send(Message::Message(message.clone()))
.await
.unwrap();
}

self.peers.insert(peer_id.clone(), Peer { send_queue });
self.round_robin.push(peer_id.clone());
match &self.fair_queue_inner {
None => {}
Some(inner) => {
inner.lock().insert(peer_id.clone(), recv_queue);
}
};
}

fn peer_disconnected(&self, peer_id: &PeerIdentity) {
self.peers.remove(peer_id);
}
}

pub struct SubSocket {
backend: Arc<GenericSocketBackend>,
backend: Arc<SubSocketBackend>,
fair_queue: FairQueue<ZmqFramedRead, PeerIdentity>,
binds: HashMap<Endpoint, AcceptStopHandle>,
}
Expand All @@ -31,24 +128,24 @@ impl Drop for SubSocket {

impl SubSocket {
pub async fn subscribe(&mut self, subscription: &str) -> ZmqResult<()> {
let mut buf = BytesMut::with_capacity(subscription.len() + 1);
buf.put_u8(1);
buf.extend_from_slice(subscription.as_bytes());
// let message = format!("\0x1{}", subscription);
let message: ZmqMessage = ZmqMessage::from(buf.freeze());
for mut peer in self.backend.peers.iter_mut() {
peer.send_queue
.send(Message::Message(message.clone()))
.await?;
}
Ok(())
self.backend.subs.lock().insert(subscription.to_string());
self.process_subs(subscription, SubBackendMsgType::SUBSCRIBE)
.await
}

pub async fn unsubscribe(&mut self, subscription: &str) -> ZmqResult<()> {
let mut buf = BytesMut::with_capacity(subscription.len() + 1);
buf.put_u8(0);
buf.extend_from_slice(subscription.as_bytes());
let message = ZmqMessage::from(buf.freeze());
self.backend.subs.lock().remove(subscription);
self.process_subs(subscription, SubBackendMsgType::UNSUBSCRIBE)
.await
}

async fn process_subs(
&mut self,
subscription: &str,
msg_type: SubBackendMsgType,
) -> ZmqResult<()> {
let message: ZmqMessage = SubSocketBackend::create_subs_message(subscription, msg_type);

for mut peer in self.backend.peers.iter_mut() {
peer.send_queue
.send(Message::Message(message.clone()))
Expand All @@ -63,7 +160,7 @@ impl Socket for SubSocket {
fn with_options(options: SocketOptions) -> Self {
let fair_queue = FairQueue::new(true);
Self {
backend: Arc::new(GenericSocketBackend::with_options(
backend: Arc::new(SubSocketBackend::with_options(
Some(fair_queue.inner()),
SocketType::SUB,
options,
Expand Down
2 changes: 1 addition & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ pub(crate) async fn peer_connected(
props = Some(connect_ops);
}
let peer_id = ready_exchange(&mut raw_socket, backend.socket_type(), props).await?;
backend.peer_connected(&peer_id, raw_socket);
backend.peer_connected(&peer_id, raw_socket).await;
Ok(peer_id)
}

Expand Down

0 comments on commit 9a24dd6

Please sign in to comment.