Skip to content

Commit

Permalink
feat: 🌈 Work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
fungiboletus committed Apr 5, 2024
1 parent a961209 commit f1d9045
Show file tree
Hide file tree
Showing 13 changed files with 817 additions and 26 deletions.
364 changes: 358 additions & 6 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,8 @@ opcua = { version = "0.12", default-features = false, features = [
] }
serde_bytes = "0.11.14"
sentry = { version = "0.32.2", features = ["anyhow", "tower"] }
rumqttc = { version = "0.24.0", features = ["url", "websocket"] }
url = "2.5.0"
rand = "0.8.5"
geobuf = "0.1.4"
protobuf = "3.0.2"
8 changes: 5 additions & 3 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use anyhow::Error;
use confique::Config;
use serde::Deserialize;
use serde_inline_default::serde_inline_default;
use std::{
net::IpAddr,
sync::{Arc, OnceLock},
};

use self::opcua::OpcuaConfig;
use self::{mqtt::MqttConfig, opcua::OpcuaConfig};
pub mod mqtt;
pub mod opcua;

#[derive(Debug, Config)]
Expand Down Expand Up @@ -43,6 +42,9 @@ pub struct SensAppConfig {

#[config(env = "SENSAPP_OPC_UA")]
pub opcua: Option<Vec<OpcuaConfig>>,

#[config(env = "SENSAPP_MQTT")]
pub mqtt: Option<Vec<MqttConfig>>,
}

impl SensAppConfig {
Expand Down
19 changes: 19 additions & 0 deletions src/config/mqtt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use serde::Deserialize;
use serde_inline_default::serde_inline_default;

#[serde_inline_default]
#[derive(Debug, Deserialize, Clone)]
pub struct MqttConfig {
pub url: String,
pub client_id: Option<String>,
pub username: Option<String>,
pub password: Option<String>,

#[serde_inline_default(30)]
pub keep_alive_seconds: u64,
// pub topics: Vec<String>,
// pub qos: u8,
// pub retain: bool,
// pub clean_session: bool,
// pub keep_alive: u16,
}
48 changes: 42 additions & 6 deletions src/config/opcua.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use opcua::client;
use opcua::client::prelude::{
ClientBuilder, ClientEndpoint, ClientUserToken, ANONYMOUS_USER_TOKEN_ID,
};
use opcua::types::{MessageSecurityMode, UserTokenPolicy};
use serde::de::{self, Visitor};
use serde::{Deserialize, Deserializer};
use serde::Deserialize;
use serde_bytes::ByteBuf;
use serde_inline_default::serde_inline_default;
use std::fmt;
use uuid::Uuid;

#[derive(Debug, Deserialize, Clone)]
pub struct OpcuaUserTokenConfig {
Expand Down Expand Up @@ -70,6 +65,43 @@ impl From<OpcuaIdentifier> for opcua::types::Identifier {
}
}

#[serde_inline_default]
#[derive(Debug, Deserialize, Clone)]
pub struct OpcuaAutoDiscovery {
#[serde_inline_default(true)]
pub enabled: bool,

// Start with the root node by default
pub start_node: Option<OpcuaIdentifier>,

// List of nodes to exclude from the discovery.
#[serde_inline_default(Vec::new())]
pub excluded_nodes: Vec<OpcuaIdentifier>,

// Maximum discovery depth
#[serde_inline_default(32)]
pub max_depth: usize,

// Maximum number of nodes to discover.
#[serde_inline_default(1024)]
pub max_nodes: usize,

// Regular expression to filter out nodes based on their browse name.
pub node_browse_name_exclude_regex: Option<String>,

// Regular expression to select variables based on their node id identifier.
// If it's not a string, it uses the string representation of the identifier.
pub variable_identifier_include_regex: Option<String>,

// Allow browsing accross namespaces
#[serde_inline_default(false)]
pub discover_across_namespaces: bool,

// Filter out variables that have sub nodes.
#[serde_inline_default(true)]
pub skip_variables_with_children: bool,
}

#[serde_inline_default]
#[derive(Debug, Deserialize, Clone)]
pub struct OpcuaSubscriptionConfig {
Expand All @@ -80,8 +112,12 @@ pub struct OpcuaSubscriptionConfig {
pub name_prefix: Option<String>,

// The subscription identifiers
#[serde_inline_default(Vec::new())]
pub identifiers: Vec<OpcuaIdentifier>,

// Autodiscovery feature
pub autodiscovery: Option<OpcuaAutoDiscovery>,

// Publishing interval
#[serde_inline_default(1000.0f64)]
pub publishing_interval: f64,
Expand Down
1 change: 1 addition & 0 deletions src/ingestors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod http;
pub mod mqtt;
pub mod opcua;
3 changes: 3 additions & 0 deletions src/ingestors/mqtt/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod mqtt_client;

pub use mqtt_client::mqtt_client;
119 changes: 119 additions & 0 deletions src/ingestors/mqtt/mqtt_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use crate::{bus::EventBus, config::mqtt::MqttConfig};
use anyhow::{bail, Context, Result};
use rand::distributions::Alphanumeric;
use rand::Rng;
use rumqttc::{AsyncClient, MqttOptions, Transport};
use std::{sync::Arc, time::Duration};

fn random_client_id() -> String {
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(12)
.map(char::from)
.collect()
}

fn get_client_id(config: &MqttConfig) -> String {
config.client_id.clone().unwrap_or_else(|| {
let mut client_id = "sensapp-".to_string();
client_id.push_str(&random_client_id());
client_id
})
}

fn make_websocket_client_options(config: &MqttConfig, tls: bool) -> Result<MqttOptions> {
let url = &config.url;
let parsed_url =
url::Url::parse(url).context(format!("Failed to parse MQTT WS URL: {}", url))?;

let port = parsed_url.port().unwrap_or(8000);

let mut mqtt_options = MqttOptions::new(get_client_id(config), url, port);

if tls {
mqtt_options.set_transport(Transport::wss_with_default_config());
} else {
mqtt_options.set_transport(Transport::Ws);
}

configure_mqtt_options(config, mqtt_options)
}

fn make_client_options(config: &MqttConfig) -> Result<MqttOptions> {
let ws = config.url.starts_with("ws://");
let ws_tls = config.url.starts_with("wss://");
if ws || ws_tls {
return make_websocket_client_options(config, ws_tls);
}

let mut parsed_url = url::Url::parse(&config.url)
.with_context(|| format!("Failed to parse MQTT URL: {}", config.url))?;

// The rumqttc crate requires the client_id as a GET parameter in the URL.
// I don't like it so it can be passed as a normal argument. However it still
// must be passed in the URL as the client_id is not mutable.
let has_client_id = parsed_url.query_pairs().any(|(key, _)| key == "client_id");
let url = if has_client_id {
if config.client_id.is_some() {
bail!("client_id is not allowed in `url` when it is set in MqttConfig");
}
config.url.clone()
} else {
let mut queries = parsed_url.query_pairs_mut();
queries.append_pair("client_id", &get_client_id(config));
queries.finish().to_string()
};

let mqtt_options = MqttOptions::parse_url(url).context("Failed to parse MQTT URL")?;

configure_mqtt_options(config, mqtt_options)
}

fn configure_mqtt_options(
config: &MqttConfig,
mut mqtt_options: MqttOptions,
) -> Result<MqttOptions> {
mqtt_options.set_keep_alive(Duration::from_secs(config.keep_alive_seconds));

if let Some(username) = &config.username.clone() {
let password = config.password.clone().unwrap_or_default();
mqtt_options.set_credentials(username, password);
}

Ok(mqtt_options)
}

pub async fn mqtt_client(config: MqttConfig, event_bus: Arc<EventBus>) -> Result<()> {
let mqtt_options = make_client_options(&config)?;

let (client, mut event_loop) = AsyncClient::new(mqtt_options, 16);

client
.subscribe("GAIA/AIS", rumqttc::QoS::AtLeastOnce)
.await?;

loop {
let notification = event_loop.poll().await;
match notification {
Ok(rumqttc::Event::Incoming(rumqttc::Packet::Publish(publish))) => {
let topic = publish.topic;
let payload = publish.payload;
println!("Received message on topic: {}", topic);
println!("Payload: {:?}", payload);
let mut geobuf = geobuf::geobuf_pb::Data::new();
use protobuf::Message;
geobuf.merge_from_bytes(&payload); //.unwrap();
match geobuf::decode::Decoder::decode(&geobuf).unwrap() {
serde_json::Value::Object(geojson) => {
println!("GeoJSON: {:?}", geojson);
}
_ => {}
}
}
Ok(_) => {}
Err(e) => {
bail!("MQTT client error: {:?}", e);
}
}
}
}
7 changes: 4 additions & 3 deletions src/ingestors/opcua/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod client;
mod utils;
mod opcua_browser;
mod opcua_client;
mod opcua_utils;

pub use client::opcua_client;
pub use opcua_client::opcua_client;
Loading

0 comments on commit f1d9045

Please sign in to comment.