Skip to content

Commit

Permalink
Initial version of socket monitor impl. Related to #103
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexei-Kornienko committed Dec 9, 2020
1 parent 8e75a13 commit 6879692
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 26 deletions.
9 changes: 8 additions & 1 deletion src/backend.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::codec::{FramedIo, Message, ZmqFramedRead, ZmqFramedWrite};
use crate::fair_queue::QueueInner;
use crate::util::PeerIdentity;
use crate::{MultiPeerBackend, SocketBackend, SocketType, ZmqError, ZmqResult};
use crate::{MultiPeerBackend, SocketBackend, SocketEvent, SocketType, ZmqError, ZmqResult};
use crossbeam::queue::SegQueue;
use dashmap::DashMap;
use futures::channel::mpsc;
use futures::SinkExt;
use parking_lot::Mutex;
use std::sync::Arc;
Expand All @@ -17,6 +18,7 @@ pub(crate) struct GenericSocketBackend {
fair_queue_inner: Option<Arc<Mutex<QueueInner<ZmqFramedRead, PeerIdentity>>>>,
pub(crate) round_robin: SegQueue<PeerIdentity>,
socket_type: SocketType,
pub(crate) socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
}

impl GenericSocketBackend {
Expand All @@ -29,6 +31,7 @@ impl GenericSocketBackend {
fair_queue_inner,
round_robin: SegQueue::new(),
socket_type,
socket_monitor: Mutex::new(None),
}
}

Expand Down Expand Up @@ -85,6 +88,10 @@ impl SocketBackend for GenericSocketBackend {
fn shutdown(&self) {
self.peers.clear();
}

fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>> {
&self.socket_monitor
}
}

impl MultiPeerBackend for GenericSocketBackend {
Expand Down
12 changes: 11 additions & 1 deletion src/dealer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ use crate::codec::{Message, ZmqFramedRead};
use crate::fair_queue::FairQueue;
use crate::transport::AcceptStopHandle;
use crate::util::PeerIdentity;
use crate::{Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketType, ZmqMessage, ZmqResult};
use crate::{
Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketType, ZmqMessage,
ZmqResult,
};
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::StreamExt;
use std::collections::hash_map::RandomState;
use std::collections::HashMap;
Expand Down Expand Up @@ -42,6 +46,12 @@ impl Socket for DealerSocket {
fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle, RandomState> {
&mut self.binds
}

fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
let (sender, receiver) = mpsc::channel(1024);
self.backend.socket_monitor.lock().replace(sender);
receiver
}
}

impl DealerSocket {
Expand Down
21 changes: 21 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ use util::PeerIdentity;
extern crate enum_primitive_derive;

use async_trait::async_trait;
use futures::channel::mpsc;
use futures_codec::FramedWrite;
use num_traits::ToPrimitive;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt::{Debug, Display};
Expand Down Expand Up @@ -102,6 +104,18 @@ impl Display for SocketType {
}
}

#[derive(Debug)]
pub enum SocketEvent {
Connected,
ConnectDelayed,
ConnectRetried,
Accepted(Endpoint, PeerIdentity),
AcceptFailed(ZmqError),
Closed,
CloseFailed,
Disconnected(PeerIdentity),
}

