Skip to content

Commit

Permalink
cloudevents#9 Encoders for MQTT
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed Nov 9, 2020
1 parent 609c036 commit 5d446f4
Show file tree
Hide file tree
Showing 8 changed files with 714 additions and 1 deletion.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ members = [
"cloudevents-sdk-actix-web",
"cloudevents-sdk-reqwest",
"cloudevents-sdk-rdkafka",
"cloudevents-sdk-warp"
"cloudevents-sdk-warp",
"cloudevents-sdk-mqtt"
]
exclude = [
"example-projects/actix-web-example",
"example-projects/reqwest-wasm-example",
"example-projects/rdkafka-example",
"example-projects/warp-example",
"example-projects/mqtt-example"
]
18 changes: 18 additions & 0 deletions cloudevents-sdk-mqtt/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "cloudevents-sdk-mqtt"
version = "0.2.0"
authors = ["Francesco Guardiani <[email protected]>"]
license-file = "../LICENSE"
edition = "2018"
description = "CloudEvents official Rust SDK - Mqtt integration"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
cloudevents-sdk = { version = "0.2.0", path = ".." }
lazy_static = "1.4.0"
paho-mqtt = { path = "../../paho.mqtt.rust" }
chrono = { version = "^0.4", features = ["serde"] }

[dev-dependencies]
serde_json = "^1.0"
35 changes: 35 additions & 0 deletions cloudevents-sdk-mqtt/src/headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use cloudevents::event::SpecVersion;
use lazy_static::lazy_static;
use std::collections::HashMap;

macro_rules! attribute_name_to_header {
($attribute:expr) => {
format!("ce_{}", $attribute)
};
}

fn attributes_to_headers(it: impl Iterator<Item = &'static str>) -> HashMap<&'static str, String> {
it.map(|s| {
if s == "datacontenttype" {
(s, String::from("content-type"))
} else {
(s, attribute_name_to_header!(s))
}
})
.collect()
}

lazy_static! {
pub(crate) static ref ATTRIBUTES_TO_MQTT_HEADERS: HashMap<&'static str, String> =
attributes_to_headers(SpecVersion::all_attribute_names());
}

pub(crate) static SPEC_VERSION_HEADER: &'static str = "ce_specversion";
pub(crate) static CLOUDEVENTS_JSON_HEADER: &'static str = "application/cloudevents+json";
pub(crate) static CONTENT_TYPE: &'static str = "content-type";

pub enum MqttVersion {
V3_1,
V3_1_1,
V5,
}
14 changes: 14 additions & 0 deletions cloudevents-sdk-mqtt/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//! This library provides Mqtt protocol bindings for CloudEvents
//! using the [paho.mqtt.rust](https://github.com/eclipse/paho.mqtt.rust) library.\\
#[macro_use]
mod headers;
mod mqtt_producer_record;
mod mqtt_consumer_record;

pub use mqtt_consumer_record::record_to_event;
pub use mqtt_consumer_record::ConsumerMessageDeserializer;
pub use mqtt_consumer_record::MessageExt;

pub use mqtt_producer_record::MessageBuilderExt;
pub use mqtt_producer_record::MessageRecord;
pub use headers::MqttVersion;
225 changes: 225 additions & 0 deletions cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
use crate::headers;
use cloudevents::event::SpecVersion;
use cloudevents::message::{Result, BinarySerializer, BinaryDeserializer, MessageAttributeValue,
MessageDeserializer, Encoding, StructuredSerializer, StructuredDeserializer};
use cloudevents::{message, Event};
use paho_mqtt::{Message, PropertyCode};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::str;

pub struct ConsumerMessageDeserializer {
pub(crate) headers: HashMap<String, Vec<u8>>,
pub(crate) payload: Option<Vec<u8>>,
}

impl ConsumerMessageDeserializer {
fn get_mqtt_headers(message: &Message) -> Result<HashMap<String, Vec<u8>>> {
let mut hm = HashMap::new();
let prop_iterator = message.properties().iter(PropertyCode::UserProperty);

for property in prop_iterator {
let header = property.get_string_pair().unwrap();
hm.insert(header.0.to_string(), Vec::from(header.1));
}

Ok(hm)
}

pub fn new(message: &Message) -> Result<ConsumerMessageDeserializer> {
Ok(ConsumerMessageDeserializer {
headers: Self::get_mqtt_headers(message)?,
payload: Some(message.payload()).map(|s| Vec::from(s)),
})
}
}

impl BinaryDeserializer for ConsumerMessageDeserializer {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(mut self, mut visitor: V) -> Result<R> {
if self.encoding() != Encoding::BINARY {
return Err(message::Error::WrongEncoding {})
}

let spec_version = SpecVersion::try_from(
str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..])
.map_err(|e| cloudevents::message::Error::Other {
source: Box::new(e),
})?,
)?;

visitor = visitor.set_spec_version(spec_version.clone())?;

let attributes = spec_version.attribute_names();

if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) {
visitor = visitor.set_attribute(
"datacontenttype",
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
cloudevents::message::Error::Other {
source: Box::new(e),
}
})?),
)?
}

