From 1882fb4883ef79d30404ec2de08a379fa8fb610a Mon Sep 17 00:00:00 2001 From: GunnarMorrigan <13799935+GunnarMorrigan@users.noreply.github.com> Date: Wed, 31 Jan 2024 19:50:13 +0100 Subject: [PATCH] added simple sync tcp example for v0.2.2 --- examples/sync_tcp_v0.2.2/Cargo.toml | 9 ++++ examples/sync_tcp_v0.2.2/src/main.rs | 68 ++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) create mode 100644 examples/sync_tcp_v0.2.2/Cargo.toml create mode 100644 examples/sync_tcp_v0.2.2/src/main.rs diff --git a/examples/sync_tcp_v0.2.2/Cargo.toml b/examples/sync_tcp_v0.2.2/Cargo.toml new file mode 100644 index 0000000..f05e80e --- /dev/null +++ b/examples/sync_tcp_v0.2.2/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "sync_tcp_v0_2_2" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +mqrstt = { version = "0.2.2", default-features = false, features = ["sync"]} diff --git a/examples/sync_tcp_v0.2.2/src/main.rs b/examples/sync_tcp_v0.2.2/src/main.rs new file mode 100644 index 0000000..6075df8 --- /dev/null +++ b/examples/sync_tcp_v0.2.2/src/main.rs @@ -0,0 +1,68 @@ +use std::time::Duration; + +use mqrstt::{ + new_sync, packets::{self, Packet}, sync::NetworkStatus, ConnectOptions, EventHandler, MqttClient +}; + +pub struct PingPong { + pub client: MqttClient, +} + +impl EventHandler for PingPong { + // Handlers only get INCOMING packets. This can change later. + fn handle(&mut self, event: packets::Packet) -> () { + match event { + Packet::Publish(p) => { + if let Ok(payload) = String::from_utf8(p.payload.to_vec()) { + if payload.to_lowercase().contains("ping") { + self.client.publish_blocking(p.topic.clone(), p.qos, p.retain, "pong").unwrap(); + println!("Received Ping, Send pong!"); + } + } + } + Packet::ConnAck(_) => { + println!("Connected!") + } + _ => (), + } + } +} + +fn main() { + let client_id = "SyncTls_MQrsTT_Example".to_string(); + let options = ConnectOptions::new(client_id); + + let address = "broker.emqx.io"; + let port = 1883; + + let (mut network, client) = new_sync(options); + + let stream = std::net::TcpStream::connect((address, port)).unwrap(); + stream.set_nonblocking(true).unwrap(); + + let mut pingpong = PingPong { client: client.clone() }; + + network.connect(stream, &mut pingpong).unwrap(); + + client.subscribe_blocking("mqrstt").unwrap(); + + let thread = std::thread::spawn(move || { + loop { + match network.poll(&mut pingpong) { + // The client is active but there is no data to be read + Ok(NetworkStatus::ActivePending) => std::thread::sleep(Duration::from_millis(100)), + // The client is active and there is data to be read + Ok(NetworkStatus::ActiveReady) => continue, + // The rest is an error + otherwise => return otherwise, + }; + } + }); + + std::thread::sleep(std::time::Duration::from_secs(30)); + client.disconnect_blocking().unwrap(); + + // Unwrap possible join errors on the thread. + let n = thread.join().unwrap(); + assert!(n.is_ok()); +}