Skip to content

Commit

Permalink
read only one packet at a time
Browse files Browse the repository at this point in the history
  • Loading branch information
GunnarMorrigan committed Dec 20, 2023
1 parent 08f25cc commit 38e611e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 39 deletions.
33 changes: 15 additions & 18 deletions src/tokio/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,27 +145,24 @@ where
tokio::select! {
res = stream.read_bytes() => {
res?;
match stream.parse_messages(incoming_packet_buffer).await {
let packet = match stream.parse_message().await {
Err(ReadBytes::Err(err)) => return Err(err),
Err(ReadBytes::InsufficientBytes(_)) => return Ok(NetworkStatus::Active),
Ok(_) => (),
}

for packet in incoming_packet_buffer.drain(0..){
use Packet::*;
match packet{
PingResp => {
handler.handle(packet).await;
*await_pingresp = None;
},
Disconnect(_) => {
Ok(packet) => packet,
};

match packet{
Packet::PingResp => {
handler.handle(packet).await;
*await_pingresp = None;
},
Packet::Disconnect(_) => {
handler.handle(packet).await;
return Ok(NetworkStatus::IncomingDisconnect);
}
packet => {
if mqtt_handler.handle_incoming_packet(&packet, outgoing_packet_buffer)?{
handler.handle(packet).await;
return Ok(NetworkStatus::IncomingDisconnect);
}
packet => {
if mqtt_handler.handle_incoming_packet(&packet, outgoing_packet_buffer)?{
handler.handle(packet).await;
}
}
}
}
Expand Down
32 changes: 11 additions & 21 deletions src/tokio/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,22 @@ pub struct Stream<S> {
}

impl<S> Stream<S> {
pub async fn parse_messages(&mut self, incoming_packet_buffer: &mut Vec<Packet>) -> Result<(), ReadBytes<ConnectionError>> {
loop {
if self.read_buffer.is_empty() {
return Ok(());
}
let (header, header_length) = FixedHeader::read_fixed_header(self.read_buffer.iter())?;
pub async fn parse_message(&mut self) -> Result<Packet, ReadBytes<ConnectionError>> {
let (header, header_length) = FixedHeader::read_fixed_header(self.read_buffer.iter())?;

if header.remaining_length + header_length > self.read_buffer.len() {
return Err(ReadBytes::InsufficientBytes(header.remaining_length - self.read_buffer.len()));
}
if header.remaining_length + header_length > self.read_buffer.len() {
return Err(ReadBytes::InsufficientBytes(header.remaining_length - self.read_buffer.len()));
}

self.read_buffer.advance(header_length);
self.read_buffer.advance(header_length);

let buf = self.read_buffer.split_to(header.remaining_length);
let read_packet = Packet::read(header, buf.into())?;
let buf = self.read_buffer.split_to(header.remaining_length);
let read_packet = Packet::read(header, buf.into())?;

#[cfg(feature = "logs")]
trace!("Read packet from network {}", read_packet);

let packet_type = read_packet.packet_type();
incoming_packet_buffer.push(read_packet);
#[cfg(feature = "logs")]
trace!("Read packet from network {}", read_packet);

if packet_type == PacketType::Disconnect {
return Ok(());
}
}
Ok(read_packet)
}
}

Expand Down

0 comments on commit 38e611e

Please sign in to comment.