Skip to content

Commit

Permalink
Added basic SocketOptions struct
Browse files Browse the repository at this point in the history
This is related to #116.

Currently it's only enables user to set custom peer-identity if needed
and passes options all around. Later we can add other options to this
struct.

Some minor changes to ReadyCommand properties were required cause
ZMQ_IDENTITY could be any byte vec and not guaranteed to be valid String
  • Loading branch information
Alexei-Kornienko committed Mar 6, 2021
1 parent 511240e commit 31e5d3d
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 33 deletions.
26 changes: 26 additions & 0 deletions examples/socket_client_with_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
mod async_helpers;

use std::convert::TryFrom;
use std::error::Error;
use zeromq::util::PeerIdentity;
use zeromq::{Socket, SocketOptions, SocketRecv, SocketSend};

#[async_helpers::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut options = SocketOptions::default();
options.peer_identity(PeerIdentity::try_from(Vec::from("SomeCustomId")).unwrap());

let mut socket = zeromq::ReqSocket::with_options(options);
socket
.connect("tcp://127.0.0.1:5555")
.await
.expect("Failed to connect");
println!("Connected to server");

for _ in 0..10u64 {
socket.send("Hello".into()).await?;
let repl = socket.recv().await?;
dbg!(repl);
}
Ok(())
}
13 changes: 11 additions & 2 deletions src/backend.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::codec::{FramedIo, Message, ZmqFramedRead, ZmqFramedWrite};
use crate::fair_queue::QueueInner;
use crate::util::PeerIdentity;
use crate::{MultiPeerBackend, SocketBackend, SocketEvent, SocketType, ZmqError, ZmqResult};
use crate::{
MultiPeerBackend, SocketBackend, SocketEvent, SocketOptions, SocketType, ZmqError, ZmqResult,
};
use crossbeam::queue::SegQueue;
use dashmap::DashMap;
use futures::channel::mpsc;
Expand All @@ -18,19 +20,22 @@ pub(crate) struct GenericSocketBackend {
fair_queue_inner: Option<Arc<Mutex<QueueInner<ZmqFramedRead, PeerIdentity>>>>,
pub(crate) round_robin: SegQueue<PeerIdentity>,
socket_type: SocketType,
socket_options: SocketOptions,
pub(crate) socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
}

