From b126dc1750cd238cc51f2fea8a44abac2df51b04 Mon Sep 17 00:00:00 2001 From: Antoine Pultier <45740+fungiboletus@users.noreply.github.com> Date: Tue, 20 Aug 2024 11:01:46 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20=F0=9F=8C=88=20more=20CRUD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 9 ++ Cargo.toml | 3 + src/crud/list_cursor.rs | 113 ++++++++++++++++++ src/crud/mod.rs | 2 + src/crud/viewmodel/mod.rs | 1 + src/crud/viewmodel/sensor_viewmodel.rs | 33 +++++ src/datamodel/sensor_type.rs | 27 +++-- src/ingestors/http/crud.rs | 60 ++++++++-- src/main.rs | 1 + src/storage/bigquery/mod.rs | 11 +- src/storage/duckdb/duckdb_crud.rs | 84 +++++++++++++ .../duckdb/migrations/20240223133248_init.sql | 17 ++- src/storage/duckdb/mod.rs | 12 +- .../migrations/20240110123122_init.sql | 17 ++- src/storage/postgresql/mod.rs | 1 + src/storage/postgresql/postgresql.rs | 28 ++++- src/storage/postgresql/postgresql_crud.rs | 89 ++++++++++++++ src/storage/rrdcached/mod.rs | 21 ++-- .../sqlite/migrations/20240110093153_init.sql | 19 ++- src/storage/sqlite/mod.rs | 1 + src/storage/sqlite/sqlite.rs | 7 +- src/storage/sqlite/sqlite_crud.rs | 58 +++++++++ src/storage/storage.rs | 8 +- .../migrations/20240223133248_init.sql | 15 ++- src/storage/timescaledb/timescaledb.rs | 10 +- 25 files changed, 600 insertions(+), 47 deletions(-) create mode 100644 src/crud/list_cursor.rs create mode 100644 src/crud/mod.rs create mode 100644 src/crud/viewmodel/mod.rs create mode 100644 src/crud/viewmodel/sensor_viewmodel.rs create mode 100644 src/storage/duckdb/duckdb_crud.rs create mode 100644 src/storage/postgresql/postgresql_crud.rs create mode 100644 src/storage/sqlite/sqlite_crud.rs diff --git a/Cargo.lock b/Cargo.lock index 674e871..b6c8487 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4122,6 +4122,12 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "rot13" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "944f240972701806a97995e54c5b74c14a62d8423b26f71759386300749e91db" + [[package]] name = "rrdcached-client" version = "0.1.3" @@ -4387,6 +4393,7 @@ dependencies = [ "async-broadcast", "async-trait", "axum", + "base64 0.22.1", "big-decimal-byte-string-encoder", "bigdecimal", "blake3", @@ -4416,6 +4423,7 @@ dependencies = [ "prost", "rand", "regex", + "rot13", "rrdcached-client", "rumqttc", "rust_decimal", @@ -4430,6 +4438,7 @@ dependencies = [ "smallvec 1.13.2", "snap", "sqlx", + "time", "tokio", "tokio-stream", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index e7af594..61c81c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,3 +91,6 @@ utoipa = { version = "4.2", features = ["axum_extras"] } utoipa-scalar = { version = "0.1", features = ["axum"] } rrdcached-client = "0.1" rustls = "0.23" +base64 = "0.22" +rot13 = "0.1" +time = "0.3" diff --git a/src/crud/list_cursor.rs b/src/crud/list_cursor.rs new file mode 100644 index 0000000..1b3be49 --- /dev/null +++ b/src/crud/list_cursor.rs @@ -0,0 +1,113 @@ +/** + * A list cursor is a string that contains two values: + * - the last created_at value, + * - the last uuid value. + * + * It is used to paginate the list of sensors. + * + * It is NOT a secret. But it doesn't have to be human readable. + * + * For simplicity, we use use a string serialisation, and the + * ASCII UNIT SEPARATOR (US) character to separate the two values. + * The string is then rot13 and base64+url encoded. Rot13 for fun. + */ +use anyhow::{bail, Result}; + +#[derive(Debug)] +pub struct ListCursor { + pub next_created_at: String, + pub next_uuid: String, +} + +impl ListCursor { + pub fn new(next_created_at: String, next_uuid: String) -> Self { + Self { + next_created_at, + next_uuid, + } + } + + /// Parse a list cursor from a string. + /// + /// The string must be in the format: + /// - last_created_at + /// - last_uuid + /// + /// The string is then rot13 and base64+url decoded. + pub fn parse(cursor: &str) -> Result { + use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _}; + let base64_data = rot13::rot13(cursor); + let data = URL_SAFE_NO_PAD.decode(base64_data)?; + let data_string = String::from_utf8(data)?; + // split on the separator + let parts: Vec<&str> = data_string.split('\u{001F}').collect(); + if parts.len() != 2 { + bail!("Invalid cursor: must contain two parts"); + } + Ok(Self { + next_created_at: parts[0].to_string(), + next_uuid: parts[1].to_string(), + }) + } + + /// Convert the list cursor to a string. + /// + /// The string is then rot13 and base64+url encoded. + #[allow(clippy::inherent_to_string)] + pub fn to_string(&self) -> String { + use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _}; + let data_string = format!("{}\u{001F}{}", self.next_created_at, self.next_uuid); + let data = data_string.as_bytes(); + let base64_data = URL_SAFE_NO_PAD.encode(data); + rot13::rot13(&base64_data) + } +} + +impl Default for ListCursor { + fn default() -> Self { + Self::new( + "-1".to_string(), + "00000000-0000-0000-0000-000000000000".to_string(), + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_list_cursor() { + let cursor = ListCursor::new( + "2023-01-01T00:00:00Z".to_string(), + "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa".to_string(), + ); + let string = cursor.to_string(); + + assert_eq!( + string, + "ZwNlZl0jZF0jZIDjZQbjZQbjZSbsLJSuLJSuLJRgLJSuLF1uLJSuYJSuLJRgLJSuLJSuLJSuLJSu" + ); + + let parsed = ListCursor::parse(&string).unwrap(); + assert_eq!(parsed.next_created_at, "2023-01-01T00:00:00Z"); + assert_eq!(parsed.next_uuid, "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"); + } + + #[test] + fn test_list_cursor_default() { + let cursor = ListCursor::default(); + assert_eq!(cursor.next_created_at, "-1"); + assert_eq!(cursor.next_uuid, "00000000-0000-0000-0000-000000000000"); + } + + #[test] + fn test_parsing_failures() { + assert!(ListCursor::parse("").is_err()); + assert!( + ListCursor::parse("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\u{001F}") + .is_err() + ); + assert!(ListCursor::parse("aaa\u{001F}aa\u{001F}aa").is_err()); + } +} diff --git a/src/crud/mod.rs b/src/crud/mod.rs new file mode 100644 index 0000000..4aae928 --- /dev/null +++ b/src/crud/mod.rs @@ -0,0 +1,2 @@ +pub mod list_cursor; +pub mod viewmodel; diff --git a/src/crud/viewmodel/mod.rs b/src/crud/viewmodel/mod.rs new file mode 100644 index 0000000..57c23bb --- /dev/null +++ b/src/crud/viewmodel/mod.rs @@ -0,0 +1 @@ +pub mod sensor_viewmodel; diff --git a/src/crud/viewmodel/sensor_viewmodel.rs b/src/crud/viewmodel/sensor_viewmodel.rs new file mode 100644 index 0000000..0eb5edd --- /dev/null +++ b/src/crud/viewmodel/sensor_viewmodel.rs @@ -0,0 +1,33 @@ +use std::collections::BTreeMap; + +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize)] +pub struct SensorViewModel { + pub uuid: Uuid, + pub name: String, + pub created_at: Option, + pub sensor_type: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub unit: Option, + pub labels: BTreeMap, +} + +// From Sensor model to SensorViewModel +impl From for SensorViewModel { + fn from(sensor: crate::datamodel::Sensor) -> Self { + Self { + uuid: sensor.uuid, + name: sensor.name, + created_at: None, // non view Sensors do not have a created_at field + sensor_type: sensor.sensor_type.to_string(), + unit: sensor.unit.map(|unit| unit.name), + labels: sensor + .labels + .iter() + .map(|(key, value)| (key.clone(), value.clone())) + .collect(), + } + } +} diff --git a/src/datamodel/sensor_type.rs b/src/datamodel/sensor_type.rs index b65f4d5..4f00566 100644 --- a/src/datamodel/sensor_type.rs +++ b/src/datamodel/sensor_type.rs @@ -15,19 +15,20 @@ pub enum SensorType { Blob = 80, } -// Implement to_string() for SensorType -impl ToString for SensorType { - fn to_string(&self) -> String { - match self { - SensorType::Integer => "Integer".to_string(), - SensorType::Numeric => "Numeric".to_string(), - SensorType::Float => "Float".to_string(), - SensorType::String => "String".to_string(), - SensorType::Boolean => "Boolean".to_string(), - SensorType::Location => "Location".to_string(), - SensorType::Json => "JSON".to_string(), - SensorType::Blob => "Blob".to_string(), - } +// Implement Display for SensorType +impl std::fmt::Display for SensorType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let string = match self { + SensorType::Integer => "Integer", + SensorType::Numeric => "Numeric", + SensorType::Float => "Float", + SensorType::String => "String", + SensorType::Boolean => "Boolean", + SensorType::Location => "Location", + SensorType::Json => "JSON", + SensorType::Blob => "Blob", + }; + write!(f, "{}", string) } } diff --git a/src/ingestors/http/crud.rs b/src/ingestors/http/crud.rs index 699a7ce..45059f0 100644 --- a/src/ingestors/http/crud.rs +++ b/src/ingestors/http/crud.rs @@ -1,20 +1,66 @@ +use crate::crud::list_cursor::ListCursor; +use crate::crud::viewmodel::sensor_viewmodel::SensorViewModel; use crate::ingestors::http::app_error::AppError; use crate::ingestors::http::state::HttpServerState; -use axum::extract::State; +use anyhow::Result; +use axum::extract::{Query, State}; use axum::Json; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize)] +pub struct ListSensorsQuery { + pub cursor: Option, + pub limit: Option, +} + +#[derive(Debug, Serialize)] +pub struct ListSensorsResponse { + pub sensors: Vec, + pub cursor: Option, +} /// List all the sensors. #[utoipa::path( get, - path = "/sensors", + path = "/api/v1/sensors", tag = "SensApp", responses( - (status = 200, description = "List of sensors", body = Vec) - ) + (status = 200, description = "List of sensors", body = Vec) + ), + params( + ("cursor" = Option, Query, description = "Cursor to start listing from"), + ("limit" = Option, Query, description = "Limit the number of sensors to return, 1000 by default", maximum = 100_000, minimum = 1), + ), )] pub async fn list_sensors( State(state): State, -) -> Result>, AppError> { - let sensors = state.storage.list_sensors().await?; - Ok(Json(sensors)) + Query(query): Query, +) -> Result, AppError> { + let cursor = query + .cursor + .map(|cursor| ListCursor::parse(&cursor)) + .unwrap_or_else(|| Ok(ListCursor::default())) + .map_err(AppError::BadRequest)?; + + let limit = query.limit.unwrap_or(1000); + if limit == 0 || limit > 100_000 { + return Err(AppError::BadRequest(anyhow::anyhow!( + "Limit must be between 1 and 100,000" + ))); + } + + let (sensors, next_cursor) = + state + .storage + .list_sensors(cursor, limit) + .await + .map_err(|error| { + eprintln!("Failed to list sensors: {:?}", error); + AppError::InternalServerError(error) + })?; + + Ok(Json(ListSensorsResponse { + sensors, + cursor: next_cursor.map(|cursor| cursor.to_string()), + })) } diff --git a/src/main.rs b/src/main.rs index 263f5ae..6d2897d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,7 @@ use tracing::event; use tracing::Level; mod bus; mod config; +mod crud; mod datamodel; mod importers; mod infer; diff --git a/src/storage/bigquery/mod.rs b/src/storage/bigquery/mod.rs index d0553e8..5b8ba1f 100644 --- a/src/storage/bigquery/mod.rs +++ b/src/storage/bigquery/mod.rs @@ -1,4 +1,7 @@ -use crate::storage::storage::StorageInstance; +use crate::{ + crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel}, + storage::storage::StorageInstance, +}; use anyhow::{bail, Result}; use async_trait::async_trait; use bigquery_publishers::{ @@ -233,7 +236,11 @@ impl StorageInstance for BigQueryStorage { Ok(()) } - async fn list_sensors(&self) -> Result> { + async fn list_sensors( + &self, + cursor: ListCursor, + limit: usize, + ) -> Result<(Vec, Option)> { unimplemented!(); } } diff --git a/src/storage/duckdb/duckdb_crud.rs b/src/storage/duckdb/duckdb_crud.rs new file mode 100644 index 0000000..002fb42 --- /dev/null +++ b/src/storage/duckdb/duckdb_crud.rs @@ -0,0 +1,84 @@ +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, +}; + +use anyhow::Result; +use duckdb::{params, CachedStatement, Connection}; +use tokio::sync::Mutex; + +use crate::{ + crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel}, + datamodel::{sensapp_datetime::SensAppDateTimeExt, SensAppDateTime}, +}; + +pub async fn list_sensors( + connection: Arc>, + cursor: ListCursor, + limit: usize, +) -> Result<(Vec, Option)> { + // We fetch the limit + 1 to know if there is a next page + let query_limit = limit as i64 + 1; + + let connection = connection.lock().await; + + let cursor_next_created_at = cursor.next_created_at.parse::()?; + let cursor_next_datetime = + SensAppDateTime::from_unix_microseconds_i64(cursor_next_created_at).to_rfc3339(); + + // cast uuid to varchar, because casting parameter to uuid + // fails with following error: + // Assertion failed: (GetType() == other.GetType()), function Copy, file base_statistics.cpp, line 220. + let mut select_stmt: CachedStatement = connection.prepare_cached( + r#" + SELECT uuid, name, created_at, type, unit, labels + FROM sensor_labels_view + WHERE (created_at, CAST(uuid AS VARCHAR)) >= (CAST(? AS TIMESTAMP), ?) + ORDER BY created_at ASC, uuid ASC + LIMIT ? + "#, + )?; + + let mut rows = select_stmt + .query(params![cursor_next_datetime, cursor.next_uuid, query_limit]) + .map_err(|e| { + println!("Failed to execute query: {}", e); + e + })?; + + let mut sensors_views: Vec = Vec::with_capacity(limit); + let mut cursor: Option = None; + + while let Some(row) = rows.next()? { + let uuid_string: String = row.get(0)?; + let uuid = uuid::Uuid::parse_str(&uuid_string)?; + let created_at: i64 = row.get(2)?; + + // If we reached the limit, we use the value as a cursor + if sensors_views.len() == limit { + cursor = Some(ListCursor::new(created_at.to_string(), uuid.to_string())); + break; + } + + let name: String = row.get(1)?; + let sensor_type: String = row.get(3)?; + let unit: Option = row.get(4)?; + let labels_string: String = row.get(5)?; + let labels: BTreeMap = serde_json::from_str(&labels_string)?; + + let created_at_datetime = SensAppDateTime::from_unix_microseconds_i64(created_at); + let created_at_rfc3339 = created_at_datetime.to_rfc3339(); + + let sensor_view_model = SensorViewModel { + uuid, + name, + created_at: Some(created_at_rfc3339), + sensor_type, + unit, + labels, + }; + sensors_views.push(sensor_view_model); + } + + Ok((sensors_views, cursor)) +} diff --git a/src/storage/duckdb/migrations/20240223133248_init.sql b/src/storage/duckdb/migrations/20240223133248_init.sql index 72a2b5e..a11c115 100644 --- a/src/storage/duckdb/migrations/20240223133248_init.sql +++ b/src/storage/duckdb/migrations/20240223133248_init.sql @@ -43,7 +43,10 @@ CREATE TABLE IF NOT EXISTS labels ( PRIMARY KEY (sensor_id, name), FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id), FOREIGN KEY (name) REFERENCES labels_name_dictionary(id), - FOREIGN KEY (description) REFERENCES labels_description_dictionary(id) + FOREIGN KEY (description) REFERENCES labels_description_dictionary(id), + + -- Unique constraint on (sensor_id, name) + UNIQUE (sensor_id, name) ); -- Create the 'strings_values_dictionary' table @@ -124,3 +127,15 @@ CREATE INDEX IF NOT EXISTS idx_units_name ON units (name); CREATE INDEX IF NOT EXISTS idx_labels_name_dictionary_name ON labels_name_dictionary (name); CREATE INDEX IF NOT EXISTS idx_labels_description_dictionary_description ON labels_description_dictionary (description); CREATE INDEX IF NOT EXISTS idx_strings_values_dictionary_value ON strings_values_dictionary (value); + +CREATE VIEW IF NOT EXISTS sensor_labels_view AS +SELECT sensors.uuid, sensors.created_at, sensors."name", type, units.name as unit, json_group_object( + labels_name_dictionary."name",labels_description_dictionary."description" +) AS labels +FROM sensors +LEFT JOIN units on sensors.unit = units.id +LEFT JOIN Labels on sensors.sensor_id = labels.sensor_id +LEFT JOIN labels_name_dictionary on labels."name" = labels_name_dictionary."id" +LEFT JOIN labels_description_dictionary on labels.description = labels_description_dictionary.id +GROUP BY sensors."sensor_id", sensors.uuid, sensors.created_at, sensors."name", type, units.name +ORDER BY sensors.created_at ASC, sensors.uuid ASC; diff --git a/src/storage/duckdb/mod.rs b/src/storage/duckdb/mod.rs index fd4241e..372ee42 100644 --- a/src/storage/duckdb/mod.rs +++ b/src/storage/duckdb/mod.rs @@ -1,9 +1,12 @@ +use crate::crud::list_cursor::ListCursor; +use crate::crud::viewmodel::sensor_viewmodel::SensorViewModel; use crate::datamodel::batch::{Batch, SingleSensorBatch}; use crate::datamodel::TypedSamples; use anyhow::{bail, Context, Result}; use async_broadcast::Sender; use async_trait::async_trait; use duckdb::Connection; +use duckdb_crud::list_sensors; use duckdb_publishers::*; use duckdb_utilities::get_sensor_id_or_create_sensor; use std::sync::Arc; @@ -14,6 +17,7 @@ use tokio::time::timeout; use super::storage::StorageInstance; +mod duckdb_crud; mod duckdb_publishers; mod duckdb_utilities; @@ -92,8 +96,12 @@ impl StorageInstance for DuckDBStorage { Ok(()) } - async fn list_sensors(&self) -> Result> { - unimplemented!(); + async fn list_sensors( + &self, + cursor: ListCursor, + limit: usize, + ) -> Result<(Vec, Option)> { + list_sensors(self.connection.clone(), cursor, limit).await } } diff --git a/src/storage/postgresql/migrations/20240110123122_init.sql b/src/storage/postgresql/migrations/20240110123122_init.sql index a58643a..b8b04b7 100644 --- a/src/storage/postgresql/migrations/20240110123122_init.sql +++ b/src/storage/postgresql/migrations/20240110123122_init.sql @@ -37,7 +37,10 @@ CREATE TABLE labels ( PRIMARY KEY (sensor_id, name), FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id), FOREIGN KEY (name) REFERENCES labels_name_dictionary(id), - FOREIGN KEY (description) REFERENCES labels_description_dictionary(id) + FOREIGN KEY (description) REFERENCES labels_description_dictionary(id), + + -- Unique constraint on (sensor_id, name) + UNIQUE (sensor_id, name) ); -- Create the 'strings_values_dictionary' table @@ -128,3 +131,15 @@ CREATE INDEX index_boolean_values ON boolean_values USING brin (sensor_id, times CREATE INDEX index_location_values ON location_values USING brin (sensor_id, timestamp_ms) WITH (pages_per_range = 32); CREATE INDEX index_json_values ON json_values USING brin (sensor_id, timestamp_ms) WITH (pages_per_range = 32); CREATE INDEX index_blob_values ON blob_values USING brin (sensor_id, timestamp_ms) WITH (pages_per_range = 32); + +CREATE VIEW sensor_labels_view AS +SELECT sensors.uuid, sensors.created_at, sensors."name", type, units.name as unit, jsonb_object_agg( + labels_name_dictionary."name",labels_description_dictionary."description" +) AS labels +FROM sensors +LEFT JOIN units on sensors.unit = units.id +LEFT JOIN Labels on sensors.sensor_id = labels.sensor_id +LEFT JOIN labels_name_dictionary on labels."name" = labels_name_dictionary."id" +LEFT JOIN labels_description_dictionary on labels.description = labels_description_dictionary.id +GROUP BY sensors."sensor_id", sensors.uuid, sensors.created_at, sensors."name", type, units.name +ORDER BY sensors.created_at ASC, sensors.uuid ASC; diff --git a/src/storage/postgresql/mod.rs b/src/storage/postgresql/mod.rs index 812664a..495fdbe 100644 --- a/src/storage/postgresql/mod.rs +++ b/src/storage/postgresql/mod.rs @@ -1,4 +1,5 @@ pub mod postgresql; +pub mod postgresql_crud; pub mod postgresql_publishers; pub mod postgresql_utilities; diff --git a/src/storage/postgresql/postgresql.rs b/src/storage/postgresql/postgresql.rs index 09f090c..fe26842 100644 --- a/src/storage/postgresql/postgresql.rs +++ b/src/storage/postgresql/postgresql.rs @@ -1,11 +1,15 @@ +use super::postgresql_crud::list_sensors; use super::{ super::storage::StorageInstance, postgresql_publishers::*, postgresql_utilities::get_sensor_id_or_create_sensor, }; +use crate::crud::list_cursor::ListCursor; +use crate::crud::viewmodel::sensor_viewmodel::SensorViewModel; use crate::datamodel::{batch::Batch, TypedSamples}; use anyhow::{Context, Result}; use async_broadcast::Sender; use async_trait::async_trait; +use sqlx::postgres::PgPoolOptions; use sqlx::{postgres::PgConnectOptions, PgPool}; use std::time::Duration; use std::{str::FromStr, sync::Arc}; @@ -21,10 +25,24 @@ impl PostgresStorage { let connect_options = PgConnectOptions::from_str(connection_string) .context("Failed to create postgres connection options")?; - let pool = PgPool::connect_with(connect_options) + let pool = PgPoolOptions::new() + .after_connect(|connection, _metadata| { + Box::pin(async move { + use sqlx::Executor; + //connection.execute("set time zone UTC;").await?; + println!("aaa fuck"); + connection.execute("set time zone UTC;").await?; + Ok(()) + }) + }) + .connect_with(connect_options) .await .context("Failed to create postgres pool")?; + /*let pool = PgPool::connect_with(connect_options) + .await + .context("Failed to create postgres pool")?;*/ + Ok(Self { pool }) } } @@ -64,8 +82,12 @@ impl StorageInstance for PostgresStorage { Ok(()) } - async fn list_sensors(&self) -> Result> { - unimplemented!(); + async fn list_sensors( + &self, + cursor: ListCursor, + limit: usize, + ) -> Result<(Vec, Option)> { + list_sensors(&self.pool, cursor, limit).await } } diff --git a/src/storage/postgresql/postgresql_crud.rs b/src/storage/postgresql/postgresql_crud.rs new file mode 100644 index 0000000..ca97614 --- /dev/null +++ b/src/storage/postgresql/postgresql_crud.rs @@ -0,0 +1,89 @@ +use std::collections::BTreeMap; + +use anyhow::anyhow; +use anyhow::Result; +use sqlx::types::time::PrimitiveDateTime; +use sqlx::PgPool; +use sqlx::Row; + +use crate::crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel}; + +pub async fn list_sensors( + pool: &PgPool, + cursor: ListCursor, + limit: usize, +) -> Result<(Vec, Option)> { + // We fetch the limit + 1 to know if there is a next page + let query_limit = limit as i64 + 1; + + let cursor_next_created_at_timestamp = cursor.next_created_at.parse::()?; + let cursor_next_created_at_offset_datetime = + ::time::OffsetDateTime::from_unix_timestamp_nanos(cursor_next_created_at_timestamp)?; + let cursor_uuid = uuid::Uuid::parse_str(&cursor.next_uuid)?; + + let query = sqlx::query( + r#" + SELECT uuid, name, created_at, type, unit, labels + FROM sensor_labels_view + WHERE (created_at, uuid) >= ($1, $2) + ORDER BY created_at ASC, uuid ASC + LIMIT $3 + "#, + ) + .bind(cursor_next_created_at_offset_datetime) + .bind(cursor_uuid) + .bind(query_limit); + + let mut connection = pool.acquire().await?; + let mut records = query.fetch_all(&mut *connection).await?; + + // check if there is a next page + let next_cursor = if records.len() == limit + 1 { + let last = records.pop().unwrap(); + let last_created_at_datetime: PrimitiveDateTime = last.get("created_at"); + let last_created_at_timestamp: i128 = + last_created_at_datetime.assume_utc().unix_timestamp_nanos(); + let last_uuid: uuid::Uuid = last.get("uuid"); + Some(ListCursor::new( + last_created_at_timestamp.to_string(), + last_uuid.to_string(), + )) + } else { + None + }; + + let sensors_views = records + .into_iter() + .map(|record| { + let uuid: uuid::Uuid = record.get("uuid"); + let labels_json: serde_json::Value = record.get("labels"); + // labels is a json object that need to be parsed and transformed into a Map + let labels: BTreeMap = labels_json + .as_object() + .ok_or_else(|| anyhow!("labels_json is not an object"))? + .into_iter() + .map(|(k, v)| (k.to_string(), v.as_str().unwrap_or("").to_string())) + .collect(); + let name: String = record.get("name"); + let created_at_datetime: PrimitiveDateTime = record.get("created_at"); + + let created_at_rfc3339: String = created_at_datetime + .assume_utc() + .format(&::time::format_description::well_known::Rfc3339)?; + + let sensor_type: String = record.get("type"); + let unit: Option = record.get("unit"); + + Ok(SensorViewModel { + uuid, + name, + created_at: Some(created_at_rfc3339), + sensor_type, + unit, + labels, + }) + }) + .collect::>>()?; + + Ok((sensors_views, next_cursor)) +} diff --git a/src/storage/rrdcached/mod.rs b/src/storage/rrdcached/mod.rs index 9089f9e..935a8f4 100644 --- a/src/storage/rrdcached/mod.rs +++ b/src/storage/rrdcached/mod.rs @@ -1,4 +1,5 @@ use crate::{ + crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel}, datamodel::{Sensor, SensorType, TypedSamples}, storage::storage::StorageInstance, }; @@ -277,6 +278,7 @@ impl StorageInstance for RrdCachedStorage { .collect::>(); } if !sensors_to_create.is_empty() { + eprintln!("Creating sensors with min timestamp: {}", min_timestamp); self.create_sensors(&sensors_to_create, min_timestamp as u64 - 10) .await?; } @@ -287,14 +289,11 @@ impl StorageInstance for RrdCachedStorage { Ok(_) => {} Err(e) => { println!("Failed to batch update: {:?}", e); - match e { - RRDCachedClientError::BatchUpdateErrorResponse(string, errors) => { - println!("Batch update error response: {:?}", string); - for error in errors { - println!("Batch update error: {:?}", error); - } + if let RRDCachedClientError::BatchUpdateErrorResponse(string, errors) = e { + eprintln!("Batch update error response: {:?}", string); + for error in errors { + eprintln!("Batch update error: {:?}", error); } - _ => {} } } } @@ -323,7 +322,11 @@ impl StorageInstance for RrdCachedStorage { Ok(()) } - async fn list_sensors(&self) -> Result> { - unimplemented!(); + async fn list_sensors( + &self, + _cursor: ListCursor, + _limit: usize, + ) -> Result<(Vec, Option)> { + Err(anyhow::anyhow!("rrdcached doesn't support listing sensors")) } } diff --git a/src/storage/sqlite/migrations/20240110093153_init.sql b/src/storage/sqlite/migrations/20240110093153_init.sql index c600ee2..5cb22cf 100644 --- a/src/storage/sqlite/migrations/20240110093153_init.sql +++ b/src/storage/sqlite/migrations/20240110093153_init.sql @@ -22,9 +22,10 @@ CREATE TABLE labels ( name INTEGER NOT NULL, -- ID for the name in the dictionary, cannot be null description INTEGER, -- ID for the description in the dictionary (optional) PRIMARY KEY (sensor_id, name), - FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id) -- Foreign key to 'sensors' table - FOREIGN KEY (name) REFERENCES labels_name_dictionary(id) -- Foreign key to 'labels_name_dictionary' - FOREIGN KEY (description) REFERENCES labels_description_dictionary(id) -- Foreign key to 'labels_description_dictionary' + FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id), -- Foreign key to 'sensors' table + FOREIGN KEY (name) REFERENCES labels_name_dictionary(id), -- Foreign key to 'labels_name_dictionary' + FOREIGN KEY (description) REFERENCES labels_description_dictionary(id), -- Foreign key to 'labels_description_dictionary' + UNIQUE (sensor_id, name) ) STRICT; -- Create the 'labels_name_dictionary' table @@ -127,3 +128,15 @@ CREATE INDEX index_boolean_values ON boolean_values(sensor_id, timestamp_ms); CREATE INDEX index_location_values ON location_values(sensor_id, timestamp_ms); CREATE INDEX index_json_values ON json_values(sensor_id, timestamp_ms); CREATE INDEX index_blob_values ON blob_values(sensor_id, timestamp_ms); + +CREATE VIEW sensor_labels_view AS +SELECT sensors.uuid, sensors.created_at, sensors."name", type, units.name as unit, json_group_object( + labels_name_dictionary."name",labels_description_dictionary."description" +) AS labels +FROM sensors +LEFT JOIN units on sensors.unit = units.id +LEFT JOIN Labels on sensors.sensor_id = labels.sensor_id +LEFT JOIN labels_name_dictionary on labels."name" = labels_name_dictionary."id" +LEFT JOIN labels_description_dictionary on labels.description = labels_description_dictionary.id +GROUP BY sensors."sensor_id", sensors.uuid, sensors.created_at, sensors."name", type, units.name +ORDER BY sensors.created_at ASC, sensors.uuid ASC; diff --git a/src/storage/sqlite/mod.rs b/src/storage/sqlite/mod.rs index d2ce26a..5db68af 100644 --- a/src/storage/sqlite/mod.rs +++ b/src/storage/sqlite/mod.rs @@ -1,4 +1,5 @@ pub mod sqlite; +pub mod sqlite_crud; pub mod sqlite_publishers; pub mod sqlite_utilities; diff --git a/src/storage/sqlite/sqlite.rs b/src/storage/sqlite/sqlite.rs index f89e058..7c8e0e9 100644 --- a/src/storage/sqlite/sqlite.rs +++ b/src/storage/sqlite/sqlite.rs @@ -1,5 +1,8 @@ +use super::sqlite_crud::list_sensors; use super::sqlite_publishers::*; use super::sqlite_utilities::get_sensor_id_or_create_sensor; +use crate::crud::list_cursor::ListCursor; +use crate::crud::viewmodel::sensor_viewmodel::SensorViewModel; use crate::datamodel::batch::{Batch, SingleSensorBatch}; use crate::datamodel::TypedSamples; use crate::storage::storage::StorageInstance; @@ -78,8 +81,8 @@ impl StorageInstance for SqliteStorage { Ok(()) } - async fn list_sensors(&self) -> Result> { - unimplemented!(); + async fn list_sensors(&self, cursor: ListCursor, limit: usize) -> Result<(Vec, Option)> { + list_sensors(&self.pool, cursor, limit).await } } diff --git a/src/storage/sqlite/sqlite_crud.rs b/src/storage/sqlite/sqlite_crud.rs new file mode 100644 index 0000000..8167470 --- /dev/null +++ b/src/storage/sqlite/sqlite_crud.rs @@ -0,0 +1,58 @@ +use std::collections::BTreeMap; + +use anyhow::Result; +use sqlx::SqlitePool; + +use crate::crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel}; + +pub async fn list_sensors( + pool: &SqlitePool, + cursor: ListCursor, + limit: usize, +) -> Result<(Vec, Option)> { + // We fetch the limit + 1 to know if there is a next page + let query_limit = limit as i64 + 1; + let query = sqlx::query!( + r#" + SELECT uuid, name, created_at, type, unit, labels + FROM sensor_labels_view + WHERE (created_at, uuid) >= (?, ?) + ORDER BY created_at ASC, uuid ASC + LIMIT ? + "#, + cursor.next_created_at, + cursor.next_uuid, + query_limit, + ); + + let mut connection = pool.acquire().await?; + let mut records = query.fetch_all(&mut *connection).await?; + + // check if there is a next page + let next_cursor = if records.len() == limit + 1 { + let last = records.pop().unwrap(); + let last_created_at = last.created_at.to_string(); + Some(ListCursor::new(last_created_at, last.uuid)) + } else { + None + }; + + let sensors_views = records + .into_iter() + .map(|record| { + // labels is a json object that need to be parsed and transformed into a HashMap + let labels: BTreeMap = serde_json::from_str(&record.labels)?; + Ok(SensorViewModel { + uuid: uuid::Uuid::parse_str(&record.uuid)?, + name: record.name, + // TODO: parse created_at + created_at: Some(record.created_at.to_string()), + sensor_type: record.r#type, + unit: record.unit, + labels, + }) + }) + .collect::>>()?; + + Ok((sensors_views, next_cursor)) +} diff --git a/src/storage/storage.rs b/src/storage/storage.rs index 5d801c4..f366365 100644 --- a/src/storage/storage.rs +++ b/src/storage/storage.rs @@ -2,6 +2,8 @@ use anyhow::Result; use async_trait::async_trait; use std::fmt::Debug; +use crate::crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel}; + #[async_trait] pub trait StorageInstance: Send + Sync + Debug { async fn create_or_migrate(&self) -> Result<()>; @@ -13,5 +15,9 @@ pub trait StorageInstance: Send + Sync + Debug { async fn sync(&self, sync_sender: async_broadcast::Sender<()>) -> Result<()>; async fn vacuum(&self) -> Result<()>; - async fn list_sensors(&self) -> Result>; + async fn list_sensors( + &self, + cursor: ListCursor, + limit: usize, + ) -> Result<(Vec, Option)>; } diff --git a/src/storage/timescaledb/migrations/20240223133248_init.sql b/src/storage/timescaledb/migrations/20240223133248_init.sql index 0dfb93f..9e7e5e2 100644 --- a/src/storage/timescaledb/migrations/20240223133248_init.sql +++ b/src/storage/timescaledb/migrations/20240223133248_init.sql @@ -37,7 +37,8 @@ CREATE TABLE labels ( PRIMARY KEY (sensor_id, name), FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id), FOREIGN KEY (name) REFERENCES labels_name_dictionary(id), - FOREIGN KEY (description) REFERENCES labels_description_dictionary(id) + FOREIGN KEY (description) REFERENCES labels_description_dictionary(id), + UNIQUE (sensor_id, name) ); -- Create the 'strings_values_dictionary' table @@ -193,3 +194,15 @@ ALTER TABLE blob_values SET ( ); SELECT add_compression_policy('blob_values', INTERVAL '7 days'); SELECT add_dimension('blob_values', by_hash('sensor_id', 2)); + +CREATE VIEW sensor_labels_view AS +SELECT sensors.uuid, sensors.created_at, sensors."name", type, units.name as unit, jsonb_object_agg( + labels_name_dictionary."name",labels_description_dictionary."description" +) AS labels +FROM sensors +LEFT JOIN units on sensors.unit = units.id +LEFT JOIN Labels on sensors.sensor_id = labels.sensor_id +LEFT JOIN labels_name_dictionary on labels."name" = labels_name_dictionary."id" +LEFT JOIN labels_description_dictionary on labels.description = labels_description_dictionary.id +GROUP BY sensors."sensor_id", sensors.uuid, sensors.created_at, sensors."name", type, units.name +ORDER BY sensors.created_at ASC, sensors.uuid ASC; diff --git a/src/storage/timescaledb/timescaledb.rs b/src/storage/timescaledb/timescaledb.rs index 32c22b2..2d63bbf 100644 --- a/src/storage/timescaledb/timescaledb.rs +++ b/src/storage/timescaledb/timescaledb.rs @@ -2,6 +2,8 @@ use super::{ super::storage::StorageInstance, timescaledb_publishers::*, timescaledb_utilities::get_sensor_id_or_create_sensor, }; +use crate::crud::list_cursor::ListCursor; +use crate::crud::viewmodel::sensor_viewmodel::SensorViewModel; use crate::datamodel::{batch::Batch, TypedSamples}; use anyhow::{Context, Result}; use async_broadcast::Sender; @@ -64,8 +66,12 @@ impl StorageInstance for TimeScaleDBStorage { Ok(()) } - async fn list_sensors(&self) -> Result> { - unimplemented!(); + async fn list_sensors( + &self, + cursor: ListCursor, + limit: usize, + ) -> Result<(Vec, Option)> { + super::super::postgresql::postgresql_crud::list_sensors(&self.pool, cursor, limit).await } }