Skip to content

Commit

Permalink
final things before 0.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
GunnarMorrigan committed Nov 27, 2024
1 parent 782dda6 commit 1ce35da
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 161 deletions.
84 changes: 18 additions & 66 deletions mqrstt/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,56 +46,6 @@ impl MqttClient {
max_packet_size,
}
}

/// This function is only here for you to use during testing of for example your handler
/// For a simple client look at [`MqttClient::test_client`]
#[cfg(feature = "test")]
pub fn test_custom_client(available_packet_ids_r: Receiver<u16>, to_network_s: Sender<Packet>, max_packet_size: usize) -> Self {
Self {
available_packet_ids_r,
to_network_s,
max_packet_size,
}
}

/// This function is only here for you to use during testing of for example your handler
/// For control over the input of this type look at [`MqttClient::test_custom_client`]
///
/// The returned values should not be dropped otherwise the client won't be able to operate normally.
///
/// # Example
/// ```ignore
/// let (
/// client, // An instance of this client
/// ids, // Allows you to indicate which packet IDs have become available again.
/// network_receiver // Messages send through the `client` will be dispatched through this channel
/// ) = MqttClient::test_client();
///
/// // perform testing
///
/// // Make sure to not drop these before the test is done!
/// std::hint::black_box((ids, network_receiver));
/// ```
#[cfg(feature = "test")]
pub fn test_client() -> (Self, crate::available_packet_ids::AvailablePacketIds, Receiver<Packet>) {
use async_channel::unbounded;

use crate::{available_packet_ids::AvailablePacketIds, util::constants::MAXIMUM_PACKET_SIZE};

let (available_packet_ids, available_packet_ids_r) = AvailablePacketIds::new(u16::MAX);

let (s, r) = unbounded();

(
Self {
available_packet_ids_r,
to_network_s: s,
max_packet_size: MAXIMUM_PACKET_SIZE as usize,
},
available_packet_ids,
r,
)
}
}

