Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix large volume test #7

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 47 additions & 83 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
//! It should also be kept around even if no requests are sent, as dropping it is used to signal the
//! [`IoCore`] to close the connection.

mod wait_queue;

use std::{
collections::VecDeque,
fmt::{self, Display, Formatter},
Expand All @@ -49,12 +51,13 @@ use tokio::{
use crate::{
header::Header,
protocol::{
payload_is_multi_frame, CompletedRead, FrameIter, JulietProtocol, LocalProtocolViolation,
OutgoingFrame, OutgoingMessage, ProtocolBuilder,
CompletedRead, FrameIter, JulietProtocol, LocalProtocolViolation, OutgoingFrame,
OutgoingMessage, ProtocolBuilder,
},
util::PayloadFormat,
ChannelId, Id, Outcome,
};
use wait_queue::{PushOutcome, WaitQueue};

/// Maximum number of bytes to pre-allocate in buffers.
const MAX_ALLOC: usize = 32 * 1024; // 32 KiB
Expand Down Expand Up @@ -269,7 +272,7 @@ pub struct IoCore<const N: usize, R, W> {
/// Frames waiting to be sent.
ready_queue: VecDeque<FrameIter>,
/// Messages that are not yet ready to be sent.
wait_queue: [VecDeque<QueuedItem>; N],
wait_queue: [WaitQueue; N],
/// Receiver for new messages to be queued.
receiver: UnboundedReceiver<QueuedItem>,
/// Mapping for outgoing requests, mapping internal IDs to public ones.
Expand Down Expand Up @@ -559,27 +562,29 @@ where
let header_sent = frame_sent.header();

// If we finished the active multi frame send, clear it.
let mut cleared_multi_frame = false;
if was_final {
let channel_idx = header_sent.channel().get() as usize;
if let Some(ref active_multi_frame) =
self.active_multi_frame[channel_idx] {
if header_sent == *active_multi_frame {
self.active_multi_frame[channel_idx] = None;
cleared_multi_frame = true;
}
}
}
};

if header_sent.is_error() {
// We finished sending an error frame, time to exit.
return Err(CoreError::RemoteProtocolViolation(header_sent));
}

// TODO: We should restrict the dirty-queue processing here a little bit
// (only check when completing a multi-frame message).
// A message has completed sending, process the wait queue in case we have
// to start sending a multi-frame message like a response that was delayed
// only because of the one-multi-frame-per-channel restriction.
self.process_wait_queue(header_sent.channel())?;
if cleared_multi_frame {
self.process_wait_queue(header_sent.channel())?;
}
} else {
#[cfg(feature = "tracing")]
tracing::error!("current frame should not disappear");
Expand Down Expand Up @@ -719,17 +724,34 @@ where

/// Handles a new item to send out that arrived through the incoming channel.
fn handle_incoming_item(&mut self, item: QueuedItem) -> Result<(), LocalProtocolViolation> {
// Check if the item is sendable immediately.
if let Some(channel) = item_should_wait(&item, &self.juliet, &self.active_multi_frame)? {
#[cfg(feature = "tracing")]
tracing::debug!(%item, "postponing send");
self.wait_queue[channel.get() as usize].push_back(item);
return Ok(());
}
let channel = match &item {
QueuedItem::Request { channel, .. } | QueuedItem::Response { channel, .. } => *channel,
QueuedItem::RequestCancellation { .. }
| QueuedItem::ResponseCancellation { .. }
| QueuedItem::Error { .. } => {
// These variants always get send immediately.
#[cfg(feature = "tracing")]
tracing::debug!(%item, "ready to send");
return self.send_to_ready_queue(item);
}
};

#[cfg(feature = "tracing")]
tracing::debug!(%item, "ready to send");
self.send_to_ready_queue(item)
// Process the wait queue to avoid this new item "jumping the queue".
self.process_wait_queue(channel)?;

// Add the item to the wait queue, or send if the wait queue returns the item.
match self.wait_queue[channel.get() as usize].try_push_back(
item,
&self.juliet,
&self.active_multi_frame,
)? {
PushOutcome::Pushed => Ok(()),
PushOutcome::NotPushed(ready_item) => {
#[cfg(feature = "tracing")]
tracing::debug!(item = %ready_item, "ready to send");
self.send_to_ready_queue(ready_item)
}
}
}

/// Sends an item directly to the ready queue, causing it to be sent out eventually.
Expand All @@ -745,6 +767,7 @@ where
let id = msg.header().id();
self.request_map.insert(io_id, (channel, id));
if msg.is_multi_frame(self.juliet.max_frame_size()) {
debug_assert!(self.active_multi_frame[channel.get() as usize].is_none());
self.active_multi_frame[channel.get() as usize] = Some(msg.header());
}
self.ready_queue.push_back(msg.frames());
Expand All @@ -771,6 +794,7 @@ where
} => {
if let Some(msg) = self.juliet.create_response(channel, id, payload)? {
if msg.is_multi_frame(self.juliet.max_frame_size()) {
debug_assert!(self.active_multi_frame[channel.get() as usize].is_none());
self.active_multi_frame[channel.get() as usize] = Some(msg.header());
}
self.ready_queue.push_back(msg.frames())
Expand Down Expand Up @@ -827,77 +851,17 @@ where

/// Process the wait queue of a given channel, promoting messages that are ready to be sent.
fn process_wait_queue(&mut self, channel: ChannelId) -> Result<(), LocalProtocolViolation> {
let mut remaining = self.wait_queue[channel.get() as usize].len();

while let Some(item) = self.wait_queue[channel.get() as usize].pop_front() {
if item_should_wait(&item, &self.juliet, &self.active_multi_frame)?.is_some() {
// Put it right back into the queue.
self.wait_queue[channel.get() as usize].push_back(item);
} else {
self.send_to_ready_queue(item)?;

// No need to look further if we have saturated the channel.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did 0253804#diff-76866598ce8fd16261a27ac58a84b2825e6e77fc37c163a6afa60f0f4477e569L852-L856 fix an issue? The code was supposed to bring down the potential $O(n^2)$
total complexity of processing the queue
times. What's the case that triggers this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't an issue exposed via a test. Rather I thought it was a bug while following the logic during debugging.

The issue is that the wait queue can have not only requests but responses, so it would be wrong to exit early in the case where a bunch of responses could have been moved out of the wait queue.

As an aside, I wonder if it would be worthwhile creating a new enum just for the wait queue, similar to QueuedItem but with only Request and Response variants?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an aside, I wonder if it would be worthwhile creating a new enum just for the wait queue, similar to QueuedItem but with only Request and Response variants?

My guess is that the problem is likely best solved with two separate queues, one for requests and one for large messages (although I am not entirely sure yet how to handle the case where a message is both large and a request). Alternatively, we should keep some sort of state to ensure it can distinguish these cases quickly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the code being as-is in this PR, I'm still uncomfortable with the situation. Imagine queuing single-frame messages at a very high rate. Once we have saturated the ready queue, they will all go into the wait queue, and every call will process the now-growing entire queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could consider adding this:

struct WaitSubQueue {
    single_frame: VecDeque<QueuedItem>,
    multi_frame: VecDeque<QueuedItem>,
}

struct WaitQueue {
    requests: WaitSubQueue,
    other: Vec<QueuedItem>,
    prefer_request: bool,
}

impl WaitSubQueue {
    #[inline(always)]
    fn next_item(&mut self, allow_multi_frame: bool) -> Option<QueuedItem> {
        if allow_multi_frame && !self.multi_frame.is_empty() {
            self.multi_frame.pop_front()
        } else {
            self.singe_frame.pop_front()
        }
    }
}

impl WaitQueue {
    pub fn next_item(
        &mut self,
        request_allowed: bool,
        multiframe_allowed: bool,
    ) -> Option<QueuedItem> {
        if request_allowed {
            self.next_item_allowing_request(multiframe_allowed)
        } else {
            self.other.next_item()
        }
    }

    /// Returns the next item, assuming a request is allowed.
    // Note: This function is separate out for readability.
    #[inline(always)]
    fn next_item_allowing_request(&mut self, multiframe_allowed: bool) {
        let candidate = if prefer_request {
            self.requests
                .next_item(multiframe_allowed)
                .or_else(|| self.other.next_item(multiframe_allowed))
        } else {
            self.other
                .next_item(multiframe_allowed)
                .or_else(|| self.requests.next_item(multiframe_allowed))
        }?;

        // Alternate, to prevent starvation is receiver is procesing at a rate
        // that matches our production rate. This essentially subdivides the
        // channel into request/non-request subchannels.
        self.prefer_request = !candidate.is_request();
        Some(candidate)
    }
}

Since the logic gets more complex, it would be wise to separate it out. This is just a sketch, at least some comments would need to be filled in.

The key idea is to know what kind of item we can produce next by checking the state of our multiframe sends and request limits, then use the separated queue to optimize. This reorders items that weren't reordered before by separating the queues.

if !self.juliet.allowed_to_send_request(channel)? {
break;
}
}

// Ensure we do not loop endlessly if we cannot find anything.
remaining -= 1;
if remaining == 0 {
break;
}
while let Some(item) = self.wait_queue[channel.get() as usize].next_item(
channel,
&self.juliet,
&self.active_multi_frame,
)? {
self.send_to_ready_queue(item)?;
}

Ok(())
}
}

/// Determines whether an item is ready to be moved from the wait queue to the ready queue.
///
/// Returns `None` if the item does not need to wait. Otherwise, the item's channel ID is returned.
fn item_should_wait<const N: usize>(
item: &QueuedItem,
juliet: &JulietProtocol<N>,
active_multi_frame: &[Option<Header>; N],
) -> Result<Option<ChannelId>, LocalProtocolViolation> {
let (payload, channel) = match item {
QueuedItem::Request {
channel, payload, ..
} => {
// Check if we cannot schedule due to the message exceeding the request limit.
if !juliet.allowed_to_send_request(*channel)? {
return Ok(Some(*channel));
}

(payload, channel)
}
QueuedItem::Response {
channel, payload, ..
} => (payload, channel),

// Other messages are always ready.
QueuedItem::RequestCancellation { .. }
| QueuedItem::ResponseCancellation { .. }
| QueuedItem::Error { .. } => return Ok(None),
};

let active_multi_frame = active_multi_frame[channel.get() as usize];

// Check if we cannot schedule due to the message being multi-frame and there being a
// multi-frame send in progress:
if active_multi_frame.is_some() {
if let Some(payload) = payload {
if payload_is_multi_frame(juliet.max_frame_size(), payload.len()) {
return Ok(Some(*channel));
}
}
}

// Otherwise, this should be a legitimate add to the run queue.
Ok(None)
}

/// A handle to the input queue to the [`IoCore`] that allows sending requests and responses.
///
/// The handle is roughly three pointers in size and can be cloned at will. Dropping the last handle
Expand Down
Loading