Skip to content

Commit

Permalink
Merge pull request #102 from zeromq/alex/fair-queue-refactoring
Browse files Browse the repository at this point in the history
Alex/fair queue refactoring
  • Loading branch information
Alexei-Kornienko authored Dec 1, 2020
2 parents 106e10f + e33266d commit 8e75a13
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 81 deletions.
2 changes: 1 addition & 1 deletion examples/socket_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
for _ in 0..10u64 {
socket.send("Hello".into()).await?;
let repl: String = socket.recv().await?.try_into()?;
// dbg!(repl);
dbg!(repl);
}
Ok(())
}
2 changes: 1 addition & 1 deletion examples/task_sink.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::error::Error;
use std::io::Write;
use std::time::Instant;
use zeromq::{BlockingRecv, BlockingSend, NonBlockingSend, Socket};
use zeromq::{BlockingRecv, BlockingSend, Socket};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
Expand Down
3 changes: 1 addition & 2 deletions src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ mod command;
mod error;
mod framed;
mod greeting;
mod mechanism;
pub(crate) mod mechanism;
mod zmq_codec;

pub(crate) use command::{ZmqCommand, ZmqCommandName};
pub(crate) use error::{CodecError, CodecResult};
pub(crate) use framed::{FrameableRead, FrameableWrite, FramedIo, ZmqFramedRead, ZmqFramedWrite};
pub(crate) use greeting::{ZmqGreeting, ZmtpVersion};
pub(crate) use mechanism::ZmqMechanism;
pub(crate) use zmq_codec::ZmqCodec;

use crate::message::ZmqMessage;
Expand Down
132 changes: 58 additions & 74 deletions src/fair_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,24 @@ use futures::Stream;
use parking_lot::Mutex;
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap};
use std::hash::Hash;
use std::pin::Pin;
use std::sync::atomic;
use std::sync::Arc;

