Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the "drain" feature for subscriptions and connections #1332

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

jsudano
Copy link

@jsudano jsudano commented Oct 23, 2024

Based on the proposal in #1325, implementing the functionality described in this NATS doc.

I would love suggestions for further tests. I tried adding tests for more complex timing/corner cases but they all wound up looking like exactly like the tests I have here

Copy link
Member

@Jarema Jarema left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this contribution!

This looks really good.
Some comments added.

@@ -773,6 +793,27 @@ impl ConnectionHandler {
Command::Flush { observer } => {
self.flush_observers.push(observer);
}
Command::Drain { sid } => {
let mut drain_sub = |sid: &u64, sub: &mut Subscription| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would pass by value here:

Suggested change
let mut drain_sub = |sid: &u64, sub: &mut Subscription| {
let mut drain_sub = |sid: u64, sub: &mut Subscription| {

let mut drain_sub = |sid: &u64, sub: &mut Subscription| {
sub.is_draining = true;
self.connection.enqueue_write_op(&ClientOp::Unsubscribe {
sid: *sid,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which allows to remove the dereference here

Suggested change
sid: *sid,
sid: sid,


if let Some(sid) = sid {
if let Some(sub) = self.subscriptions.get_mut(&sid) {
drain_sub(&sid, sub);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and referencing here

} else {
// sid isn't set, so drain the whole client
self.is_draining = true;
for (sid, sub) in self.subscriptions.iter_mut() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for (sid, sub) in self.subscriptions.iter_mut() {
for (&sid, sub) in self.subscriptions.iter_mut() {

// The entire connection is draining. This means we flushed outgoing messages in the previous
// call to this fn, we handled any remaining messages from the server in the loop above, and
// all subs were drained, so drain is complete and we should exit instead of processing any
// further messages
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should add new Event variants - like Draining and Closed.

@@ -1251,6 +1293,48 @@ impl Subscriber {
.await?;
Ok(())
}

/// Unsubscribes immediately but leaves the stream open to allow any in-flight messages on the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe subscription instead of stream? Sounds more direct on what is affected.

/// delivered
///
/// # Examples
/// ```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add no_run to examples - the demo server is not reliable enough :).

@Jarema
Copy link
Member

Jarema commented Oct 24, 2024

(CI was red because there was a typo in example. Should be fine after a rebase)

@Jarema
Copy link
Member

Jarema commented Oct 29, 2024

@jsudano ping, if you missed the review 🙂

@jsudano
Copy link
Author

jsudano commented Nov 6, 2024

Apologies for the delay, was away from the computer last week. Really appreciate the review, will get the comments addressed this week!

}
} else {
// sid isn't set, so drain the whole client
self.connector.events_tx.try_send(Event::Draining).ok();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assumed it was fine to ignore any errors here as they'd be pretty unlikely (the only scenario I could think of would be if the drain command was sent mid-shutdown, in which case it's fine to just ignore the error).

ExitReason::Closed => break,
ExitReason::Closed => {
//
self.connector.events_tx.try_send(Event::Closed).ok();
Copy link
Author

@jsudano jsudano Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the other comment, this is more-or-less the last thing we'd do during a drain-shutdown so it's probably fine to ignore errors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, however please either remove the // or add a comment there that explains why its ok.

Copy link
Member

@Jarema Jarema left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Just one leftover from I guess intended comment left.

ExitReason::Closed => break,
ExitReason::Closed => {
//
self.connector.events_tx.try_send(Event::Closed).ok();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, however please either remove the // or add a comment there that explains why its ok.

Copy link
Member

@Jarema Jarema left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, missed the cargo fmt and linter errors.
After fixing them, it's good to merge ;).

@jsudano
Copy link
Author

jsudano commented Nov 14, 2024

Oops, hopefully that last commit got all of them (I ran the commands from the validation runs locally and they passed at least)

@Jarema
Copy link
Member

Jarema commented Nov 15, 2024

@jsudano Thanks!

The test is missing use futures_util::stream::stream::StreamExt;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants