Skip to content

Commit

Permalink
Merge pull request #15 from vincentdephily/refactor_0.2
Browse files Browse the repository at this point in the history
API changes, unittest, and docs
  • Loading branch information
00imvj00 authored Oct 25, 2019
2 parents 86b064d + 7c77d35 commit 12be9d2
Show file tree
Hide file tree
Showing 15 changed files with 1,064 additions and 658 deletions.
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# 0.2 (unreleased)

This is a fairly large release with API fixes and improvements, bug fixes, and much better test
coverage and documentation.

## API changes

* We now return a dedicated error type instead of abusing `std::io::Error`.
* `PacketIdentifier` was renamed to `Pid`. It now avoids the illegal value 0, wraps around automatically, and can be hashed.
* `Publish.qos` and `Publish.pid` have been merged together, avoiding accidental illegal combinations.
* `Connect.password` and `Connect.will.payload` can now contain binary data.
* The `Protocol` enum doesn't carry extra data anymore.
* All public structs/enum/functions are now (re)exported from the crate root, and the rest has been made private.
* The letter-casing of packet types is more consistent.
* Packet subtypes can be converted to `Packet` using `.into()`.

## Other changes

* Much improved documentation. See it with `cargo doc --open`.
* More thorough unittesting, including exhaustive and random value ranges testing.
* Lots of corner-case bugfixes, particularly when decoding partial or corrupted data.
* The minimum rust version is now 1.32.
* Raised `mqttrs`'s bus factor to 2 ;)

# 0.1.4 (2019-09-16)

* Fix issue #8: Decoding an incomplete packet still consumes bytes from the buffer.
6 changes: 2 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
cargo-features = ["edition"]

[package]
name = "mqttrs"
version = "0.1.4"
authors = ["00imvj00 <[email protected]>"]
authors = ["00imvj00 <[email protected]>",
"Vincent de Phily <[email protected]>"]
edition = "2018"
description = "mqttrs is encoding & decoding library for mqtt protocol, it can work with both sync as well as async apps"
homepage = "https://github.com/00imvj00/mqttrs"
repository = "https://github.com/00imvj00/mqttrs"
keywords = ["mqtt", "encoding", "decoding", "async", "async-mqtt"]
license = "Apache-2.0"


[dependencies]
bytes = "0.4"

Expand Down
65 changes: 45 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,45 @@
# Rust Mqtt Encoding & Decoding

### What is Mqtt?
MQTT is an ISO standard publish-subscribe-based messaging protocol. It works on top of the TCP/IP protocol.

### What is Rust?
Rust is a multi-paradigm systems programming language focused on safety, especially safe concurrency. Rust is syntactically similar to C++, but is designed to provide better memory safety while maintaining high performance.

### What is mqttrs?

It is library which can be used in any rust projects where you need to transform valid mqtt bytes buffer to Mqtt types and vice versa.

In short it is encoding/decoding library which you can use it in sync as well as async environment.

The way it works is, It will take byte buffer as input and then will try to read the header of the mqtt packet, if the packet is not completely received as it happens in async networking, the library function will return `None` and will not remove any bytes from buffer.

Once, the whole mqtt packet is received, mqttrs will convert the bytes into appropriate mqtt packet type and return as well as remove all bytes from the beginning which belongs to already received packet.

So, in this way, this library can be used for sync tcp streams as well as async streams like tokio tcp streams.

# Rust Mqtt Encoding & Decoding [![Crates.io](https://img.shields.io/crates/l/mqttrs)](LICENSE) [![Docs.rs](https://docs.rs/mqttrs/badge.svg)](https://docs.rs/mqttrs/*/mqttrs/)