pub(crate) struct QueueInner<S, K> {
pub(crate) struct QueueInner<S, K: Clone> {
counter: atomic::AtomicUsize,
ready_queue: BinaryHeap<PriorityStream<S, K>>,
pending_streams: HashMap<usize, PriorityStream<S, K>>,
// See wake_by_ref for details
weak_up_processing: Option<usize>,
ready_queue: BinaryHeap<ReadyEvent<K>>,
streams: HashMap<K, Pin<Box<S>>>,
waker: Option<Waker>,
}

impl<S, K> QueueInner<S, K> {
impl<S, K: Clone + Eq + Hash> QueueInner<S, K> {
pub fn insert(&mut self, k: K, s: S) {
self.ready_queue.push(PriorityStream {
self.streams.insert(k.clone(), Box::pin(s));
self.ready_queue.push(ReadyEvent {
priority: self.counter.fetch_add(1, atomic::Ordering::Relaxed),
key: k,
stream: Box::pin(s),
});
match &self.waker {
Some(w) => w.wake_by_ref(),
Expand All @@ -30,136 +29,121 @@ impl<S, K> QueueInner<S, K> {
}
}

pub struct FairQueue<S, K> {
pub struct FairQueue<S, K: Clone> {
block_on_no_clients: bool,
inner: Arc<Mutex<QueueInner<S, K>>>,
}

struct StreamWaker<S, K> {
inner: Arc<Mutex<QueueInner<S, K>>>,
index: usize,
}

impl<S, K> ArcWake for StreamWaker<S, K>
where
S: Send,
K: Send,
{
fn wake_by_ref(arc_self: &Arc<Self>) {
let mut inner = arc_self.inner.lock();
match inner.pending_streams.remove(&arc_self.index) {
None => {
// This is a tricky part..
// Some streams call waker inside the poll_next method.
// At that moment stream is neither ready or pending.
// We leave it's priority hang for the moment.
// It's responsibility of the FairQueue::poll_next to take this into account.
// In such case it will put stream as ready (cause it explicitly asked for it)
inner.weak_up_processing = Some(arc_self.index);
}
Some(s) => {
inner.ready_queue.push(s);
}
};
if let Some(waker) = inner.waker.take() {
waker.wake_by_ref();
}
}
}

struct PriorityStream<S, K> {
#[derive(Clone)]
struct ReadyEvent<K: Clone> {
priority: usize,
key: K,
stream: Pin<Box<S>>,
}

impl<S, K> PartialEq for PriorityStream<S, K> {
impl<K: Clone> PartialEq for ReadyEvent<K> {
fn eq(&self, other: &Self) -> bool {
self.priority.eq(&other.priority)
}
}
impl<S, K> Eq for PriorityStream<S, K> {}
impl<K: Clone> Eq for ReadyEvent<K> {}

impl<S, K> PartialOrd for PriorityStream<S, K> {
impl<K: Clone> PartialOrd for ReadyEvent<K> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
other.priority.partial_cmp(&self.priority)
}
}
impl<S, K> Ord for PriorityStream<S, K> {
impl<K: Clone> Ord for ReadyEvent<K> {
fn cmp(&self, other: &Self) -> Ordering {
other.priority.cmp(&self.priority)
}
}

struct StreamWaker<S, K: Clone> {
inner: Arc<Mutex<QueueInner<S, K>>>,
event: ReadyEvent<K>,
}

impl<S, K> ArcWake for StreamWaker<S, K>
where
S: Send,
K: Clone + Send + Sync,
{
fn wake_by_ref(arc_self: &Arc<Self>) {
let mut inner = arc_self.inner.lock();
inner.ready_queue.push(arc_self.event.clone());
if let Some(waker) = inner.waker.take() {
waker.wake_by_ref();
}
}
}

impl<S, T, K> Stream for FairQueue<S, K>
where
T: Send,
S: Stream<Item = T> + Send,
K: Unpin + Clone + Send,
K: Eq + Hash + Unpin + Clone + Send + Sync,
{
type Item = (K, T);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let stream = self.get_mut();
let fair_queue = self.get_mut();
loop {
let mut s = {
let mut inner = stream.inner.lock();
let (event, mut io_stream) = {
let mut inner = fair_queue.inner.lock();
inner.waker = Some(cx.waker().clone());
match inner.ready_queue.pop() {
let event = match inner.ready_queue.pop() {
Some(s) => s,
None => {
return if !inner.pending_streams.is_empty() || stream.block_on_no_clients {
return if !inner.streams.is_empty() || fair_queue.block_on_no_clients {
Poll::Pending
} else {
Poll::Ready(None)
}
}
};
match inner.streams.remove(&event.key) {
Some(stream) => (event, stream),
None => continue,
}
};

let waker = Arc::new(StreamWaker {
inner: stream.inner.clone(),
index: s.priority,
inner: fair_queue.inner.clone(),
event: event.clone(),
});
let waker_ref = futures::task::waker_ref(&waker);
let mut cx = Context::from_waker(&waker_ref);
match s.stream.as_mut().poll_next(&mut cx) {
match io_stream.as_mut().poll_next(&mut cx) {
Poll::Ready(Some(res)) => {
let item = Some((s.key.clone(), res));
let mut inner = stream.inner.lock();
s.priority = inner.counter.fetch_add(1, atomic::Ordering::Relaxed);
inner.ready_queue.push(s);
inner.weak_up_processing = None;
let item = Some((event.key.clone(), res));
let mut inner = fair_queue.inner.lock();
let priority = inner.counter.fetch_add(1, atomic::Ordering::Relaxed);
inner.ready_queue.push(ReadyEvent {
priority,
key: event.key.clone(),
});
inner.streams.insert(event.key, io_stream);
return Poll::Ready(item);
}
Poll::Ready(None) => continue,
Poll::Pending => {
let mut inner = stream.inner.lock();
match inner.weak_up_processing.take() {
None => {
inner.pending_streams.insert(s.priority, s);
}
Some(prio) => {
assert_eq!(prio, s.priority);
inner.ready_queue.push(s);
}
};
let mut inner = fair_queue.inner.lock();
inner.streams.insert(event.key, io_stream);
return Poll::Pending;
}
}
}
}
}

impl<S, K> FairQueue<S, K> {
impl<S, K: Clone> FairQueue<S, K> {
pub fn new(block_on_no_clients: bool) -> Self {
Self {
block_on_no_clients,
inner: Arc::new(Mutex::new(QueueInner {
counter: atomic::AtomicUsize::new(0),
ready_queue: BinaryHeap::new(),
pending_streams: HashMap::new(),
weak_up_processing: None,
streams: HashMap::new(),
waker: None,
})),
}
Expand Down
6 changes: 3 additions & 3 deletions src/util.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::codec::CodecResult;
use crate::codec::FramedIo;
use crate::codec::{CodecResult, FramedIo};
use crate::*;

use bytes::Bytes;
Expand Down Expand Up @@ -187,6 +186,7 @@ pub(crate) async fn peer_connected(
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::codec::mechanism::ZmqMechanism;

pub async fn test_bind_to_unspecified_interface_helper(
any: std::net::IpAddr,
Expand Down Expand Up @@ -248,7 +248,7 @@ pub(crate) mod tests {

fn new_greeting(version: ZmtpVersion) -> CodecResult<Message> {
Ok(Message::Greeting(ZmqGreeting {
version: version,
version,
mechanism: ZmqMechanism::PLAIN,
as_server: false,
}))
Expand Down

0 comments on commit 8e75a13

Please sign in to comment.