diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index efd6d01..447cf2c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,5 +20,8 @@ jobs: steps: - uses: actions/checkout@v3 - run: rustup update ${{ matrix.toolchain }} && rustup default ${{ matrix.toolchain }} + - run: cargo install sqlx-cli --no-default-features --features sqlite + - run: cargo sqlx prepare + - run: cargo clippy --verbose - run: cargo build --verbose - run: cargo test --verbose diff --git a/.gitignore b/.gitignore index 94c7b4e..47d8ad9 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,8 @@ Cargo.lock # MSVC Windows builds of rustc generate these, which store debugging information *.pdb + +# For code coverage +coverage/ + +.sqlx/ diff --git a/.vscode/extensions.json b/.vscode/extensions.json index e4d2481..1a1f06c 100644 --- a/.vscode/extensions.json +++ b/.vscode/extensions.json @@ -3,7 +3,7 @@ "matklad.rust-analyzer", "serayuzgur.crates", "EditorConfig.EditorConfig", - "bungcip.better-toml", + "tamasfe.even-better-toml", "vadimcn.vscode-lldb", "usernamehw.errorlens" ], diff --git a/Cargo.toml b/Cargo.toml index d34821f..9c7f5c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,36 @@ [package] name = "sensapp" -version = "0.1.0" +version = "0.2.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = "1.0" +#async-stream = "0.3" +async-trait = "0.1" +axum = { version = "0.7" } +#axum-streams = { version = "0.12", features = ["json", "csv", "text"] } +#bytes = "1.5" +futures = "0.3" +#futures-util = { version = "0.3", features = ["io"] } +#http-body = "1.0" +#http-body-util = "0.1" +polars = { version = "0.36" } +sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite"] } +tokio = { version = "1.35", features = ["full"] } +tokio-stream = { version = "0.1", features = ["io-util"] } +tokio-util = "0.7" +tower = { version = "0.4", features = ["full"] } +tower-http = { version = "0.5", features = ["full"] } +tracing = { version = "0.1" } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +uuid = "1.6" +csv-async = "1.2" +rust_decimal = "1.33.1" +geo = "0.27.0" +async-broadcast = "0.6.0" +cached = { version = "0.47.0", features = ["async", "tokio", "async-trait"] } +nom = "7.1" +sindit-senml = "0.2.0" +serde_json = "1.0" diff --git a/README.md b/README.md index d11a545..2057e72 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ It enables the handling of small time series data of the edge efficiently to lar - **Flexible Time Series DataBase Storage**: Supports various time-series databases like SQLite, PostgreSQL (with optional TimeScaleDB plugin), and ClickHouse, with the potential to extend support to other databases in the future. - **Data Lake Storage**: Supports Parquet files over S3 compatible object stores for long-term time-series data storage. -- **Multiple Data Ingestion Protocols**: Easy data ingestion via HTTP REST API, MQTT, AMQP, KAFKA, and NATS. +- **Multiple Data Ingestion Protocols**: Easy data ingestion via HTTP REST API, MQTT, AMQP, KAFKA, OPCUA, and NATS. - **Compatibility with Existing Pipelines**: Offers Prometheus Remote Write and InfluxDB line format support for seamless integration into existing sensor data pipelines. - **Data formats**: Supports various data formats like JSON, CSV, Parquet, or SenML. @@ -26,6 +26,15 @@ Check the [ARCHITECTURE.md](docs/ARCHITECTURE.md) file for more details. SensApp is developed using Rust, a language known for its performance, memory safety, and annoying borrow checker. SensApp used to be written in Scala, but the new author prefers Rust. +Not only the language, it's also the extensive high quality open-source ecosystem that makes Rust a great choice for SensApp: + +* [Tokio](https://tokio.rs/) asynchronous runtime +* [Serde](https://serde.rs/) serialization framework +* [Axum](https://github.com/tokio-rs/axum) web framework +* [SQLx](https://github.com/launchbadge/sqlx) database driver +* [Polars](https://pola.rs) data frame library +* *and many more…* + ## Contributing We appreciate your interest in contributing to SensApp! Contributing is as simple as submitting an issue or a merge/pull request. Please read the [CONTRIBUTING.md](CONTRIBUTING.md) file for more details. diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..37d8bd7 --- /dev/null +++ b/build.rs @@ -0,0 +1,5 @@ +fn main() { + // trigger recompilation when a new migration is added + println!("cargo:rerun-if-changed=src/storage/postgresql/migrations"); + println!("cargo:rerun-if-changed=src/storage/sqlite/migrations"); +} diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 8c3bf14..4091e7d 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -137,3 +137,7 @@ SensApp should acknowledge the persistence of the incoming data once the storage The publisher should favour the message queue ingestion pipeline if resilience is a concern. The storage backend and the message queue should be resilient. + +## Internal Software Architecture + +Internally, SensApp uses a message bus and queues to communicate between components. This is inspired by the NodeJS, Apache NiFi, and the reactive manifesto. diff --git a/docs/DATAMODEL.md b/docs/DATAMODEL.md index f82c8d2..878e2a7 100644 --- a/docs/DATAMODEL.md +++ b/docs/DATAMODEL.md @@ -15,7 +15,8 @@ SensApp distinguises between: - **Numeric** values, which are decimal numbers, that shouldn't be approximate values. This is not supported by SQLite 3, it was during the SQLite4 experimental project, but is supported by PostGreSQL and ClickHouse. - **String** values, which are UTF-8 encoded strings. - **Boolean** values, which are true or false. - - **Localisation** values, which are latitude and longitude coordinates, with an optional altitude. We consider earth as the center of the universe. _Do not_ use this type for space projects, and rely on multiple sensors instead. + - **Locations** values, which are latitude and longitude coordinates, with an optional altitude. We consider earth as the center of the universe. _Do not_ use this type for space projects, and rely on multiple sensors instead. + - **JSON** values, which are JSON objects. This not a recommended type but it can be very convenient to store complex data. ```mermaid @@ -93,11 +94,11 @@ erDiagram SENSORS ||--o{ INTEGER_VALUES : "" SENSORS ||--o{ NUMERIC_VALUES : "" SENSORS ||--o{ FLOAT_VALUES : "" - SENSORS ||--o{ LOCALISATIONS : "" + SENSORS ||--o{ LOCATION_VALUES : "" SENSORS ||--o{ BOOLEAN_VALUES : "" %% Localisations are common enough to be part of the core data model - LOCALISATIONS { + LOCATION_VALUES { UUID sensor DateTime datetime Float latitude @@ -105,6 +106,14 @@ erDiagram } ``` +## Virtual Composite Sensors + +SensApp can compose sensors together. For example if you have a sensor that measures the temperature and another one that measures the humidity, you can create a virtual sensor that will consist of both the temperature and humidity sensors. + +This can be useful to simplify the data model and the queries. Composite Sensors can also be represented as materialised views in the database, which can improve the read performances. + +Virtual Sensors time-series data is joined through the timestamp, using a configurable window size. For example every second, minute, day… It is possible to have a composite sensor consisting of only one sensor to enable resampling. + ## Optimisations and Compression SensApp does not attempt to do optimisation and compression on the time series data itself. This is left to the storage backend. PostGreSQL with the TimeScaleDB extension, ClickHouse, or Parquet files will optimise and compress the data pretty well. @@ -155,3 +164,7 @@ In practice, we expect sensors to not generate unique distinct strings all the t ## Geolocalisation and Coordinates Systems In the current version, the geolocalised data doesn't really mind the coordinate system used. The data is likely going to use WGS84, but it could be ETRS89 or something else. It's up to the publisher and the consumer to agree on the coordinate system used, for now. + +## TimeStamps are in microseconds + +We use microsecond timestamps, as it provides a good compromise between precision and storage size. Some time-series database go down to nanoseconds but then the minimum and maximum timestamps are too close in times using 64 bits integers. It should be possible to have historical data and prediction data in SensApp. We haven't identified use cases that would require nanosecond precision in our research. People with such use cases should consider patching SensApp or using another solution. diff --git a/src/bus/event_bus.rs b/src/bus/event_bus.rs new file mode 100644 index 0000000..18cc7ae --- /dev/null +++ b/src/bus/event_bus.rs @@ -0,0 +1,73 @@ +use super::message::{Message, PublishMessage}; +use crate::datamodel::batch::Batch; +use anyhow::Result; +use std::sync::Arc; + +#[derive(Debug)] +pub struct EventBus { + pub name: String, + //main_bus_sender: mpsc::Sender, + //main_bus_receiver: mpsc::Receiver, + //pub main_bus_sender: async_channel::Sender, + //pub main_bus_receiver: async_channel::Receiver, + //pub main_bus_sender: tokio::sync::broadcast::Sender, + //pub main_bus_receiver: tokio::sync::broadcast::Receiver, + pub main_bus_sender: async_broadcast::Sender, + pub main_bus_receiver: async_broadcast::InactiveReceiver, +} + +impl EventBus { + // Create a new event bus. + // Please note that the receiver is inactive by default as it may be cloned many times. + // Consider using .activate() or .activate_cloned() to activate it. + pub fn init(name: String) -> Self { + // let (tx, rx) = mpsc::channel(10); + //let (s, _) = tokio::sync::broadcast::channel::(1000); + //let (s, r) = async_broadcast::broadcast(128); + let (s, r) = async_broadcast::broadcast(128); + let r = r.deactivate(); + Self { + name, + main_bus_sender: s, + main_bus_receiver: r, + } + } + + async fn broadcast(&self, message: Message) -> Result<()> { + //self.main_bus_sender.send(event).await?; + //self.main_bus_sender.send(event)?; + self.main_bus_sender.broadcast(message).await?; + Ok(()) + } + + pub async fn publish(&self, batch: Batch) -> Result> { + // We create a new broadcast channel to receive the sync message. + // It can technically have multiple emitters and multiple receivers. + // In most cases, it should be a one to one relationship, but + // it could be possible to have multiple storage backends and a single + // receiver that waits for the first one to sync, or all. + let (sync_sender, sync_receiver) = async_broadcast::broadcast(1); + let sync_receiver = sync_receiver.deactivate(); + + self.broadcast(Message::Publish(PublishMessage { + batch: Arc::new(batch), + sync_sender, + sync_receiver: sync_receiver.clone(), + })) + .await?; + + Ok(sync_receiver) + } + + // receive + /*pub async fn receive_one(&mut self) -> Result { + self.main_bus_receiver + .recv() + .await + .map_err(|e| anyhow::anyhow!("Failed to receive event: {}", e)) + }*/ +} + +pub fn init_event_bus() -> Arc { + Arc::new(EventBus::init("SensApp".to_string())) +} diff --git a/src/bus/message.rs b/src/bus/message.rs new file mode 100644 index 0000000..69f46bc --- /dev/null +++ b/src/bus/message.rs @@ -0,0 +1,18 @@ +use std::sync::Arc; + +#[derive(Debug, Clone)] +pub enum Message { + Publish(PublishMessage), +} + +#[derive(Debug, Clone)] +pub struct PublishMessage { + pub batch: Arc, + // A request sync message is sent to ask the storage backends + // to sync. This is done to ensure that the data is persisted. + // + // However, some storage backends many support syncing, or may lie about + // syncing. This is also true for some storage hardware nowadays. + pub sync_sender: async_broadcast::Sender<()>, + pub sync_receiver: async_broadcast::InactiveReceiver<()>, +} diff --git a/src/bus/mod.rs b/src/bus/mod.rs new file mode 100644 index 0000000..da11a48 --- /dev/null +++ b/src/bus/mod.rs @@ -0,0 +1,4 @@ +pub mod event_bus; +pub mod message; +pub mod utils; +pub use event_bus::EventBus; diff --git a/src/bus/utils.rs b/src/bus/utils.rs new file mode 100644 index 0000000..6c77cbb --- /dev/null +++ b/src/bus/utils.rs @@ -0,0 +1,57 @@ +use std::sync::Arc; + +use anyhow::Result; +use async_broadcast::{InactiveReceiver, Receiver, Sender}; +use tokio::sync::Mutex; + +#[derive(Debug)] +pub struct WaitForAll { + nb_started: Arc>, + nb_finished: Arc>, + finished_sender: Sender<()>, + finished_receiver: InactiveReceiver<()>, +} + +impl WaitForAll { + pub fn new() -> Self { + let (s, r) = async_broadcast::broadcast(1); + Self { + nb_started: Arc::new(Mutex::new(0)), + nb_finished: Arc::new(Mutex::new(0)), + finished_sender: s, + finished_receiver: r.deactivate(), + } + } + + pub async fn add(&mut self, mut receiver: Receiver<()>) { + { + let mut nb_started = self.nb_started.lock().await; + *nb_started += 1; + } + let nb_finished_clone = self.nb_finished.clone(); + let finished_sender_clone = self.finished_sender.clone(); + + tokio::spawn(async move { + let _ = receiver.recv().await; + { + let mut nb_finished = nb_finished_clone.lock().await; + *nb_finished += 1; + } + if !finished_sender_clone.is_closed() && finished_sender_clone.receiver_count() > 0 { + let _ = finished_sender_clone.broadcast(()).await; + } + }); + } + + pub async fn wait(&mut self) -> Result<()> { + // If already finished, return immediately + if *self.nb_started.lock().await == *self.nb_finished.lock().await { + return Ok(()); + } + + let mut receiver = self.finished_receiver.activate_cloned(); + receiver.recv().await?; + + Ok(()) + } +} diff --git a/src/datamodel/batch.rs b/src/datamodel/batch.rs new file mode 100644 index 0000000..c2046b3 --- /dev/null +++ b/src/datamodel/batch.rs @@ -0,0 +1,25 @@ +use std::sync::Arc; + +#[derive(Debug)] +pub struct Sample { + pub timestamp_ms: i64, + pub value: V, +} + +#[derive(Debug)] +pub enum TypedSamples { + Integer(Vec>), + Numeric(Vec>), + Float(Vec>), + String(Vec>), + Boolean(Vec>), + Location(Vec>), + Blob(Vec>>), +} + +#[derive(Debug)] +pub struct Batch { + pub sensor_uuid: uuid::Uuid, + pub sensor_name: String, + pub samples: Arc, +} diff --git a/src/datamodel/mod.rs b/src/datamodel/mod.rs new file mode 100644 index 0000000..2983736 --- /dev/null +++ b/src/datamodel/mod.rs @@ -0,0 +1,110 @@ +/*use serde::{Deserialize, Serialize}; +use uuid::Uuid;*/ + +pub mod batch; + +pub enum SensorType { + Integer, + Numeric, + Float, + String, + Boolean, + Location, + JSON, + Blob, +} + +// Implement to_string() for SensorType +impl ToString for SensorType { + fn to_string(&self) -> String { + match self { + SensorType::Integer => "Integer".to_string(), + SensorType::Numeric => "Numeric".to_string(), + SensorType::Float => "Float".to_string(), + SensorType::String => "String".to_string(), + SensorType::Boolean => "Boolean".to_string(), + SensorType::Location => "Location".to_string(), + SensorType::JSON => "JSON".to_string(), + SensorType::Blob => "Blob".to_string(), + } + } +} + +/* +// Units +#[derive(Serialize, Deserialize, Debug)] +struct Unit { + id: i32, + name: String, + description: Option, +} + +// Sensors +#[derive(Serialize, Deserialize, Debug)] +struct Sensor { + sensor_id: i32, + uuid: Uuid, + name: String, + sensor_type: String, // Represent 'type' as 'sensor_type' to avoid keyword conflict + unit_id: Option, +} + +// Labels +#[derive(Serialize, Deserialize, Debug)] +struct Label { + sensor_id: i32, + named: i32, + description: Option, +} + +// LabelsNameDictionary +#[derive(Serialize, Deserialize, Debug)] +struct LabelsNameDictionary { + id: i32, + name: String, +} + +// LabelsDescriptionDictionary +#[derive(Serialize, Deserialize, Debug)] +struct LabelsDescriptionDictionary { + id: i32, + description: String, +} + +// StringsValuesDictionary +#[derive(Serialize, Deserialize, Debug)] +struct StringsValuesDictionary { + id: i32, + value: String, +} + +// Sample model (for different value types) +#[derive(Serialize, Deserialize, Debug)] +struct Sample { + datetime: i64, + value: T, +} + +// Higher-level model containing a list of samples and the sensor UUID +#[derive(Serialize, Deserialize, Debug)] +struct SensorData { + sensor_uuid: Uuid, + samples: Vec>, +} + +// Values tables (Integer, Numeric, Float, String, Boolean) +type IntegerValue = Sample; +type NumericValue = Sample; // Assuming 'Numeric' is represented as f64 +type FloatValue = Sample; +type StringValue = Sample; // Reference to StringsValuesDictionary ID +type BooleanValue = Sample; + +// Localisations +#[derive(Serialize, Deserialize, Debug)] +struct Localisation { + sensor_id: i32, + datetime: i64, + latitude: f32, + longitude: f32, +} +*/ diff --git a/src/http/mod.rs b/src/http/mod.rs new file mode 100644 index 0000000..708117c --- /dev/null +++ b/src/http/mod.rs @@ -0,0 +1,2 @@ +pub mod server; +pub mod state; diff --git a/src/http/server.rs b/src/http/server.rs new file mode 100644 index 0000000..a247730 --- /dev/null +++ b/src/http/server.rs @@ -0,0 +1,157 @@ +use anyhow::Result; +use axum::extract::DefaultBodyLimit; +use axum::extract::State; +use axum::http::header; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::response::Response; +use axum::routing::get; +use axum::routing::post; +use axum::Json; +use axum::Router; +use futures::io::BufReader; +use futures::stream::StreamExt; +use futures::TryStreamExt; +use polars::prelude::*; +use sqlx::any; +use std::io; +use std::io::Cursor; +use std::net::IpAddr; +use std::net::Ipv4Addr; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::bytes::Bytes; +use tower::ServiceBuilder; +use tower_http::trace; +use tower_http::{timeout::TimeoutLayer, trace::TraceLayer, ServiceBuilderExt}; +use tracing::Level; + +use super::state::HttpServerState; +use crate::importers::csv::publish_csv_async; + +// Anyhow error handling with axum +// https://github.com/tokio-rs/axum/blob/d3112a40d55f123bc5e65f995e2068e245f12055/examples/anyhow-error-response/src/main.rs +struct AppError(anyhow::Error); +impl IntoResponse for AppError { + fn into_response(self) -> Response { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Something went wrong: {}", self.0), + ) + .into_response() + } +} +impl From for AppError +where + E: Into, +{ + fn from(err: E) -> Self { + Self(err.into()) + } +} + +pub async fn run_http_server(state: HttpServerState, address: SocketAddr) -> Result<()> { + // Initialize tracing + tracing_subscriber::fmt() + .with_target(false) + .compact() + .init(); + + // List of headers that shouldn't be logged + let sensitive_headers: Arc<[_]> = vec![header::AUTHORIZATION, header::COOKIE].into(); + + // Middleware creation + let middleware = ServiceBuilder::new() + .sensitive_request_headers(sensitive_headers.clone()) + .layer( + TraceLayer::new_for_http() + .make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO)) + .on_response(trace::DefaultOnResponse::new().level(Level::INFO)), + ) + .sensitive_response_headers(sensitive_headers) + .layer(TimeoutLayer::new(Duration::from_secs(10))) + .compression() + .into_inner(); + + // Create our application with a single route + let app = Router::new() + .route("/", get(handler)) + .route( + "/publish", + post(publish_handler).layer(DefaultBodyLimit::max(1024 * 1024 * 1024)), + ) + //.route("/publish_stream", post(publish_stream_handler)) + .route("/publish_csv", post(publish_csv)) + .route("/fail", get(test_fail)) + .layer(middleware) + .with_state(state); + + // Run our application + let listener = tokio::net::TcpListener::bind(address).await?; + axum::serve(listener, app).await?; + + Ok(()) +} + +async fn handler(State(state): State) -> Result, AppError> { + //EVENT_BUS.get().unwrap().publish(42).await.unwrap(); + //state.event_bus.publish(42).await.unwrap(); + // let mut sync_receiver = state.event_bus.sync_request().await?; + // sync_receiver.recv().await?; + + Ok(Json(state.name)) +} + +async fn publish_csv( + State(state): State, + body: axum::body::Body, +) -> Result { + let stream = body.into_data_stream(); + let stream = stream.map_err(|err| io::Error::new(io::ErrorKind::Other, err)); + let reader = stream.into_async_read(); + //let reader = BufReader::new(stream.into_async_read()); + // csv_async already uses a BufReader internally + let csv_reader = csv_async::AsyncReaderBuilder::new() + .has_headers(true) + .delimiter(b';') + .create_reader(reader); + + publish_csv_async(csv_reader, 100, state.event_bus.clone()).await?; + + Ok("ok".to_string()) +} + +fn try_thing() -> Result<()> { + anyhow::bail!("Test error"); +} + +async fn test_fail() -> Result { + // wait 3s + tokio::time::sleep(Duration::from_secs(3)).await; + try_thing()?; + Ok("ok".to_string()) +} + +async fn publish_handler(bytes: Bytes) -> Result, (StatusCode, String)> { + let cursor = Cursor::new(bytes); + let df_result = CsvReader::new(cursor) + .with_separator(b';') + //.infer_schema(Some(128)) + //.with_dtypes( + .has_header(true) + .finish(); + + // print the schema + let schema = df_result.as_ref().unwrap().schema(); + println!("{:?}", schema); + + match df_result { + Ok(df) => Ok(Json(format!("Number of rows: {}", df.height()))), + Err(_) => Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "Error reading CSV".to_string(), + )), + } +} diff --git a/src/http/state.rs b/src/http/state.rs new file mode 100644 index 0000000..9f184a5 --- /dev/null +++ b/src/http/state.rs @@ -0,0 +1,8 @@ +use std::sync::Arc; +use crate::bus::EventBus; + +#[derive(Clone, Debug)] +pub struct HttpServerState { + pub name: String, + pub event_bus: Arc, +} diff --git a/src/importers/csv.rs b/src/importers/csv.rs new file mode 100644 index 0000000..d44c499 --- /dev/null +++ b/src/importers/csv.rs @@ -0,0 +1,65 @@ +use crate::{ + bus::{utils::WaitForAll, EventBus}, + datamodel::batch::{Batch, Sample, TypedSamples}, +}; +use anyhow::Result; +use csv_async::AsyncReader; +use futures::{io, StreamExt}; +use std::sync::Arc; + +pub async fn publish_csv_async( + mut csv_reader: AsyncReader, + batch_size: usize, + event_bus: Arc, +) -> Result<()> { + println!("{:?}", csv_reader.has_headers()); + println!("{:?}", csv_reader.headers().await.unwrap()); + let mut records = csv_reader.records(); + + let mut current_samples: Vec> = vec![]; + + let mut toto = WaitForAll::new(); + + let mut i = 0; + + println!("Reading CSV"); + while let Some(record) = records.next().await { + let record = record.unwrap(); + //println!("{:?}", record); + + current_samples.push(Sample { + timestamp_ms: 0, + value: i, + }); + + i += 1; + + if current_samples.len() >= batch_size { + let batch = Batch { + sensor_uuid: uuid::Uuid::from_bytes([0; 16]), + sensor_name: "test".to_string(), + samples: Arc::new(TypedSamples::Integer(current_samples)), + }; + let sync_receiver = event_bus.publish(batch).await?; + //sync_receiver.activate().recv().await?; + current_samples = vec![]; + toto.add(sync_receiver.activate()).await; + } + } + + if !current_samples.is_empty() { + let batch = Batch { + sensor_uuid: uuid::Uuid::from_bytes([0; 16]), + sensor_name: "test".to_string(), + samples: Arc::new(TypedSamples::Integer(current_samples)), + }; + let sync_receiver = event_bus.publish(batch).await?; + toto.add(sync_receiver.activate()).await; + } + + // Wololo ?? + toto.wait().await?; + + println!("Done reading CSV"); + Ok(()) +} diff --git a/src/importers/lazy_csv.rs b/src/importers/lazy_csv.rs new file mode 100644 index 0000000..7672422 --- /dev/null +++ b/src/importers/lazy_csv.rs @@ -0,0 +1,12 @@ +use crate::bus::EventBus; +use anyhow::Result; +use futures::io; +use std::sync::Arc; + +pub async fn publish_csv_async( + mut async_reader: R, + batch_size: usize, + event_bus: Arc, +) -> Result<()> { + Ok(()) +} diff --git a/src/importers/mod.rs b/src/importers/mod.rs new file mode 100644 index 0000000..d4996f0 --- /dev/null +++ b/src/importers/mod.rs @@ -0,0 +1 @@ +pub mod csv; diff --git a/src/infer/columns.rs b/src/infer/columns.rs new file mode 100644 index 0000000..02832ba --- /dev/null +++ b/src/infer/columns.rs @@ -0,0 +1,304 @@ +use super::infer::*; +use std::{str::FromStr, sync::Arc}; + +#[derive(Debug, Clone, PartialEq)] +pub enum InferedColumn { + Integer(Vec), + Numeric(Vec), + Float(Vec), + String(Vec), + Boolean(Vec), + JSON(Vec>), + //DateTime(Vec), +} + +pub fn infer_column(column: Vec, trim: bool, numeric: bool) -> InferedColumn { + // select the right infer method + let infer_method = if trim { + if numeric { + infer_type_with_trim_and_numeric + } else { + infer_type_with_trim + } + } else if numeric { + infer_type_with_numeric + } else { + infer_type + }; + + let infered_column = column + .iter() + .map(|value| infer_method(value)) + .collect::>(); + + let mut has_integers = false; + let mut has_numeric = false; + let mut has_floats = false; + let mut has_string = false; + let mut has_boolean = false; + let mut has_json = false; + + for infered_value in infered_column.iter() { + match infered_value { + Ok((_, InferedValue::Integer(_))) => has_integers = true, + Ok((_, InferedValue::Numeric(_))) => has_numeric = true, + Ok((_, InferedValue::Float(_))) => has_floats = true, + Ok((_, InferedValue::String(_))) => has_string = true, + Ok((_, InferedValue::JSON(_))) => has_json = true, + Ok((_, InferedValue::Boolean(_))) => has_boolean = true, + _ => panic!("Failed to infer column"), + } + } + + // If we have at least a string, everything is a string + if has_string { + // We can return the column as is + return InferedColumn::String(column); + } + + if has_json { + return InferedColumn::JSON( + infered_column + .iter() + .map(|value| match value { + Ok((_, InferedValue::JSON(value))) => value.clone(), + // Convert the other types to JSON, to be nice + Ok((_, InferedValue::Integer(value))) => { + Arc::new(serde_json::Value::from(*value)) + } + Ok((_, InferedValue::Float(value))) => { + Arc::new(serde_json::Value::from(*value)) + } + Ok((_, InferedValue::Boolean(value))) => { + Arc::new(serde_json::Value::from(*value)) + } + _ => unreachable!("We should have only JSON compatible types at this point"), + }) + .collect::>(), + ); + } + + // If we have booleans + if has_boolean { + // If we don't have only booleans, we use string instead + if has_integers || has_numeric || has_floats { + return InferedColumn::String(column); + } + return InferedColumn::Boolean( + infered_column + .iter() + .map(|value| match value { + Ok((_, InferedValue::Boolean(value))) => *value, + _ => unreachable!("We should have only booleans at this point"), + }) + .collect::>(), + ); + } + + // If we have numerics + if has_numeric { + return InferedColumn::Numeric( + infered_column + .iter() + .map(|value| match value { + Ok((_, InferedValue::Numeric(value))) => *value, + _ => unreachable!("We should have only numerics"), + }) + .collect::>(), + ); + } + + // If we have floats, integers are also floats + if has_floats { + return InferedColumn::Float( + infered_column + .iter() + .map(|value| match value { + Ok((_, InferedValue::Float(value))) => *value, + Ok((_, InferedValue::Integer(value))) => *value as f64, + _ => unreachable!("We should have only floats and integers at this point"), + }) + .collect::>(), + ); + } + + // If we have only integers + if has_integers { + return InferedColumn::Integer( + infered_column + .iter() + .map(|value| match value { + Ok((_, InferedValue::Integer(value))) => *value, + _ => unreachable!("We should have only integers at this point"), + }) + .collect::>(), + ); + } + + unreachable!("failed to infer column"); +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + + #[test] + fn test_infer_column_integers() { + let column = vec![ + "1".to_string(), + "2".to_string(), + "3".to_string(), + "4".to_string(), + ]; + let infered_column = infer_column(column, false, false); + assert_eq!(infered_column, InferedColumn::Integer(vec![1, 2, 3, 4])); + } + + #[test] + fn test_infer_column_floats() { + let column = vec![ + "1.1".to_string(), + "2.2".to_string(), + "3.3".to_string(), + "4.4".to_string(), + ]; + let infered_column = infer_column(column, false, false); + assert_eq!( + infered_column, + InferedColumn::Float(vec![1.1, 2.2, 3.3, 4.4]) + ); + + // now with a mix of integers and floats + let column = vec![ + "1.1".to_string(), + "2".to_string(), + "3.3".to_string(), + "4".to_string(), + ]; + let infered_column = infer_column(column, false, false); + assert_eq!( + infered_column, + InferedColumn::Float(vec![1.1, 2.0, 3.3, 4.0]) + ); + } + + #[test] + fn test_infer_column_numeric() { + let column = vec![ + "1".to_string(), + "2.2".to_string(), + "3.3".to_string(), + "4.4".to_string(), + "78953678389071".to_string(), + ]; + let infered_column = infer_column(column, false, true); + assert_eq!( + infered_column, + InferedColumn::Numeric(vec![ + rust_decimal::Decimal::from_str("1").unwrap(), + rust_decimal::Decimal::from_str("2.2").unwrap(), + rust_decimal::Decimal::from_str("3.3").unwrap(), + rust_decimal::Decimal::from_str("4.4").unwrap(), + rust_decimal::Decimal::from_str("78953678389071").unwrap(), + ]) + ); + } + + #[test] + fn test_infer_column_bool() { + let column = vec![ + " true ".to_string(), + "false".to_string(), + "TRUE".to_string(), + "FALSE\n".to_string(), + ]; + let infered_column = infer_column(column, true, false); + assert_eq!( + infered_column, + InferedColumn::Boolean(vec![true, false, true, false]) + ); + } + + #[test] + fn test_boolean_fallback_to_string() { + let column = vec![" true ".to_string(), "false".to_string(), "42".to_string()]; + let infered_column = infer_column(column, true, true); + assert_eq!( + infered_column, + InferedColumn::String(vec![ + " true ".to_string(), + "false".to_string(), + "42".to_string(), + ]) + ); + } + + #[test] + fn test_infer_column_string() { + let column = vec![ + "abcd".to_string(), + "efgh".to_string(), + " . ".to_string(), + "42".to_string(), + "true".to_string(), + ]; + + let infered_column = infer_column(column, true, false); + assert_eq!( + infered_column, + InferedColumn::String(vec![ + "abcd".to_string(), + "efgh".to_string(), + " . ".to_string(), + "42".to_string(), + "true".to_string(), + ]) + ); + } + + #[test] + fn test_infer_column_json() { + let column = vec![ + r#"{"a": 1}"#.to_string(), + r#"[{"b": 2}]"#.to_string(), + r#"{"c": true}"#.to_string(), + r#"{"d": "{\"test\":true}"}"#.to_string(), + ]; + + let infered_column = infer_column(column, true, false); + assert_eq!( + infered_column, + InferedColumn::JSON(vec![ + Arc::new(json!({"a": 1})), + Arc::new(json!([{"b": 2}])), + Arc::new(json!({"c": true})), + Arc::new(json!({"d": "{\"test\":true}"})), + ]) + ); + } + + #[test] + fn test_fallback_json() { + let column = vec![ + r#"{"a": 1}"#.to_string(), + r#"[{"b": 2}]"#.to_string(), + "42".to_string(), + "42.83".to_string(), + "true".to_string(), + ]; + + let infered_column = infer_column(column, true, false); + assert_eq!( + infered_column, + InferedColumn::JSON(vec![ + Arc::new(json!({"a": 1})), + Arc::new(json!([{"b": 2}])), + Arc::new(json!(42)), + Arc::new(json!(42.83)), + Arc::new(json!(true)), + ]) + ); + } +} diff --git a/src/infer/infer.rs b/src/infer/infer.rs new file mode 100644 index 0000000..847e9df --- /dev/null +++ b/src/infer/infer.rs @@ -0,0 +1,454 @@ +use std::{str::FromStr, sync::Arc}; + +use nom::{ + branch::alt, + bytes::complete::tag_no_case, + character::complete::{i64, multispace0}, + combinator::{eof, map}, + number::complete::double, + number::complete::recognize_float_or_exceptions, + sequence::{delimited, terminated}, + Err, IResult, +}; +use rust_decimal::Decimal; + +#[derive(Debug, Clone, PartialEq)] +pub enum InferedValue { + Integer(i64), + Numeric(rust_decimal::Decimal), + Float(f64), + String(String), + Boolean(bool), + JSON(Arc), + //Location, + //Blob(Vec), + // todo: booleans and dates and timestamps +} + +pub fn parse_integer(data: &str) -> IResult<&str, InferedValue> { + map(i64, |i| InferedValue::Integer(i))(data) +} + +pub fn parse_float(data: &str) -> IResult<&str, InferedValue> { + // We use the "double" parser from nom, that returns a f64. + // The parser named "float" from nom returns a f32. + map(double, |f| InferedValue::Float(f))(data) +} + +pub fn parse_numeric(data: &str) -> IResult<&str, InferedValue> { + // We try to recognize the float using nom, but then we parse it with rust_decimal. + let (i, s) = recognize_float_or_exceptions(data)?; + match Decimal::from_str(s) { + Ok(d) => Ok((i, InferedValue::Numeric(d))), + Err(_) => Err(Err::Error(nom::error::Error::new( + data, + // If the number cannot be parsed, we return an error understandable by nom. + nom::error::ErrorKind::Fail, + ))), + } +} + +pub fn parse_string(data: &str) -> IResult<&str, InferedValue> { + // placeholder, accepts anything + Ok(("", InferedValue::String(data.to_string()))) +} + +pub fn parse_boolean(data: &str) -> IResult<&str, InferedValue> { + // placeholder, accepts anything + map( + alt((tag_no_case("true"), tag_no_case("false"))), + |s: &str| InferedValue::Boolean(s.to_lowercase() == "true"), + )(data) +} + +fn is_likely_json(data: &str) -> bool { + // Not done using nom because it's not the right tool. + (data.starts_with('{') && data.ends_with('}')) || (data.starts_with('[') && data.ends_with(']')) +} + +pub fn parse_json(data: &str) -> IResult<&str, InferedValue> { + if is_likely_json(data) { + serde_json::from_str(data) + .map(|val| ("", InferedValue::JSON(val))) + .map_err(|_| Err::Error(nom::error::Error::new(data, nom::error::ErrorKind::Fail))) + } else { + Err(Err::Error(nom::error::Error::new( + data, + nom::error::ErrorKind::Fail, + ))) + } +} + +pub fn infer_type(data: &str) -> IResult<&str, InferedValue> { + alt(( + terminated(parse_integer, eof), + terminated(parse_float, eof), + terminated(parse_boolean, eof), + terminated(parse_json, eof), + terminated(parse_string, eof), + ))(data) +} + +pub fn infer_type_with_trim(data: &str) -> IResult<&str, InferedValue> { + alt(( + terminated(delimited(multispace0, parse_integer, multispace0), eof), + terminated(delimited(multispace0, parse_float, multispace0), eof), + terminated(delimited(multispace0, parse_boolean, multispace0), eof), + terminated(delimited(multispace0, parse_json, multispace0), eof), + // We don't trim strings, as they can contain whitespace. + terminated(parse_string, eof), + ))(data) +} + +pub fn infer_type_with_numeric(data: &str) -> IResult<&str, InferedValue> { + alt(( + terminated(parse_numeric, eof), + terminated(parse_boolean, eof), + terminated(parse_json, eof), + terminated(parse_string, eof), + ))(data) +} + +pub fn infer_type_with_trim_and_numeric(data: &str) -> IResult<&str, InferedValue> { + alt(( + terminated(delimited(multispace0, parse_numeric, multispace0), eof), + terminated(delimited(multispace0, parse_boolean, multispace0), eof), + terminated(delimited(multispace0, parse_json, multispace0), eof), + // We don't trim strings, as they can contain whitespace. + terminated(parse_string, eof), + ))(data) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_integer() { + assert_eq!(parse_integer("42"), Ok(("", InferedValue::Integer(42)))); + assert_eq!(parse_integer("-42"), Ok(("", InferedValue::Integer(-42)))); + assert_eq!(parse_integer("0"), Ok(("", InferedValue::Integer(0)))); + assert_eq!( + parse_integer("123456789"), + Ok(("", InferedValue::Integer(123456789))) + ); + assert_eq!( + parse_integer("123456789123456789123456789"), + Err(Err::Error(nom::error::Error::new( + "123456789123456789123456789", + nom::error::ErrorKind::Digit + ))) + ); + } + + #[test] + fn test_parse_float() { + assert_eq!(parse_float("42"), Ok(("", InferedValue::Float(42.0)))); + assert_eq!(parse_float("-42"), Ok(("", InferedValue::Float(-42.0)))); + assert_eq!(parse_float("0"), Ok(("", InferedValue::Float(0.0)))); + assert_eq!(parse_float("42.0"), Ok(("", InferedValue::Float(42.0)))); + assert_eq!(parse_float("-42.0"), Ok(("", InferedValue::Float(-42.0)))); + assert_eq!(parse_float("0.0"), Ok(("", InferedValue::Float(0.0)))); + assert_eq!(parse_float("42.0\n"), Ok(("\n", InferedValue::Float(42.0)))); + + // unprecise IEEE 754 floats + assert_eq!( + parse_float("12345678901.12345678901"), + // Notice that it's not the same number. + Ok(("", InferedValue::Float(12345678901.123457))) + ); + + assert_eq!( + parse_integer("123456789123456789123456789.123456789"), + Err(Err::Error(nom::error::Error::new( + "123456789123456789123456789.123456789", + nom::error::ErrorKind::Digit + ))) + ); + } + + #[test] + fn test_parse_numeric() { + assert_eq!( + parse_numeric("42"), + Ok(("", InferedValue::Numeric(Decimal::new(42, 0)))) + ); + assert_eq!( + parse_numeric("-42"), + Ok(("", InferedValue::Numeric(Decimal::new(-42, 0)))) + ); + assert_eq!( + parse_numeric("0"), + Ok(("", InferedValue::Numeric(Decimal::new(0, 0)))) + ); + assert_eq!( + parse_numeric("42.0"), + Ok(("", InferedValue::Numeric(Decimal::new(42, 0)))) + ); + assert_eq!( + parse_numeric("-42.0"), + Ok(("", InferedValue::Numeric(Decimal::new(-42, 0)))) + ); + assert_eq!( + parse_numeric("0.0"), + Ok(("", InferedValue::Numeric(Decimal::new(0, 1)))) + ); + + // unprecise IEEE 754 floats are gone + assert_eq!( + parse_numeric("12345678901.12345678901"), + Ok(( + "", + InferedValue::Numeric(Decimal::from_str("12345678901.12345678901").unwrap()) + )) + ); + + // Now large numbers are accepted + assert_eq!( + parse_numeric("123456789123456789123456789.123456789"), + Ok(( + "", + InferedValue::Numeric( + Decimal::from_str("123456789123456789123456789.123456789").unwrap() + ) + )) + ); + assert_eq!( + parse_numeric("123456789123456789123456789"), + Ok(( + "", + InferedValue::Numeric(Decimal::from_str("123456789123456789123456789").unwrap()) + )) + ); + assert_eq!( + parse_numeric("123456789123456789123456789123456789"), + Err(Err::Error(nom::error::Error::new( + "123456789123456789123456789123456789", + nom::error::ErrorKind::Fail + ))) + ); + + if let Ok((_, InferedValue::Numeric(d))) = parse_numeric("123456789123456789123456789") { + assert_eq!(d.to_string(), "123456789123456789123456789"); + } else { + panic!("Not a numeric"); + } + } + + #[test] + // Very simple test, as only the edge cases need to be tested. + fn test_parse_string() { + assert_eq!( + parse_string(""), + Ok(("", InferedValue::String("".to_string()))) + ); + assert_eq!( + parse_string("abcd"), + Ok(("", InferedValue::String("abcd".to_string()))) + ); + assert_eq!( + parse_string("abcd\n"), + Ok(("", InferedValue::String("abcd\n".to_string()))) + ); + } + + #[test] + fn test_parse_boolean() { + assert_eq!(parse_boolean("true"), Ok(("", InferedValue::Boolean(true)))); + assert_eq!( + parse_boolean("false"), + Ok(("", InferedValue::Boolean(false))) + ); + assert_eq!(parse_boolean("TRUE"), Ok(("", InferedValue::Boolean(true)))); + assert_eq!( + parse_boolean("FALSE"), + Ok(("", InferedValue::Boolean(false))) + ); + assert_eq!(parse_boolean("True"), Ok(("", InferedValue::Boolean(true)))); + assert_eq!( + parse_boolean("False"), + Ok(("", InferedValue::Boolean(false))) + ); + assert_eq!( + parse_boolean("abcd"), + Err(Err::Error(nom::error::Error::new( + "abcd", + nom::error::ErrorKind::Tag + ))) + ); + } + + #[test] + fn test_is_like_json() { + assert!(is_likely_json("{}")); + assert!(is_likely_json("[]")); + assert!(is_likely_json("{\n}")); + assert!(is_likely_json("[{\"a\": 1}]")); + assert!(is_likely_json("[{\"a\": 1}, {\"b\": 2}]")); + assert!(!is_likely_json("[]\n")); + assert!(!is_likely_json("42")); + assert!(!is_likely_json("abcd")); + assert!(!is_likely_json("\"abcd\"")); + } + + #[test] + fn test_parse_json() { + assert_eq!( + parse_json("{}"), + Ok(("", InferedValue::JSON(Arc::new(serde_json::json!({}))))) + ); + assert_eq!( + parse_json("[]"), + Ok(("", InferedValue::JSON(Arc::new(serde_json::json!([]))))) + ); + assert_eq!( + parse_json("[{\"a\": 1}]"), + Ok(( + "", + InferedValue::JSON(Arc::new(serde_json::json!([{"a": 1}]))) + )) + ); + assert_eq!( + parse_json("[{\"a\": 1}, {\"b\": 2}]"), + Ok(( + "", + InferedValue::JSON(Arc::new(serde_json::json!([{"a": 1}, {"b": 2}]))) + )) + ); + assert_eq!( + parse_json("[{\"a\": 1}, {\"b\": 2}]\n\n"), + Err(Err::Error(nom::error::Error::new( + "[{\"a\": 1}, {\"b\": 2}]\n\n", + nom::error::ErrorKind::Fail + ))) + ); + assert_eq!( + parse_json("abcd"), + Err(Err::Error(nom::error::Error::new( + "abcd", + nom::error::ErrorKind::Fail + ))) + ); + } + + #[test] + fn test_infer_type() { + assert_eq!(infer_type("42"), Ok(("", InferedValue::Integer(42)))); + assert_eq!(infer_type("-42"), Ok(("", InferedValue::Integer(-42)))); + assert_eq!(infer_type("0"), Ok(("", InferedValue::Integer(0)))); + assert_eq!(infer_type("42.0"), Ok(("", InferedValue::Float(42.0)))); + assert_eq!(infer_type("-42.0"), Ok(("", InferedValue::Float(-42.0)))); + assert_eq!(infer_type("0.0"), Ok(("", InferedValue::Float(0.0)))); + assert_eq!( + infer_type("42.0\n"), + Ok(("", InferedValue::String("42.0\n".to_string()))) + ); + assert_eq!( + infer_type("12345678901.12345678901"), + Ok(("", InferedValue::Float(12345678901.123457))) + ); + assert_eq!( + infer_type("abcd"), + Ok(("", InferedValue::String("abcd".to_string()))) + ); + assert_eq!( + infer_type("{}"), + Ok(("", InferedValue::JSON(Arc::new(serde_json::json!({}))))) + ); + assert_eq!( + infer_type("[{\"a\": 1}]"), + Ok(( + "", + InferedValue::JSON(Arc::new(serde_json::json!([{"a": 1}]))) + )) + ); + } + + #[test] + fn test_infer_type_with_trim() { + assert_eq!( + infer_type_with_trim(" 42 "), + Ok(("", InferedValue::Integer(42))) + ); + assert_eq!( + infer_type_with_trim(" -42 "), + Ok(("", InferedValue::Integer(-42))) + ); + assert_eq!( + infer_type_with_trim("-42.23"), + Ok(("", InferedValue::Float(-42.23))) + ); + // only whitespace + assert_eq!( + infer_type_with_trim(" \n"), + Ok(("", InferedValue::String(" \n".to_string()))) + ); + // strings contain the whitespace + assert_eq!( + infer_type_with_trim(" abcd\n"), + Ok(("", InferedValue::String(" abcd\n".to_string()))) + ); + } + + #[test] + fn test_infer_type_with_numeric() { + assert_eq!( + infer_type_with_numeric("42"), + Ok(("", InferedValue::Numeric(Decimal::new(42, 0)))) + ); + assert_eq!( + infer_type_with_numeric("-123456789123456789123456789.123456789"), + Ok(( + "", + InferedValue::Numeric( + Decimal::from_str("-123456789123456789123456789.123456789").unwrap() + ) + )) + ); + assert_eq!( + infer_type_with_numeric("abcd a\n\n "), + Ok(("", InferedValue::String("abcd a\n\n ".to_string()))) + ); + assert_eq!( + infer_type_with_numeric("FALSE"), + Ok(("", InferedValue::Boolean(false))) + ); + assert_eq!( + infer_type_with_numeric("{}"), + Ok(("", InferedValue::JSON(Arc::new(serde_json::json!({}))))) + ); + assert_eq!( + infer_type_with_numeric(" 42.12 "), + Ok(("", InferedValue::String(" 42.12 ".to_string()))) + ); + } + #[test] + fn test_infer_type_with_trim_and_numeric() { + assert_eq!( + infer_type_with_trim_and_numeric(" 42 "), + Ok(("", InferedValue::Numeric(Decimal::new(42, 0)))) + ); + assert_eq!( + infer_type_with_trim_and_numeric(" -42 "), + Ok(("", InferedValue::Numeric(Decimal::new(-42, 0)))) + ); + assert_eq!( + infer_type_with_trim_and_numeric("-42.23"), + Ok(("", InferedValue::Numeric(Decimal::new(-4223, 2)))) + ); + // only whitespace + assert_eq!( + infer_type_with_trim_and_numeric(" \n"), + Ok(("", InferedValue::String(" \n".to_string()))) + ); + // strings contain the whitespace + assert_eq!( + infer_type_with_trim_and_numeric(" abcd\n"), + Ok(("", InferedValue::String(" abcd\n".to_string()))) + ); + assert_eq!( + infer_type_with_trim_and_numeric(" 42.12 "), + Ok(("", InferedValue::Numeric(Decimal::new(4212, 2)))) + ); + } +} diff --git a/src/infer/mod.rs b/src/infer/mod.rs new file mode 100644 index 0000000..d017a80 --- /dev/null +++ b/src/infer/mod.rs @@ -0,0 +1,3 @@ +mod columns; +mod infer; +//pub use infer::infer; diff --git a/src/main.rs b/src/main.rs index e7a11a9..2a2a361 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,171 @@ -fn main() { +use crate::bus::message; +use crate::datamodel::batch::Batch; +use crate::http::server::run_http_server; +use crate::http::state::HttpServerState; +use axum::extract::DefaultBodyLimit; +use axum::http::header; +use axum::http::StatusCode; +use axum::routing::get; +use axum::routing::post; +use axum::Json; +use axum::Router; +use futures::stream::StreamExt; +use futures::TryStreamExt; +use polars::prelude::*; +use std::io; +use std::io::Cursor; +use std::net::Ipv4Addr; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; +use storage::sqlite::sqlite::SqliteStorage; +use storage::storage::GenericStorage; +use tokio::sync::OnceCell; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::bytes::Bytes; +use tower::ServiceBuilder; +use tower_http::trace; +use tower_http::{timeout::TimeoutLayer, trace::TraceLayer, ServiceBuilderExt}; +use tracing::event; +use tracing::Level; +mod bus; +mod datamodel; +mod http; +mod importers; +mod infer; +mod storage; + +#[tokio::main] +async fn main() { + /*let (tx, rx) = tokio::sync::mpsc::channel(100); // Channel with buffer size 100 + + // Simulate event emitter + tokio::spawn(async move { + for i in 1..=1004 { + tx.send(i).await.unwrap(); // Send events + } + }); + + // Create a stream from the receiver and buffer it in chunks + use futures::stream::StreamExt; + + //let mut buffered_stream = rx.chunks(10); // Buffer size of 10 + let mut buffered_stream = ReceiverStream::new(rx).chunks(10); // Chunk size of 10 + + // Process chunks of events + while let Some(events) = buffered_stream.next().await { + // `events` is a Vec containing a chunk of events + println!("Handling chunk of events: {:?}", events); + }*/ + + let sqlite_storage = SqliteStorage::connect("sqlite:test.db") + .await + .expect("Failed to connect to SQLite"); + + sqlite_storage + .create_or_migrate() + .await + .expect("Failed to create or migrate database"); + println!("Hello, world!"); + + /*let event_bus = event_bus::EVENT_BUS + .get_or_init(|| event_bus::init_event_bus()) + .await; + */ + + let event_bus = bus::event_bus::init_event_bus(); + let mut wololo = event_bus.main_bus_receiver.activate_cloned(); + let mut wololo2 = event_bus.main_bus_receiver.activate_cloned(); + + // spawn a task that prints the events to stdout + tokio::spawn(async move { + while let Some(message) = wololo.recv().await.ok() { + //println!("Received event a: {:?}", message); + + use crate::storage::storage::StorageInstance; + let toto: &dyn StorageInstance = &sqlite_storage; + + match message { + message::Message::Publish(message::PublishMessage { + batch, + sync_receiver: _, + sync_sender, + }) => { + toto.publish(batch, sync_sender).await; + //println!("Published batch"); + //sync_receiver.activate().recv().await.unwrap(); + } /*message::Message::SyncRequest(message::RequestSyncMessage { sender }) => { + println!("Received sync request"); + toto.sync().await.unwrap(); + sender.broadcast(()).await.unwrap(); + }*/ + } + } + }); + tokio::spawn(async move { + while let Some(event) = wololo2.recv().await.ok() { + //println!("Received event b: {:?}", event); + } + }); + + run_http_server( + HttpServerState { + name: "SensApp".to_string(), + event_bus: event_bus, + }, + SocketAddr::from((Ipv4Addr::LOCALHOST, 3000)), + ) + .await + .expect("Failed to run HTTP server"); +} + +async fn handler() -> &'static str { + "Hello, world!" +} + +async fn publish_stream_handler(body: axum::body::Body) -> Result { + let mut count = 0usize; + let mut stream = body.into_data_stream(); + + loop { + let chunk = stream.try_next().await; + match chunk { + Ok(bytes) => match bytes { + Some(bytes) => count += bytes.into_iter().filter(|b| *b == b'\n').count(), + None => break, + }, + Err(_) => { + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "Error reading body".to_string(), + )) + } + } + } + + Ok(count.to_string()) +} + +async fn publish_csv(body: axum::body::Body) -> Result { + let stream = body.into_data_stream(); + let stream = stream.map_err(|err| io::Error::new(io::ErrorKind::Other, err)); + let reader = stream.into_async_read(); + let mut csv_reader = csv_async::AsyncReaderBuilder::new() + .has_headers(true) + .delimiter(b';') + .create_reader(reader); + + println!("{:?}", csv_reader.has_headers()); + println!("{:?}", csv_reader.headers().await.unwrap()); + let mut records = csv_reader.records(); + + println!("Reading CSV"); + while let Some(record) = records.next().await { + let record = record.unwrap(); + println!("{:?}", record); + } + println!("Done reading CSV"); + + Ok("ok".to_string()) } diff --git a/src/storage/mod.rs b/src/storage/mod.rs new file mode 100644 index 0000000..a6b35bb --- /dev/null +++ b/src/storage/mod.rs @@ -0,0 +1,3 @@ +pub mod postgresql; +pub mod sqlite; +pub mod storage; diff --git a/src/storage/postgresql/migrations/20240110123122_init.sql b/src/storage/postgresql/migrations/20240110123122_init.sql new file mode 100644 index 0000000..6208935 --- /dev/null +++ b/src/storage/postgresql/migrations/20240110123122_init.sql @@ -0,0 +1,6 @@ +-- Add migration script here +CREATE TABLE IF NOT EXISTS sensors ( + sensor_id INTEGER PRIMARY KEY AUTOINCREMENT, --toto + name TEXT NOT NULL UNIQUE, + description TEXT +); diff --git a/src/storage/postgresql/mod.rs b/src/storage/postgresql/mod.rs new file mode 100644 index 0000000..cc668e3 --- /dev/null +++ b/src/storage/postgresql/mod.rs @@ -0,0 +1,42 @@ +use std::sync::Arc; + +use super::storage::{GenericStorage, SensorData, StorageInstance}; +use crate::datamodel::batch::Batch; +use anyhow::Result; +use async_broadcast::Sender; +use async_trait::async_trait; + +#[derive(Debug)] +pub struct PostgresStorage {} + +#[async_trait] +impl GenericStorage for PostgresStorage { + type StorageInstance = Self; + + async fn connect(connection_string: &str) -> Result { + Ok(Self::StorageInstance {}) + } + async fn create_or_migrate(&self) -> Result<()> { + Ok(()) + } + async fn publish_batch(&self, batch: crate::datamodel::batch::Batch) -> Result<()> { + Ok(()) + } +} + +#[async_trait] +impl StorageInstance for PostgresStorage { + async fn create_sensor(&self, sensor_data: &SensorData) -> Result<()> { + // Implement sensor creation logic here + Ok(()) + } + async fn publish(&self, batch: Arc, sync_sender: Sender<()>) -> Result<()> { + // Implement batch publishing logic here + Ok(()) + } + + async fn sync(&self, sync_sender: Sender<()>) -> Result<()> { + // Implement sync logic here + Ok(()) + } +} diff --git a/src/storage/sqlite/migrations/20240110093153_init.sql b/src/storage/sqlite/migrations/20240110093153_init.sql new file mode 100644 index 0000000..a7bff85 --- /dev/null +++ b/src/storage/sqlite/migrations/20240110093153_init.sql @@ -0,0 +1,93 @@ +-- Create the 'units' table +CREATE TABLE units ( + id INTEGER PRIMARY KEY AUTOINCREMENT, -- Auto-incrementing primary key + name TEXT NOT NULL UNIQUE, -- Unique name, cannot be null + description TEXT -- Optional description +); + +-- Create the 'sensors' table with both UUID and auto-incrementing sensor_id +CREATE TABLE sensors ( + sensor_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Auto-incrementing integer for relationships + uuid TEXT NOT NULL UNIQUE, -- UUID as text for unique sensor identification, cannot be null + name TEXT NOT NULL, -- Name of the sensor, cannot be null + type TEXT NOT NULL, -- Type of the sensor (e.g., integer, float, etc.), cannot be null + unit INTEGER, -- References 'units' (optional) + FOREIGN KEY (unit) REFERENCES units(id) -- Foreign key to 'units' table +); + +-- Create the 'labels' table +CREATE TABLE labels ( + sensor_id INTEGER NOT NULL, -- References 'sensors' (sensor_id), cannot be null + named INTEGER NOT NULL, -- ID for the name in the dictionary, cannot be null + description INTEGER, -- ID for the description in the dictionary (optional) + PRIMARY KEY (sensor_id, named), + FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id) -- Foreign key to 'sensors' table +); + +-- Create the 'labels_name_dictionary' table +CREATE TABLE labels_name_dictionary ( + id INTEGER PRIMARY KEY AUTOINCREMENT, -- Auto-incrementing primary key + name TEXT NOT NULL UNIQUE -- Unique name for label, cannot be null +); + +-- Create the 'labels_description_dictionary' table +CREATE TABLE labels_description_dictionary ( + id INTEGER PRIMARY KEY AUTOINCREMENT, -- Auto-incrementing primary key + description TEXT NOT NULL UNIQUE -- Unique description for label, cannot be null +); + +-- Create the 'strings_values_dictionary' table +CREATE TABLE strings_values_dictionary ( + id INTEGER PRIMARY KEY AUTOINCREMENT, -- Auto-incrementing primary key + value TEXT NOT NULL UNIQUE -- Unique text value, cannot be null +); + +-- Create the 'integer_values' table +CREATE TABLE integer_values ( + sensor_id INTEGER NOT NULL, -- References 'sensors' (sensor_id), cannot be null + timestamp_ms INTEGER NOT NULL, -- Unix timestamp in milliseconds, cannot be null + value INTEGER NOT NULL, -- Integer value, cannot be null + FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id) -- Foreign key to 'sensors' table +); + +-- Create the 'numeric_values' table +CREATE TABLE numeric_values ( + sensor_id INTEGER NOT NULL, -- References 'sensors' (sensor_id), cannot be null + timestamp_ms INTEGER NOT NULL, -- Unix timestamp in milliseconds, cannot be null + value NUMERIC NOT NULL, -- Numeric value, cannot be null + FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id) -- Foreign key to 'sensors' table +); + +-- Create the 'float_values' table +CREATE TABLE float_values ( + sensor_id INTEGER NOT NULL, -- References 'sensors' (sensor_id), cannot be null + timestamp_ms INTEGER NOT NULL, -- Unix timestamp in milliseconds, cannot be null + value REAL NOT NULL, -- Real (float) value, cannot be null + FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id) -- Foreign key to 'sensors' table +); + +-- Create the 'string_values' table +CREATE TABLE string_values ( + sensor_id INTEGER NOT NULL, -- References 'sensors' (sensor_id), cannot be null + timestamp_ms INTEGER NOT NULL, -- Unix timestamp in milliseconds, cannot be null + value INTEGER NOT NULL, -- References 'strings_values_dictionary', cannot be null + FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id), -- Foreign key to 'sensors' table + FOREIGN KEY (value) REFERENCES strings_values_dictionary(id) -- Foreign key to 'strings_values_dictionary' +); + +-- Create the 'boolean_values' table +CREATE TABLE boolean_values ( + sensor_id INTEGER NOT NULL, -- References 'sensors' (sensor_id), cannot be null + timestamp_ms INTEGER NOT NULL, -- Unix timestamp in milliseconds, cannot be null + value BOOLEAN NOT NULL, -- Boolean value, cannot be null + FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id) -- Foreign key to 'sensors' table +); + +-- Create the 'localisations' table +CREATE TABLE localisations ( + sensor_id INTEGER NOT NULL, -- References 'sensors' (sensor_id), cannot be null + timestamp_ms INTEGER NOT NULL, -- Unix timestamp in milliseconds, cannot be null + latitude REAL NOT NULL, -- Latitude value, cannot be null + longitude REAL NOT NULL, -- Longitude value, cannot be null + FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id) -- Foreign key to 'sensors' table +); diff --git a/src/storage/sqlite/mod.rs b/src/storage/sqlite/mod.rs new file mode 100644 index 0000000..0c23c5a --- /dev/null +++ b/src/storage/sqlite/mod.rs @@ -0,0 +1,4 @@ +pub mod sqlite; + +// rexport SqliteStorage +pub use sqlite::SqliteStorage; diff --git a/src/storage/sqlite/sqlite.rs b/src/storage/sqlite/sqlite.rs new file mode 100644 index 0000000..3ed2db7 --- /dev/null +++ b/src/storage/sqlite/sqlite.rs @@ -0,0 +1,180 @@ +use crate::datamodel::batch::{Batch, Sample, TypedSamples}; +use crate::datamodel::SensorType; +use crate::storage::storage::{GenericStorage, SensorData, StorageInstance}; +use anyhow::{Context, Result}; +use async_broadcast::Sender; +use async_trait::async_trait; +use cached::proc_macro::once; +use sqlx::{prelude::*, Sqlite, Transaction}; +use sqlx::{sqlite::SqliteConnectOptions, SqlitePool}; +use std::default; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::timeout; +use uuid::Uuid; + +// SQLite implementation +#[derive(Debug)] +pub struct SqliteStorage { + pool: SqlitePool, +} + +#[async_trait] +impl GenericStorage for SqliteStorage { + type StorageInstance = Self; + + async fn connect(connection_string: &str) -> Result { + let connect_options = SqliteConnectOptions::from_str(connection_string) + .context("Failed to create sqlite connection options")? + // Create the database file if it doesn't exist + .create_if_missing(true) + // The Wall mode should perform better for SensApp + .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) + // Foreign keys have a performance impact, they are disabled by default + // in SQLite, but we want to make sure they stay disabled. + .foreign_keys(false); + + let pool = sqlx::SqlitePool::connect_with(connect_options) + .await + .context("Failed to create sqlite pool")?; + + Ok(SqliteStorage { pool }) + } + + async fn create_or_migrate(&self) -> Result<()> { + // Implement schema creation or migration logic here + sqlx::migrate!("src/storage/sqlite/migrations") + .run(&self.pool) + .await + .context("Failed to migrate database")?; + + Ok(()) + } + + async fn publish_batch(&self, batch: crate::datamodel::batch::Batch) -> Result<()> { + // Implement batch publishing logic here + Ok(()) + } +} + +#[async_trait] +impl StorageInstance for SqliteStorage { + async fn create_sensor(&self, sensor_data: &SensorData) -> Result<()> { + // Implement sensor creation logic here + Ok(()) + } + async fn publish(&self, batch: Arc, sync_sender: Sender<()>) -> Result<()> { + // Implement batch publishing logic here + + // start transaction + match batch.samples.as_ref() { + TypedSamples::Integer(samples) => { + let sensor_id = self + .get_sensor_id( + batch.sensor_uuid, + batch.sensor_name.clone(), + SensorType::Integer, + ) + .await?; + self.publish_integer_values(sensor_id, samples).await?; + } + _ => { + return Err(anyhow::anyhow!( + "Unsupported sample type: {:?}", + batch.samples + )) + } + } + // finish transaction + + self.sync(sync_sender).await?; + + Ok(()) + } + + async fn sync(&self, sync_sender: Sender<()>) -> Result<()> { + // Implement sync logic here + //println!("Syncing"); + //println!("Receiver count: {}", sync_sender.receiver_count()); + if sync_sender.receiver_count() > 0 && !sync_sender.is_closed() { + let _ = timeout(Duration::from_secs(5), sync_sender.broadcast(())).await?; + } + Ok(()) + } +} + +#[once(time = 120, result = true, sync_writes = true)] +async fn once_get_sensor_id( + pool: &SqlitePool, + sensor_uuid: Uuid, + sensor_name: String, + sensor_type: SensorType, +) -> Result { + println!("aaah"); + let uuid_string = sensor_uuid.to_string(); + let sensor_id = sqlx::query!( + r#" + SELECT sensor_id FROM sensors WHERE uuid = ? + "#, + uuid_string + ) + .fetch_optional(pool) + .await? + .map(|row| row.sensor_id); + + // If the sensor exists, it's returned + if let Some(Some(sensor_id)) = sensor_id { + return Ok(sensor_id); + } + + let sensor_type_string = sensor_type.to_string(); + + let create_sensor_query = sqlx::query!( + r#" + INSERT INTO sensors (uuid, name, type) + VALUES (?, ?, ?) + "#, + uuid_string, + sensor_name, + sensor_type_string + ); + + // Execute the query + let sensor_id = create_sensor_query.execute(pool).await?.last_insert_rowid(); + + Ok(sensor_id) +} + +impl SqliteStorage { + async fn get_sensor_id( + &self, + sensor_uuid: Uuid, + sensor_name: String, + sensor_type: SensorType, + ) -> Result { + once_get_sensor_id(&self.pool, sensor_uuid, sensor_name, sensor_type).await + } + + async fn publish_integer_values( + &self, + sensor_id: i64, + values: &Vec>, + ) -> Result<()> { + let mut transaction = self.pool.begin().await?; + for value in values { + let lol = sqlx::query!( + r#" + INSERT INTO integer_values (sensor_id, timestamp_ms, value) + VALUES (?, ?, ?) + "#, + sensor_id, + value.timestamp_ms, + value.value + ); + transaction.execute(lol).await?; + } + transaction.commit().await?; + Ok(()) + } +} diff --git a/src/storage/storage.rs b/src/storage/storage.rs new file mode 100644 index 0000000..92f160d --- /dev/null +++ b/src/storage/storage.rs @@ -0,0 +1,58 @@ +use anyhow::Result; +use async_broadcast::Sender; +use async_trait::async_trait; +use std::sync::Arc; + +use crate::datamodel::batch::Batch; + +#[async_trait] +pub trait GenericStorage { + type StorageInstance: StorageInstance + Sync + Send; + + async fn connect(connection_string: &str) -> Result; + async fn create_or_migrate(&self) -> Result<()>; + async fn publish_batch(&self, batch: Batch) -> Result<()>; +} + +#[async_trait] +pub trait StorageInstance { + async fn create_sensor(&self, sensor_data: &SensorData) -> Result<()>; + async fn publish(&self, batch: Arc, sync_sender: Sender<()>) -> Result<()>; + async fn sync(&self, sync_sender: Sender<()>) -> Result<()>; +} + +pub struct SensorData { + // Define sensor data structure here +} + +pub struct SensorSample { + // time series sample + pub timestamp_ms: i64, + pub value: f64, +} + +#[derive(Debug)] +enum GenericStorages { + Sqlite(crate::storage::sqlite::SqliteStorage), + Postgres(crate::storage::postgresql::PostgresStorage), +} + +#[derive(Debug)] +pub struct Storage { + generic_storage: GenericStorages, +} + +impl Storage { + pub async fn publish_batch(&self, batch: Batch) -> Result<()> { + match self.generic_storage { + GenericStorages::Sqlite(ref sqlite_storage) => { + sqlite_storage.publish_batch(batch).await? + } + GenericStorages::Postgres(ref postgres_storage) => { + postgres_storage.publish_batch(batch).await? + } + } + // Implement batch publishing logic here + Ok(()) + } +}