Skip to content

Commit

Permalink
quinn: introduce wake_stream() helper
Browse files Browse the repository at this point in the history
  • Loading branch information
djc authored and Ralith committed Jun 23, 2024
1 parent 0273e0a commit 70f5194
Showing 1 changed file with 11 additions and 21 deletions.
32 changes: 11 additions & 21 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1061,11 +1061,7 @@ impl State {
ConnectionLost { reason } => {
self.terminate(reason, shared);
}
Stream(StreamEvent::Writable { id }) => {
if let Some(writer) = self.blocked_writers.remove(&id) {
writer.wake();
}
}
Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
shared.stream_incoming[Dir::Uni as usize].notify_waiters();
}
Expand All @@ -1078,27 +1074,15 @@ impl State {
DatagramsUnblocked => {
shared.datagrams_unblocked.notify_waiters();
}
Stream(StreamEvent::Readable { id }) => {
if let Some(reader) = self.blocked_readers.remove(&id) {
reader.wake();
}
}
Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
Stream(StreamEvent::Available { dir }) => {
// Might mean any number of streams are ready, so we wake up everyone
shared.stream_budget_available[dir as usize].notify_waiters();
}
Stream(StreamEvent::Finished { id }) => {
if let Some(stopped) = self.stopped.remove(&id) {
stopped.wake();
}
}
Stream(StreamEvent::Finished { id }) => wake_stream(id, &mut self.stopped),
Stream(StreamEvent::Stopped { id, .. }) => {
if let Some(stopped) = self.stopped.remove(&id) {
stopped.wake();
}
if let Some(writer) = self.blocked_writers.remove(&id) {
writer.wake();
}
wake_stream(id, &mut self.stopped);
wake_stream(id, &mut self.blocked_writers);
}
}
}
Expand Down Expand Up @@ -1222,6 +1206,12 @@ impl fmt::Debug for State {
}
}

fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
if let Some(waker) = wakers.remove(&stream_id) {
waker.wake();
}
}

fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
wakers.drain().for_each(|(_, waker)| waker.wake())
}
Expand Down

0 comments on commit 70f5194

Please sign in to comment.