impl GenericSocketBackend {
pub(crate) fn new(
pub(crate) fn with_options(
fair_queue_inner: Option<Arc<Mutex<QueueInner<ZmqFramedRead, PeerIdentity>>>>,
socket_type: SocketType,
options: SocketOptions,
) -> Self {
Self {
peers: DashMap::new(),
fair_queue_inner,
round_robin: SegQueue::new(),
socket_type,
socket_options: options,
socket_monitor: Mutex::new(None),
}
}
Expand Down Expand Up @@ -79,6 +84,10 @@ impl SocketBackend for GenericSocketBackend {
self.socket_type
}

fn socket_options(&self) -> &SocketOptions {
&self.socket_options
}

fn shutdown(&self) {
self.peers.clear();
}
Expand Down
19 changes: 14 additions & 5 deletions src/codec/command.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::error::CodecError;
use crate::SocketType;

use bytes::{Buf, BufMut, BytesMut};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::collections::HashMap;
use std::convert::TryFrom;

Expand All @@ -21,18 +21,28 @@ impl From<ZmqCommandName> for String {
#[derive(Debug, Clone)]
pub struct ZmqCommand {
pub name: ZmqCommandName,
pub properties: HashMap<String, String>,
pub properties: HashMap<String, Bytes>,
}

impl ZmqCommand {
pub fn ready(socket: SocketType) -> Self {
let mut properties = HashMap::new();
properties.insert("Socket-Type".into(), format!("{}", socket));
properties.insert("Socket-Type".into(), format!("{}", socket).into());
Self {
name: ZmqCommandName::READY,
properties,
}
}

pub fn add_prop(&mut self, name: String, value: Bytes) -> &mut Self {
self.properties.insert(name, value);
self
}

pub fn add_properties(&mut self, map: HashMap<String, Bytes>) -> &mut Self {
self.properties.extend(map);
self
}
}

impl TryFrom<BytesMut> for ZmqCommand {
Expand All @@ -55,8 +65,7 @@ impl TryFrom<BytesMut> for ZmqCommand {
let property = unsafe { String::from_utf8_unchecked(buf.split_to(prop_len).to_vec()) };

let prop_val_len = buf.get_u32() as usize;
let prop_value =
unsafe { String::from_utf8_unchecked(buf.split_to(prop_val_len).to_vec()) };
let prop_value = buf.split_to(prop_val_len).freeze();
properties.insert(property, prop_value);
}
Ok(Self {
Expand Down
9 changes: 5 additions & 4 deletions src/dealer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use crate::fair_queue::FairQueue;
use crate::transport::AcceptStopHandle;
use crate::util::PeerIdentity;
use crate::{
CaptureSocket, Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketRecv,
SocketSend, SocketType, ZmqMessage, ZmqResult,
CaptureSocket, Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketOptions,
SocketRecv, SocketSend, SocketType, ZmqMessage, ZmqResult,
};
use async_trait::async_trait;
use futures::channel::mpsc;
Expand All @@ -28,12 +28,13 @@ impl Drop for DealerSocket {

#[async_trait]
impl Socket for DealerSocket {
fn new() -> Self {
fn with_options(options: SocketOptions) -> Self {
let fair_queue = FairQueue::new(true);
Self {
backend: Arc::new(GenericSocketBackend::new(
backend: Arc::new(GenericSocketBackend::with_options(
Some(fair_queue.inner()),
SocketType::DEALER,
options,
)),
fair_queue,
binds: HashMap::new(),
Expand Down
24 changes: 23 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,23 @@ pub enum SocketEvent {
Disconnected(PeerIdentity),
}

pub struct SocketOptions {
pub(crate) peer_id: Option<PeerIdentity>,
}

impl SocketOptions {
pub fn peer_identity(&mut self, peer_id: PeerIdentity) -> &mut Self {
self.peer_id = Some(peer_id);
self
}
}

impl Default for SocketOptions {
fn default() -> Self {
Self { peer_id: None }
}
}

pub trait MultiPeerBackend: SocketBackend {
/// This should not be public..
/// Find a better way of doing this
Expand All @@ -135,6 +152,7 @@ pub trait MultiPeerBackend: SocketBackend {

pub trait SocketBackend: Send + Sync {
fn socket_type(&self) -> SocketType;
fn socket_options(&self) -> &SocketOptions;
fn shutdown(&self);
fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>>;
}
Expand All @@ -155,7 +173,11 @@ pub trait CaptureSocket: SocketSend {}

#[async_trait]
pub trait Socket: Sized + Send {
fn new() -> Self;
fn new() -> Self {
Self::with_options(SocketOptions::default())
}

fn with_options(options: SocketOptions) -> Self;

fn backend(&self) -> Arc<dyn MultiPeerBackend>;

Expand Down
10 changes: 8 additions & 2 deletions src/pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::error::ZmqResult;
use crate::message::*;
use crate::transport::AcceptStopHandle;
use crate::util::PeerIdentity;
use crate::{async_rt, CaptureSocket};
use crate::{async_rt, CaptureSocket, SocketOptions};
use crate::{
MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketSend, SocketType, ZmqError,
};
Expand All @@ -28,6 +28,7 @@ pub(crate) struct Subscriber {
pub(crate) struct PubSocketBackend {
subscribers: DashMap<PeerIdentity, Subscriber>,
socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
socket_options: SocketOptions,
}

impl PubSocketBackend {
Expand Down Expand Up @@ -85,6 +86,10 @@ impl SocketBackend for PubSocketBackend {
SocketType::PUB
}

fn socket_options(&self) -> &SocketOptions {
&self.socket_options
}

fn shutdown(&self) {
self.subscribers.clear();
}
Expand Down Expand Up @@ -194,11 +199,12 @@ impl CaptureSocket for PubSocket {}

#[async_trait]
impl Socket for PubSocket {
fn new() -> Self {
fn with_options(options: SocketOptions) -> Self {
Self {
backend: Arc::new(PubSocketBackend {
subscribers: DashMap::new(),
socket_monitor: Mutex::new(None),
socket_options: options,
}),
binds: HashMap::new(),
}
Expand Down
8 changes: 5 additions & 3 deletions src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use crate::fair_queue::FairQueue;
use crate::transport::AcceptStopHandle;
use crate::util::PeerIdentity;
use crate::{
Endpoint, MultiPeerBackend, Socket, SocketEvent, SocketRecv, SocketType, ZmqMessage, ZmqResult,
Endpoint, MultiPeerBackend, Socket, SocketEvent, SocketOptions, SocketRecv, SocketType,
ZmqMessage, ZmqResult,
};
use async_trait::async_trait;
use futures::channel::mpsc;
Expand All @@ -21,12 +22,13 @@ pub struct PullSocket {

#[async_trait]
impl Socket for PullSocket {
fn new() -> Self {
fn with_options(options: SocketOptions) -> Self {
let fair_queue = FairQueue::new(true);
Self {
backend: Arc::new(GenericSocketBackend::new(
backend: Arc::new(GenericSocketBackend::with_options(
Some(fair_queue.inner()),
SocketType::PULL,
options,
)),
fair_queue,
binds: HashMap::new(),
Expand Down
12 changes: 8 additions & 4 deletions src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::backend::GenericSocketBackend;
use crate::codec::Message;
use crate::transport::AcceptStopHandle;
use crate::{
CaptureSocket, Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketSend,
SocketType, ZmqMessage, ZmqResult,
CaptureSocket, Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketOptions,
SocketSend, SocketType, ZmqMessage, ZmqResult,
};
use async_trait::async_trait;
use futures::channel::mpsc;
Expand All @@ -24,9 +24,13 @@ impl Drop for PushSocket {

#[async_trait]
impl Socket for PushSocket {
fn new() -> Self {
fn with_options(options: SocketOptions) -> Self {
Self {
backend: Arc::new(GenericSocketBackend::new(None, SocketType::PUSH)),
backend: Arc::new(GenericSocketBackend::with_options(
None,
SocketType::PUSH,
options,
)),
binds: HashMap::new(),
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ struct RepSocketBackend {
pub(crate) peers: DashMap<PeerIdentity, RepPeer>,
fair_queue_inner: Arc<Mutex<QueueInner<ZmqFramedRead, PeerIdentity>>>,
socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
socket_options: SocketOptions,
}

pub struct RepSocket {
Expand All @@ -40,13 +41,14 @@ impl Drop for RepSocket {

#[async_trait]
impl Socket for RepSocket {
fn new() -> Self {
fn with_options(options: SocketOptions) -> Self {
let fair_queue = FairQueue::new(true);
Self {
backend: Arc::new(RepSocketBackend {
peers: DashMap::new(),
fair_queue_inner: fair_queue.inner(),
socket_monitor: Mutex::new(None),
socket_options: options,
}),
current_request: None,
fair_queue,
Expand Down Expand Up @@ -98,6 +100,10 @@ impl SocketBackend for RepSocketBackend {
SocketType::REP
}

fn socket_options(&self) -> &SocketOptions {
&self.socket_options
}

fn shutdown(&self) {
self.peers.clear();
}
Expand Down
8 changes: 7 additions & 1 deletion src/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ struct ReqSocketBackend {
pub(crate) peers: DashMap<PeerIdentity, Peer>,
pub(crate) round_robin: SegQueue<PeerIdentity>,
socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
socket_options: SocketOptions,
}

pub struct ReqSocket {
Expand Down Expand Up @@ -97,12 +98,13 @@ impl SocketRecv for ReqSocket {

#[async_trait]
impl Socket for ReqSocket {
fn new() -> Self {
fn with_options(options: SocketOptions) -> Self {
Self {
backend: Arc::new(ReqSocketBackend {
peers: DashMap::new(),
round_robin: SegQueue::new(),
socket_monitor: Mutex::new(None),
socket_options: options,
}),
current_request: None,
binds: HashMap::new(),
Expand Down Expand Up @@ -148,6 +150,10 @@ impl SocketBackend for ReqSocketBackend {
SocketType::REQ
}

fn socket_options(&self) -> &SocketOptions {
&self.socket_options
}

fn shutdown(&self) {
self.peers.clear();
}
Expand Down
7 changes: 4 additions & 3 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::fair_queue::FairQueue;
use crate::message::*;
use crate::transport::AcceptStopHandle;
use crate::util::PeerIdentity;
use crate::{MultiPeerBackend, SocketEvent, SocketRecv, SocketSend, SocketType};
use crate::{MultiPeerBackend, SocketEvent, SocketOptions, SocketRecv, SocketSend, SocketType};
use crate::{Socket, SocketBackend};
use futures::channel::mpsc;
use futures::SinkExt;
Expand All @@ -31,12 +31,13 @@ impl Drop for RouterSocket {

#[async_trait]
impl Socket for RouterSocket {
fn new() -> Self {
fn with_options(options: SocketOptions) -> Self {
let fair_queue = FairQueue::new(true);
Self {
backend: Arc::new(GenericSocketBackend::new(
backend: Arc::new(GenericSocketBackend::with_options(
Some(fair_queue.inner()),
SocketType::ROUTER,
options,
)),
binds: HashMap::new(),
fair_queue,
Expand Down
Loading

0 comments on commit 31e5d3d

Please sign in to comment.