Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix large volume test #7

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

Fraser999
Copy link
Contributor

This PR fixes the failing run_large_volume_test_with_default_values_10_channels.

The root cause of the failure was a deadlock in the test itself, but during investigations a couple of other potential issues in the production code were identified and addressed. Each has been separated into an individual commit so it can be reverted if required.

Closes #4575.

Copy link
Contributor

@marc-casperlabs marc-casperlabs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got some questions around the wait queue processing, fearing it might become too expensive, can you shed some light on that? :)

src/rpc.rs Show resolved Hide resolved
src/io.rs Outdated
@@ -719,6 +721,16 @@ where

/// Handles a new item to send out that arrived through the incoming channel.
fn handle_incoming_item(&mut self, item: QueuedItem) -> Result<(), LocalProtocolViolation> {
// Process the wait queue to avoid this new item "jumping the queue".
match &item {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1dff662 seems to add starvation protection, i.e. newer data cannot consistently get in front of existing data. Was this behavior observed to be problematic?

My core issue with this is that if we process the wait queue each time anyway, it might be better to not even check if we can bypass it and just put everything in the wait queue every time. However, processing the wait queue is expensive, especially if the previously mentioned change is made. Queuing messages will then result in quadratic complexity!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was problematic in that Alice's requests started timing out, as ones in the wait queue weren't processed since newer ones kept getting preferential treatment.

I did consider just dumping everything in the wait queue, however I had the same reservation as you about the cost (and it also seemed to be somewhat abusing the intent of the wait queue - it would at least need renamed for clarity I think if we did that).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the very least add QueuedItem::is_request :)

This may be less of an issue if the "new" WaitQueue (see above) is added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the very least add QueuedItem::is_request :)

I can do, but tbh I don't see why we'd want that or where we'd use it?

@@ -835,11 +849,6 @@ where
self.wait_queue[channel.get() as usize].push_back(item);
} else {
self.send_to_ready_queue(item)?;

// No need to look further if we have saturated the channel.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did 0253804#diff-76866598ce8fd16261a27ac58a84b2825e6e77fc37c163a6afa60f0f4477e569L852-L856 fix an issue? The code was supposed to bring down the potential $O(n^2)$
total complexity of processing the queue
times. What's the case that triggers this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't an issue exposed via a test. Rather I thought it was a bug while following the logic during debugging.

The issue is that the wait queue can have not only requests but responses, so it would be wrong to exit early in the case where a bunch of responses could have been moved out of the wait queue.

As an aside, I wonder if it would be worthwhile creating a new enum just for the wait queue, similar to QueuedItem but with only Request and Response variants?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an aside, I wonder if it would be worthwhile creating a new enum just for the wait queue, similar to QueuedItem but with only Request and Response variants?

My guess is that the problem is likely best solved with two separate queues, one for requests and one for large messages (although I am not entirely sure yet how to handle the case where a message is both large and a request). Alternatively, we should keep some sort of state to ensure it can distinguish these cases quickly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the code being as-is in this PR, I'm still uncomfortable with the situation. Imagine queuing single-frame messages at a very high rate. Once we have saturated the ready queue, they will all go into the wait queue, and every call will process the now-growing entire queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could consider adding this:

struct WaitSubQueue {
    single_frame: VecDeque<QueuedItem>,
    multi_frame: VecDeque<QueuedItem>,
}

struct WaitQueue {
    requests: WaitSubQueue,
    other: Vec<QueuedItem>,
    prefer_request: bool,
}

impl WaitSubQueue {
    #[inline(always)]
    fn next_item(&mut self, allow_multi_frame: bool) -> Option<QueuedItem> {
        if allow_multi_frame && !self.multi_frame.is_empty() {
            self.multi_frame.pop_front()
        } else {
            self.singe_frame.pop_front()
        }
    }
}

impl WaitQueue {
    pub fn next_item(
        &mut self,
        request_allowed: bool,
        multiframe_allowed: bool,
    ) -> Option<QueuedItem> {
        if request_allowed {
            self.next_item_allowing_request(multiframe_allowed)
        } else {
            self.other.next_item()
        }
    }

    /// Returns the next item, assuming a request is allowed.
    // Note: This function is separate out for readability.
    #[inline(always)]
    fn next_item_allowing_request(&mut self, multiframe_allowed: bool) {
        let candidate = if prefer_request {
            self.requests
                .next_item(multiframe_allowed)
                .or_else(|| self.other.next_item(multiframe_allowed))
        } else {
            self.other
                .next_item(multiframe_allowed)
                .or_else(|| self.requests.next_item(multiframe_allowed))
        }?;

        // Alternate, to prevent starvation is receiver is procesing at a rate
        // that matches our production rate. This essentially subdivides the
        // channel into request/non-request subchannels.
        self.prefer_request = !candidate.is_request();
        Some(candidate)
    }
}

Since the logic gets more complex, it would be wise to separate it out. This is just a sketch, at least some comments would need to be filled in.

The key idea is to know what kind of item we can produce next by checking the state of our multiframe sends and request limits, then use the separated queue to optimize. This reorders items that weren't reordered before by separating the queues.

src/protocol.rs Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Fix Juliet test
2 participants