Skip to content

Commit

Permalink
Return handler from reader after it is done
Browse files Browse the repository at this point in the history
  • Loading branch information
GunnarMorrigan committed Jan 2, 2024
1 parent d5eb571 commit d003d71
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 45 deletions.
46 changes: 6 additions & 40 deletions benches/benchmarks/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,40 +13,6 @@ use crate::benchmarks::test_handlers::{PingPong, SimpleDelay};

use super::fill_stuff;

struct ReadWriteTester<'a> {
read: Cursor<&'a [u8]>,
write: Vec<u8>,
}

impl<'a> ReadWriteTester<'a> {
pub fn new(read: &'a [u8]) -> Self {
Self {
read: Cursor::new(read),
write: Vec::new(),
}
}
}

impl<'a> tokio::io::AsyncRead for ReadWriteTester<'a> {
fn poll_read(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf<'_>) -> std::task::Poll<std::io::Result<()>> {
tokio::io::AsyncRead::poll_read(std::pin::Pin::new(&mut self.read), cx, buf)
}
}

impl<'a> tokio::io::AsyncWrite for ReadWriteTester<'a> {
fn poll_write(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, _buf: &[u8]) -> std::task::Poll<Result<usize, std::io::Error>> {
todo!()
}

fn poll_flush(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), std::io::Error>> {
todo!()
}

fn poll_shutdown(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), std::io::Error>> {
todo!()
}
}

fn tokio_setup() -> (TcpStream, std::net::TcpStream, SocketAddr) {
let mut buffer = BytesMut::new();

Expand Down Expand Up @@ -93,7 +59,7 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) {

let (read_res, write_res) = tokio::join!(read_handle, write_handle);
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
let (read_res, handler) = read_res.unwrap();
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
assert_eq!(read_res, NetworkStatus::IncomingDisconnect);
Expand All @@ -119,7 +85,7 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) {

let (read_res, write_res) = futures::join!(read_handle, write_handle);
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
let (read_res, handler) = read_res.unwrap();
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
assert_eq!(read_res, NetworkStatus::IncomingDisconnect);
Expand Down Expand Up @@ -157,7 +123,7 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) {

let (read_res, write_res) = tokio::join!(read_handle, write_handle);
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
let (read_res, handler) = read_res.unwrap();
assert!(read_res.is_ok());
assert_eq!(read_res.unwrap(), NetworkStatus::IncomingDisconnect);

Expand Down Expand Up @@ -190,7 +156,7 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) {

let (read_res, write_res) = tokio::join!(read_handle, write_handle);
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
let (read_res, handler) = read_res.unwrap();
assert!(read_res.is_ok());
assert_eq!(read_res.unwrap(), NetworkStatus::IncomingDisconnect);

Expand Down Expand Up @@ -218,7 +184,7 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) {

let (read_res, write_res) = futures::join!(read_handle, write_handle);
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
let (read_res, handler) = read_res.unwrap();
assert!(read_res.is_ok());
assert_eq!(read_res.unwrap(), NetworkStatus::IncomingDisconnect);
assert_eq!(102, num_packets_received.load(std::sync::atomic::Ordering::SeqCst));
Expand Down Expand Up @@ -256,7 +222,7 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) {

let (read_res, write_res) = futures::join!(read_handle, write_handle);
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
let (read_res, handler) = read_res.unwrap();
assert!(read_res.is_ok());
assert_eq!(read_res.unwrap(), NetworkStatus::IncomingDisconnect);

Expand Down
2 changes: 1 addition & 1 deletion src/event_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub mod example_handlers{
match event {
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
let max_len = payload.len().min(10);
// let max_len = payload.len().min(10);
// let a = &payload[0..max_len];
if payload.to_lowercase().contains("ping") {
self.client.publish(p.topic.clone(), p.qos, p.retain, Bytes::from_static(b"pong")).await.unwrap();
Expand Down
8 changes: 4 additions & 4 deletions src/tokio/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,20 +277,20 @@ where
N: HandlerExt<H>,
S: tokio::io::AsyncReadExt + Sized + Unpin + Send + 'static,
{
/// Runs the read half with a [`AsyncEventHandlerMut`].
/// Runs the read half of the mqtt connection.
/// Continuously loops until disconnect or error.
///
/// # Return
/// - Ok(None) in the case that the write task requested shutdown.
/// - Ok(Some(reason)) in the case that this task initiates a shutdown.
/// - Err in the case of IO, or protocol errors.
pub async fn run(mut self) -> Result<NetworkStatus, ConnectionError> {
pub async fn run(mut self) -> (Result<NetworkStatus, ConnectionError>, H) {
let ret = self.read().await;
self.run_signal.store(false, std::sync::atomic::Ordering::Release);
while let Some(_) = self.join_set.join_next().await {
()
}
ret
(ret, self.handler)
}
async fn read(&mut self) -> Result<NetworkStatus, ConnectionError> {
while self.run_signal.load(std::sync::atomic::Ordering::Acquire) {
Expand Down Expand Up @@ -364,7 +364,7 @@ impl<S> NetworkWriter<S>
where
S: tokio::io::AsyncWriteExt + Sized + Unpin,
{
/// Runs the write half of the concurrent read & write tokio client
/// Runs the read half of the mqtt connection.
/// Continuously loops until disconnect or error.
///
/// # Return
Expand Down

0 comments on commit d003d71

Please sign in to comment.