Skip to content

Commit

Permalink
Working on reconnect logic
Browse files Browse the repository at this point in the history
Initial prototype is working for GenericSocketBackend
TODO:
* implement for rest of the sockets
* implement outgoing queue for messages according to ZMQ spec

Related to zeromq#143
  • Loading branch information
Alexei-Kornienko authored and bemyak committed May 14, 2024
1 parent d3ed228 commit bdf5679
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 37 deletions.
21 changes: 21 additions & 0 deletions examples/async_helpers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Helper functions to be runtime agnostic
use futures::Future;

#[cfg(feature = "tokio-runtime")]
extern crate tokio;
Expand All @@ -22,3 +23,23 @@ pub async fn sleep(duration: std::time::Duration) {
pub async fn sleep(duration: std::time::Duration) {
async_std::task::sleep(duration).await
}

#[allow(unused)]
#[cfg(feature = "tokio-runtime")]
pub fn spawn<T>(future: T)
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
tokio::spawn(future);
}

#[allow(unused)]
#[cfg(feature = "async-std-runtime")]
pub fn spawn<T>(future: T)
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
async_std::spawn(future);
}
24 changes: 24 additions & 0 deletions examples/dealer_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
mod async_helpers;

use futures::StreamExt;
use std::{error::Error, time::Duration};
use zeromq::prelude::*;

#[async_helpers::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut client = zeromq::DealerSocket::new();
let mut monitor = client.monitor();
async_helpers::spawn(async move {
while let Some(event) = monitor.next().await {
dbg!(event);
}
});

client.connect("tcp://127.0.0.1:5559").await?;

loop {
let result = client.send("Test message".into()).await;
dbg!(result);
async_helpers::sleep(Duration::from_secs(1)).await;
}
}
17 changes: 17 additions & 0 deletions examples/router_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
mod async_helpers;

use std::{convert::TryFrom, error::Error};
use zeromq::{prelude::*, util::PeerIdentity, SocketOptions};

#[async_helpers::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut options = SocketOptions::default();
options.peer_identity(PeerIdentity::try_from(Vec::from("SomeCustomId")).unwrap());
let mut frontend = zeromq::RouterSocket::with_options(options);
frontend.bind("tcp://127.0.0.1:5559").await?;

loop {
let message = frontend.recv().await?;
dbg!(message);
}
}
66 changes: 49 additions & 17 deletions src/backend.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::codec::{FramedIo, Message, ZmqFramedRead, ZmqFramedWrite};
use crate::fair_queue::QueueInner;
use crate::util::PeerIdentity;
use crate::util::{self, PeerIdentity};
use crate::{
MultiPeerBackend, SocketBackend, SocketEvent, SocketOptions, SocketType, ZmqError, ZmqResult,
async_rt, Endpoint, MultiPeerBackend, SocketBackend, SocketEvent, SocketOptions, SocketType,
ZmqError, ZmqResult,
};

use async_trait::async_trait;
Expand All @@ -25,6 +26,7 @@ pub(crate) struct GenericSocketBackend {
socket_type: SocketType,
socket_options: SocketOptions,
pub(crate) socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
connect_endpoints: DashMap<PeerIdentity, Endpoint>,
}

impl GenericSocketBackend {
Expand All @@ -40,10 +42,14 @@ impl GenericSocketBackend {
socket_type,
socket_options: options,
socket_monitor: Mutex::new(None),
connect_endpoints: DashMap::new(),
}
}

