Skip to content

Commit

Permalink
feat: 🌈 Work in progress
Browse files Browse the repository at this point in the history
And RRDcached is there…
  • Loading branch information
fungiboletus committed Aug 13, 2024
1 parent 999e54a commit 6669308
Show file tree
Hide file tree
Showing 22 changed files with 730 additions and 329 deletions.
357 changes: 143 additions & 214 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 10 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ config = "0.14"
serde = "1.0"
confique = "0.2"
byte-unit = "5.1"
prost = "0.12"
#prost = "0.12"
prost = "0.13"
snap = "1.1"
hex = "0.4"
blake3 = "1.5"
Expand All @@ -76,11 +77,17 @@ rand = "0.8"
#protobuf = "3.0.2"
lapin = "2.3"
futures-lite = "2.3"
gcp-bigquery-client = { git = "https://github.com/lquerel/gcp-bigquery-client.git", rev = "107a5557df6336933f6b0bcf330aa91fe6ca866a" }
#gcp-bigquery-client = { git = "https://github.com/lquerel/gcp-bigquery-client.git", rev = "107a5557df6336933f6b0bcf330aa91fe6ca866a" }
gcp-bigquery-client = { version = "0.23.0" }
#prost = { version = "0.12", features = ["derive"] }
#enum_delegate = "0.2"
sinteflake = { version = "0.1", features = ["async"] }
tonic = "0.11"
#tonic = "0.11"
tonic = "0.12"
bigdecimal = "0.4.5"
big-decimal-byte-string-encoder = "0.1.0"
clru = "0.6"
utoipa = { version = "4.2", features = ["axum_extras"] }
utoipa-scalar = { version = "0.1", features = ["axum"] }
rrdcached-client = "0.1"
rustls = "0.23"
2 changes: 1 addition & 1 deletion src/datamodel/sensapp_vec.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use smallvec::SmallVec;

pub type SensAppVec<T> = SmallVec<[T; 1]>;
pub type SensAppVec<T> = SmallVec<[T; 4]>;

pub type SensAppLabels = SmallVec<[(String, String); 8]>;
20 changes: 12 additions & 8 deletions src/datamodel/sensor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use anyhow::{anyhow, Error};
use cached::proc_macro::cached;
use once_cell::sync::OnceCell;
use smallvec::SmallVec;
use std::fmt;
use std::sync::Arc;
use uuid::Uuid;

Expand All @@ -17,20 +18,23 @@ pub struct Sensor {
pub labels: SensAppLabels,
}

impl ToString for Sensor {
fn to_string(&self) -> String {
let mut s = format!(
impl fmt::Display for Sensor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Sensor {{ uuid: {}, name: {}, sensor_type: {:?}",
self.uuid, self.name, self.sensor_type
);
)?;

if let Some(unit) = &self.unit {
s.push_str(&format!(", unit: {}", unit));
write!(f, ", unit: {}", unit)?;
}

if !self.labels.is_empty() {
s.push_str(&format!(", labels: {:?}", self.labels));
write!(f, ", labels: {:?}", self.labels)?;
}
s.push_str(" }");
s

write!(f, " }}")
}
}

Expand Down
20 changes: 20 additions & 0 deletions src/ingestors/http/crud.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use crate::ingestors::http::app_error::AppError;
use crate::ingestors::http::state::HttpServerState;
use axum::extract::State;
use axum::Json;

/// List all the sensors.
#[utoipa::path(
get,
path = "/sensors",
tag = "SensApp",
responses(
(status = 200, description = "List of sensors", body = Vec<String>)
)
)]
pub async fn list_sensors(
State(state): State<HttpServerState>,
) -> Result<Json<Vec<String>>, AppError> {
let sensors = state.storage.list_sensors().await?;
Ok(Json(sensors))
}
28 changes: 28 additions & 0 deletions src/ingestors/http/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,32 @@ impl FromStr for Precision {
}
}