pub trait MultiPeerBackend: SocketBackend {
/// This should not be public..
/// Find a better way of doing this
Expand All @@ -113,6 +127,7 @@ pub trait MultiPeerBackend: SocketBackend {
pub trait SocketBackend: Send + Sync {
fn socket_type(&self) -> SocketType;
fn shutdown(&self);
fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>>;
}

#[async_trait]
Expand Down Expand Up @@ -195,6 +210,12 @@ pub trait Socket: Sized + Send {
Ok(())
}

/// Creates and setups new socket monitor
///
/// Subsequent calls to this method each create a new monitor channel.
/// Sender side of previous one is dropped.
fn monitor(&mut self) -> mpsc::Receiver<SocketEvent>;

// TODO: async fn connections(&self) -> ?

/// Disconnects from the given endpoint, blocking until finished.
Expand Down
19 changes: 17 additions & 2 deletions src/pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use crate::error::ZmqResult;
use crate::message::*;
use crate::transport::AcceptStopHandle;
use crate::util::PeerIdentity;
use crate::{BlockingSend, MultiPeerBackend, Socket, SocketBackend, SocketType, ZmqError};
use futures::channel::oneshot;
use crate::{
BlockingSend, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketType, ZmqError,
};
use futures::channel::{mpsc, oneshot};

use async_trait::async_trait;
use dashmap::DashMap;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::io::ErrorKind;
use std::pin::Pin;
Expand All @@ -22,6 +25,7 @@ pub(crate) struct Subscriber {

pub(crate) struct PubSocketBackend {
subscribers: DashMap<PeerIdentity, Subscriber>,
socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
}

impl PubSocketBackend {
Expand Down Expand Up @@ -81,6 +85,10 @@ impl SocketBackend for PubSocketBackend {
fn shutdown(&self) {
self.subscribers.clear();
}

fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>> {
&self.socket_monitor
}
}

impl MultiPeerBackend for PubSocketBackend {
Expand Down Expand Up @@ -184,6 +192,7 @@ impl Socket for PubSocket {
Self {
backend: Arc::new(PubSocketBackend {
subscribers: DashMap::new(),
socket_monitor: Mutex::new(None),
}),
binds: HashMap::new(),
}
Expand All @@ -196,6 +205,12 @@ impl Socket for PubSocket {
fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle> {
&mut self.binds
}

fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
let (sender, receiver) = mpsc::channel(1024);
self.backend.socket_monitor.lock().replace(sender);
receiver
}
}

#[cfg(test)]
Expand Down
12 changes: 11 additions & 1 deletion src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ use crate::codec::{Message, ZmqFramedRead};
use crate::fair_queue::FairQueue;
use crate::transport::AcceptStopHandle;
use crate::util::PeerIdentity;
use crate::{BlockingRecv, Endpoint, MultiPeerBackend, Socket, SocketType, ZmqMessage, ZmqResult};
use crate::{
BlockingRecv, Endpoint, MultiPeerBackend, Socket, SocketEvent, SocketType, ZmqMessage,
ZmqResult,
};
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::StreamExt;
use std::collections::hash_map::RandomState;
use std::collections::HashMap;
Expand Down Expand Up @@ -37,6 +41,12 @@ impl Socket for PullSocket {
fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle, RandomState> {
&mut self.binds
}

fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
let (sender, receiver) = mpsc::channel(1024);
self.backend.socket_monitor.lock().replace(sender);
receiver
}
}

#[async_trait]
Expand Down
11 changes: 9 additions & 2 deletions src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use crate::backend::GenericSocketBackend;
use crate::codec::Message;
use crate::transport::AcceptStopHandle;
use crate::{
BlockingSend, Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketType, ZmqMessage,
ZmqResult,
BlockingSend, Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketType,
ZmqMessage, ZmqResult,
};
use async_trait::async_trait;
use futures::channel::mpsc;
use std::collections::hash_map::RandomState;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -36,6 +37,12 @@ impl Socket for PushSocket {
fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle, RandomState> {
&mut self.binds
}

fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
let (sender, receiver) = mpsc::channel(1024);
self.backend.socket_monitor.lock().replace(sender);
receiver
}
}

#[async_trait]
Expand Down
15 changes: 15 additions & 0 deletions src/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ struct RepPeer {
struct RepSocketBackend {
pub(crate) peers: DashMap<PeerIdentity, RepPeer>,
fair_queue_inner: Arc<Mutex<QueueInner<ZmqFramedRead, PeerIdentity>>>,
socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
}

pub struct RepSocket {
Expand All @@ -44,6 +45,7 @@ impl Socket for RepSocket {
backend: Arc::new(RepSocketBackend {
peers: DashMap::new(),
fair_queue_inner: fair_queue.inner(),
socket_monitor: Mutex::new(None),
}),
current_request: None,
fair_queue,
Expand All @@ -58,6 +60,12 @@ impl Socket for RepSocket {
fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle> {
&mut self.binds
}

fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
let (sender, receiver) = mpsc::channel(1024);
self.backend.socket_monitor.lock().replace(sender);
receiver
}
}

impl MultiPeerBackend for RepSocketBackend {
Expand All @@ -77,6 +85,9 @@ impl MultiPeerBackend for RepSocketBackend {
}

fn peer_disconnected(&self, peer_id: &PeerIdentity) {
if let Some(monitor) = self.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Disconnected(peer_id.clone()));
}
self.peers.remove(peer_id);
}
}
Expand All @@ -89,6 +100,10 @@ impl SocketBackend for RepSocketBackend {
fn shutdown(&self) {
self.peers.clear();
}

fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>> {
&self.socket_monitor
}
}

#[async_trait]
Expand Down
21 changes: 12 additions & 9 deletions src/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ use crate::{SocketType, ZmqResult};
use async_trait::async_trait;
use crossbeam::queue::SegQueue;
use dashmap::DashMap;
use futures::lock::Mutex;
use futures::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;

struct ReqSocketBackend {
pub(crate) peers: DashMap<PeerIdentity, Peer>,
pub(crate) round_robin: SegQueue<PeerIdentity>,
pub(crate) current_request_peer_id: Mutex<Option<PeerIdentity>>,
socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
}

pub struct ReqSocket {
Expand Down Expand Up @@ -64,11 +63,6 @@ impl BlockingSend for ReqSocket {
message,
];
peer.send_queue.send(Message::Multipart(frames)).await?;
self.backend
.current_request_peer_id
.lock()
.await
.replace(next_peer_id.clone());
self.current_request = Some(next_peer_id);
return Ok(());
}
Expand Down Expand Up @@ -110,7 +104,7 @@ impl Socket for ReqSocket {
backend: Arc::new(ReqSocketBackend {
peers: DashMap::new(),
round_robin: SegQueue::new(),
current_request_peer_id: Mutex::new(None),
socket_monitor: Mutex::new(None),
}),
current_request: None,
binds: HashMap::new(),
Expand All @@ -124,6 +118,12 @@ impl Socket for ReqSocket {
fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle> {
&mut self.binds
}

fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
let (sender, receiver) = mpsc::channel(1024);
self.backend.socket_monitor.lock().replace(sender);
receiver
}
}

impl MultiPeerBackend for ReqSocketBackend {
Expand Down Expand Up @@ -151,7 +151,10 @@ impl SocketBackend for ReqSocketBackend {
}

fn shutdown(&self) {
println!("Shutting down req backend");
self.peers.clear();
}

fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>> {
&self.socket_monitor
}
}
9 changes: 8 additions & 1 deletion src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use crate::fair_queue::FairQueue;
use crate::message::*;
use crate::transport::AcceptStopHandle;
use crate::util::PeerIdentity;
use crate::{MultiPeerBackend, SocketType};
use crate::{MultiPeerBackend, SocketEvent, SocketType};
use crate::{Socket, SocketBackend};
use futures::channel::mpsc;
use futures::SinkExt;

pub struct RouterSocket {
Expand Down Expand Up @@ -49,6 +50,12 @@ impl Socket for RouterSocket {
fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle> {
&mut self.binds
}

fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
let (sender, receiver) = mpsc::channel(1024);
self.backend.socket_monitor.lock().replace(sender);
receiver
}
}

impl RouterSocket {
Expand Down
9 changes: 8 additions & 1 deletion src/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use crate::error::ZmqResult;
use crate::message::*;
use crate::transport::AcceptStopHandle;
use crate::util::PeerIdentity;
use crate::{BlockingRecv, MultiPeerBackend, Socket, SocketBackend, SocketType};
use crate::{BlockingRecv, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketType};

use crate::backend::GenericSocketBackend;
use crate::fair_queue::FairQueue;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -74,6 +75,12 @@ impl Socket for SubSocket {
fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle> {
&mut self.binds
}

fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
let (sender, receiver) = mpsc::channel(1024);
self.backend.socket_monitor.lock().replace(sender);
receiver
}
}

#[async_trait]
Expand Down
Loading

0 comments on commit 6879692

Please sign in to comment.