`Mqttrs` is a [Rust](https://www.rust-lang.org/) crate (library) to write [MQTT
protocol](https://mqtt.org/) clients and servers.

It is a codec-only library with [very few dependencies](Cargo.toml) and a [straightworward and
composable API](https://docs.rs/mqttrs/*/mqttrs/), usable with rust's standard library or with async
frameworks like [tokio](https://tokio.rs/).

`Mqttrs` currently requires [Rust >= 1.32](https://www.rust-lang.org/learn/get-started) and supports
[MQTT 3.1.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html). Support for [MQTT
5](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html) is planned for a future version.

## Usage

Add `mqttrs = "0.2"` to your `Cargo.toml`.

```rust
use mqttrs::*;
use bytes::BytesMut;

// Allocate write buffer.
let mut buf = BytesMut::with_capacity(1024);

// Encode an MQTT Connect packet.
let pkt = Packet::Connect(Connect { protocol: Protocol::MQTT311,
keep_alive: 30,
client_id: "doc_client".into(),
clean_session: true,
last_will: None,
username: None,
password: None });
assert!(encode(&pkt, &mut buf).is_ok());
assert_eq!(&buf[14..], "doc_client".as_bytes());
let mut encoded = buf.clone();

// Decode one packet. The buffer will advance to the next packet.
assert_eq!(Ok(Some(pkt)), decode(&mut buf));

// Example decode failures.
let mut incomplete = encoded.split_to(10);
assert_eq!(Ok(None), decode(&mut incomplete));
let mut garbage = BytesMut::from(vec![0u8,0,0,0]);
assert_eq!(Err(Error::InvalidHeader), decode(&mut garbage));
```
81 changes: 52 additions & 29 deletions src/codec_test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::*;
use bytes::BufMut;
use bytes::BytesMut;
use proptest::{bool, collection::vec, num::*, prelude::*};

Expand All @@ -13,6 +14,11 @@ prop_compose! {
QoS::from_u8(qos).unwrap()
}
}
prop_compose! {
fn stg_pid()(pid in 1..std::u16::MAX) -> Pid {
Pid::try_from(pid).unwrap()
}
}
prop_compose! {
fn stg_subtopic()(topic_path in stg_topic(), qos in stg_qos()) -> SubscribeTopic {
SubscribeTopic { topic_path, qos }
Expand Down Expand Up @@ -40,13 +46,13 @@ prop_compose! {
clean_session in bool::ANY,
username in stg_optstr(),
password in stg_optstr()) -> Packet {
Packet::Connect(Connect { protocol: Protocol::MQTT(4),
Packet::Connect(Connect { protocol: Protocol::MQTT311,
keep_alive,
client_id,
clean_session,
last_will: None,
username,
password })
password: password.map(|p| p.as_bytes().to_vec()) })
}
}
prop_compose! {
Expand All @@ -57,67 +63,70 @@ prop_compose! {
}
prop_compose! {
fn stg_publish()(dup in bool::ANY,
qos in 0u8..3,
pid in u16::ANY,
qos in stg_qos(),
pid in stg_pid(),
retain in bool::ANY,
topic_name in stg_topic(),
payload in vec(0u8..255u8, 1..300)) -> Packet {
Packet::Publish(Publish{dup,
qos: QoS::from_u8(qos).unwrap(),
qospid: match qos {
QoS::AtMostOnce => QosPid::AtMostOnce,
QoS::AtLeastOnce => QosPid::AtLeastOnce(pid),
QoS::ExactlyOnce => QosPid::ExactlyOnce(pid),
},
retain,
topic_name,
pid: if qos == 0 { None } else { Some(PacketIdentifier(pid)) },
payload})
}
}
prop_compose! {
fn stg_puback()(pid in u16::ANY) -> Packet {
Packet::Puback(PacketIdentifier(pid))
fn stg_puback()(pid in stg_pid()) -> Packet {
Packet::Puback(pid)
}
}
prop_compose! {
fn stg_pubrec()(pid in u16::ANY) -> Packet {
Packet::Pubrec(PacketIdentifier(pid))
fn stg_pubrec()(pid in stg_pid()) -> Packet {
Packet::Pubrec(pid)
}
}
prop_compose! {
fn stg_pubrel()(pid in u16::ANY) -> Packet {
Packet::Puback(PacketIdentifier(pid))
fn stg_pubrel()(pid in stg_pid()) -> Packet {
Packet::Pubrel(pid)
}
}
prop_compose! {
fn stg_pubcomp()(pid in u16::ANY) -> Packet {
Packet::PubComp(PacketIdentifier(pid))
fn stg_pubcomp()(pid in stg_pid()) -> Packet {
Packet::Pubcomp(pid)
}
}
prop_compose! {
fn stg_subscribe()(pid in u16::ANY, topics in vec(stg_subtopic(), 0..20)) -> Packet {
Packet::Subscribe(Subscribe{pid: PacketIdentifier(pid), topics})
fn stg_subscribe()(pid in stg_pid(), topics in vec(stg_subtopic(), 0..20)) -> Packet {
Packet::Subscribe(Subscribe{pid: pid, topics})
}
}
prop_compose! {
fn stg_suback()(pid in u16::ANY, return_codes in vec(stg_subretcode(), 0..300)) -> Packet {
Packet::SubAck(Suback{pid: PacketIdentifier(pid), return_codes})
fn stg_suback()(pid in stg_pid(), return_codes in vec(stg_subretcode(), 0..300)) -> Packet {
Packet::Suback(Suback{pid: pid, return_codes})
}
}
prop_compose! {
fn stg_unsubscribe()(pid in u16::ANY, topics in vec(stg_topic(), 0..20)) -> Packet {
Packet::UnSubscribe(Unsubscribe{pid:PacketIdentifier(pid), topics})
fn stg_unsubscribe()(pid in stg_pid(), topics in vec(stg_topic(), 0..20)) -> Packet {
Packet::Unsubscribe(Unsubscribe{pid:pid, topics})
}
}
prop_compose! {
fn stg_unsuback()(pid in u16::ANY) -> Packet {
Packet::UnSubAck(PacketIdentifier(pid))
fn stg_unsuback()(pid in stg_pid()) -> Packet {
Packet::Unsuback(pid)
}
}
prop_compose! {
fn stg_pingreq()(_ in bool::ANY) -> Packet {
Packet::PingReq
Packet::Pingreq
}
}
prop_compose! {
fn stg_pingresp()(_ in bool::ANY) -> Packet {
Packet::PingResp
Packet::Pingresp
}
}
prop_compose! {
Expand All @@ -136,14 +145,14 @@ macro_rules! impl_proptests {
fn $name(pkt in $stg()) {
// Encode the packet
let mut buf = BytesMut::with_capacity(10240);
let res = encoder::encode(&pkt.clone(), &mut buf);
let res = encode(&pkt, &mut buf);
prop_assert!(res.is_ok(), "encode({:?}) -> {:?}", pkt, res);
prop_assert!(buf.len() >= 2, "buffer too small: {:?}", buf); //PING is 2 bytes
prop_assert!(buf[0] >> 4 > 0 && buf[0] >> 4 < 16, "bad packet type {:?}", buf);

// Check that decoding returns the original
let mut encoded = buf.clone();
let decoded = decoder::decode(&mut buf);
let encoded = buf.clone();
let decoded = decode(&mut buf);
let ok = match &decoded {
Ok(Some(p)) if *p == pkt => true,
_other => false,
Expand All @@ -152,9 +161,23 @@ macro_rules! impl_proptests {
prop_assert!(buf.is_empty(), "Buffer not empty: {:?}", buf);

// Check that decoding a partial packet returns Ok(None)
encoded.split_off(encoded.len() - 1);
let decoded = decoder::decode(&mut encoded).unwrap();
let decoded = decode(&mut encoded.clone().split_off(encoded.len() - 1)).unwrap();
prop_assert!(decoded.is_none(), "partial decode {:?} -> {:?}", encoded, decoded);

// Check that encoding into a small buffer fails cleanly
buf.clear();
buf.split_off(encoded.len());
prop_assert!(encoded.len() == buf.remaining_mut() && buf.is_empty(),
"Wrong buffer init1 {}/{}/{}", encoded.len(), buf.remaining_mut(), buf.is_empty());
prop_assert!(encode(&pkt, &mut buf).is_ok(), "exact buffer capacity {}", buf.capacity());
for l in (0..encoded.len()).rev() {
buf.clear();
buf.split_to(1);
prop_assert!(l == buf.remaining_mut() && buf.is_empty(),
"Wrong buffer init2 {}/{}/{}", l, buf.remaining_mut(), buf.is_empty());
prop_assert_eq!(Err(Error::WriteZero), encode(&pkt, &mut buf),
"small buffer capacity {}/{}", buf.capacity(), encoded.len());
}
}
}
};
Expand Down
Loading

0 comments on commit 12be9d2

Please sign in to comment.