Skip to content

Commit

Permalink
added tokio and smol tcp examples again
Browse files Browse the repository at this point in the history
  • Loading branch information
GunnarMorrigan committed Nov 27, 2024
1 parent 78def56 commit 782dda6
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
members = [
"mqrstt",
"fuzz",
"examples/tcp"
]
19 changes: 19 additions & 0 deletions examples/tcp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "tcp"
version = "0.1.0"
edition = "2021"

[dependencies]
smol = { version = "2" }

tokio = { version = "1", features = ["full"] }

mqrstt = { path = "../../mqrstt", features = ["logs"] }

[[bin]]
name = "tokio"
path = "src/tokio.rs"

[[bin]]
name = "smol"
path = "src/smol.rs"
42 changes: 42 additions & 0 deletions examples/tcp/src/smol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use mqrstt::AsyncEventHandler;

pub struct Handler {
byte_count: u64,
}

impl AsyncEventHandler for Handler {
fn handle(&mut self, incoming_packet: mqrstt::packets::Packet) -> impl std::future::Future<Output = ()> + Send + Sync {
async move {
if let mqrstt::packets::Packet::Publish(publish) = incoming_packet {
self.byte_count += publish.payload.len() as u64;
}
}
}
}

fn main() {
smol::block_on(async {
let hostname = "broker.emqx.io:1883";

let mut handler = Handler { byte_count: 0 };

let stream = smol::net::TcpStream::connect(hostname).await.unwrap();
let (mut network, client) = mqrstt::NetworkBuilder::new_from_client_id("TestClientABCDEFG").smol_network();

network.connect(stream, &mut handler).await.unwrap();
smol::Timer::after(std::time::Duration::from_secs(5)).await;

client.subscribe("testtopic/#").await.unwrap();

smol::spawn(async move {
network.run(&mut handler).await.unwrap();

dbg!(handler.byte_count);
})
.detach();

smol::Timer::after(std::time::Duration::from_secs(60)).await;
client.disconnect().await.unwrap();
smol::Timer::after(std::time::Duration::from_secs(1)).await;
});
}
40 changes: 40 additions & 0 deletions examples/tcp/src/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use mqrstt::AsyncEventHandler;

pub struct Handler {
byte_count: u64,
}

impl AsyncEventHandler for Handler {
fn handle(&mut self, incoming_packet: mqrstt::packets::Packet) -> impl std::future::Future<Output = ()> + Send + Sync {
async move {
if let mqrstt::packets::Packet::Publish(publish) = incoming_packet {
self.byte_count += publish.payload.len() as u64;
}
}
}
}

#[tokio::main]
async fn main() {
let hostname = "broker.emqx.io:1883";

let mut handler = Handler { byte_count: 0 };

let stream = tokio::net::TcpStream::connect(hostname).await.unwrap();
let (mut network, client) = mqrstt::NetworkBuilder::new_from_client_id("TestClientABCDEFG").tokio_network();

network.connect(stream, &mut handler).await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(5)).await;

client.subscribe("testtopic/#").await.unwrap();

tokio::spawn(async move {
network.run(&mut handler).await.unwrap();

dbg!(handler.byte_count);
});

tokio::time::sleep(std::time::Duration::from_secs(60)).await;
client.disconnect().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}

0 comments on commit 782dda6

Please sign in to comment.