/// Async functions to perform MQTT operations
Expand All @@ -106,7 +56,7 @@ impl MqttClient {
///
/// # Examples
/// ```
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("example_id").smol_sequential_network();
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("example_id").smol_network();
/// # smol::block_on(async {
///
/// use mqrstt::packets::QoS;
Expand Down Expand Up @@ -140,6 +90,8 @@ impl MqttClient {

sub.validate(self.max_packet_size)?;
self.to_network_s.send(Packet::Subscribe(sub)).await.map_err(|_| ClientError::NoNetworkChannel)?;
#[cfg(feature = "logs")]
info!("Send to network: Subscribe with ID {:?}", pkid);
Ok(())
}

Expand All @@ -150,7 +102,7 @@ impl MqttClient {
///
/// # Examples
/// ```
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("example_id").smol_sequential_network();
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("example_id").smol_network();
/// # smol::block_on(async {
///
/// use mqrstt::packets::QoS;
Expand Down Expand Up @@ -210,7 +162,7 @@ impl MqttClient {
///
/// # Examples
/// ```
/// # let (_, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_sequential_network();
/// # let (_, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_network();
/// # smol::block_on(async {
///
/// use mqrstt::packets::QoS;
Expand Down Expand Up @@ -264,7 +216,7 @@ impl MqttClient {
///
/// # Examples
/// ```
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_sequential_network();
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_network();
/// # smol::block_on(async {
///
/// use mqrstt::packets::QoS;
Expand Down Expand Up @@ -334,7 +286,7 @@ impl MqttClient {
/// # Examples
///
/// ```
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_sequential_network();
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_network();
/// # smol::block_on(async {
///
/// // Unsubscribe from a single topic specified as a string:
Expand Down Expand Up @@ -381,7 +333,7 @@ impl MqttClient {
/// # Examples
///
/// ```
/// # let (_, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_sequential_network();
/// # let (_, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_network();
/// # smol::block_on(async {
///
/// use mqrstt::packets::UnsubscribeProperties;
Expand Down Expand Up @@ -450,7 +402,7 @@ impl MqttClient {
/// # Example
///
/// ```
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_sequential_network();
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_network();
/// # smol::block_on(async {
///
/// mqtt_client.disconnect().await.unwrap();
Expand All @@ -476,7 +428,7 @@ impl MqttClient {
/// # Example
///
/// ```
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_sequential_network();
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_network();
/// # smol::block_on(async {
///
/// use mqrstt::packets::DisconnectProperties;
Expand Down Expand Up @@ -512,7 +464,7 @@ impl MqttClient {
///
/// # Examples
/// ```
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_sequential_network();
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_network();
/// # smol::block_on(async {
/// use mqrstt::packets::QoS;
/// use mqrstt::packets::{SubscriptionOptions, RetainHandling};
Expand Down Expand Up @@ -556,7 +508,7 @@ impl MqttClient {
/// This function blocks until the packet is queued for transmission
/// # Examples
/// ```
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_sequential_network();
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_network();
/// # smol::block_on(async {
/// use mqrstt::packets::QoS;
/// use mqrstt::packets::{SubscribeProperties, SubscriptionOptions, RetainHandling};
Expand Down Expand Up @@ -616,7 +568,7 @@ impl MqttClient {
/// This function blocks until the packet is queued for transmission
/// # Examples
/// ```
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_sequential_network();
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_network();
/// # smol::block_on(async {
///
/// use mqrstt::packets::QoS;
Expand Down Expand Up @@ -672,7 +624,7 @@ impl MqttClient {
///
/// # Examples
/// ```
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_sequential_network();
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_network();
/// # smol::block_on(async {
///
/// use mqrstt::packets::QoS;
Expand Down Expand Up @@ -742,7 +694,7 @@ impl MqttClient {
///
/// # Examples
/// ```
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_sequential_network();
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_network();
/// # smol::block_on(async {
///
/// // Unsubscribe from a single topic specified as a string:
Expand Down Expand Up @@ -790,7 +742,7 @@ impl MqttClient {
/// # Examples
///
/// ```
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_sequential_network();
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_network();
/// # smol::block_on(async {
///
/// use mqrstt::packets::UnsubscribeProperties;
Expand Down Expand Up @@ -851,7 +803,7 @@ impl MqttClient {
/// # Example
///
/// ```
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_sequential_network();
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_network();
/// # smol::block_on(async {
///
/// mqtt_client.disconnect_blocking().unwrap();
Expand All @@ -877,7 +829,7 @@ impl MqttClient {
///
/// ```
///
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_sequential_network();
/// # let (network, mqtt_client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>::new_from_client_id("Example").smol_network();
/// # smol::block_on(async {
///
/// use mqrstt::packets::DisconnectProperties;
Expand Down
44 changes: 6 additions & 38 deletions mqrstt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
//! // To reconnect after a disconnect or error
//! let (mut network, client) = NetworkBuilder
//! ::new_from_client_id("mqrsttSmolExample")
//! .smol_sequential_network();
//! .smol_network();
//! let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883))
//! .await
//! .unwrap();
Expand Down Expand Up @@ -236,9 +236,9 @@ where
/// ```
/// let (mut network, client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>
/// ::new_from_client_id("ExampleClient")
/// .smol_sequential_network();
/// .smol_network();
/// ```
pub fn smol_sequential_network(self) -> (smol::Network<H, S>, MqttClient) {
pub fn smol_network(self) -> (smol::Network<H, S>, MqttClient) {
let (to_network_s, to_network_r) = async_channel::bounded(CHANNEL_SIZE);

let (apkids, apkids_r) = available_packet_ids::AvailablePacketIds::new(self.options.send_maximum());
Expand All @@ -253,38 +253,6 @@ where
}
}

#[cfg(feature = "todo")]
/// Creates a new [`sync::Network<S>`] and [`MqttClient`] that can be connected to a broker.
/// S should implement [`std::io::Read`] and [`std::io::Write`].
/// Additionally, S should be made non_blocking otherwise it will not progress.
///
/// # Example
///
/// ```
/// use mqrstt::ConnectOptions;
///
/// let options = ConnectOptions::new("ExampleClient");
/// let (network, client) = mqrstt::new_sync::<std::net::TcpStream>(options);
/// ```
pub fn new_sync<S>(options: ConnectOptions) -> (sync::Network<S>, MqttClient)
where
S: std::io::Read + std::io::Write + Sized + Unpin,
{
use available_packet_ids::AvailablePacketIds;

let (to_network_s, to_network_r) = async_channel::bounded(100);

let (apkids, apkids_r) = AvailablePacketIds::new(options.send_maximum());

let max_packet_size = options.maximum_packet_size();

let client = MqttClient::new(apkids_r, to_network_s, max_packet_size);

let network = sync::Network::new(options, to_network_r, apkids);

(network, client)
}

#[cfg(test)]
fn random_chars() -> String {
rand::Rng::sample_iter(rand::thread_rng(), &rand::distributions::Alphanumeric).take(7).map(char::from).collect()
Expand All @@ -310,7 +278,7 @@ mod smol_lib_test {
let address = "broker.emqx.io";
let port = 1883;

let (mut network, client) = NetworkBuilder::new_from_options(options).smol_sequential_network();
let (mut network, client) = NetworkBuilder::new_from_options(options).smol_network();

let stream = smol::net::TcpStream::connect((address, port)).await.unwrap();
let mut pingpong = PingPong::new(client.clone());
Expand Down Expand Up @@ -347,7 +315,7 @@ mod smol_lib_test {
let address = "broker.emqx.io";
let port = 1883;

let (mut network, client) = NetworkBuilder::new_from_options(options).smol_sequential_network();
let (mut network, client) = NetworkBuilder::new_from_options(options).smol_network();
let stream = smol::net::TcpStream::connect((address, port)).await.unwrap();

let mut pingresp = crate::example_handlers::PingResp::new(client.clone());
Expand Down Expand Up @@ -393,7 +361,7 @@ mod smol_lib_test {

let (n, _) = futures::join!(
async {
let (mut network, client) = NetworkBuilder::new_from_options(options).smol_sequential_network();
let (mut network, client) = NetworkBuilder::new_from_options(options).smol_network();
let stream = smol::net::TcpStream::connect((address, port)).await.unwrap();
let mut pingresp = crate::example_handlers::PingResp::new(client.clone());
network.connect(stream, &mut pingresp).await
Expand Down
12 changes: 4 additions & 8 deletions mqrstt/src/state_handler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::available_packet_ids::AvailablePacketIds;
use crate::connect_options::ConnectOptions;
use crate::error::HandlerError;
use crate::packets::{PubRecReasonCode, PubAckReasonCode, ConnAckReasonCode};
use crate::packets::PubComp;
use crate::packets::PubRec;
use crate::packets::PubRel;
Expand All @@ -12,6 +11,7 @@ use crate::packets::Subscribe;
use crate::packets::UnsubAck;
use crate::packets::Unsubscribe;
use crate::packets::{ConnAck, Disconnect};
use crate::packets::{ConnAckReasonCode, PubAckReasonCode, PubRecReasonCode};
use crate::packets::{Packet, PacketType};
use crate::packets::{PubAck, PubAckProperties};
use crate::state::State;
Expand Down Expand Up @@ -188,6 +188,7 @@ impl StateHandler {
_a => {
#[cfg(test)]
unreachable!("Was given unexpected packet {:?} ", _a);
#[cfg(not(test))]
Ok(())
}
}
Expand Down Expand Up @@ -247,13 +248,8 @@ mod handler_tests {
use crate::{
available_packet_ids::AvailablePacketIds,
packets::{
Packet,
PubComp, PubCompProperties, PubCompReasonCode,
PubRec, PubRecProperties, PubRecReasonCode,
PubRel, PubRelProperties, PubRelReasonCode,
QoS,
SubAck, SubAckProperties, SubAckReasonCode,
UnsubAck, UnsubAckProperties, UnsubAckReasonCode
Packet, PubComp, PubCompProperties, PubCompReasonCode, PubRec, PubRecProperties, PubRecReasonCode, PubRel, PubRelProperties, PubRelReasonCode, QoS, SubAck, SubAckProperties,
SubAckReasonCode, UnsubAck, UnsubAckProperties, UnsubAckReasonCode,
},
tests::test_packets::{create_connack_packet, create_puback_packet, create_publish_packet, create_subscribe_packet, create_unsubscribe_packet},
ConnectOptions, StateHandler,
Expand Down
Loading

0 comments on commit 1ce35da

Please sign in to comment.