Skip to content

Commit

Permalink
feat: ✨ Work in progress
Browse files Browse the repository at this point in the history
Sensapp in rust work in progress. This may change a lot.
  • Loading branch information
fungiboletus committed Jan 16, 2024
1 parent cb8d608 commit 05e1f9a
Show file tree
Hide file tree
Showing 31 changed files with 1,921 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ jobs:
steps:
- uses: actions/checkout@v3
- run: rustup update ${{ matrix.toolchain }} && rustup default ${{ matrix.toolchain }}
- run: cargo install sqlx-cli --no-default-features --features sqlite
- run: cargo sqlx prepare
- run: cargo clippy --verbose
- run: cargo build --verbose
- run: cargo test --verbose
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,8 @@ Cargo.lock

# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb

# For code coverage
coverage/

.sqlx/
2 changes: 1 addition & 1 deletion .vscode/extensions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"matklad.rust-analyzer",
"serayuzgur.crates",
"EditorConfig.EditorConfig",
"bungcip.better-toml",
"tamasfe.even-better-toml",
"vadimcn.vscode-lldb",
"usernamehw.errorlens"
],
Expand Down
30 changes: 29 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,36 @@
[package]
name = "sensapp"
version = "0.1.0"
version = "0.2.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0"
#async-stream = "0.3"
async-trait = "0.1"
axum = { version = "0.7" }
#axum-streams = { version = "0.12", features = ["json", "csv", "text"] }
#bytes = "1.5"
futures = "0.3"
#futures-util = { version = "0.3", features = ["io"] }
#http-body = "1.0"
#http-body-util = "0.1"
polars = { version = "0.36" }
sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite"] }
tokio = { version = "1.35", features = ["full"] }
tokio-stream = { version = "0.1", features = ["io-util"] }
tokio-util = "0.7"
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.5", features = ["full"] }
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = "1.6"
csv-async = "1.2"
rust_decimal = "1.33.1"
geo = "0.27.0"
async-broadcast = "0.6.0"
cached = { version = "0.47.0", features = ["async", "tokio", "async-trait"] }
nom = "7.1"
sindit-senml = "0.2.0"
serde_json = "1.0"
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ It enables the handling of small time series data of the edge efficiently to lar

- **Flexible Time Series DataBase Storage**: Supports various time-series databases like SQLite, PostgreSQL (with optional TimeScaleDB plugin), and ClickHouse, with the potential to extend support to other databases in the future.
- **Data Lake Storage**: Supports Parquet files over S3 compatible object stores for long-term time-series data storage.
- **Multiple Data Ingestion Protocols**: Easy data ingestion via HTTP REST API, MQTT, AMQP, KAFKA, and NATS.
- **Multiple Data Ingestion Protocols**: Easy data ingestion via HTTP REST API, MQTT, AMQP, KAFKA, OPCUA, and NATS.
- **Compatibility with Existing Pipelines**: Offers Prometheus Remote Write and InfluxDB line format support for seamless integration into existing sensor data pipelines.
- **Data formats**: Supports various data formats like JSON, CSV, Parquet, or SenML.

Expand All @@ -26,6 +26,15 @@ Check the [ARCHITECTURE.md](docs/ARCHITECTURE.md) file for more details.

SensApp is developed using Rust, a language known for its performance, memory safety, and annoying borrow checker. SensApp used to be written in Scala, but the new author prefers Rust.

Not only the language, it's also the extensive high quality open-source ecosystem that makes Rust a great choice for SensApp:

* [Tokio](https://tokio.rs/) asynchronous runtime
* [Serde](https://serde.rs/) serialization framework
* [Axum](https://github.com/tokio-rs/axum) web framework
* [SQLx](https://github.com/launchbadge/sqlx) database driver
* [Polars](https://pola.rs) data frame library
* *and many more…*

## Contributing

We appreciate your interest in contributing to SensApp! Contributing is as simple as submitting an issue or a merge/pull request. Please read the [CONTRIBUTING.md](CONTRIBUTING.md) file for more details.
Expand Down
5 changes: 5 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fn main() {
// trigger recompilation when a new migration is added
println!("cargo:rerun-if-changed=src/storage/postgresql/migrations");
println!("cargo:rerun-if-changed=src/storage/sqlite/migrations");
}
4 changes: 4 additions & 0 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,7 @@ SensApp should acknowledge the persistence of the incoming data once the storage
The publisher should favour the message queue ingestion pipeline if resilience is a concern.

The storage backend and the message queue should be resilient.

## Internal Software Architecture

Internally, SensApp uses a message bus and queues to communicate between components. This is inspired by the NodeJS, Apache NiFi, and the reactive manifesto.
19 changes: 16 additions & 3 deletions docs/DATAMODEL.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ SensApp distinguises between:
- **Numeric** values, which are decimal numbers, that shouldn't be approximate values. This is not supported by SQLite 3, it was during the SQLite4 experimental project, but is supported by PostGreSQL and ClickHouse.
- **String** values, which are UTF-8 encoded strings.
- **Boolean** values, which are true or false.
- **Localisation** values, which are latitude and longitude coordinates, with an optional altitude. We consider earth as the center of the universe. _Do not_ use this type for space projects, and rely on multiple sensors instead.
- **Locations** values, which are latitude and longitude coordinates, with an optional altitude. We consider earth as the center of the universe. _Do not_ use this type for space projects, and rely on multiple sensors instead.
- **JSON** values, which are JSON objects. This not a recommended type but it can be very convenient to store complex data.


```mermaid
Expand Down Expand Up @@ -93,18 +94,26 @@ erDiagram
SENSORS ||--o{ INTEGER_VALUES : ""
SENSORS ||--o{ NUMERIC_VALUES : ""
SENSORS ||--o{ FLOAT_VALUES : ""
SENSORS ||--o{ LOCALISATIONS : ""
SENSORS ||--o{ LOCATION_VALUES : ""
SENSORS ||--o{ BOOLEAN_VALUES : ""
%% Localisations are common enough to be part of the core data model
LOCALISATIONS {
LOCATION_VALUES {
UUID sensor
DateTime datetime
Float latitude
Float longitude
}
```

## Virtual Composite Sensors

SensApp can compose sensors together. For example if you have a sensor that measures the temperature and another one that measures the humidity, you can create a virtual sensor that will consist of both the temperature and humidity sensors.

This can be useful to simplify the data model and the queries. Composite Sensors can also be represented as materialised views in the database, which can improve the read performances.

Virtual Sensors time-series data is joined through the timestamp, using a configurable window size. For example every second, minute, day… It is possible to have a composite sensor consisting of only one sensor to enable resampling.

## Optimisations and Compression

SensApp does not attempt to do optimisation and compression on the time series data itself. This is left to the storage backend. PostGreSQL with the TimeScaleDB extension, ClickHouse, or Parquet files will optimise and compress the data pretty well.
Expand Down Expand Up @@ -155,3 +164,7 @@ In practice, we expect sensors to not generate unique distinct strings all the t
## Geolocalisation and Coordinates Systems

In the current version, the geolocalised data doesn't really mind the coordinate system used. The data is likely going to use WGS84, but it could be ETRS89 or something else. It's up to the publisher and the consumer to agree on the coordinate system used, for now.

## TimeStamps are in microseconds

We use microsecond timestamps, as it provides a good compromise between precision and storage size. Some time-series database go down to nanoseconds but then the minimum and maximum timestamps are too close in times using 64 bits integers. It should be possible to have historical data and prediction data in SensApp. We haven't identified use cases that would require nanosecond precision in our research. People with such use cases should consider patching SensApp or using another solution.
73 changes: 73 additions & 0 deletions src/bus/event_bus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use super::message::{Message, PublishMessage};
use crate::datamodel::batch::Batch;
use anyhow::Result;
use std::sync::Arc;

#[derive(Debug)]
pub struct EventBus {
pub name: String,
//main_bus_sender: mpsc::Sender<u8>,
//main_bus_receiver: mpsc::Receiver<u8>,
//pub main_bus_sender: async_channel::Sender<u8>,
//pub main_bus_receiver: async_channel::Receiver<u8>,
//pub main_bus_sender: tokio::sync::broadcast::Sender<u8>,
//pub main_bus_receiver: tokio::sync::broadcast::Receiver<u8>,
pub main_bus_sender: async_broadcast::Sender<Message>,
pub main_bus_receiver: async_broadcast::InactiveReceiver<Message>,
}

impl EventBus {
// Create a new event bus.
// Please note that the receiver is inactive by default as it may be cloned many times.
// Consider using .activate() or .activate_cloned() to activate it.
pub fn init(name: String) -> Self {
// let (tx, rx) = mpsc::channel(10);
//let (s, _) = tokio::sync::broadcast::channel::<u8>(1000);
//let (s, r) = async_broadcast::broadcast(128);
let (s, r) = async_broadcast::broadcast(128);
let r = r.deactivate();
Self {
name,
main_bus_sender: s,
main_bus_receiver: r,
}
}

async fn broadcast(&self, message: Message) -> Result<()> {
//self.main_bus_sender.send(event).await?;
//self.main_bus_sender.send(event)?;
self.main_bus_sender.broadcast(message).await?;
Ok(())
}

pub async fn publish(&self, batch: Batch) -> Result<async_broadcast::InactiveReceiver<()>> {
// We create a new broadcast channel to receive the sync message.
// It can technically have multiple emitters and multiple receivers.
// In most cases, it should be a one to one relationship, but
// it could be possible to have multiple storage backends and a single
// receiver that waits for the first one to sync, or all.
let (sync_sender, sync_receiver) = async_broadcast::broadcast(1);
let sync_receiver = sync_receiver.deactivate();

self.broadcast(Message::Publish(PublishMessage {
batch: Arc::new(batch),
sync_sender,
sync_receiver: sync_receiver.clone(),
}))
.await?;

Ok(sync_receiver)
}

// receive
/*pub async fn receive_one(&mut self) -> Result<u8> {
self.main_bus_receiver
.recv()
.await
.map_err(|e| anyhow::anyhow!("Failed to receive event: {}", e))
}*/
}

pub fn init_event_bus() -> Arc<EventBus> {
Arc::new(EventBus::init("SensApp".to_string()))
}
18 changes: 18 additions & 0 deletions src/bus/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use std::sync::Arc;

#[derive(Debug, Clone)]
pub enum Message {
Publish(PublishMessage),
}

#[derive(Debug, Clone)]
pub struct PublishMessage {
pub batch: Arc<crate::datamodel::batch::Batch>,
// A request sync message is sent to ask the storage backends
// to sync. This is done to ensure that the data is persisted.
//
// However, some storage backends many support syncing, or may lie about
// syncing. This is also true for some storage hardware nowadays.
pub sync_sender: async_broadcast::Sender<()>,
pub sync_receiver: async_broadcast::InactiveReceiver<()>,
}
4 changes: 4 additions & 0 deletions src/bus/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod event_bus;
pub mod message;
pub mod utils;
pub use event_bus::EventBus;
57 changes: 57 additions & 0 deletions src/bus/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::sync::Arc;

use anyhow::Result;
use async_broadcast::{InactiveReceiver, Receiver, Sender};
use tokio::sync::Mutex;

#[derive(Debug)]
pub struct WaitForAll {
nb_started: Arc<Mutex<usize>>,
nb_finished: Arc<Mutex<usize>>,
finished_sender: Sender<()>,
finished_receiver: InactiveReceiver<()>,
}

impl WaitForAll {
pub fn new() -> Self {
let (s, r) = async_broadcast::broadcast(1);
Self {
nb_started: Arc::new(Mutex::new(0)),
nb_finished: Arc::new(Mutex::new(0)),
finished_sender: s,
finished_receiver: r.deactivate(),
}
}

pub async fn add(&mut self, mut receiver: Receiver<()>) {
{
let mut nb_started = self.nb_started.lock().await;
*nb_started += 1;
}
let nb_finished_clone = self.nb_finished.clone();
let finished_sender_clone = self.finished_sender.clone();

tokio::spawn(async move {
let _ = receiver.recv().await;
{
let mut nb_finished = nb_finished_clone.lock().await;
*nb_finished += 1;
}
if !finished_sender_clone.is_closed() && finished_sender_clone.receiver_count() > 0 {
let _ = finished_sender_clone.broadcast(()).await;
}
});
}

pub async fn wait(&mut self) -> Result<()> {
// If already finished, return immediately
if *self.nb_started.lock().await == *self.nb_finished.lock().await {
return Ok(());
}

let mut receiver = self.finished_receiver.activate_cloned();
receiver.recv().await?;

Ok(())
}
}
25 changes: 25 additions & 0 deletions src/datamodel/batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use std::sync::Arc;

#[derive(Debug)]
pub struct Sample<V> {
pub timestamp_ms: i64,
pub value: V,
}

#[derive(Debug)]
pub enum TypedSamples {
Integer(Vec<Sample<i64>>),
Numeric(Vec<Sample<rust_decimal::Decimal>>),
Float(Vec<Sample<f64>>),
String(Vec<Sample<String>>),
Boolean(Vec<Sample<bool>>),
Location(Vec<Sample<geo::Point>>),
Blob(Vec<Sample<Vec<u8>>>),
}

#[derive(Debug)]
pub struct Batch {
pub sensor_uuid: uuid::Uuid,
pub sensor_name: String,
pub samples: Arc<TypedSamples>,
}
Loading

0 comments on commit 05e1f9a

Please sign in to comment.