for (hn, hv) in self
.headers
.into_iter()
.filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
{
let name = &hn["ce_".len()..];

if attributes.contains(&name) {
visitor = visitor.set_attribute(
name,
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
cloudevents::message::Error::Other {
source: Box::new(e),
}
})?),
)?
} else {
visitor = visitor.set_extension(
name,
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
cloudevents::message::Error::Other {
source: Box::new(e),
}
})?),
)?
}
}

if self.payload != None {
visitor.end_with_data(self.payload.unwrap())
} else {
visitor.end()
}
}
}

impl StructuredDeserializer for ConsumerMessageDeserializer {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
visitor.set_structured_event(self.payload.unwrap())
}
}

impl MessageDeserializer for ConsumerMessageDeserializer {
fn encoding(&self) -> Encoding {
match (
self.headers
.get("content-type")
.map(|s| String::from_utf8(s.to_vec()).ok())
.flatten()
.map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER))
.unwrap_or(false),
self.headers.get(headers::SPEC_VERSION_HEADER),
) {
(true, _) => Encoding::STRUCTURED,
(_, Some(_)) => Encoding::BINARY,
_ => Encoding::UNKNOWN,
}
}
}

pub fn record_to_event(msg: &Message, version: headers::MqttVersion) -> Result<Event> {
match version {
headers::MqttVersion::V5 => BinaryDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?),
headers::MqttVersion::V3_1 => StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?),
headers::MqttVersion::V3_1_1 => StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?),
}
}

pub trait MessageExt {
fn to_event(&self, version: headers::MqttVersion) -> Result<Event>;
}

impl MessageExt for Message {
fn to_event(&self, version: headers::MqttVersion) -> Result<Event> {
record_to_event(self, version)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::mqtt_producer_record::MessageRecord;

use chrono::Utc;
use cloudevents::{EventBuilder, EventBuilderV10};
use crate::MessageBuilderExt;
use serde_json::json;
use cloudevents::event::Data;

#[test]
fn test_binary_record() {
let time = Utc::now();

let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.time(time)
.source("http://localhost")
.data("application/json",
Data::Binary(String::from("{\"hello\":\"world\"}").into_bytes()))
.extension("someint", "10")
.build()
.unwrap();

let message_record = MessageRecord::from_event(
EventBuilderV10::new()
.id("0001")
.ty("example.test")
.time(time)
.source("http://localhost")
.extension("someint", "10")
.data("application/json", json!({"hello": "world"}))
.build()
.unwrap(),
headers::MqttVersion::V5,
)
.unwrap();

let msg = MessageBuilder::new()
.topic("test")
.message_record(&message_record)
.qos(1)
.finalize();

assert_eq!(msg.to_event(headers::MqttVersion::V5).unwrap(), expected)
}

#[test]
fn test_structured_record() {
let j = json!({"hello": "world"});

let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
.data("application/cloudevents+json", j.clone())
.extension("someint", "10")
.build()
.unwrap();

let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
.data("application/cloudevents+json", j.clone())
.extension("someint", "10")
.build()
.unwrap();

let serialized_event =
StructuredDeserializer::deserialize_structured(input, MessageRecord::new()).unwrap();

let msg = MessageBuilder::new()
.topic("test")
.message_record(&serialized_event)
.qos(1)
.finalize();

assert_eq!(msg.to_event(headers::MqttVersion::V3_1_1).unwrap(), expected)
}
}
Loading

0 comments on commit 5d446f4

Please sign in to comment.