/// InfluxDB Compatible Write API.
///
/// Allows you to write data from InfluxDB or Telegraf to SensApp.
/// [More information.](https://github.com/SINTEF/sensapp/blob/main/docs/INFLUX_DB.md)
#[utoipa::path(
post,
path = "/api/v2/write",
tag = "InfluxDB",
request_body(
content = String,
content_type = "text/plain",
description = "InfluxDB Line Protocol endpoint. [Reference](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/).",
example = "cpu,host=A,region=west usage_system=64.2 1590488773254420000"
),
params(
("bucket" = String, Query, description = "Bucket name", example = "sensapp"),
("org" = Option<String>, Query, description = "Organization name", example = "sensapp"),
("org_id" = Option<String>, Query, description = "Organization ID"),
("precision" = Option<String>, Query, description = "Precision of the timestamps. One of ns, us, ms, s"),
),
responses(
(status = 204, description = "No Content"),
(status = 400, description = "Bad Request", body = AppError),
(status = 500, description = "Internal Server Error", body = AppError),
)
)]
#[debug_handler]
pub async fn publish_influxdb(
State(state): State<HttpServerState>,
Expand Down Expand Up @@ -258,6 +284,7 @@ pub async fn publish_influxdb(
#[cfg(test)]
mod tests {
use crate::bus::{self, message};
use crate::storage::sqlite::SqliteStorage;

use super::*;
use flate2::write::GzEncoder;
Expand Down Expand Up @@ -318,6 +345,7 @@ mod tests {
let state = State(HttpServerState {
name: Arc::new("influxdb test".to_string()),
event_bus: event_bus.clone(),
storage: Arc::new(SqliteStorage::connect("sqlite::memory:").await.unwrap()),
});
let headers = HeaderMap::new();
let query = Query(InfluxDBQueryParams {
Expand Down
1 change: 1 addition & 0 deletions src/ingestors/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod app_error;
pub mod crud;
pub mod influxdb;
pub mod prometheus;
pub mod server;
Expand Down
25 changes: 25 additions & 0 deletions src/ingestors/http/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,31 @@ fn verify_headers(headers: &HeaderMap) -> Result<(), AppError> {
Ok(())
}

/// Prometheus Remote Write API.
///
/// Allows you to write data from Prometheus to SensApp.
///
/// It follows the [Prometheus Remote Write specification](https://prometheus.io/docs/concepts/remote_write_spec/).
#[utoipa::path(
post,
path = "/api/v1/prometheus_remote_write",
tag = "Prometheus",
request_body(
content = Bytes,
content_type = "application/x-protobuf",
description = "Prometheus Remote Write endpoint. [Reference](https://prometheus.io/docs/concepts/remote_write_spec/)",
),
params(
("content-encoding" = String, Header, format = "snappy", description = "Content encoding, must be snappy"),
("content-type" = String, Header, format = "application/x-protobuf", description = "Content type, must be application/x-protobuf"),
("x-prometheus-remote-write-version" = String, Header, format = "0.1.0", description = "Prometheus Remote Write version, must be 0.1.0"),
),
responses(
(status = 204, description = "No Content"),
(status = 400, description = "Bad Request", body = AppError),
(status = 500, description = "Internal Server Error", body = AppError),
)
)]
#[debug_handler]
pub async fn publish_prometheus(
State(state): State<HttpServerState>,
Expand Down
49 changes: 45 additions & 4 deletions src/ingestors/http/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::app_error::AppError;
use super::crud::list_sensors;
use super::influxdb::publish_influxdb;
use super::prometheus::publish_prometheus;
use super::state::HttpServerState;
Expand All @@ -9,6 +10,9 @@ use axum::extract::DefaultBodyLimit;
use axum::extract::Request;
//use axum::extract::Multipart;
//use axum::extract::Path;
use crate::ingestors::http::crud::__path_list_sensors;
use crate::ingestors::http::influxdb::__path_publish_influxdb;
use crate::ingestors::http::prometheus::__path_publish_prometheus;
use axum::extract::State;
use axum::http::header;
use axum::http::StatusCode;
Expand All @@ -29,6 +33,19 @@ use tower::ServiceBuilder;
use tower_http::trace;
use tower_http::{timeout::TimeoutLayer, trace::TraceLayer, ServiceBuilderExt};
use tracing::Level;
use utoipa::OpenApi;
use utoipa_scalar::{Scalar, Servable as ScalarServable};

#[derive(OpenApi)]
#[openapi(
tags(
(name = "SensApp", description = "SensApp API"),
(name = "InfluxDB", description = "InfluxDB Write API"),
(name = "Prometheus", description = "Prometheus Remote Write API"),
),
paths(frontpage, list_sensors, publish_influxdb, publish_prometheus),
)]
struct ApiDoc;

pub async fn run_http_server(state: HttpServerState, address: SocketAddr) -> Result<()> {
let config = config::get()?;
Expand Down Expand Up @@ -60,7 +77,9 @@ pub async fn run_http_server(state: HttpServerState, address: SocketAddr) -> Res

// Create our application with a single route
let app = Router::new()
.route("/", get(handler))
.route("/", get(frontpage))
//.route("/api-docs/openapi.json", get(openapi))
.merge(Scalar::with_url("/docs", ApiDoc::openapi()))
.route(
"/publish",
post(publish_handler).layer(max_body_layer.clone()),
Expand All @@ -73,6 +92,8 @@ pub async fn run_http_server(state: HttpServerState, address: SocketAddr) -> Res
"/sensors/:sensor_name_or_uuid/publish_multipart",
post(publish_multipart).layer(max_body_layer.clone()),
)
// Boring Sensor CRUD
.route("/sensors", get(list_sensors))
// InfluxDB Write API
.route(
"/api/v2/write",
Expand Down Expand Up @@ -102,11 +123,30 @@ async fn shutdown_signal() {
.expect("failed to install shutdown CTRL+C signal handler");
}

async fn handler(State(state): State<HttpServerState>) -> Result<Json<String>, AppError> {
#[utoipa::path(
get,
path = "/",
tag = "SensApp",
responses(
(status = 200, description = "SensApp Frontpage", body = String)
)
)]
async fn frontpage(State(state): State<HttpServerState>) -> Result<Json<String>, AppError> {
let name: String = (*state.name).clone();
Ok(Json(name))
}

// #[utoipa::path(
// get,
// path = "/api-docs/openapi.json",
// responses(
// (status = 200, description = "OpenAPI JSON", body = ApiDoc)
// )
// )]
// async fn openapi() -> Json<utoipa::openapi::OpenApi> {
// Json(ApiDoc::openapi())
// }

async fn publish_csv(
State(state): State<HttpServerState>,
//Path(sensor_name_or_uuid): Path<String>,
Expand Down Expand Up @@ -162,15 +202,16 @@ mod tests {
use tower::ServiceExt;

use super::*;
use crate::bus::EventBus;
use crate::{bus::EventBus, storage::sqlite::SqliteStorage};

#[tokio::test]
async fn test_handler() {
let state = HttpServerState {
name: Arc::new("hello world".to_string()),
event_bus: Arc::new(EventBus::init("test".to_string())),
storage: Arc::new(SqliteStorage::connect("sqlite::memory:").await.unwrap()),
};
let app = Router::new().route("/", get(handler)).with_state(state);
let app = Router::new().route("/", get(frontpage)).with_state(state);
let request = Request::builder().uri("/").body(Body::empty()).unwrap();

let response = app.oneshot(request).await.unwrap();
Expand Down
3 changes: 2 additions & 1 deletion src/ingestors/http/state.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::bus::EventBus;
use crate::{bus::EventBus, storage::storage::StorageInstance};
use std::sync::Arc;

#[derive(Clone, Debug)]
pub struct HttpServerState {
pub name: Arc<String>,
pub event_bus: Arc<EventBus>,
pub storage: Arc<dyn StorageInstance>,
}
4 changes: 2 additions & 2 deletions src/ingestors/mqtt/mqtt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ pub async fn mqtt_client(config: MqttConfig, event_bus: Arc<EventBus>) -> Result
let payload = publish.payload;
println!("Received message on topic: {}", topic);
println!("Payload: {:?}", payload);
let mut geobuf = geobuf::geobuf_pb::Data::new();
/*let mut geobuf = geobuf::geobuf_pb::Data::new();
use protobuf::Message;
geobuf.merge_from_bytes(&payload); //.unwrap();
match geobuf::decode::Decoder::decode(&geobuf).unwrap() {
serde_json::Value::Object(geojson) => {
println!("GeoJSON: {:?}", geojson);
}
_ => {}
}
}*/
}
Ok(_) => {}
Err(e) => {
Expand Down
Loading

0 comments on commit 6669308

Please sign in to comment.