Skip to content

Commit

Permalink
apps: add GSO support for server on Linux
Browse files Browse the repository at this point in the history
When GSO can be enabled (on a supported linux platform),
now quiche-server sends multiple packets using sendmsg()
with GSO.

For non-supported platforms, it should work same as before.
  • Loading branch information
Junho Choi authored and ghedo committed Aug 26, 2022
1 parent 9214311 commit 3131c0d
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 94 deletions.
165 changes: 72 additions & 93 deletions apps/src/bin/quiche-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
#[macro_use]
extern crate log;

use std::net;

use std::io;

use std::net;

use std::io::prelude::*;

use std::collections::HashMap;
Expand All @@ -45,13 +45,15 @@ use quiche_apps::args::*;

use quiche_apps::common::*;

const MAX_BUF_SIZE: usize = 65535;
use quiche_apps::sendto::*;

const MAX_BUF_SIZE: usize = 65507;

const MAX_DATAGRAM_SIZE: usize = 1350;

fn main() {
let mut buf = [0; MAX_BUF_SIZE];
let mut out = [0; MAX_DATAGRAM_SIZE];
let mut out = [0; MAX_BUF_SIZE];
let mut pacing = false;

env_logger::builder()
Expand Down Expand Up @@ -87,6 +89,11 @@ fn main() {
.register(&mut socket, mio::Token(0), mio::Interest::READABLE)
.unwrap();

let max_datagram_size = MAX_DATAGRAM_SIZE;
let enable_gso = detect_gso(&socket, max_datagram_size);

trace!("GSO detected: {}", enable_gso);

// Create the configuration for the QUIC connections.
let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();

Expand All @@ -96,8 +103,8 @@ fn main() {
config.set_application_protos(&conn_args.alpns).unwrap();

config.set_max_idle_timeout(conn_args.idle_timeout);
config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
config.set_max_recv_udp_payload_size(max_datagram_size);
config.set_max_send_udp_payload_size(max_datagram_size);
config.set_initial_max_data(conn_args.max_data);
config.set_initial_max_stream_data_bidi_local(conn_args.max_stream_data);
config.set_initial_max_stream_data_bidi_remote(conn_args.max_stream_data);
Expand Down Expand Up @@ -366,7 +373,9 @@ fn main() {
partial_responses: HashMap::new(),
siduck_conn: None,
app_proto_selected: false,
bytes_sent: 0,
max_datagram_size,
loss_rate: 0.0,
max_send_burst: MAX_BUF_SIZE,
};

clients.insert(client_id, client);
Expand Down Expand Up @@ -450,6 +459,10 @@ fn main() {

client.app_proto_selected = true;
}

// Update max_datagram_size after connection established.
client.max_datagram_size =
client.conn.max_send_udp_payload_size();
}

if client.http_conn.is_some() {
Expand Down Expand Up @@ -515,12 +528,29 @@ fn main() {
// packets to be sent.
continue_write = false;
for client in clients.values_mut() {
let max_send_burst = client.conn.send_quantum().min(MAX_BUF_SIZE) /
MAX_DATAGRAM_SIZE *
MAX_DATAGRAM_SIZE;
// Reduce max_send_burst by 25% if loss is increasing more than 0.1%.
let loss_rate =
client.conn.stats().lost as f64 / client.conn.stats().sent as f64;
if loss_rate > client.loss_rate + 0.001 {
client.max_send_burst = client.max_send_burst / 4 * 3;
// Minimun bound of 10xMSS.
client.max_send_burst =
client.max_send_burst.max(client.max_datagram_size * 10);
client.loss_rate = loss_rate;
}

loop {
let (write, send_info) = match client.conn.send(&mut out) {
let max_send_burst =
client.conn.send_quantum().min(client.max_send_burst) /
client.max_datagram_size *
client.max_datagram_size;
let mut total_write = 0;
let mut dst_info = None;

while total_write < max_send_burst {
let (write, send_info) = match client
.conn
.send(&mut out[total_write..max_send_burst])
{
Ok(v) => v,

Err(quiche::Error::Done) => {
Expand All @@ -536,33 +566,42 @@ fn main() {
},
};

// TODO: coalesce packets.
if let Err(e) =
send_to(&socket, &out[..write], &send_info, pacing)
{
if e.kind() == std::io::ErrorKind::WouldBlock {
trace!("send() would block");
break;
}
total_write += write;

panic!("send() failed: {:?}", e);
}
// Use the first packet time to send, not the last.
let _ = dst_info.get_or_insert(send_info);

trace!("{} written {} bytes", client.conn.trace_id(), write);
if write < client.max_datagram_size {
break;
}
}

// limit write bursting
client.bytes_sent += write;
if total_write == 0 || dst_info.is_none() {
break;
}

if client.bytes_sent >= max_send_burst {
trace!(
"{} pause writing at {}",
client.conn.trace_id(),
client.bytes_sent
);
client.bytes_sent = 0;
continue_write = true;
if let Err(e) = send_to(
&socket,
&out[..total_write],
&dst_info.unwrap(),
client.max_datagram_size,
pacing,
enable_gso,
) {
if e.kind() == std::io::ErrorKind::WouldBlock {
trace!("send() would block");
break;
}

panic!("send_to() failed: {:?}", e);
}

trace!("{} written {} bytes", client.conn.trace_id(), total_write);

if total_write >= max_send_burst {
trace!("{} pause writing", client.conn.trace_id(),);
continue_write = true;
break;
}
}

Expand Down Expand Up @@ -741,63 +780,3 @@ fn set_txtime_sockopt(_: &mio::net::UdpSocket) -> io::Result<()> {
"Not supported on this platform",
))
}

/// Send outgoing UDP packet to kernel using sendmsg syscall
///
/// sendmsg syscall also includes the time the packet needs to be
/// sent by the kernel in msghdr.
///
/// Note that sendmsg syscall is used only on linux platforms.
#[cfg(target_os = "linux")]
fn send_to(
sock: &mio::net::UdpSocket, send_buf: &[u8], send_info: &quiche::SendInfo,
pacing: bool,
) -> io::Result<usize> {
use nix::sys::socket::sendmsg;
use nix::sys::socket::ControlMessage;
use nix::sys::socket::MsgFlags;
use nix::sys::socket::SockaddrStorage;
use std::io::IoSlice;
use std::os::unix::io::AsRawFd;

if !pacing {
return sock.send_to(send_buf, send_info.to);
}

let nanos_per_sec: u64 = 1_000_000_000;
let sockfd = sock.as_raw_fd();
let len = send_buf.len();
let iov = [IoSlice::new(&send_buf[..len])];

let mut time_spec = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};

unsafe {
std::ptr::copy_nonoverlapping(
&send_info.at as *const _ as *const libc::timespec,
&mut time_spec,
1,
)
};

let send_time =
time_spec.tv_sec as u64 * nanos_per_sec + time_spec.tv_nsec as u64;

let cmsg = ControlMessage::TxTime(&send_time);
let addr = SockaddrStorage::from(send_info.to);

match sendmsg(sockfd, &iov, &[cmsg], MsgFlags::empty(), Some(&addr)) {
Ok(written) => Ok(written),
Err(e) => Err(e.into()),
}
}

#[cfg(not(target_os = "linux"))]
fn send_to(
sock: &mio::net::UdpSocket, send_buf: &[u8], send_info: &quiche::SendInfo,
_: bool,
) -> io::Result<usize> {
sock.send_to(send_buf, send_info.to)
}
6 changes: 5 additions & 1 deletion apps/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ pub struct Client {

pub partial_responses: std::collections::HashMap<u64, PartialResponse>,

pub bytes_sent: usize,
pub max_datagram_size: usize,

pub loss_rate: f64,

pub max_send_burst: usize,
}

pub type ClientIdMap = HashMap<ConnectionId<'static>, ClientId>;
Expand Down
1 change: 1 addition & 0 deletions apps/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ extern crate log;
pub mod args;
pub mod client;
pub mod common;
pub mod sendto;
Loading

0 comments on commit 3131c0d

Please sign in to comment.