From 77705fa9f7d943991a36b419bf5c0ed53fb99196 Mon Sep 17 00:00:00 2001 From: Jiaqi Gao Date: Wed, 30 Oct 2024 05:31:12 -0400 Subject: [PATCH] vsock: add buffer space management `buf_alloc` and `fwd_cnt` in packet header are used for buffer space management of stream sockets. If peer has insufficient buffer space, the sender waits until new packets are returned and checks `buf_alloc` and `fwd_cnt` again. `VIRTIO_VSOCK_OP_CREDIT_REQUEST` packet is used to query how much buffer space is available. `VIRTIO_VSOCK_OP_CREDIT_UPDATE` replies the query and it can also be sent without previous `VIRTIO_VSOCK_OP_CREDIT_REQUEST`. This patch tracks the `fwd_cnt` and `buf_alloc` to calculate the free space of peer. The credite request and update packets are also handled. Signed-off-by: Jiaqi Gao --- src/devices/vsock/src/stream.rs | 134 +++++++++++++++++++++++--------- 1 file changed, 96 insertions(+), 38 deletions(-) diff --git a/src/devices/vsock/src/stream.rs b/src/devices/vsock/src/stream.rs index 05c59c3b..64c556ba 100644 --- a/src/devices/vsock/src/stream.rs +++ b/src/devices/vsock/src/stream.rs @@ -87,6 +87,9 @@ pub struct VsockStream { addr: VsockAddrPair, data_queue: VecDeque>, rx_cnt: u32, + tx_cnt: u32, + peer_fwd_cnt: u32, + peer_buf_alloc: u32, } impl Read for VsockStream { @@ -124,6 +127,9 @@ impl VsockStream { }, data_queue: VecDeque::new(), rx_cnt: 0, + tx_cnt: 0, + peer_fwd_cnt: 0, + peer_buf_alloc: 0, }) } @@ -201,6 +207,9 @@ impl VsockStream { }, data_queue: VecDeque::new(), rx_cnt: 0, + tx_cnt: 0, + peer_fwd_cnt: packet.fwd_cnt(), + peer_buf_alloc: packet.buf_alloc(), }; add_stream_to_connection_map(&new_stream); @@ -254,6 +263,8 @@ impl VsockStream { && packet.src_cid() == self.addr.remote.cid() as u64 { self.state = State::Establised; + self.peer_buf_alloc = packet.buf_alloc(); + self.peer_fwd_cnt = packet.fwd_cnt(); Ok(()) } else { Err(VsockError::REFUSED) @@ -297,6 +308,11 @@ impl VsockStream { if state != State::Establised { return Err(VsockError::Illegal); } + + while self.has_free_space() == 0 { + self.recv_packet_connected()?; + } + let mut header_buf = [0u8; HEADER_LEN]; let mut packet = Packet::new_unchecked(&mut header_buf[..]); packet.set_src_cid(self.addr.local.cid() as u64); @@ -315,6 +331,7 @@ impl VsockStream { .ok_or(VsockError::DeviceNotAvailable)? .transport .enqueue(self, packet.as_ref(), buf, DEFAULT_TIMEOUT)?; + self.tx_cnt += buf.len() as u32; Ok(buf.len()) } @@ -325,44 +342,8 @@ impl VsockStream { return Err(VsockError::Illegal); } - if self.data_queue.is_empty() { - let recv = VSOCK_DEVICE - .lock() - .get_mut() - .ok_or(VsockError::DeviceNotAvailable)? - .transport - .dequeue(self, DEFAULT_TIMEOUT)?; - let packet = Packet::new_checked(recv.as_slice())?; - - if packet.op() == field::OP_SHUTDOWN { - self.shutdown()?; - return Ok(0); - } - if packet.op() == field::OP_RST { - self.reset()?; - return Err(VsockError::Illegal); - } - if packet.op() != field::OP_RW { - return Err(VsockError::Illegal); - } - - if packet.data_len() > 0 { - let mut recv = VSOCK_DEVICE - .lock() - .get_mut() - .ok_or(VsockError::DeviceNotAvailable)? - .transport - .dequeue(self, DEFAULT_TIMEOUT)?; - - self.rx_cnt += packet.data_len(); - if packet.data_len() as usize <= recv.len() { - recv.truncate(packet.data_len() as usize); - } else { - return Err(VsockError::Illegal); - } - - self.data_queue.push_back(recv); - } + while self.data_queue.is_empty() { + self.recv_packet_connected()?; } let mut recvd = 0; @@ -424,6 +405,83 @@ impl VsockStream { } } + fn recv_packet_connected(&mut self) -> Result<()> { + let recv = VSOCK_DEVICE + .lock() + .get_mut() + .ok_or(VsockError::DeviceNotAvailable)? + .transport + .dequeue(self, DEFAULT_TIMEOUT)?; + let packet = Packet::new_checked(recv.as_slice())?; + + self.peer_buf_alloc = packet.buf_alloc(); + self.peer_fwd_cnt = packet.fwd_cnt(); + match packet.op() { + field::OP_SHUTDOWN => { + self.shutdown()?; + } + field::OP_RST => { + self.reset()?; + return Err(VsockError::Illegal); + } + field::OP_RW => { + if packet.data_len() > 0 { + let mut recv = VSOCK_DEVICE + .lock() + .get_mut() + .ok_or(VsockError::DeviceNotAvailable)? + .transport + .dequeue(self, DEFAULT_TIMEOUT)?; + + self.rx_cnt += packet.data_len(); + if packet.data_len() as usize <= recv.len() { + recv.truncate(packet.data_len() as usize); + } else { + return Err(VsockError::Illegal); + } + + self.data_queue.push_back(recv); + } + } + field::OP_CREDIT_UPDATE => { + self.peer_fwd_cnt = packet.fwd_cnt(); + self.peer_buf_alloc = packet.buf_alloc(); + } + field::OP_CREDIT_REQUEST => { + self.send_credit_update()?; + } + _ => return Err(VsockError::Illegal), + } + Ok(()) + } + + fn send_credit_update(&self) -> Result<()> { + let mut header_buf = [0u8; HEADER_LEN]; + let mut packet = Packet::new_unchecked(&mut header_buf[..]); + packet.set_src_cid(self.addr.local.cid() as u64); + packet.set_dst_cid(self.addr.remote.cid() as u64); + packet.set_src_port(self.addr.local.port()); + packet.set_dst_port(self.addr.remote.port()); + packet.set_type(field::TYPE_STREAM); + packet.set_op(field::OP_CREDIT_UPDATE); + packet.set_data_len(0); + packet.set_flags(0); + packet.set_fwd_cnt(self.rx_cnt); + packet.set_buf_alloc(VSOCK_BUF_ALLOC); + let _ = VSOCK_DEVICE + .lock() + .get_mut() + .ok_or(VsockError::DeviceNotAvailable)? + .transport + .enqueue(self, packet.as_ref(), &[], DEFAULT_TIMEOUT)?; + Ok(()) + } + + fn has_free_space(&self) -> u32 { + self.peer_buf_alloc + .saturating_sub(self.tx_cnt.saturating_sub(self.peer_fwd_cnt)) + } + pub(crate) fn addr(&self) -> VsockAddrPair { self.addr }