From 6669308802bfc116cc9f3db6c12b403685d42f2a Mon Sep 17 00:00:00 2001 From: Antoine Pultier <45740+fungiboletus@users.noreply.github.com> Date: Tue, 13 Aug 2024 16:27:25 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20=F0=9F=8C=88=20Work=20in=20progress?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit And RRDcached is thereā€¦ --- Cargo.lock | 357 ++++++++++--------------- Cargo.toml | 13 +- src/datamodel/sensapp_vec.rs | 2 +- src/datamodel/sensor.rs | 20 +- src/ingestors/http/crud.rs | 20 ++ src/ingestors/http/influxdb.rs | 28 ++ src/ingestors/http/mod.rs | 1 + src/ingestors/http/prometheus.rs | 25 ++ src/ingestors/http/server.rs | 49 +++- src/ingestors/http/state.rs | 3 +- src/ingestors/mqtt/mqtt_client.rs | 4 +- src/main.rs | 129 +++++---- src/storage/bigquery/mod.rs | 13 + src/storage/duckdb/duckdb_utilities.rs | 2 - src/storage/duckdb/mod.rs | 6 +- src/storage/mod.rs | 1 + src/storage/postgresql/postgresql.rs | 4 + src/storage/rrdcached/mod.rs | 329 +++++++++++++++++++++++ src/storage/sqlite/sqlite.rs | 4 + src/storage/storage.rs | 37 +-- src/storage/storage_factory.rs | 4 +- src/storage/timescaledb/timescaledb.rs | 8 + 22 files changed, 730 insertions(+), 329 deletions(-) create mode 100644 src/ingestors/http/crud.rs create mode 100644 src/storage/rrdcached/mod.rs diff --git a/Cargo.lock b/Cargo.lock index f57b8b9..fb51cd7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -621,9 +621,9 @@ checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" [[package]] name = "async-trait" -version = "0.1.80" +version = "0.1.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" +checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", @@ -713,34 +713,6 @@ dependencies = [ "paste", ] -[[package]] -name = "axum" -version = "0.6.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" -dependencies = [ - "async-trait", - "axum-core 0.3.4", - "bitflags 1.3.2", - "bytes", - "futures-util", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.30", - "itoa", - "matchit", - "memchr", - "mime", - "percent-encoding 2.3.1", - "pin-project-lite", - "rustversion", - "serde", - "sync_wrapper 0.1.2", - "tower", - "tower-layer", - "tower-service", -] - [[package]] name = "axum" version = "0.7.5" @@ -748,14 +720,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" dependencies = [ "async-trait", - "axum-core 0.4.3", + "axum-core", "axum-macros", "bytes", "futures-util", - "http 1.1.0", - "http-body 1.0.0", + "http", + "http-body", "http-body-util", - "hyper 1.3.1", + "hyper", "hyper-util", "itoa", "matchit", @@ -777,23 +749,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "axum-core" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" -dependencies = [ - "async-trait", - "bytes", - "futures-util", - "http 0.2.12", - "http-body 0.4.6", - "mime", - "rustversion", - "tower-layer", - "tower-service", -] - [[package]] name = "axum-core" version = "0.4.3" @@ -803,8 +758,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http 1.1.0", - "http-body 1.0.0", + "http", + "http-body", "http-body-util", "mime", "pin-project-lite", @@ -2165,14 +2120,15 @@ dependencies = [ [[package]] name = "gcp-bigquery-client" -version = "0.22.0" -source = "git+https://github.com/lquerel/gcp-bigquery-client.git?rev=107a5557df6336933f6b0bcf330aa91fe6ca866a#107a5557df6336933f6b0bcf330aa91fe6ca866a" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51480b6aca9d7997b8575b7e8b68441a847235673cdd739ae576bbfc708dbd3d" dependencies = [ "async-stream", "async-trait", "dyn-clone", "flate2", - "hyper 1.3.1", + "hyper", "hyper-util", "log", "prost", @@ -2273,25 +2229,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" -[[package]] -name = "h2" -version = "0.3.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http 0.2.12", - "indexmap 2.2.6", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "h2" version = "0.4.5" @@ -2303,7 +2240,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http 1.1.0", + "http", "indexmap 2.2.6", "slab", "tokio", @@ -2482,17 +2419,6 @@ dependencies = [ "windows", ] -[[package]] -name = "http" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" -dependencies = [ - "bytes", - "fnv", - "itoa", -] - [[package]] name = "http" version = "1.1.0" @@ -2504,17 +2430,6 @@ dependencies = [ "itoa", ] -[[package]] -name = "http-body" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" -dependencies = [ - "bytes", - "http 0.2.12", - "pin-project-lite", -] - [[package]] name = "http-body" version = "1.0.0" @@ -2522,7 +2437,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ "bytes", - "http 1.1.0", + "http", ] [[package]] @@ -2533,8 +2448,8 @@ checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" dependencies = [ "bytes", "futures-util", - "http 1.1.0", - "http-body 1.0.0", + "http", + "http-body", "pin-project-lite", ] @@ -2574,40 +2489,16 @@ dependencies = [ [[package]] name = "hyper" -version = "0.14.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a152ddd61dfaec7273fe8419ab357f33aee0d914c5f4efbf0d96fa749eea5ec9" -dependencies = [ - "bytes", - "futures-channel", - "futures-core", - "futures-util", - "h2 0.3.26", - "http 0.2.12", - "http-body 0.4.6", - "httparse", - "httpdate", - "itoa", - "pin-project-lite", - "socket2 0.5.7", - "tokio", - "tower-service", - "tracing", - "want", -] - -[[package]] -name = "hyper" -version = "1.3.1" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.5", - "http 1.1.0", - "http-body 1.0.0", + "h2", + "http", + "http-body", "httparse", "httpdate", "itoa", @@ -2624,8 +2515,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" dependencies = [ "futures-util", - "http 1.1.0", - "hyper 1.3.1", + "http", + "hyper", "hyper-util", "rustls 0.23.10", "rustls-native-certs", @@ -2638,14 +2529,15 @@ dependencies = [ [[package]] name = "hyper-timeout" -version = "0.4.1" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" dependencies = [ - "hyper 0.14.30", + "hyper", + "hyper-util", "pin-project-lite", "tokio", - "tokio-io-timeout", + "tower-service", ] [[package]] @@ -2656,7 +2548,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper 1.3.1", + "hyper", "hyper-util", "native-tls", "tokio", @@ -2666,16 +2558,16 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.5" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" +checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" dependencies = [ "bytes", "futures-channel", "futures-util", - "http 1.1.0", - "http-body 1.0.0", - "hyper 1.3.1", + "http", + "http-body", + "hyper", "pin-project-lite", "socket2 0.5.7", "tokio", @@ -2752,6 +2644,7 @@ checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", "hashbrown 0.14.5", + "serde", ] [[package]] @@ -3078,9 +2971,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.21" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "lz4" @@ -3181,13 +3074,14 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.11" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ + "hermit-abi 0.3.9", "libc", "wasi", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -3205,7 +3099,7 @@ dependencies = [ "bytes", "encoding_rs", "futures-util", - "http 1.1.0", + "http", "httparse", "memchr", "mime", @@ -3393,16 +3287,6 @@ dependencies = [ "libm", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi 0.3.9", - "libc", -] - [[package]] name = "num_threads" version = "0.1.7" @@ -4341,6 +4225,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote", + "syn 1.0.109", "version_check", ] @@ -4366,9 +4251,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.12.6" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc" dependencies = [ "bytes", "prost-derive", @@ -4376,9 +4261,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.12.6" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" +checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" dependencies = [ "bytes", "heck 0.5.0", @@ -4399,9 +4284,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.12.6" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" dependencies = [ "anyhow", "itertools 0.12.1", @@ -4412,9 +4297,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.12.6" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" +checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2" dependencies = [ "prost", ] @@ -4720,10 +4605,10 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "http 1.1.0", - "http-body 1.0.0", + "http", + "http-body", "http-body-util", - "hyper 1.3.1", + "hyper", "hyper-rustls", "hyper-tls", "hyper-util", @@ -4817,6 +4702,17 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "rrdcached-client" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8320934d3dc3e2f1d236912fe45a381cad9648d21f3975181393ffe8f2eac4b" +dependencies = [ + "nom", + "thiserror", + "tokio", +] + [[package]] name = "rsa" version = "0.9.6" @@ -4858,7 +4754,7 @@ dependencies = [ "bytes", "flume", "futures-util", - "http 1.1.0", + "http", "log", "rustls-native-certs", "rustls-pemfile", @@ -5126,7 +5022,7 @@ dependencies = [ "anyhow", "async-broadcast", "async-trait", - "axum 0.7.5", + "axum", "big-decimal-byte-string-encoder", "bigdecimal", "blake3", @@ -5157,8 +5053,10 @@ dependencies = [ "prost", "rand", "regex", + "rrdcached-client", "rumqttc", "rust_decimal", + "rustls 0.23.10", "sentry", "serde", "serde-inline-default", @@ -5179,6 +5077,8 @@ dependencies = [ "tracing-subscriber", "url 2.5.2", "urlencoding", + "utoipa", + "utoipa-scalar", "uuid", ] @@ -5322,9 +5222,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.203" +version = "1.0.207" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094" +checksum = "5665e14a49a4ea1b91029ba7d3bca9f299e1f7cfa194388ccc20f14743e784f2" dependencies = [ "serde_derive", ] @@ -5351,9 +5251,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.203" +version = "1.0.207" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" +checksum = "6aea2634c86b0e8ef2cfdc0c340baede54ec27b1e46febd7f80dffb2aa44a00e" dependencies = [ "proc-macro2", "quote", @@ -5362,11 +5262,12 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.118" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d947f6b3163d8857ea16c4fa0dd4840d52f3041039a85decd46867eb1abef2e4" +checksum = "66ad62847a56b3dba58cc891acd13884b9c61138d330c0d7b6181713d4fce38d" dependencies = [ "itoa", + "memchr", "ryu", "serde", ] @@ -6089,18 +5990,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.61" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" +checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.61" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" +checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", @@ -6176,38 +6077,27 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.38.0" +version = "1.39.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" +checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" dependencies = [ "backtrace", "bytes", "libc", "mio", - "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2 0.5.7", "tokio-macros", - "windows-sys 0.48.0", -] - -[[package]] -name = "tokio-io-timeout" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" -dependencies = [ - "pin-project-lite", - "tokio", + "windows-sys 0.52.0", ] [[package]] name = "tokio-macros" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", @@ -6326,28 +6216,30 @@ dependencies = [ [[package]] name = "tonic" -version = "0.11.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +checksum = "38659f4a91aba8598d27821589f5db7dddd94601e7a01b1e485a50e5484c7401" dependencies = [ "async-stream", "async-trait", - "axum 0.6.20", - "base64 0.21.7", + "axum", + "base64 0.22.1", "bytes", - "h2 0.3.26", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.30", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", "hyper-timeout", + "hyper-util", "percent-encoding 2.3.1", "pin-project", "prost", "rustls-native-certs", "rustls-pemfile", - "rustls-pki-types", + "socket2 0.5.7", "tokio", - "tokio-rustls 0.25.0", + "tokio-rustls 0.26.0", "tokio-stream", "tower", "tower-layer", @@ -6357,9 +6249,9 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.11.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4ef6dd70a610078cb4e338a0f79d06bc759ff1b22d2120c2ff02ae264ba9c2" +checksum = "568392c5a2bd0020723e3f387891176aabafe36fd9fcd074ad309dfa0c8eb964" dependencies = [ "prettyplease", "proc-macro2", @@ -6401,8 +6293,8 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http 1.1.0", - "http-body 1.0.0", + "http", + "http-body", "http-body-util", "http-range-header", "httpdate", @@ -6509,7 +6401,7 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http 1.1.0", + "http", "httparse", "log", "rand", @@ -6677,6 +6569,43 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "utoipa" +version = "4.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5afb1a60e207dca502682537fefcfd9921e71d0b83e9576060f09abc6efab23" +dependencies = [ + "indexmap 2.2.6", + "serde", + "serde_json", + "utoipa-gen", +] + +[[package]] +name = "utoipa-gen" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bf0e16c02bc4bf5322ab65f10ab1149bdbcaa782cba66dc7057370a3f8190be" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "regex", + "syn 2.0.68", +] + +[[package]] +name = "utoipa-scalar" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3ab4b7269d14d93626b0bfedf212f1b0995cb7d13d35daba21d579511e7fae8" +dependencies = [ + "axum", + "serde", + "serde_json", + "utoipa", +] + [[package]] name = "uuid" version = "1.9.1" @@ -7159,9 +7088,9 @@ dependencies = [ "async-trait", "base64 0.22.1", "futures", - "http 1.1.0", + "http", "http-body-util", - "hyper 1.3.1", + "hyper", "hyper-rustls", "hyper-util", "log", diff --git a/Cargo.toml b/Cargo.toml index 79492f2..1d06377 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,8 @@ config = "0.14" serde = "1.0" confique = "0.2" byte-unit = "5.1" -prost = "0.12" +#prost = "0.12" +prost = "0.13" snap = "1.1" hex = "0.4" blake3 = "1.5" @@ -76,11 +77,17 @@ rand = "0.8" #protobuf = "3.0.2" lapin = "2.3" futures-lite = "2.3" -gcp-bigquery-client = { git = "https://github.com/lquerel/gcp-bigquery-client.git", rev = "107a5557df6336933f6b0bcf330aa91fe6ca866a" } +#gcp-bigquery-client = { git = "https://github.com/lquerel/gcp-bigquery-client.git", rev = "107a5557df6336933f6b0bcf330aa91fe6ca866a" } +gcp-bigquery-client = { version = "0.23.0" } #prost = { version = "0.12", features = ["derive"] } #enum_delegate = "0.2" sinteflake = { version = "0.1", features = ["async"] } -tonic = "0.11" +#tonic = "0.11" +tonic = "0.12" bigdecimal = "0.4.5" big-decimal-byte-string-encoder = "0.1.0" clru = "0.6" +utoipa = { version = "4.2", features = ["axum_extras"] } +utoipa-scalar = { version = "0.1", features = ["axum"] } +rrdcached-client = "0.1" +rustls = "0.23" diff --git a/src/datamodel/sensapp_vec.rs b/src/datamodel/sensapp_vec.rs index d4b551d..163a9a4 100644 --- a/src/datamodel/sensapp_vec.rs +++ b/src/datamodel/sensapp_vec.rs @@ -1,5 +1,5 @@ use smallvec::SmallVec; -pub type SensAppVec = SmallVec<[T; 1]>; +pub type SensAppVec = SmallVec<[T; 4]>; pub type SensAppLabels = SmallVec<[(String, String); 8]>; diff --git a/src/datamodel/sensor.rs b/src/datamodel/sensor.rs index 6872675..e198eda 100644 --- a/src/datamodel/sensor.rs +++ b/src/datamodel/sensor.rs @@ -5,6 +5,7 @@ use anyhow::{anyhow, Error}; use cached::proc_macro::cached; use once_cell::sync::OnceCell; use smallvec::SmallVec; +use std::fmt; use std::sync::Arc; use uuid::Uuid; @@ -17,20 +18,23 @@ pub struct Sensor { pub labels: SensAppLabels, } -impl ToString for Sensor { - fn to_string(&self) -> String { - let mut s = format!( +impl fmt::Display for Sensor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, "Sensor {{ uuid: {}, name: {}, sensor_type: {:?}", self.uuid, self.name, self.sensor_type - ); + )?; + if let Some(unit) = &self.unit { - s.push_str(&format!(", unit: {}", unit)); + write!(f, ", unit: {}", unit)?; } + if !self.labels.is_empty() { - s.push_str(&format!(", labels: {:?}", self.labels)); + write!(f, ", labels: {:?}", self.labels)?; } - s.push_str(" }"); - s + + write!(f, " }}") } } diff --git a/src/ingestors/http/crud.rs b/src/ingestors/http/crud.rs new file mode 100644 index 0000000..699a7ce --- /dev/null +++ b/src/ingestors/http/crud.rs @@ -0,0 +1,20 @@ +use crate::ingestors::http::app_error::AppError; +use crate::ingestors::http::state::HttpServerState; +use axum::extract::State; +use axum::Json; + +/// List all the sensors. +#[utoipa::path( + get, + path = "/sensors", + tag = "SensApp", + responses( + (status = 200, description = "List of sensors", body = Vec) + ) +)] +pub async fn list_sensors( + State(state): State, +) -> Result>, AppError> { + let sensors = state.storage.list_sensors().await?; + Ok(Json(sensors)) +} diff --git a/src/ingestors/http/influxdb.rs b/src/ingestors/http/influxdb.rs index 09acc06..ec9fdb1 100644 --- a/src/ingestors/http/influxdb.rs +++ b/src/ingestors/http/influxdb.rs @@ -119,6 +119,32 @@ impl FromStr for Precision { } } +/// InfluxDB Compatible Write API. +/// +/// Allows you to write data from InfluxDB or Telegraf to SensApp. +/// [More information.](https://github.com/SINTEF/sensapp/blob/main/docs/INFLUX_DB.md) +#[utoipa::path( + post, + path = "/api/v2/write", + tag = "InfluxDB", + request_body( + content = String, + content_type = "text/plain", + description = "InfluxDB Line Protocol endpoint. [Reference](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/).", + example = "cpu,host=A,region=west usage_system=64.2 1590488773254420000" + ), + params( + ("bucket" = String, Query, description = "Bucket name", example = "sensapp"), + ("org" = Option, Query, description = "Organization name", example = "sensapp"), + ("org_id" = Option, Query, description = "Organization ID"), + ("precision" = Option, Query, description = "Precision of the timestamps. One of ns, us, ms, s"), + ), + responses( + (status = 204, description = "No Content"), + (status = 400, description = "Bad Request", body = AppError), + (status = 500, description = "Internal Server Error", body = AppError), + ) +)] #[debug_handler] pub async fn publish_influxdb( State(state): State, @@ -258,6 +284,7 @@ pub async fn publish_influxdb( #[cfg(test)] mod tests { use crate::bus::{self, message}; + use crate::storage::sqlite::SqliteStorage; use super::*; use flate2::write::GzEncoder; @@ -318,6 +345,7 @@ mod tests { let state = State(HttpServerState { name: Arc::new("influxdb test".to_string()), event_bus: event_bus.clone(), + storage: Arc::new(SqliteStorage::connect("sqlite::memory:").await.unwrap()), }); let headers = HeaderMap::new(); let query = Query(InfluxDBQueryParams { diff --git a/src/ingestors/http/mod.rs b/src/ingestors/http/mod.rs index 30bd13e..1b81320 100644 --- a/src/ingestors/http/mod.rs +++ b/src/ingestors/http/mod.rs @@ -1,4 +1,5 @@ pub mod app_error; +pub mod crud; pub mod influxdb; pub mod prometheus; pub mod server; diff --git a/src/ingestors/http/prometheus.rs b/src/ingestors/http/prometheus.rs index 48cb5db..6d51dd7 100644 --- a/src/ingestors/http/prometheus.rs +++ b/src/ingestors/http/prometheus.rs @@ -73,6 +73,31 @@ fn verify_headers(headers: &HeaderMap) -> Result<(), AppError> { Ok(()) } +/// Prometheus Remote Write API. +/// +/// Allows you to write data from Prometheus to SensApp. +/// +/// It follows the [Prometheus Remote Write specification](https://prometheus.io/docs/concepts/remote_write_spec/). +#[utoipa::path( + post, + path = "/api/v1/prometheus_remote_write", + tag = "Prometheus", + request_body( + content = Bytes, + content_type = "application/x-protobuf", + description = "Prometheus Remote Write endpoint. [Reference](https://prometheus.io/docs/concepts/remote_write_spec/)", + ), + params( + ("content-encoding" = String, Header, format = "snappy", description = "Content encoding, must be snappy"), + ("content-type" = String, Header, format = "application/x-protobuf", description = "Content type, must be application/x-protobuf"), + ("x-prometheus-remote-write-version" = String, Header, format = "0.1.0", description = "Prometheus Remote Write version, must be 0.1.0"), + ), + responses( + (status = 204, description = "No Content"), + (status = 400, description = "Bad Request", body = AppError), + (status = 500, description = "Internal Server Error", body = AppError), + ) +)] #[debug_handler] pub async fn publish_prometheus( State(state): State, diff --git a/src/ingestors/http/server.rs b/src/ingestors/http/server.rs index a4f741e..637e879 100644 --- a/src/ingestors/http/server.rs +++ b/src/ingestors/http/server.rs @@ -1,4 +1,5 @@ use super::app_error::AppError; +use super::crud::list_sensors; use super::influxdb::publish_influxdb; use super::prometheus::publish_prometheus; use super::state::HttpServerState; @@ -9,6 +10,9 @@ use axum::extract::DefaultBodyLimit; use axum::extract::Request; //use axum::extract::Multipart; //use axum::extract::Path; +use crate::ingestors::http::crud::__path_list_sensors; +use crate::ingestors::http::influxdb::__path_publish_influxdb; +use crate::ingestors::http::prometheus::__path_publish_prometheus; use axum::extract::State; use axum::http::header; use axum::http::StatusCode; @@ -29,6 +33,19 @@ use tower::ServiceBuilder; use tower_http::trace; use tower_http::{timeout::TimeoutLayer, trace::TraceLayer, ServiceBuilderExt}; use tracing::Level; +use utoipa::OpenApi; +use utoipa_scalar::{Scalar, Servable as ScalarServable}; + +#[derive(OpenApi)] +#[openapi( + tags( + (name = "SensApp", description = "SensApp API"), + (name = "InfluxDB", description = "InfluxDB Write API"), + (name = "Prometheus", description = "Prometheus Remote Write API"), + ), + paths(frontpage, list_sensors, publish_influxdb, publish_prometheus), +)] +struct ApiDoc; pub async fn run_http_server(state: HttpServerState, address: SocketAddr) -> Result<()> { let config = config::get()?; @@ -60,7 +77,9 @@ pub async fn run_http_server(state: HttpServerState, address: SocketAddr) -> Res // Create our application with a single route let app = Router::new() - .route("/", get(handler)) + .route("/", get(frontpage)) + //.route("/api-docs/openapi.json", get(openapi)) + .merge(Scalar::with_url("/docs", ApiDoc::openapi())) .route( "/publish", post(publish_handler).layer(max_body_layer.clone()), @@ -73,6 +92,8 @@ pub async fn run_http_server(state: HttpServerState, address: SocketAddr) -> Res "/sensors/:sensor_name_or_uuid/publish_multipart", post(publish_multipart).layer(max_body_layer.clone()), ) + // Boring Sensor CRUD + .route("/sensors", get(list_sensors)) // InfluxDB Write API .route( "/api/v2/write", @@ -102,11 +123,30 @@ async fn shutdown_signal() { .expect("failed to install shutdown CTRL+C signal handler"); } -async fn handler(State(state): State) -> Result, AppError> { +#[utoipa::path( + get, + path = "/", + tag = "SensApp", + responses( + (status = 200, description = "SensApp Frontpage", body = String) + ) +)] +async fn frontpage(State(state): State) -> Result, AppError> { let name: String = (*state.name).clone(); Ok(Json(name)) } +// #[utoipa::path( +// get, +// path = "/api-docs/openapi.json", +// responses( +// (status = 200, description = "OpenAPI JSON", body = ApiDoc) +// ) +// )] +// async fn openapi() -> Json { +// Json(ApiDoc::openapi()) +// } + async fn publish_csv( State(state): State, //Path(sensor_name_or_uuid): Path, @@ -162,15 +202,16 @@ mod tests { use tower::ServiceExt; use super::*; - use crate::bus::EventBus; + use crate::{bus::EventBus, storage::sqlite::SqliteStorage}; #[tokio::test] async fn test_handler() { let state = HttpServerState { name: Arc::new("hello world".to_string()), event_bus: Arc::new(EventBus::init("test".to_string())), + storage: Arc::new(SqliteStorage::connect("sqlite::memory:").await.unwrap()), }; - let app = Router::new().route("/", get(handler)).with_state(state); + let app = Router::new().route("/", get(frontpage)).with_state(state); let request = Request::builder().uri("/").body(Body::empty()).unwrap(); let response = app.oneshot(request).await.unwrap(); diff --git a/src/ingestors/http/state.rs b/src/ingestors/http/state.rs index f76badf..359644b 100644 --- a/src/ingestors/http/state.rs +++ b/src/ingestors/http/state.rs @@ -1,8 +1,9 @@ -use crate::bus::EventBus; +use crate::{bus::EventBus, storage::storage::StorageInstance}; use std::sync::Arc; #[derive(Clone, Debug)] pub struct HttpServerState { pub name: Arc, pub event_bus: Arc, + pub storage: Arc, } diff --git a/src/ingestors/mqtt/mqtt_client.rs b/src/ingestors/mqtt/mqtt_client.rs index 375649f..fa8b763 100644 --- a/src/ingestors/mqtt/mqtt_client.rs +++ b/src/ingestors/mqtt/mqtt_client.rs @@ -100,7 +100,7 @@ pub async fn mqtt_client(config: MqttConfig, event_bus: Arc) -> Result let payload = publish.payload; println!("Received message on topic: {}", topic); println!("Payload: {:?}", payload); - let mut geobuf = geobuf::geobuf_pb::Data::new(); + /*let mut geobuf = geobuf::geobuf_pb::Data::new(); use protobuf::Message; geobuf.merge_from_bytes(&payload); //.unwrap(); match geobuf::decode::Decoder::decode(&geobuf).unwrap() { @@ -108,7 +108,7 @@ pub async fn mqtt_client(config: MqttConfig, event_bus: Arc) -> Result println!("GeoJSON: {:?}", geojson); } _ => {} - } + }*/ } Ok(_) => {} Err(e) => { diff --git a/src/main.rs b/src/main.rs index 3f6d1cc..40734af 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +#![forbid(unsafe_code)] use crate::bus::message; use crate::config::load_configuration; use crate::ingestors::amqp::amqp_example; @@ -6,6 +7,7 @@ use crate::ingestors::http::state::HttpServerState; use axum::http::StatusCode; use futures::stream::StreamExt; use futures::TryStreamExt; +use rustls::crypto::CryptoProvider; use std::io; use std::net::SocketAddr; use std::sync::Arc; @@ -36,6 +38,10 @@ fn main() { }, )); + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .expect("Failed to install CryptoProvider"); + tokio::runtime::Builder::new_multi_thread() .enable_all() .build() @@ -95,16 +101,17 @@ async fn async_main() { //let storage = create_storage_from_connection_string("sqlite://toto.db") //let storage = create_storage_from_connection_string("postgres://localhost:5432/postgres") //let storage = create_storage_from_connection_string("duckdb://caca.db") - let storage = create_storage_from_connection_string( - "bigquery://key.json?project_id=smartbuildinghub&dataset_id=sensapp_dev_3", - ) - .await - .expect("Failed to create storage"); - - storage - .create_or_migrate() + //let storage = create_storage_from_connection_string( + // "bigquery://key.json?project_id=smartbuildinghub&dataset_id=sensapp_dev_3", + //) + let storage = create_storage_from_connection_string("rrdcached://localhost:42217?preset=munin") .await - .expect("Failed to create or migrate database"); + .expect("Failed to create storage"); + + /*storage + .create_or_migrate() + .await + .expect("Failed to create or migrate database");*/ /*let duckdb_storage = DuckDBStorage::connect("sensapp.db") .await @@ -165,6 +172,8 @@ async fn async_main() { std::process::exit(1); })); + let storage_for_publish = storage.clone(); + // spawn a task that prints the events to stdout tokio::spawn(async move { while let Ok(message) = wololo.recv().await { @@ -172,7 +181,7 @@ async fn async_main() { use crate::storage::storage::StorageInstance; //let toto: &dyn StorageInstance = &storage; - let toto: &dyn StorageInstance = storage.as_ref(); + let toto: &dyn StorageInstance = storage_for_publish.as_ref(); match message { message::Message::Publish(message::PublishMessage { @@ -280,6 +289,8 @@ async fn async_main() { HttpServerState { name: Arc::new("SensApp".to_string()), event_bus, + //storage: storage.clone(), + storage, }, SocketAddr::from((endpoint, port)), ) @@ -294,52 +305,52 @@ async fn async_main() { } } -async fn handler() -> &'static str { - "Hello, world!" -} - -async fn publish_stream_handler(body: axum::body::Body) -> Result { - let mut count = 0usize; - let mut stream = body.into_data_stream(); - - loop { - let chunk = stream.try_next().await; - match chunk { - Ok(bytes) => match bytes { - Some(bytes) => count += bytes.into_iter().filter(|b| *b == b'\n').count(), - None => break, - }, - Err(_) => { - return Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "Error reading body".to_string(), - )) - } - } - } - - Ok(count.to_string()) -} - -async fn publish_csv(body: axum::body::Body) -> Result { - let stream = body.into_data_stream(); - let stream = stream.map_err(|err| io::Error::new(io::ErrorKind::Other, err)); - let reader = stream.into_async_read(); - let mut csv_reader = csv_async::AsyncReaderBuilder::new() - .has_headers(true) - .delimiter(b';') - .create_reader(reader); - - println!("{:?}", csv_reader.has_headers()); - println!("{:?}", csv_reader.headers().await.unwrap()); - let mut records = csv_reader.records(); - - println!("Reading CSV"); - while let Some(record) = records.next().await { - let record = record.unwrap(); - println!("{:?}", record); - } - println!("Done reading CSV"); - - Ok("ok".to_string()) -} +// async fn handler() -> &'static str { +// "Hello, world!" +// } + +// async fn publish_stream_handler(body: axum::body::Body) -> Result { +// let mut count = 0usize; +// let mut stream = body.into_data_stream(); + +// loop { +// let chunk = stream.try_next().await; +// match chunk { +// Ok(bytes) => match bytes { +// Some(bytes) => count += bytes.into_iter().filter(|b| *b == b'\n').count(), +// None => break, +// }, +// Err(_) => { +// return Err(( +// StatusCode::INTERNAL_SERVER_ERROR, +// "Error reading body".to_string(), +// )) +// } +// } +// } + +// Ok(count.to_string()) +// } + +// async fn publish_csv(body: axum::body::Body) -> Result { +// let stream = body.into_data_stream(); +// let stream = stream.map_err(|err| io::Error::new(io::ErrorKind::Other, err)); +// let reader = stream.into_async_read(); +// let mut csv_reader = csv_async::AsyncReaderBuilder::new() +// .has_headers(true) +// .delimiter(b';') +// .create_reader(reader); + +// println!("{:?}", csv_reader.has_headers()); +// println!("{:?}", csv_reader.headers().await.unwrap()); +// let mut records = csv_reader.records(); + +// println!("Reading CSV"); +// while let Some(record) = records.next().await { +// let record = record.unwrap(); +// println!("{:?}", record); +// } +// println!("Done reading CSV"); + +// Ok("ok".to_string()) +// } diff --git a/src/storage/bigquery/mod.rs b/src/storage/bigquery/mod.rs index a7006a5..d0553e8 100644 --- a/src/storage/bigquery/mod.rs +++ b/src/storage/bigquery/mod.rs @@ -35,6 +35,15 @@ pub struct BigQueryStorage { dataset_id: String, } +impl std::fmt::Debug for BigQueryStorage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BigQueryStorage") + .field("project_id", &self.project_id) + .field("dataset_id", &self.dataset_id) + .finish() + } +} + fn parse_connection_string(connection_string: &str) -> Result<(String, String, String)> { let url = Url::parse(connection_string)?; if url.scheme() != "bigquery" { @@ -223,4 +232,8 @@ impl StorageInstance for BigQueryStorage { // Implement vacuum logic here Ok(()) } + + async fn list_sensors(&self) -> Result> { + unimplemented!(); + } } diff --git a/src/storage/duckdb/duckdb_utilities.rs b/src/storage/duckdb/duckdb_utilities.rs index 08ac21c..17152ab 100644 --- a/src/storage/duckdb/duckdb_utilities.rs +++ b/src/storage/duckdb/duckdb_utilities.rs @@ -116,12 +116,10 @@ pub fn get_sensor_id_or_create_sensor(transaction: &Transaction, sensor: &Sensor let mut insert_stmt: CachedStatement = transaction.prepare_cached( "INSERT INTO sensors (uuid, name, type, unit) VALUES (?, ?, ?, ?) RETURNING sensor_id", )?; - println!("bonjour - 1"); let sensor_id: i64 = insert_stmt.query_row( params![uuid_string, sensor.name, sensor_type_string, unit_id], |row| row.get(0), )?; - println!("bonjour - 2"); // Add the labels let mut label_insert_stmt: CachedStatement = transaction diff --git a/src/storage/duckdb/mod.rs b/src/storage/duckdb/mod.rs index 4fb9934..fd4241e 100644 --- a/src/storage/duckdb/mod.rs +++ b/src/storage/duckdb/mod.rs @@ -75,7 +75,7 @@ impl StorageInstance for DuckDBStorage { } async fn vacuum(&self) -> Result<()> { - let mut connection = self.connection.lock().await; + let connection = self.connection.lock().await; /*let transaction = connection.transaction()?; transaction.execute( @@ -91,6 +91,10 @@ impl StorageInstance for DuckDBStorage { connection.execute("VACUUM ANALYZE", [])?; Ok(()) } + + async fn list_sensors(&self) -> Result> { + unimplemented!(); + } } fn publish_single_sensor_batch( diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 263fa7f..105104a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,6 +1,7 @@ pub mod bigquery; pub mod duckdb; pub mod postgresql; +pub mod rrdcached; pub mod sqlite; pub mod storage; pub mod storage_factory; diff --git a/src/storage/postgresql/postgresql.rs b/src/storage/postgresql/postgresql.rs index 8d4e1f3..09f090c 100644 --- a/src/storage/postgresql/postgresql.rs +++ b/src/storage/postgresql/postgresql.rs @@ -63,6 +63,10 @@ impl StorageInstance for PostgresStorage { self.vacuum().await?; Ok(()) } + + async fn list_sensors(&self) -> Result> { + unimplemented!(); + } } impl PostgresStorage { diff --git a/src/storage/rrdcached/mod.rs b/src/storage/rrdcached/mod.rs new file mode 100644 index 0000000..9089f9e --- /dev/null +++ b/src/storage/rrdcached/mod.rs @@ -0,0 +1,329 @@ +use crate::{ + datamodel::{Sensor, SensorType, TypedSamples}, + storage::storage::StorageInstance, +}; +use anyhow::{anyhow, bail, Result}; +use axum::async_trait; +use rrdcached_client::{ + batch_update::BatchUpdate, + consolidation_function::ConsolidationFunction, + create::{CreateArguments, CreateDataSource, CreateDataSourceType, CreateRoundRobinArchive}, + errors::RRDCachedClientError, + RRDCachedClient, +}; +use std::{collections::HashSet, sync::Arc, time::Duration}; +use tokio::{sync::RwLock, time::timeout}; +use url::Url; +use uuid::Uuid; + +#[derive(Debug, Clone, PartialEq)] +pub enum Preset { + Munin, + Hoarder, +} + +impl Preset { + pub fn get_round_robin_archives(&self) -> Vec { + match self { + Preset::Munin => vec![ + // Every 5 minutes for 600 entries + CreateRoundRobinArchive { + consolidation_function: ConsolidationFunction::Average, + xfiles_factor: 0.5, + steps: 30, + rows: 600, + }, + // Every 30 minutes for 700 entries + CreateRoundRobinArchive { + consolidation_function: ConsolidationFunction::Average, + xfiles_factor: 0.5, + steps: 180, + rows: 700, + }, + // Every 2 hours for 775 entries + CreateRoundRobinArchive { + consolidation_function: ConsolidationFunction::Average, + xfiles_factor: 0.5, + steps: 720, + rows: 775, + }, + // Every day for 797 entries + CreateRoundRobinArchive { + consolidation_function: ConsolidationFunction::Average, + xfiles_factor: 0.5, + steps: 8640, + rows: 797, + }, + ], + Preset::Hoarder => vec![ + // Every 10 seconds for 1 day + CreateRoundRobinArchive { + consolidation_function: ConsolidationFunction::Average, + xfiles_factor: 0.5, + steps: 1, + rows: 8640, + }, + // Every minute for 2 days + CreateRoundRobinArchive { + consolidation_function: ConsolidationFunction::Average, + xfiles_factor: 0.5, + steps: 6, + rows: 2880, + }, + // Every 10 minutes for 7 days + CreateRoundRobinArchive { + consolidation_function: ConsolidationFunction::Average, + xfiles_factor: 0.5, + steps: 60, + rows: 1008, + }, + // Every hour for 1 year + CreateRoundRobinArchive { + consolidation_function: ConsolidationFunction::Average, + xfiles_factor: 0.5, + steps: 360, + rows: 8760, + }, + // Every day for 10 years + CreateRoundRobinArchive { + consolidation_function: ConsolidationFunction::Average, + xfiles_factor: 0.5, + steps: 8640, + rows: 3650, + }, + ], + } + } +} + +impl std::str::FromStr for Preset { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "munin" => Ok(Preset::Munin), + "hoarder" => Ok(Preset::Hoarder), + _ => bail!("Invalid preset: {}", s), + } + } +} + +#[derive(Debug)] +pub struct RrdCachedStorage { + client: Arc>, + + created_sensors: Arc>>, + + preset: Preset, +} + +impl RrdCachedStorage { + pub async fn connect(connection_string: &str) -> Result { + let url = Url::parse(connection_string)?; + let scheme = url.scheme(); + + let preset = url + .query_pairs() + .find(|(key, _)| key == "preset") + .map(|(_, value)| value.parse()) + .transpose()? + .unwrap_or(Preset::Hoarder); // Default to Hoarder if not specified + + match scheme { + "rrdcached" | "rrdcached+tcp" => { + // extract host and port + let host = url.host_str().ok_or_else(|| anyhow!("No host in URL"))?; + let port = url.port().ok_or_else(|| anyhow!("No port in URL"))?; + + let client = RRDCachedClient::connect_tcp(&format!("{}:{}", host, port)).await?; + Ok(Self { + client: Arc::new(RwLock::new(client)), + created_sensors: Arc::new(RwLock::new(HashSet::new())), + preset, + }) + } + "rrdcached+unix" => { + unimplemented!() + } + _ => bail!("Invalid scheme in connection string: {}", scheme), + } + } + + async fn create_sensors(&self, sensors: &[Arc], start_timestamp: u64) -> Result<()> { + if sensors.is_empty() { + return Ok(()); + } + let mut client = self.client.write().await; + let mut created_sensors = self.created_sensors.write().await; + for sensor in sensors { + client + .create(CreateArguments { + path: sensor.uuid.to_string(), + data_sources: vec![CreateDataSource { + name: "sensapp".to_string(), + minimum: None, + maximum: None, + heartbeat: 20, + serie_type: CreateDataSourceType::Gauge, + }], + round_robin_archives: self.preset.get_round_robin_archives(), + start_timestamp, + step_seconds: 10, + }) + .await?; + created_sensors.insert(sensor.uuid); + } + Ok(()) + } +} + +#[async_trait] +impl StorageInstance for RrdCachedStorage { + async fn create_or_migrate(&self) -> Result<()> { + Ok(()) + } + async fn publish( + &self, + batch: std::sync::Arc, + sync_sender: async_broadcast::Sender<()>, + ) -> Result<()> { + if batch.sensors.is_empty() { + return Ok(()); + } + + let mut batch_updates = vec![]; + let mut min_timestamp = usize::MAX; + + for single_sensor_batch in batch.sensors.as_ref() { + let samples_guard = single_sensor_batch.samples.read().await; + let uuid = single_sensor_batch.sensor.uuid; + let name = uuid.to_string(); + match &*samples_guard { + TypedSamples::Float(samples) => { + for value in samples { + let timestamp = value.datetime.to_unix_seconds().floor() as usize; + if timestamp < min_timestamp { + min_timestamp = timestamp; + } + batch_updates.push(BatchUpdate::new( + &name, + Some(timestamp), + vec![value.value], + )?); + } + } + TypedSamples::Numeric(samples) => { + for value in samples { + let timestamp = value.datetime.to_unix_seconds().floor() as usize; + if timestamp < min_timestamp { + min_timestamp = timestamp; + } + use rust_decimal::prelude::ToPrimitive; + batch_updates.push(BatchUpdate::new( + &name, + Some(timestamp), + vec![value.value.to_f64().unwrap_or(f64::NAN)], + )?); + } + } + TypedSamples::Integer(samples) => { + for value in samples { + let timestamp = value.datetime.to_unix_seconds().floor() as usize; + if timestamp < min_timestamp { + min_timestamp = timestamp; + } + batch_updates.push(BatchUpdate::new( + &name, + Some(timestamp), + vec![value.value as f64], + )?); + } + } + TypedSamples::Boolean(samples) => { + for value in samples { + let timestamp = value.datetime.to_unix_seconds().floor() as usize; + if timestamp < min_timestamp { + min_timestamp = timestamp; + } + batch_updates.push(BatchUpdate::new( + &name, + Some(timestamp), + vec![if value.value { 1.0 } else { 0.0 }], + )?); + } + } + _ => { + print!("Unsupported type"); + } + } + } + + // Find the sensors that need to be created + let sensors_to_create: Vec>; + { + let created_sensors = self.created_sensors.read().await; + sensors_to_create = batch + .sensors + .iter() + .filter(|single_sensor_batch| { + let sensor = &single_sensor_batch.sensor; + !created_sensors.contains(&sensor.uuid) + && (sensor.sensor_type == SensorType::Float + || sensor.sensor_type == SensorType::Numeric + || sensor.sensor_type == SensorType::Integer + || sensor.sensor_type == SensorType::Boolean) + }) + .map(|single_sensor_batch| single_sensor_batch.sensor.clone()) + .collect::>(); + } + if !sensors_to_create.is_empty() { + self.create_sensors(&sensors_to_create, min_timestamp as u64 - 10) + .await?; + } + + { + let mut client = self.client.write().await; + match client.batch(batch_updates).await { + Ok(_) => {} + Err(e) => { + println!("Failed to batch update: {:?}", e); + match e { + RRDCachedClientError::BatchUpdateErrorResponse(string, errors) => { + println!("Batch update error response: {:?}", string); + for error in errors { + println!("Batch update error: {:?}", error); + } + } + _ => {} + } + } + } + } + + self.sync(sync_sender).await?; + + Ok(()) + } + + async fn sync(&self, sync_sender: async_broadcast::Sender<()>) -> Result<()> { + // Flush ! + { + let mut client = self.client.write().await; + client.flush_all().await?; + } + + if sync_sender.receiver_count() > 0 && !sync_sender.is_closed() { + let _ = timeout(Duration::from_secs(15), sync_sender.broadcast(())).await?; + } + + Ok(()) + } + + async fn vacuum(&self) -> Result<()> { + Ok(()) + } + + async fn list_sensors(&self) -> Result> { + unimplemented!(); + } +} diff --git a/src/storage/sqlite/sqlite.rs b/src/storage/sqlite/sqlite.rs index ce35229..f89e058 100644 --- a/src/storage/sqlite/sqlite.rs +++ b/src/storage/sqlite/sqlite.rs @@ -77,6 +77,10 @@ impl StorageInstance for SqliteStorage { self.vacuum().await?; Ok(()) } + + async fn list_sensors(&self) -> Result> { + unimplemented!(); + } } impl SqliteStorage { diff --git a/src/storage/storage.rs b/src/storage/storage.rs index 10039c5..5d801c4 100644 --- a/src/storage/storage.rs +++ b/src/storage/storage.rs @@ -1,14 +1,9 @@ -use anyhow::{bail, Result}; -use async_broadcast::Sender; +use anyhow::Result; use async_trait::async_trait; -use sqlx::Sqlite; -use std::sync::Arc; - -use crate::datamodel::batch::Batch; +use std::fmt::Debug; #[async_trait] -//#[enum_delegate::register] -pub trait StorageInstance: Send + Sync { +pub trait StorageInstance: Send + Sync + Debug { async fn create_or_migrate(&self) -> Result<()>; async fn publish( &self, @@ -17,30 +12,6 @@ pub trait StorageInstance: Send + Sync { ) -> Result<()>; async fn sync(&self, sync_sender: async_broadcast::Sender<()>) -> Result<()>; async fn vacuum(&self) -> Result<()>; -} -/*#[derive(Debug)] -enum GenericStorages { - Sqlite(crate::storage::sqlite::SqliteStorage), - Postgres(crate::storage::postgresql::PostgresStorage), + async fn list_sensors(&self) -> Result>; } - -#[derive(Debug)] -pub struct Storage { - generic_storage: GenericStorages, -} - -impl Storage { - pub async fn publish_batch(&self, batch: Batch) -> Result<()> { - match self.generic_storage { - GenericStorages::Sqlite(ref sqlite_storage) => { - sqlite_storage.publish_batch(batch).await? - } - GenericStorages::Postgres(ref postgres_storage) => { - postgres_storage.publish_batch(batch).await? - } - } - // Implement batch publishing logic here - Ok(()) - } -}*/ diff --git a/src/storage/storage_factory.rs b/src/storage/storage_factory.rs index 712700b..485b7c6 100644 --- a/src/storage/storage_factory.rs +++ b/src/storage/storage_factory.rs @@ -4,7 +4,8 @@ use anyhow::{bail, Result}; use super::{ bigquery::BigQueryStorage, duckdb::DuckDBStorage, postgresql::PostgresStorage, - sqlite::SqliteStorage, storage::StorageInstance, timescaledb::TimeScaleDBStorage, + rrdcached::RrdCachedStorage, sqlite::SqliteStorage, storage::StorageInstance, + timescaledb::TimeScaleDBStorage, }; /*#[enum_delegate::implement(StorageInstance)] @@ -35,6 +36,7 @@ pub async fn create_storage_from_connection_string( s if s.starts_with("postgres:") => Arc::new(PostgresStorage::connect(s).await?), s if s.starts_with("sqlite:") => Arc::new(SqliteStorage::connect(s).await?), s if s.starts_with("timescaledb:") => Arc::new(TimeScaleDBStorage::connect(s).await?), + s if s.starts_with("rrdcached:") => Arc::new(RrdCachedStorage::connect(s).await?), _ => bail!("Unsupported storage type: {}", connection_string), }) } diff --git a/src/storage/timescaledb/timescaledb.rs b/src/storage/timescaledb/timescaledb.rs index 83981f0..6566531 100644 --- a/src/storage/timescaledb/timescaledb.rs +++ b/src/storage/timescaledb/timescaledb.rs @@ -63,6 +63,10 @@ impl StorageInstance for TimeScaleDBStorage { self.vacuum().await?; Ok(()) } + + async fn list_sensors(&self) -> Result> { + unimplemented!(); + } } impl TimeScaleDBStorage { @@ -113,4 +117,8 @@ impl TimeScaleDBStorage { Ok(()) } + + async fn list_sensors(&self) -> Result> { + unimplemented!(); + } }