Skip to content

Commit

Permalink
recovery: move detect acked packets logic to epoch
Browse files Browse the repository at this point in the history
  • Loading branch information
vkrasnov authored and ghedo committed May 7, 2024
1 parent 2a83344 commit 70b67f9
Showing 1 changed file with 131 additions and 142 deletions.
273 changes: 131 additions & 142 deletions quiche/src/recovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use std::time::Instant;
use std::collections::VecDeque;

use crate::packet::Epoch;
use crate::ranges::RangeSet;
use crate::Config;
use crate::Result;

Expand Down Expand Up @@ -95,6 +96,14 @@ struct RecoveryEpoch {
lost_frames: Vec<frame::Frame>,
}

struct AckedDetectionResult {
acked_bytes: usize,
spurious_losses: usize,
spurious_pkt_thresh: Option<u64>,
has_ack_eliciting: bool,
has_in_flight_spurious_loss: bool,
}

struct LossDetectionResult {
largest_lost_pkt: Option<Sent>,
lost_packets: usize,
Expand All @@ -103,6 +112,96 @@ struct LossDetectionResult {
}

impl RecoveryEpoch {
fn detect_and_remove_acked_packets(
&mut self, now: Instant, acked: &RangeSet, newly_acked: &mut Vec<Acked>,
rtt_stats: &RttStats, trace_id: &str,
) -> AckedDetectionResult {
newly_acked.clear();

let mut acked_bytes = 0;
let mut spurious_losses = 0;
let mut spurious_pkt_thresh = None;
let mut has_ack_eliciting = false;
let mut has_in_flight_spurious_loss = false;

let largest_acked = self.largest_acked_packet.unwrap();

for ack in acked.iter() {
// Because packets always have incrementing numbers, they are always
// in sorted order.
let start = if self
.sent_packets
.front()
.filter(|e| e.pkt_num >= ack.start)
.is_some()
{
// Usually it will be the first packet.
0
} else {
self.sent_packets
.binary_search_by_key(&ack.start, |p| p.pkt_num)
.unwrap_or_else(|e| e)
};

for unacked in self.sent_packets.range_mut(start..) {
if unacked.pkt_num >= ack.end {
break;
}

if unacked.time_acked.is_some() {
// Already acked.
} else if unacked.time_lost.is_some() {
// An acked packet was already declared lost.
spurious_losses += 1;
spurious_pkt_thresh
.get_or_insert(largest_acked - unacked.pkt_num + 1);
unacked.time_acked = Some(now);

if unacked.in_flight {
has_in_flight_spurious_loss = true;
}
} else {
if unacked.in_flight {
self.in_flight_count -= 1;
acked_bytes += unacked.size;
}

newly_acked.push(Acked {
pkt_num: unacked.pkt_num,
time_sent: unacked.time_sent,
size: unacked.size,

rtt: now.saturating_duration_since(unacked.time_sent),
delivered: unacked.delivered,
delivered_time: unacked.delivered_time,
first_sent_time: unacked.first_sent_time,
is_app_limited: unacked.is_app_limited,
tx_in_flight: unacked.tx_in_flight,
lost: unacked.lost,
});

trace!("{} packet newly acked {}", trace_id, unacked.pkt_num);

self.acked_frames
.extend(std::mem::take(&mut unacked.frames));

has_ack_eliciting |= unacked.ack_eliciting;
unacked.time_acked = Some(now);
}
}
}

self.drain_acked_and_lost_packets(now - rtt_stats.rtt());

AckedDetectionResult {
acked_bytes,
spurious_losses,
spurious_pkt_thresh,
has_ack_eliciting,
has_in_flight_spurious_loss,
}
}

fn detect_lost_packets(
&mut self, loss_delay: Duration, pkt_thresh: u64, now: Instant,
trace_id: &str, epoch: Epoch,
Expand Down Expand Up @@ -561,162 +660,54 @@ impl Recovery {
) -> Result<(usize, usize)> {
let largest_acked = ranges.last().unwrap();

// While quiche used to consider ACK frames acknowledging packet numbers
// larger than the largest sent one as invalid, this is not true anymore
// if we consider a single packet number space and multiple paths. The
// simplest example is the case where the host sends a probing packet on
// a validating path, then receives an acknowledgment for that packet on
// the active one.

// Update the largest acked packet.
let largest_acked = self.epochs[epoch]
.largest_acked_packet
.unwrap_or(0)
.max(largest_acked);

self.epochs[epoch]
.largest_acked_packet
.replace(largest_acked);

let mut has_ack_eliciting = false;

let mut largest_newly_acked_pkt_num = 0;
let mut largest_newly_acked_sent_time = now;

let mut undo_cwnd = false;

let max_rtt = cmp::max(self.rtt_stats.latest_rtt, self.rtt_stats.rtt());
self.epochs[epoch].largest_acked_packet = Some(largest_acked);

// Detect and mark acked packets, without removing them from the sent
// packets list.
for r in ranges.iter() {
let lowest_acked_in_block = r.start;
let largest_acked_in_block = r.end - 1;

let first_unacked = if self.epochs[epoch]
.sent_packets
.front()
.map(|p| p.pkt_num >= lowest_acked_in_block)
.unwrap_or(true)
{
// In the happy case the first sent packet is the first to be
// acked, so optimize for that case.
0
} else {
// If it is not the first packet, try to find it using binary
// search.
self.epochs[epoch]
.sent_packets
.binary_search_by_key(&lowest_acked_in_block, |e| e.pkt_num)
.unwrap_or_else(|i| i)
};

let epoch = &mut self.epochs[epoch];

let unacked_iter = epoch.sent_packets.range_mut(first_unacked..)
// Skip packets that follow the largest acked packet in the block.
.take_while(|p| p.pkt_num <= largest_acked_in_block)
// Skip packets that have already been acked or lost.
.filter(|p| p.time_acked.is_none());

for unacked in unacked_iter {
unacked.time_acked = Some(now);

// Check if acked packet was already declared lost.
if unacked.time_lost.is_some() {
// Calculate new packet reordering threshold.
let pkt_thresh = epoch.largest_acked_packet.unwrap_or(0) -
unacked.pkt_num +
1;
let pkt_thresh = cmp::min(MAX_PACKET_THRESHOLD, pkt_thresh);

self.pkt_thresh = cmp::max(self.pkt_thresh, pkt_thresh);

// Calculate new time reordering threshold.
let loss_delay = max_rtt.mul_f64(self.time_thresh);

// unacked.time_sent can be in the future due to
// pacing.
if now.saturating_duration_since(unacked.time_sent) >
loss_delay
{
// TODO: do time threshold update
self.time_thresh = 5_f64 / 4_f64;
}

if unacked.in_flight {
undo_cwnd = true;
}

self.lost_spurious_count += 1;
continue;
}

if unacked.ack_eliciting {
has_ack_eliciting = true;
}

largest_newly_acked_pkt_num = unacked.pkt_num;
largest_newly_acked_sent_time = unacked.time_sent;

epoch.acked_frames.extend(unacked.frames.drain(..));

if unacked.in_flight {
epoch.in_flight_count -= 1;
}

newly_acked.push(Acked {
pkt_num: unacked.pkt_num,

time_sent: unacked.time_sent,

size: unacked.size,

rtt: now.saturating_duration_since(unacked.time_sent),

delivered: unacked.delivered,

delivered_time: unacked.delivered_time,

first_sent_time: unacked.first_sent_time,

is_app_limited: unacked.is_app_limited,

tx_in_flight: unacked.tx_in_flight,

lost: unacked.lost,
});
let AckedDetectionResult {
acked_bytes,
spurious_losses,
spurious_pkt_thresh,
has_ack_eliciting,
has_in_flight_spurious_loss,
} = self.epochs[epoch].detect_and_remove_acked_packets(
now,
ranges,
newly_acked,
&self.rtt_stats,
trace_id,
);

trace!("{} packet newly acked {}", trace_id, unacked.pkt_num);
}
self.lost_spurious_count += spurious_losses;
if let Some(thresh) = spurious_pkt_thresh {
self.pkt_thresh =
self.pkt_thresh.max(thresh.min(MAX_PACKET_THRESHOLD));
}

// Undo congestion window update.
if undo_cwnd {
if has_in_flight_spurious_loss {
(self.cc_ops.rollback)(self);
}

if newly_acked.is_empty() {
return Ok((0, 0));
}

if largest_newly_acked_pkt_num == largest_acked && has_ack_eliciting {
// The packet's sent time could be in the future if pacing is used
// and the network has a very short RTT.
let latest_rtt =
now.saturating_duration_since(largest_newly_acked_sent_time);

let ack_delay = Duration::from_micros(ack_delay);

// Don't update srtt if rtt is zero.
if !latest_rtt.is_zero() {
self.rtt_stats.update_rtt(
latest_rtt,
ack_delay,
now,
handshake_status.completed,
);
}
// Check if largest packet is newly acked.
let largest_newly_acked = newly_acked.last().unwrap();

if largest_newly_acked.pkt_num == largest_acked && has_ack_eliciting {
let latest_rtt = now - largest_newly_acked.time_sent;
self.rtt_stats.update_rtt(
latest_rtt,
Duration::from_micros(ack_delay),
now,
handshake_status.completed,
);
}

// Detect and mark lost packets without removing them from the sent
Expand All @@ -725,6 +716,8 @@ impl Recovery {

self.on_packets_acked(newly_acked, now);

self.bytes_in_flight -= acked_bytes;

self.pto_count = 0;

self.set_loss_detection_timer(handshake_status, now);
Expand Down Expand Up @@ -1044,11 +1037,9 @@ impl Recovery {
}

fn on_packets_acked(&mut self, acked: &mut Vec<Acked>, now: Instant) {
let mut newly_acked_bytes = 0;
// Update delivery rate sample per acked packet.
for pkt in acked.iter() {
self.delivery_rate.update_rate_sample(pkt, now);
newly_acked_bytes += pkt.size;
}

// Fill in a rate sample.
Expand All @@ -1057,8 +1048,6 @@ impl Recovery {

// Call congestion control hooks.
(self.cc_ops.on_packets_acked)(self, acked, now);

self.bytes_in_flight -= newly_acked_bytes;
}

fn in_congestion_recovery(&self, sent_time: Instant) -> bool {
Expand Down

0 comments on commit 70b67f9

Please sign in to comment.