From 782dda6877304b42206f185eadc70296ec88f3bc Mon Sep 17 00:00:00 2001 From: GunnarMorrigan <13799935+GunnarMorrigan@users.noreply.github.com> Date: Thu, 28 Nov 2024 00:00:48 +0100 Subject: [PATCH] added tokio and smol tcp examples again --- Cargo.toml | 1 + examples/tcp/Cargo.toml | 19 ++++++++++++++++++ examples/tcp/src/smol.rs | 42 +++++++++++++++++++++++++++++++++++++++ examples/tcp/src/tokio.rs | 40 +++++++++++++++++++++++++++++++++++++ 4 files changed, 102 insertions(+) create mode 100644 examples/tcp/Cargo.toml create mode 100644 examples/tcp/src/smol.rs create mode 100644 examples/tcp/src/tokio.rs diff --git a/Cargo.toml b/Cargo.toml index a62cdfe..10dc1aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,4 +3,5 @@ members = [ "mqrstt", "fuzz", + "examples/tcp" ] \ No newline at end of file diff --git a/examples/tcp/Cargo.toml b/examples/tcp/Cargo.toml new file mode 100644 index 0000000..78834ec --- /dev/null +++ b/examples/tcp/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "tcp" +version = "0.1.0" +edition = "2021" + +[dependencies] +smol = { version = "2" } + +tokio = { version = "1", features = ["full"] } + +mqrstt = { path = "../../mqrstt", features = ["logs"] } + +[[bin]] +name = "tokio" +path = "src/tokio.rs" + +[[bin]] +name = "smol" +path = "src/smol.rs" diff --git a/examples/tcp/src/smol.rs b/examples/tcp/src/smol.rs new file mode 100644 index 0000000..592d880 --- /dev/null +++ b/examples/tcp/src/smol.rs @@ -0,0 +1,42 @@ +use mqrstt::AsyncEventHandler; + +pub struct Handler { + byte_count: u64, +} + +impl AsyncEventHandler for Handler { + fn handle(&mut self, incoming_packet: mqrstt::packets::Packet) -> impl std::future::Future + Send + Sync { + async move { + if let mqrstt::packets::Packet::Publish(publish) = incoming_packet { + self.byte_count += publish.payload.len() as u64; + } + } + } +} + +fn main() { + smol::block_on(async { + let hostname = "broker.emqx.io:1883"; + + let mut handler = Handler { byte_count: 0 }; + + let stream = smol::net::TcpStream::connect(hostname).await.unwrap(); + let (mut network, client) = mqrstt::NetworkBuilder::new_from_client_id("TestClientABCDEFG").smol_network(); + + network.connect(stream, &mut handler).await.unwrap(); + smol::Timer::after(std::time::Duration::from_secs(5)).await; + + client.subscribe("testtopic/#").await.unwrap(); + + smol::spawn(async move { + network.run(&mut handler).await.unwrap(); + + dbg!(handler.byte_count); + }) + .detach(); + + smol::Timer::after(std::time::Duration::from_secs(60)).await; + client.disconnect().await.unwrap(); + smol::Timer::after(std::time::Duration::from_secs(1)).await; + }); +} diff --git a/examples/tcp/src/tokio.rs b/examples/tcp/src/tokio.rs new file mode 100644 index 0000000..1e9693b --- /dev/null +++ b/examples/tcp/src/tokio.rs @@ -0,0 +1,40 @@ +use mqrstt::AsyncEventHandler; + +pub struct Handler { + byte_count: u64, +} + +impl AsyncEventHandler for Handler { + fn handle(&mut self, incoming_packet: mqrstt::packets::Packet) -> impl std::future::Future + Send + Sync { + async move { + if let mqrstt::packets::Packet::Publish(publish) = incoming_packet { + self.byte_count += publish.payload.len() as u64; + } + } + } +} + +#[tokio::main] +async fn main() { + let hostname = "broker.emqx.io:1883"; + + let mut handler = Handler { byte_count: 0 }; + + let stream = tokio::net::TcpStream::connect(hostname).await.unwrap(); + let (mut network, client) = mqrstt::NetworkBuilder::new_from_client_id("TestClientABCDEFG").tokio_network(); + + network.connect(stream, &mut handler).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + client.subscribe("testtopic/#").await.unwrap(); + + tokio::spawn(async move { + network.run(&mut handler).await.unwrap(); + + dbg!(handler.byte_count); + }); + + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + client.disconnect().await.unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; +}