Skip to content

Commit

Permalink
Addressing comments: emit Event::Draining and Event::Closed; pass `si…
Browse files Browse the repository at this point in the history
…d` by value in closure; update doc comments
  • Loading branch information
jsudano authored and jsudano committed Nov 14, 2024
1 parent bcc995b commit 712580f
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,11 @@ impl ConnectionHandler {
};
debug!("reconnected");
}
ExitReason::Closed => break,
ExitReason::Closed => {
//
self.connector.events_tx.try_send(Event::Closed).ok();
break;
}
ExitReason::ReconnectRequested => {
debug!("reconnect requested");
// Should be ok to ingore error, as that means we are not in connected state.
Expand Down Expand Up @@ -794,22 +798,21 @@ impl ConnectionHandler {
self.flush_observers.push(observer);
}
Command::Drain { sid } => {
let mut drain_sub = |sid: &u64, sub: &mut Subscription| {
let mut drain_sub = |sid: u64, sub: &mut Subscription| {
sub.is_draining = true;
self.connection.enqueue_write_op(&ClientOp::Unsubscribe {
sid: *sid,
max: None,
});
self.connection
.enqueue_write_op(&ClientOp::Unsubscribe { sid, max: None });
};

if let Some(sid) = sid {
if let Some(sub) = self.subscriptions.get_mut(&sid) {
drain_sub(&sid, sub);
drain_sub(sid, sub);
}
} else {
// sid isn't set, so drain the whole client
self.connector.events_tx.try_send(Event::Draining).ok();
self.is_draining = true;
for (sid, sub) in self.subscriptions.iter_mut() {
for (&sid, sub) in self.subscriptions.iter_mut() {
drain_sub(sid, sub);
}
}
Expand Down Expand Up @@ -1068,6 +1071,8 @@ pub enum Event {
Connected,
Disconnected,
LameDuckMode,
Draining,
Closed,
SlowConsumer(u64),
ServerError(ServerError),
ClientError(ClientError),
Expand All @@ -1079,6 +1084,8 @@ impl fmt::Display for Event {
Event::Connected => write!(f, "connected"),
Event::Disconnected => write!(f, "disconnected"),
Event::LameDuckMode => write!(f, "lame duck mode detected"),
Event::Draining => write!(f, "draining"),
Event::Closed => write!(f, "closed"),
Event::SlowConsumer(sid) => write!(f, "slow consumers for subscription {sid}"),
Event::ServerError(err) => write!(f, "server error: {err}"),
Event::ClientError(err) => write!(f, "client error: {err}"),
Expand Down Expand Up @@ -1294,12 +1301,12 @@ impl Subscriber {
Ok(())
}

/// Unsubscribes immediately but leaves the stream open to allow any in-flight messages on the
/// subscription to be delivered. The stream will be closed after any remaining messages are
/// delivered
/// Unsubscribes immediately but leaves the subscription open to allow any in-flight messages
/// on the subscription to be delivered. The stream will be closed after any remaining messages
/// are delivered
///
/// # Examples
/// ```
/// ```no_run
/// # use futures::StreamExt;
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
Expand Down

0 comments on commit 712580f

Please sign in to comment.