pub(crate) async fn send_round_robin(&self, message: Message) -> ZmqResult<PeerIdentity> {
pub(crate) async fn send_round_robin(
self: &Arc<Self>,
message: Message,
) -> ZmqResult<PeerIdentity> {
// In normal scenario this will always be only 1 iteration
// There can be special case when peer has disconnected and his id is still in
// RR queue This happens because SegQueue don't have an api to delete
Expand Down Expand Up @@ -73,7 +79,7 @@ impl GenericSocketBackend {
Ok(next_peer_id)
}
Err(e) => {
self.peer_disconnected(&next_peer_id);
self.clone().peer_disconnected(&next_peer_id);
Err(e.into())
}
};
Expand Down Expand Up @@ -101,25 +107,51 @@ impl SocketBackend for GenericSocketBackend {

#[async_trait]
impl MultiPeerBackend for GenericSocketBackend {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
async fn peer_connected(
self: Arc<Self>,
peer_id: &PeerIdentity,
io: FramedIo,
endpoint: Option<Endpoint>,
) {
let (recv_queue, send_queue) = io.into_parts();
self.peers.insert(peer_id.clone(), Peer { send_queue });
self.round_robin.push(peer_id.clone());
match &self.fair_queue_inner {
None => {}
Some(inner) => {
inner.lock().insert(peer_id.clone(), recv_queue);
}
};

if let Some(queue_inner) = &self.fair_queue_inner {
queue_inner.lock().insert(peer_id.clone(), recv_queue);
}

if let Some(e) = endpoint {
self.connect_endpoints.insert(peer_id.clone(), e);
}
}

fn peer_disconnected(&self, peer_id: &PeerIdentity) {
fn peer_disconnected(self: Arc<Self>, peer_id: &PeerIdentity) {
if let Some(monitor) = self.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Disconnected(peer_id.clone()));
}

self.peers.remove(peer_id);
match &self.fair_queue_inner {
None => {}
Some(inner) => {
inner.lock().remove(peer_id);
}
if let Some(inner) = &self.fair_queue_inner {
inner.lock().remove(peer_id);
}

let endpoint = match self.connect_endpoints.remove(peer_id) {
Some((_, e)) => e,
None => return,
};
let backend = self;

async_rt::task::spawn(async move {
let (socket, endpoint) = util::connect_forever(endpoint)
.await
.expect("Failed to connect");
let peer_id = util::peer_connected(socket, backend.clone(), Some(endpoint.clone()))
.await
.expect("Failed to handshake");
if let Some(monitor) = backend.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Connected(endpoint, peer_id));
}
});
}
}
21 changes: 14 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,13 @@ pub trait MultiPeerBackend: SocketBackend {
/// This should not be public..
/// Find a better way of doing this

async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo);
fn peer_disconnected(&self, peer_id: &PeerIdentity);
async fn peer_connected(
self: Arc<Self>,
peer_id: &PeerIdentity,
io: FramedIo,
endpoint: Option<Endpoint>,
);
fn peer_disconnected(self: Arc<Self>, peer_id: &PeerIdentity);
}

pub trait SocketBackend: Send + Sync {
Expand Down Expand Up @@ -235,7 +240,7 @@ pub trait Socket: Sized + Send {
async move {
let result = match result {
Ok((socket, endpoint)) => {
match util::peer_connected(socket, cloned_backend.clone()).await {
match util::peer_connected(socket, cloned_backend.clone(), None).await {
Ok(peer_id) => Ok((endpoint, peer_id)),
Err(e) => Err(e),
}
Expand Down Expand Up @@ -303,10 +308,12 @@ pub trait Socket: Sized + Send {
let endpoint = TryIntoEndpoint::try_into(endpoint)?;

let result = match util::connect_forever(endpoint).await {
Ok((socket, endpoint)) => match util::peer_connected(socket, backend).await {
Ok(peer_id) => Ok((endpoint, peer_id)),
Err(e) => Err(e),
},
Ok((socket, endpoint)) => {
match util::peer_connected(socket, backend, Some(endpoint.clone())).await {
Ok(peer_id) => Ok((endpoint, peer_id)),
Err(e) => Err(e),
}
}
Err(e) => Err(e),
};
match result {
Expand Down
11 changes: 8 additions & 3 deletions src/pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ impl SocketBackend for PubSocketBackend {

#[async_trait]
impl MultiPeerBackend for PubSocketBackend {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
async fn peer_connected(
self: Arc<Self>,
peer_id: &PeerIdentity,
io: FramedIo,
endpoint: Option<Endpoint>,
) {
let (mut recv_queue, send_queue) = io.into_parts();
// TODO provide handling for recv_queue
let (sender, stop_receiver) = oneshot::channel();
Expand Down Expand Up @@ -143,7 +148,7 @@ impl MultiPeerBackend for PubSocketBackend {
});
}

fn peer_disconnected(&self, peer_id: &PeerIdentity) {
fn peer_disconnected(self: Arc<Self>, peer_id: &PeerIdentity) {
log::info!("Client disconnected {:?}", peer_id);
self.subscribers.remove(peer_id);
}
Expand Down Expand Up @@ -197,7 +202,7 @@ impl SocketSend for PubSocket {
}
}
for peer in dead_peers {
self.backend.peer_disconnected(&peer);
self.backend.clone().peer_disconnected(&peer);
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl SocketRecv for PullSocket {
}
Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg),
Some((peer_id, Err(_))) => {
self.backend.peer_disconnected(&peer_id);
self.backend.clone().peer_disconnected(&peer_id);
}
None => todo!(),
};
Expand Down
9 changes: 7 additions & 2 deletions src/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ impl Socket for RepSocket {

#[async_trait]
impl MultiPeerBackend for RepSocketBackend {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
async fn peer_connected(
self: Arc<Self>,
peer_id: &PeerIdentity,
io: FramedIo,
endpoint: Option<Endpoint>,
) {
let (recv_queue, send_queue) = io.into_parts();

self.peers.insert(
Expand All @@ -90,7 +95,7 @@ impl MultiPeerBackend for RepSocketBackend {
.insert(peer_id.clone(), recv_queue);
}

fn peer_disconnected(&self, peer_id: &PeerIdentity) {
fn peer_disconnected(self: Arc<Self>, peer_id: &PeerIdentity) {
if let Some(monitor) = self.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Disconnected(peer_id.clone()));
}
Expand Down
9 changes: 7 additions & 2 deletions src/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,12 @@ impl Socket for ReqSocket {

#[async_trait]
impl MultiPeerBackend for ReqSocketBackend {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
async fn peer_connected(
self: Arc<Self>,
peer_id: &PeerIdentity,
io: FramedIo,
endpoint: Option<Endpoint>,
) {
let (recv_queue, send_queue) = io.into_parts();
self.peers.insert(
peer_id.clone(),
Expand All @@ -143,7 +148,7 @@ impl MultiPeerBackend for ReqSocketBackend {
self.round_robin.push(peer_id.clone());
}

fn peer_disconnected(&self, peer_id: &PeerIdentity) {
fn peer_disconnected(self: Arc<Self>, peer_id: &PeerIdentity) {
self.peers.remove(peer_id);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl SocketRecv for RouterSocket {
}
Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg),
Some((peer_id, Err(_))) => {
self.backend.peer_disconnected(&peer_id);
self.backend.clone().peer_disconnected(&peer_id);
}
None => todo!(),
};
Expand Down
11 changes: 8 additions & 3 deletions src/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ impl SocketBackend for SubSocketBackend {

#[async_trait]
impl MultiPeerBackend for SubSocketBackend {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
async fn peer_connected(
self: Arc<Self>,
peer_id: &PeerIdentity,
io: FramedIo,
endpoint: Option<Endpoint>,
) {
let (recv_queue, mut send_queue) = io.into_parts();

let subs_msgs: Vec<ZmqMessage> = self
Expand All @@ -107,7 +112,7 @@ impl MultiPeerBackend for SubSocketBackend {
};
}

fn peer_disconnected(&self, peer_id: &PeerIdentity) {
fn peer_disconnected(self: Arc<Self>, peer_id: &PeerIdentity) {
self.peers.remove(peer_id);
}
}
Expand Down Expand Up @@ -193,7 +198,7 @@ impl SocketRecv for SubSocket {
}
Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg),
Some((peer_id, Err(_))) => {
self.backend.peer_disconnected(&peer_id);
self.backend.clone().peer_disconnected(&peer_id);
}
None => todo!(),
}
Expand Down
3 changes: 2 additions & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ pub(crate) async fn ready_exchange(
pub(crate) async fn peer_connected(
mut raw_socket: FramedIo,
backend: Arc<dyn MultiPeerBackend>,
endpoint: Option<Endpoint>,
) -> ZmqResult<PeerIdentity> {
greet_exchange(&mut raw_socket).await?;
let mut props = None;
Expand All @@ -197,7 +198,7 @@ pub(crate) async fn peer_connected(
props = Some(connect_ops);
}
let peer_id = ready_exchange(&mut raw_socket, backend.socket_type(), props).await?;
backend.peer_connected(&peer_id, raw_socket).await;
backend.peer_connected(&peer_id, raw_socket, endpoint).await;
Ok(peer_id)
}

Expand Down

0 comments on commit bdf5679

Please sign in to comment.