From d003d715b0008e61ebd9c1d5a0831f23bfc81749 Mon Sep 17 00:00:00 2001 From: GunnarMorrigan <13799935+GunnarMorrigan@users.noreply.github.com> Date: Tue, 2 Jan 2024 11:22:32 +0100 Subject: [PATCH] Return handler from reader after it is done --- benches/benchmarks/tokio.rs | 46 +++++-------------------------------- src/event_handlers.rs | 2 +- src/tokio/network.rs | 8 +++---- 3 files changed, 11 insertions(+), 45 deletions(-) diff --git a/benches/benchmarks/tokio.rs b/benches/benchmarks/tokio.rs index 2feb8de..e463bc7 100644 --- a/benches/benchmarks/tokio.rs +++ b/benches/benchmarks/tokio.rs @@ -13,40 +13,6 @@ use crate::benchmarks::test_handlers::{PingPong, SimpleDelay}; use super::fill_stuff; -struct ReadWriteTester<'a> { - read: Cursor<&'a [u8]>, - write: Vec, -} - -impl<'a> ReadWriteTester<'a> { - pub fn new(read: &'a [u8]) -> Self { - Self { - read: Cursor::new(read), - write: Vec::new(), - } - } -} - -impl<'a> tokio::io::AsyncRead for ReadWriteTester<'a> { - fn poll_read(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf<'_>) -> std::task::Poll> { - tokio::io::AsyncRead::poll_read(std::pin::Pin::new(&mut self.read), cx, buf) - } -} - -impl<'a> tokio::io::AsyncWrite for ReadWriteTester<'a> { - fn poll_write(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, _buf: &[u8]) -> std::task::Poll> { - todo!() - } - - fn poll_flush(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll> { - todo!() - } - - fn poll_shutdown(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll> { - todo!() - } -} - fn tokio_setup() -> (TcpStream, std::net::TcpStream, SocketAddr) { let mut buffer = BytesMut::new(); @@ -93,7 +59,7 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) { let (read_res, write_res) = tokio::join!(read_handle, write_handle); assert!(read_res.is_ok()); - let read_res = read_res.unwrap(); + let (read_res, handler) = read_res.unwrap(); assert!(read_res.is_ok()); let read_res = read_res.unwrap(); assert_eq!(read_res, NetworkStatus::IncomingDisconnect); @@ -119,7 +85,7 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) { let (read_res, write_res) = futures::join!(read_handle, write_handle); assert!(read_res.is_ok()); - let read_res = read_res.unwrap(); + let (read_res, handler) = read_res.unwrap(); assert!(read_res.is_ok()); let read_res = read_res.unwrap(); assert_eq!(read_res, NetworkStatus::IncomingDisconnect); @@ -157,7 +123,7 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) { let (read_res, write_res) = tokio::join!(read_handle, write_handle); assert!(read_res.is_ok()); - let read_res = read_res.unwrap(); + let (read_res, handler) = read_res.unwrap(); assert!(read_res.is_ok()); assert_eq!(read_res.unwrap(), NetworkStatus::IncomingDisconnect); @@ -190,7 +156,7 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) { let (read_res, write_res) = tokio::join!(read_handle, write_handle); assert!(read_res.is_ok()); - let read_res = read_res.unwrap(); + let (read_res, handler) = read_res.unwrap(); assert!(read_res.is_ok()); assert_eq!(read_res.unwrap(), NetworkStatus::IncomingDisconnect); @@ -218,7 +184,7 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) { let (read_res, write_res) = futures::join!(read_handle, write_handle); assert!(read_res.is_ok()); - let read_res = read_res.unwrap(); + let (read_res, handler) = read_res.unwrap(); assert!(read_res.is_ok()); assert_eq!(read_res.unwrap(), NetworkStatus::IncomingDisconnect); assert_eq!(102, num_packets_received.load(std::sync::atomic::Ordering::SeqCst)); @@ -256,7 +222,7 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) { let (read_res, write_res) = futures::join!(read_handle, write_handle); assert!(read_res.is_ok()); - let read_res = read_res.unwrap(); + let (read_res, handler) = read_res.unwrap(); assert!(read_res.is_ok()); assert_eq!(read_res.unwrap(), NetworkStatus::IncomingDisconnect); diff --git a/src/event_handlers.rs b/src/event_handlers.rs index 44212de..9ca100f 100644 --- a/src/event_handlers.rs +++ b/src/event_handlers.rs @@ -122,7 +122,7 @@ pub mod example_handlers{ match event { Packet::Publish(p) => { if let Ok(payload) = String::from_utf8(p.payload.to_vec()) { - let max_len = payload.len().min(10); + // let max_len = payload.len().min(10); // let a = &payload[0..max_len]; if payload.to_lowercase().contains("ping") { self.client.publish(p.topic.clone(), p.qos, p.retain, Bytes::from_static(b"pong")).await.unwrap(); diff --git a/src/tokio/network.rs b/src/tokio/network.rs index 335db17..5f25047 100644 --- a/src/tokio/network.rs +++ b/src/tokio/network.rs @@ -277,20 +277,20 @@ where N: HandlerExt, S: tokio::io::AsyncReadExt + Sized + Unpin + Send + 'static, { - /// Runs the read half with a [`AsyncEventHandlerMut`]. + /// Runs the read half of the mqtt connection. /// Continuously loops until disconnect or error. /// /// # Return /// - Ok(None) in the case that the write task requested shutdown. /// - Ok(Some(reason)) in the case that this task initiates a shutdown. /// - Err in the case of IO, or protocol errors. - pub async fn run(mut self) -> Result { + pub async fn run(mut self) -> (Result, H) { let ret = self.read().await; self.run_signal.store(false, std::sync::atomic::Ordering::Release); while let Some(_) = self.join_set.join_next().await { () } - ret + (ret, self.handler) } async fn read(&mut self) -> Result { while self.run_signal.load(std::sync::atomic::Ordering::Acquire) { @@ -364,7 +364,7 @@ impl NetworkWriter where S: tokio::io::AsyncWriteExt + Sized + Unpin, { - /// Runs the write half of the concurrent read & write tokio client + /// Runs the read half of the mqtt connection. /// Continuously loops until disconnect or error. /// /// # Return