From 143467d0fc38b3ed1d6115a0ecd7b51a97ca8e8c Mon Sep 17 00:00:00 2001 From: Vincent Ruello <5345986+vruello@users.noreply.github.com> Date: Sun, 3 Nov 2024 11:03:24 +0100 Subject: [PATCH 01/11] Optionally expose usage metrics --- Cargo.lock | 206 ++++++++++++++++++++++++++++++++++++++- common/src/settings.rs | 37 ++++++- server/Cargo.toml | 2 + server/src/lib.rs | 23 ++++- server/src/logic.rs | 29 ++++-- server/src/monitoring.rs | 69 +++++++++++++ 6 files changed, 351 insertions(+), 15 deletions(-) create mode 100644 server/src/monitoring.rs diff --git a/Cargo.lock b/Cargo.lock index ca3f11b..9f98841 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -461,6 +461,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -476,6 +486,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crypto-common" version = "0.1.6" @@ -705,6 +730,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -888,13 +919,22 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashbrown" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +dependencies = [ + "foldhash", +] + [[package]] name = "hashlink" version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" dependencies = [ - "hashbrown", + "hashbrown 0.14.5", ] [[package]] @@ -1006,6 +1046,24 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" +dependencies = [ + "futures-util", + "http", + "hyper", + "hyper-util", + "rustls", + "rustls-native-certs", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.6" @@ -1061,14 +1119,20 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.6" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.0", ] +[[package]] +name = "ipnet" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" + [[package]] name = "is_terminal_polyfill" version = "1.70.0" @@ -1285,6 +1349,51 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "metrics" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae428771d17306715c5091d446327d1cfdedc82185c65ba8423ab404e45bf10" +dependencies = [ + "ahash", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85b6f8152da6d7892ff1b7a1c0fa3f435e92b5918ad67035c3bb432111d9a29b" +dependencies = [ + "base64 0.22.1", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "indexmap", + "ipnet", + "metrics", + "metrics-util", + "quanta", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "metrics-util" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15b482df36c13dd1869d73d14d28cd4855fbd6cfc32294bee109908a9f4a4ed7" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.15.0", + "metrics", + "quanta", + "sketches-ddsketch", +] + [[package]] name = "miette" version = "7.2.0" @@ -1472,6 +1581,12 @@ dependencies = [ "syn 2.0.70", ] +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "openssl-sys" version = "0.9.103" @@ -1584,6 +1699,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "portable-atomic" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" + [[package]] name = "postgres-openssl" version = "0.5.0" @@ -1676,6 +1797,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.36.0" @@ -1724,6 +1860,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "11.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "rdkafka" version = "0.36.2" @@ -1914,6 +2059,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "2.1.2" @@ -1963,6 +2121,15 @@ dependencies = [ "sdd", ] +[[package]] +name = "schannel" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01227be5826fa0690321a2ba6c5cd57a19cf3f6a09e76973b58e61de6ab9d1c1" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1975,6 +2142,29 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b84345e4c9bd703274a082fb80caaa99b7612be48dfaa1dd9266577ec412309d" +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags 2.6.0", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea4a292869320c0272d7bc55a5a6aafaff59b4f63404a003887b679a2e05b4b6" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.204" @@ -2089,6 +2279,8 @@ dependencies = [ "log", "log-mdc", "log4rs", + "metrics", + "metrics-exporter-prometheus", "mime", "ppp", "quick-xml", @@ -2160,6 +2352,12 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "sketches-ddsketch" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a" + [[package]] name = "slab" version = "0.4.9" diff --git a/common/src/settings.rs b/common/src/settings.rs index a3391cf..a48288b 100644 --- a/common/src/settings.rs +++ b/common/src/settings.rs @@ -322,7 +322,7 @@ impl Cli { #[serde(deny_unknown_fields)] pub struct FilesOutput { // Time after which an unused file descriptor is closed - files_descriptor_close_timeout: Option + files_descriptor_close_timeout: Option, } impl FilesOutput { @@ -351,7 +351,7 @@ pub struct Outputs { #[serde(default)] files: FilesOutput, #[serde(default)] - kafka: KafkaOutput + kafka: KafkaOutput, } impl Outputs { @@ -368,6 +368,33 @@ impl Outputs { } } +#[derive(Debug, Deserialize, Clone, Default)] +#[serde(deny_unknown_fields)] +pub struct Monitoring { + listen_address: String, + listen_port: u16, + http_requests_histogram_buckets: Option>, +} + +impl Monitoring { + pub fn listen_address(&self) -> &str { + &self.listen_address + } + + pub fn listen_port(&self) -> u16 { + self.listen_port + } + + pub fn http_requests_histogram_buckets(&self) -> &[f64] { + match &self.http_requests_histogram_buckets { + Some(bucket) => bucket, + None => &[ + 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, + ], + } + } +} + #[derive(Debug, Deserialize, Clone)] #[serde(deny_unknown_fields)] pub struct Settings { @@ -381,6 +408,8 @@ pub struct Settings { cli: Cli, #[serde(default)] outputs: Outputs, + #[serde(default)] + monitoring: Option, } impl std::str::FromStr for Settings { @@ -422,6 +451,10 @@ impl Settings { pub fn outputs(&self) -> &Outputs { &self.outputs } + + pub fn monitoring(&self) -> Option<&Monitoring> { + self.monitoring.as_ref() + } } #[cfg(test)] diff --git a/server/Cargo.toml b/server/Cargo.toml index 532022a..3740f63 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -52,3 +52,5 @@ ppp = "2.2.0" tokio-rustls = "0.26.0" strum = { version = "0.26.1", features = ["derive"] } leon = "3.0.1" +metrics = "0.24.0" +metrics-exporter-prometheus = { version = "0.16.0", features = ["http-listener"] } \ No newline at end of file diff --git a/server/src/lib.rs b/server/src/lib.rs index d691b5d..51496f4 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -8,6 +8,7 @@ mod heartbeat; mod kerberos; mod logging; mod logic; +mod monitoring; mod multipart; mod output; mod proxy_protocol; @@ -38,6 +39,11 @@ use hyper_util::rt::TokioIo; use kerberos::AuthenticationError; use libgssapi::error::MajorFlags; use log::{debug, error, info, trace, warn}; +use metrics::histogram; +use monitoring::{ + HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM, HTTP_REQUESTS_METHOD, HTTP_REQUESTS_STATUS, + HTTP_REQUESTS_URI, +}; use quick_xml::writer::Writer; use soap::Serializable; use socket2::{SockRef, TcpKeepalive}; @@ -379,14 +385,23 @@ fn log_response( principal: &str, conn_status: ConnectionStatus, ) { - let duration: f32 = start.elapsed().as_micros() as f32; + let duration = start.elapsed(); + + histogram!(HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM, + HTTP_REQUESTS_METHOD => method.to_owned(), + HTTP_REQUESTS_STATUS => status.to_string(), + HTTP_REQUESTS_URI => uri.to_owned()) + .record(duration.as_secs_f64()); // MDC is thread related, so it should be safe to use it in a non-async // function. log_mdc::insert("http_status", status.as_str()); log_mdc::insert("http_method", method); log_mdc::insert("http_uri", uri); - log_mdc::insert("response_time", format!("{:.3}", duration / 1000.0)); + log_mdc::insert( + "response_time", + format!("{:.3}", duration.as_micros() / 1000), + ); log_mdc::insert("ip", addr.ip().to_string()); log_mdc::insert("port", addr.port().to_string()); log_mdc::insert("principal", principal); @@ -1041,6 +1056,10 @@ pub async fn run(settings: Settings, verbosity: u8) { panic!("Failed to setup logging: {:?}", e); } + if let Some(monitoring_settings) = settings.monitoring() { + monitoring::init(monitoring_settings).expect("Failed to set metric exporter"); + } + let rt_handle = Handle::current(); // Start monitoring thread diff --git a/server/src/logic.rs b/server/src/logic.rs index 2595c3d..ea88374 100644 --- a/server/src/logic.rs +++ b/server/src/logic.rs @@ -1,6 +1,9 @@ use crate::{ event::{EventData, EventMetadata}, heartbeat::{store_heartbeat, WriteHeartbeatMessage}, + monitoring::{ + EVENTS_COUNTER, EVENTS_SUBSCRIPTION_NAME, EVENTS_SUBSCRIPTION_UUID, FAILED_EVENTS_COUNTER, MESSAGES_ACTION, MESSAGES_ACTION_ENUMERATE, MESSAGES_ACTION_EVENTS, MESSAGES_ACTION_HEARTBEAT, MESSAGES_COUNTER + }, output::get_formatter, soap::{ Body, Header, Message, OptionSetValue, Subscription as SoapSubscription, SubscriptionBody, @@ -17,6 +20,7 @@ use common::{ }; use hyper::http::status::StatusCode; use log::{debug, error, warn}; +use metrics::counter; use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -217,6 +221,9 @@ async fn handle_enumerate( }); } + counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_ENUMERATE) + .increment(1); + Ok(Response::ok( ACTION_ENUMERATE_RESPONSE, Some(Body::EnumerateResponse(res_subscriptions)), @@ -283,6 +290,9 @@ async fn handle_heartbeat( ) .await .context("Failed to store heartbeat")?; + + counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_HEARTBEAT).increment(1); + Ok(Response::ok(ACTION_ACK, None)) } @@ -354,6 +364,14 @@ async fn handle_events( return Ok(Response::err(StatusCode::FORBIDDEN)); } + // Retrieve the public version sent by the client, not the one stored in memory + let public_version = if let Some(public_version) = message.header().version() { + public_version + } else { + warn!("Missing subscription version in message events"); + return Ok(Response::err(StatusCode::BAD_REQUEST)); + }; + debug!( "Received {} events from {}:{} ({}) for subscription {} ({})", events.len(), @@ -364,13 +382,9 @@ async fn handle_events( subscription.uuid_string() ); - // Retrieve the public version sent by the client, not the one stored in memory - let public_version = if let Some(public_version) = message.header().version() { - public_version - } else { - warn!("Missing subscription version in message events"); - return Ok(Response::err(StatusCode::BAD_REQUEST)); - }; + counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_EVENTS) + .increment(1); + counter!(EVENTS_COUNTER, EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string()).increment(events.len().try_into()?); let metadata = Arc::new(EventMetadata::new( request_data.remote_addr(), @@ -457,6 +471,7 @@ async fn handle_events( } if !succeed { + counter!(FAILED_EVENTS_COUNTER, EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string()).increment(events.len().try_into()?); return Ok(Response::err(StatusCode::SERVICE_UNAVAILABLE)); } diff --git a/server/src/monitoring.rs b/server/src/monitoring.rs new file mode 100644 index 0000000..063d71c --- /dev/null +++ b/server/src/monitoring.rs @@ -0,0 +1,69 @@ +use std::{ + net::{IpAddr, SocketAddr}, + str::FromStr, +}; + +use anyhow::Result; +use common::settings::Monitoring; +use log::info; +use metrics::{describe_counter, describe_histogram, Unit}; +use metrics_exporter_prometheus::{Matcher, PrometheusBuilder}; + +pub const MESSAGES_COUNTER: &str = "openwec_message_total"; +pub const MESSAGES_ACTION: &str = "action"; +pub const MESSAGES_ACTION_HEARTBEAT: &str = "heartbeat"; +pub const MESSAGES_ACTION_EVENTS: &str = "events"; +pub const MESSAGES_ACTION_ENUMERATE: &str = "enumerate"; + +pub const EVENTS_COUNTER: &str = "openwec_event_received_total"; +pub const EVENTS_SUBSCRIPTION_UUID: &str = "subscription_uuid"; +pub const EVENTS_SUBSCRIPTION_NAME: &str = "subscription_name"; + +pub const FAILED_EVENTS_COUNTER: &str = "openwec_event_output_failure_total"; + +pub const HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM: &str = "http_request_duration_seconds"; +pub const HTTP_REQUESTS_METHOD: &str = "method"; +pub const HTTP_REQUESTS_URI: &str = "uri"; +pub const HTTP_REQUESTS_STATUS: &str = "status"; + +pub fn init(settings: &Monitoring) -> Result<()> { + let addr = SocketAddr::from(( + IpAddr::from_str(settings.listen_address()) + .expect("Failed to parse monitoring.listen_address"), + settings.listen_port(), + )); + + let builder = PrometheusBuilder::new() + .with_http_listener(addr) + .set_buckets_for_metric( + Matcher::Full(HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM.to_string()), + settings.http_requests_histogram_buckets(), + )?; + + info!("Starting monitoring server on {}", addr); + + builder.install()?; + + describe_counter!( + MESSAGES_COUNTER, + Unit::Count, + "Number of messages received by openwec" + ); + describe_counter!( + EVENTS_COUNTER, + Unit::Count, + "Number of events received by openwec" + ); + describe_counter!( + FAILED_EVENTS_COUNTER, + Unit::Count, + "Number of events that could not be written to outputs by openwec" + ); + describe_histogram!( + HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM, + Unit::Seconds, + "HTTP requests duration histogram" + ); + + Ok(()) +} From 0d0abca39486c30f2b2fb3a39f26589bb4ad90d7 Mon Sep 17 00:00:00 2001 From: Vincent Ruello <5345986+vruello@users.noreply.github.com> Date: Tue, 12 Nov 2024 10:18:20 +0100 Subject: [PATCH 02/11] Fix duration ms in access logs --- server/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/lib.rs b/server/src/lib.rs index 51496f4..a4c6692 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -385,13 +385,13 @@ fn log_response( principal: &str, conn_status: ConnectionStatus, ) { - let duration = start.elapsed(); + let duration = start.elapsed().as_secs_f64(); histogram!(HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM, HTTP_REQUESTS_METHOD => method.to_owned(), HTTP_REQUESTS_STATUS => status.to_string(), HTTP_REQUESTS_URI => uri.to_owned()) - .record(duration.as_secs_f64()); + .record(duration); // MDC is thread related, so it should be safe to use it in a non-async // function. @@ -400,7 +400,7 @@ fn log_response( log_mdc::insert("http_uri", uri); log_mdc::insert( "response_time", - format!("{:.3}", duration.as_micros() / 1000), + format!("{:.3}", duration * 1000.0), ); log_mdc::insert("ip", addr.ip().to_string()); log_mdc::insert("port", addr.port().to_string()); From 1e4e4a9a852ba70c6590beee967dece835ae3cab Mon Sep 17 00:00:00 2001 From: Vincent Ruello <5345986+vruello@users.noreply.github.com> Date: Sat, 23 Nov 2024 16:42:41 +0100 Subject: [PATCH 03/11] Rename metrics and optionally count events per machine --- common/src/settings.rs | 5 +++++ server/src/lib.rs | 20 +++++++++++++++----- server/src/logic.rs | 32 +++++++++++++++++++++++++------- server/src/monitoring.rs | 7 ++++--- 4 files changed, 49 insertions(+), 15 deletions(-) diff --git a/common/src/settings.rs b/common/src/settings.rs index a48288b..ff4c041 100644 --- a/common/src/settings.rs +++ b/common/src/settings.rs @@ -374,6 +374,7 @@ pub struct Monitoring { listen_address: String, listen_port: u16, http_requests_histogram_buckets: Option>, + count_received_events_per_machine: Option, } impl Monitoring { @@ -393,6 +394,10 @@ impl Monitoring { ], } } + + pub fn count_received_events_per_machine(&self) -> bool { + self.count_received_events_per_machine.unwrap_or(false) + } } #[derive(Debug, Deserialize, Clone)] diff --git a/server/src/lib.rs b/server/src/lib.rs index a4c6692..98b4d8e 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -20,7 +20,7 @@ mod tls; use anyhow::{anyhow, bail, Context, Result}; use common::database::{db_from_settings, schema_is_up_to_date, Db}; use common::encoding::decode_utf16le; -use common::settings::{Authentication, Kerberos, Tls}; +use common::settings::{Authentication, Kerberos, Monitoring, Tls}; use common::settings::{Collector, Server as ServerSettings, Settings}; use core::pin::Pin; use futures::Future; @@ -293,6 +293,7 @@ async fn authenticate( async fn handle_payload( server: &ServerSettings, collector: &Collector, + monitoring: &Option, db: Db, subscriptions: Subscriptions, heartbeat_tx: mpsc::Sender, @@ -313,6 +314,7 @@ async fn handle_payload( let response = logic::handle_message( server, collector, + monitoring, db, subscriptions, heartbeat_tx, @@ -398,10 +400,7 @@ fn log_response( log_mdc::insert("http_status", status.as_str()); log_mdc::insert("http_method", method); log_mdc::insert("http_uri", uri); - log_mdc::insert( - "response_time", - format!("{:.3}", duration * 1000.0), - ); + log_mdc::insert("response_time", format!("{:.3}", duration * 1000.0)); log_mdc::insert("ip", addr.ip().to_string()); log_mdc::insert("port", addr.port().to_string()); log_mdc::insert("principal", principal); @@ -422,6 +421,7 @@ fn build_error_response(status: StatusCode) -> Response, db: Db, subscriptions: Subscriptions, heartbeat_tx: mpsc::Sender, @@ -536,6 +536,7 @@ async fn handle( let res = handle_payload( &server, &collector, + &monitoring, db, subscriptions, heartbeat_tx, @@ -693,6 +694,7 @@ fn create_kerberos_server( collector_subscriptions: Subscriptions, collector_heartbeat_tx: mpsc::Sender, collector_server_settings: ServerSettings, + monitoring_settings: Option, collector_shutdown_ct: CancellationToken, server_addr: SocketAddr, ) -> Pin> + Send>> { @@ -745,6 +747,7 @@ fn create_kerberos_server( let svc_db = collector_db.clone(); let svc_server_settings = collector_server_settings.clone(); let svc_server_principal = server_principal.clone(); + let svc_monitoring_settings = monitoring_settings.clone(); let subscriptions = collector_subscriptions.clone(); let collector_heartbeat_tx = collector_heartbeat_tx.clone(); @@ -784,6 +787,7 @@ fn create_kerberos_server( handle( svc_server_settings.clone(), collector_settings.clone(), + svc_monitoring_settings.clone(), svc_db.clone(), subscriptions.clone(), collector_heartbeat_tx.clone(), @@ -842,6 +846,7 @@ fn create_tls_server( collector_subscriptions: Subscriptions, collector_heartbeat_tx: mpsc::Sender, collector_server_settings: ServerSettings, + monitoring_settings: Option, collector_shutdown_ct: CancellationToken, server_addr: SocketAddr, ) -> Pin> + Send>> { @@ -890,6 +895,7 @@ fn create_tls_server( let collector_settings = collector_settings.clone(); let svc_db = collector_db.clone(); let svc_server_settings = collector_server_settings.clone(); + let svc_monitoring_settings = monitoring_settings.clone(); let subscriptions = collector_subscriptions.clone(); let collector_heartbeat_tx = collector_heartbeat_tx.clone(); let thumbprint = tls_config.thumbprint.clone(); @@ -960,6 +966,7 @@ fn create_tls_server( handle( svc_server_settings.clone(), collector_settings.clone(), + svc_monitoring_settings.clone(), svc_db.clone(), subscriptions.clone(), collector_heartbeat_tx.clone(), @@ -1152,6 +1159,7 @@ pub async fn run(settings: Settings, verbosity: u8) { let collector_heartbeat_tx = heartbeat_tx.clone(); let collector_server_settings = settings.server().clone(); let collector_shutdown_ct = shutdown_ct.clone(); + let collector_monitoring_settings = settings.monitoring().cloned(); // Construct our SocketAddr to listen on... let addr = SocketAddr::from(( @@ -1172,6 +1180,7 @@ pub async fn run(settings: Settings, verbosity: u8) { collector_subscriptions, collector_heartbeat_tx, collector_server_settings, + collector_monitoring_settings, collector_shutdown_ct, addr, )); @@ -1184,6 +1193,7 @@ pub async fn run(settings: Settings, verbosity: u8) { collector_subscriptions, collector_heartbeat_tx, collector_server_settings, + collector_monitoring_settings, collector_shutdown_ct, addr, )); diff --git a/server/src/logic.rs b/server/src/logic.rs index ea88374..e29e3bb 100644 --- a/server/src/logic.rs +++ b/server/src/logic.rs @@ -2,7 +2,9 @@ use crate::{ event::{EventData, EventMetadata}, heartbeat::{store_heartbeat, WriteHeartbeatMessage}, monitoring::{ - EVENTS_COUNTER, EVENTS_SUBSCRIPTION_NAME, EVENTS_SUBSCRIPTION_UUID, FAILED_EVENTS_COUNTER, MESSAGES_ACTION, MESSAGES_ACTION_ENUMERATE, MESSAGES_ACTION_EVENTS, MESSAGES_ACTION_HEARTBEAT, MESSAGES_COUNTER + EVENTS_COUNTER, EVENTS_MACHINE, EVENTS_SUBSCRIPTION_NAME, EVENTS_SUBSCRIPTION_UUID, + FAILED_EVENTS_COUNTER, MESSAGES_ACTION, MESSAGES_ACTION_ENUMERATE, MESSAGES_ACTION_EVENTS, + MESSAGES_ACTION_HEARTBEAT, MESSAGES_COUNTER, }, output::get_formatter, soap::{ @@ -15,7 +17,7 @@ use crate::{ }; use common::{ database::Db, - settings::{Collector, Server}, + settings::{Collector, Monitoring, Server}, subscription::{SubscriptionOutputFormat, SubscriptionUuid}, }; use hyper::http::status::StatusCode; @@ -221,8 +223,7 @@ async fn handle_enumerate( }); } - counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_ENUMERATE) - .increment(1); + counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_ENUMERATE).increment(1); Ok(Response::ok( ACTION_ENUMERATE_RESPONSE, @@ -326,6 +327,7 @@ fn get_formatted_events( async fn handle_events( server: &Server, + monitoring: &Option, db: &Db, subscriptions: Subscriptions, heartbeat_tx: mpsc::Sender, @@ -382,9 +384,23 @@ async fn handle_events( subscription.uuid_string() ); - counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_EVENTS) - .increment(1); - counter!(EVENTS_COUNTER, EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string()).increment(events.len().try_into()?); + counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_EVENTS).increment(1); + + match monitoring { + Some(monitoring_conf) if monitoring_conf.count_received_events_per_machine() => { + counter!(EVENTS_COUNTER, + EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string(), + EVENTS_MACHINE => request_data.principal().to_string()) + .increment(events.len().try_into()?); + } + _ => { + counter!(EVENTS_COUNTER, + EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string()) + .increment(events.len().try_into()?); + } + } let metadata = Arc::new(EventMetadata::new( request_data.remote_addr(), @@ -515,6 +531,7 @@ async fn handle_events( pub async fn handle_message( server: &Server, collector: &Collector, + monitoring: &Option, db: Db, subscriptions: Subscriptions, heartbeat_tx: mpsc::Sender, @@ -538,6 +555,7 @@ pub async fn handle_message( } else if action == ACTION_EVENTS { handle_events( server, + monitoring, &db, subscriptions, heartbeat_tx, diff --git a/server/src/monitoring.rs b/server/src/monitoring.rs index 063d71c..540f2e3 100644 --- a/server/src/monitoring.rs +++ b/server/src/monitoring.rs @@ -9,17 +9,18 @@ use log::info; use metrics::{describe_counter, describe_histogram, Unit}; use metrics_exporter_prometheus::{Matcher, PrometheusBuilder}; -pub const MESSAGES_COUNTER: &str = "openwec_message_total"; +pub const MESSAGES_COUNTER: &str = "openwec_messages_total"; pub const MESSAGES_ACTION: &str = "action"; pub const MESSAGES_ACTION_HEARTBEAT: &str = "heartbeat"; pub const MESSAGES_ACTION_EVENTS: &str = "events"; pub const MESSAGES_ACTION_ENUMERATE: &str = "enumerate"; -pub const EVENTS_COUNTER: &str = "openwec_event_received_total"; +pub const EVENTS_COUNTER: &str = "openwec_received_events_total"; pub const EVENTS_SUBSCRIPTION_UUID: &str = "subscription_uuid"; pub const EVENTS_SUBSCRIPTION_NAME: &str = "subscription_name"; +pub const EVENTS_MACHINE: &str = "machine"; -pub const FAILED_EVENTS_COUNTER: &str = "openwec_event_output_failure_total"; +pub const FAILED_EVENTS_COUNTER: &str = "openwec_event_output_failures_total"; pub const HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM: &str = "http_request_duration_seconds"; pub const HTTP_REQUESTS_METHOD: &str = "method"; From d6444381e14c5c6c9b5927cfead89bcb857956e7 Mon Sep 17 00:00:00 2001 From: Vincent Ruello <5345986+vruello@users.noreply.github.com> Date: Sat, 23 Nov 2024 18:22:26 +0100 Subject: [PATCH 04/11] Add metrics for the size of events and the size of http requests body --- common/src/settings.rs | 17 ++++++++ server/src/lib.rs | 90 +++++++++++++++++++++++++++++++--------- server/src/logic.rs | 32 +++++++++++--- server/src/monitoring.rs | 23 ++++++++++ 4 files changed, 136 insertions(+), 26 deletions(-) diff --git a/common/src/settings.rs b/common/src/settings.rs index ff4c041..77f595d 100644 --- a/common/src/settings.rs +++ b/common/src/settings.rs @@ -375,6 +375,9 @@ pub struct Monitoring { listen_port: u16, http_requests_histogram_buckets: Option>, count_received_events_per_machine: Option, + count_event_size_per_machine: Option, + count_http_request_body_network_size_per_machine: Option, + count_http_request_body_real_size_per_machine: Option, } impl Monitoring { @@ -398,6 +401,20 @@ impl Monitoring { pub fn count_received_events_per_machine(&self) -> bool { self.count_received_events_per_machine.unwrap_or(false) } + + pub fn count_event_size_per_machine(&self) -> bool { + self.count_event_size_per_machine.unwrap_or(false) + } + + pub fn count_http_request_body_network_size_per_machine(&self) -> bool { + self.count_http_request_body_network_size_per_machine + .unwrap_or(false) + } + + pub fn count_http_request_body_real_size_per_machine(&self) -> bool { + self.count_http_request_body_real_size_per_machine + .unwrap_or(false) + } } #[derive(Debug, Deserialize, Clone)] diff --git a/server/src/lib.rs b/server/src/lib.rs index 98b4d8e..db765fc 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -39,10 +39,11 @@ use hyper_util::rt::TokioIo; use kerberos::AuthenticationError; use libgssapi::error::MajorFlags; use log::{debug, error, info, trace, warn}; -use metrics::histogram; +use metrics::{counter, histogram}; use monitoring::{ - HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM, HTTP_REQUESTS_METHOD, HTTP_REQUESTS_STATUS, - HTTP_REQUESTS_URI, + HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM, HTTP_REQUESTS_MACHINE, HTTP_REQUESTS_METHOD, + HTTP_REQUESTS_STATUS, HTTP_REQUESTS_URI, HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, + HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, }; use quick_xml::writer::Writer; use soap::Serializable; @@ -95,6 +96,8 @@ pub struct RequestData { principal: String, remote_addr: SocketAddr, category: RequestCategory, + uri: String, + method: String, } impl RequestData { @@ -103,6 +106,8 @@ impl RequestData { principal: principal.to_owned(), remote_addr: remote_addr.to_owned(), category: RequestCategory::try_from(req)?, + method: req.method().to_string(), + uri: req.uri().to_string(), }) } @@ -120,6 +125,14 @@ impl RequestData { pub fn category(&self) -> &RequestCategory { &self.category } + + pub fn uri(&self) -> &str { + &self.uri + } + + pub fn method(&self) -> &str { + &self.method + } } #[derive(Debug, Clone)] @@ -143,7 +156,9 @@ fn full>(chunk: T) -> BoxBody { async fn get_request_payload( collector: &Collector, + monitoring: &Option, auth_ctx: &AuthenticationContext, + request_data: &RequestData, req: Request, ) -> Result> { let (parts, body) = req.into_parts(); @@ -174,6 +189,23 @@ async fn get_request_payload( return Ok(None); } + let http_request_body_network_size_bytes_counter = match monitoring { + Some(monitoring_conf) + if monitoring_conf.count_http_request_body_network_size_per_machine() => + { + counter!(HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, + HTTP_REQUESTS_METHOD => request_data.method().to_string(), + HTTP_REQUESTS_URI => request_data.uri().to_string(), + HTTP_REQUESTS_MACHINE => request_data.principal().to_string()) + } + _ => { + counter!(HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, + HTTP_REQUESTS_METHOD => request_data.method().to_string(), + HTTP_REQUESTS_URI => request_data.uri().to_string()) + } + }; + http_request_body_network_size_bytes_counter.increment(data.len().try_into()?); + let message = match auth_ctx { AuthenticationContext::Tls(_, _) => tls::get_request_payload(parts, data).await?, AuthenticationContext::Kerberos(conn_state) => { @@ -183,6 +215,23 @@ async fn get_request_payload( match message { Some(bytes) => { + let http_request_body_real_size_bytes_counter = match monitoring { + Some(monitoring_conf) + if monitoring_conf.count_http_request_body_real_size_per_machine() => + { + counter!(HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, + HTTP_REQUESTS_METHOD => request_data.method().to_string(), + HTTP_REQUESTS_URI => request_data.uri().to_string(), + HTTP_REQUESTS_MACHINE => request_data.principal().to_string()) + } + _ => { + counter!(HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, + HTTP_REQUESTS_METHOD => request_data.method().to_string(), + HTTP_REQUESTS_URI => request_data.uri().to_string()) + } + }; + http_request_body_real_size_bytes_counter.increment(bytes.len().try_into()?); + // Spawn a blocking task to decode utf16 tokio::task::spawn_blocking(|| Ok(Some(decode_utf16le(bytes)?))).await? } @@ -489,23 +538,24 @@ async fn handle( }; // Get request payload - let request_payload = match get_request_payload(&collector, &auth_ctx, req).await { - Ok(payload) => payload, - Err(e) => { - error!("Failed to retrieve request payload: {:?}", e); - let status = StatusCode::BAD_REQUEST; - log_response( - &addr, - &method, - &uri, - &start, - status, - &principal, - ConnectionStatus::Alive, - ); - return Ok(build_error_response(status)); - } - }; + let request_payload = + match get_request_payload(&collector, &monitoring, &auth_ctx, &request_data, req).await { + Ok(payload) => payload, + Err(e) => { + error!("Failed to retrieve request payload: {:?}", e); + let status = StatusCode::BAD_REQUEST; + log_response( + &addr, + &method, + &uri, + &start, + status, + &principal, + ConnectionStatus::Alive, + ); + return Ok(build_error_response(status)); + } + }; trace!( "Received payload: {:?}", diff --git a/server/src/logic.rs b/server/src/logic.rs index e29e3bb..c89fd00 100644 --- a/server/src/logic.rs +++ b/server/src/logic.rs @@ -3,8 +3,9 @@ use crate::{ heartbeat::{store_heartbeat, WriteHeartbeatMessage}, monitoring::{ EVENTS_COUNTER, EVENTS_MACHINE, EVENTS_SUBSCRIPTION_NAME, EVENTS_SUBSCRIPTION_UUID, - FAILED_EVENTS_COUNTER, MESSAGES_ACTION, MESSAGES_ACTION_ENUMERATE, MESSAGES_ACTION_EVENTS, - MESSAGES_ACTION_HEARTBEAT, MESSAGES_COUNTER, + EVENT_SIZE_BYTES_COUNTER, FAILED_EVENTS_COUNTER, MESSAGES_ACTION, + MESSAGES_ACTION_ENUMERATE, MESSAGES_ACTION_EVENTS, MESSAGES_ACTION_HEARTBEAT, + MESSAGES_COUNTER, }, output::get_formatter, soap::{ @@ -386,21 +387,40 @@ async fn handle_events( counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_EVENTS).increment(1); - match monitoring { + let events_counter = match monitoring { Some(monitoring_conf) if monitoring_conf.count_received_events_per_machine() => { counter!(EVENTS_COUNTER, EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string(), EVENTS_MACHINE => request_data.principal().to_string()) - .increment(events.len().try_into()?); } _ => { counter!(EVENTS_COUNTER, EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string()) - .increment(events.len().try_into()?); } - } + }; + events_counter.increment(events.len().try_into()?); + + let event_size_counter = match monitoring { + Some(monitoring_conf) if monitoring_conf.count_event_size_per_machine() => { + counter!(EVENT_SIZE_BYTES_COUNTER, + EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string(), + EVENTS_MACHINE => request_data.principal().to_string()) + } + _ => { + counter!(EVENT_SIZE_BYTES_COUNTER, + EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string()) + } + }; + event_size_counter.increment( + events + .iter() + .fold(0, |acc, event| acc + event.len()) + .try_into()?, + ); let metadata = Arc::new(EventMetadata::new( request_data.remote_addr(), diff --git a/server/src/monitoring.rs b/server/src/monitoring.rs index 540f2e3..f582358 100644 --- a/server/src/monitoring.rs +++ b/server/src/monitoring.rs @@ -26,6 +26,14 @@ pub const HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM: &str = "http_request_duratio pub const HTTP_REQUESTS_METHOD: &str = "method"; pub const HTTP_REQUESTS_URI: &str = "uri"; pub const HTTP_REQUESTS_STATUS: &str = "status"; +pub const HTTP_REQUESTS_MACHINE: &str = "machine"; + +pub const HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER: &str = + "http_request_body_network_size_bytes_total"; +pub const HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER: &str = + "http_request_body_real_size_bytes_total"; + +pub const EVENT_SIZE_BYTES_COUNTER: &str = "openwec_event_size_bytes_total"; pub fn init(settings: &Monitoring) -> Result<()> { let addr = SocketAddr::from(( @@ -65,6 +73,21 @@ pub fn init(settings: &Monitoring) -> Result<()> { Unit::Seconds, "HTTP requests duration histogram" ); + describe_counter!( + HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, + Unit::Bytes, + "The total size of all http requests body received by openwec" + ); + describe_counter!( + HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, + Unit::Bytes, + "The total size of all http requests body received by openwec after decryption and decompression" + ); + describe_counter!( + EVENT_SIZE_BYTES_COUNTER, + Unit::Bytes, + "The total size of all events received by openwec" + ); Ok(()) } From 474f2786b298c8af4f54a4f055a51324479aac92 Mon Sep 17 00:00:00 2001 From: Vincent Ruello <5345986+vruello@users.noreply.github.com> Date: Sat, 23 Nov 2024 19:42:20 +0100 Subject: [PATCH 05/11] Add monitoring section in sample config file --- openwec.conf.sample.toml | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/openwec.conf.sample.toml b/openwec.conf.sample.toml index 5f6d4d8..d757eae 100644 --- a/openwec.conf.sample.toml +++ b/openwec.conf.sample.toml @@ -302,3 +302,41 @@ # - If you configure Kafka options in an output, a dedicated Kafka client will be # used for that output regardless of this setting. # options = {} + +########################## +## Monitoring settings ## +########################## + +# OpenWEC can expose internal metrics on a Prometheus-compatible endpoint. +# Monitoring is disabled by default. +# You can enable it by uncommenting the [monitoring] section. + +# [monitoring] + +# [Required] +# Listen address of the Prometheus-compatible endpoint +# listen_address = + +# [Required] +# Listen port of the Prometheus-compatible endpoint +# listen_port = + +# [Optional] +# If set, a "machine" label will be added to the "openwec_received_events_total" metric +# Warning: this may cause a HUGE increase in metric cardinality +# count_received_events_per_machine = false + +# [Optional] +# If set, a "machine" label will be added to the "openwec_event_size_bytes_total" metric +# Warning: this may cause a HUGE increase in metric cardinality +# count_event_size_per_machine = false + +# [Optional] +# If set, a "machine" label will be added to the "http_request_body_network_size_bytes_total" metric +# Warning: this may cause a HUGE increase in metric cardinality +# count_http_request_body_network_size_per_machine = false + +# [Optional] +# If set, a "machine" label will be added to the "http_request_body_real_size_bytes_total" metric +# Warning: this may cause a HUGE increase in metric cardinality +# count_http_request_body_real_size_per_machine = false From f5f4e36ce55755c2082bc75a351809e4e82465a5 Mon Sep 17 00:00:00 2001 From: Vincent Ruello <5345986+vruello@users.noreply.github.com> Date: Sat, 23 Nov 2024 19:43:10 +0100 Subject: [PATCH 06/11] Add tests for monitoring settings --- common/src/settings.rs | 63 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/common/src/settings.rs b/common/src/settings.rs index 77f595d..31fdf3e 100644 --- a/common/src/settings.rs +++ b/common/src/settings.rs @@ -539,9 +539,12 @@ mod tests { assert!(s.logging().verbosity().is_none()); assert!(s.logging().access_logs().is_none()); assert_eq!(s.logging().server_logs(), LoggingType::Stderr); + assert_eq!(s.server().tcp_keepalive_time(), 3600); assert_eq!(s.server().tcp_keepalive_intvl().unwrap(), 1); assert_eq!(s.server().tcp_keepalive_probes().unwrap(), 10); + + assert!(s.monitoring().is_none()); } const CONFIG_TLS_POSTGRES: &str = r#" @@ -568,6 +571,10 @@ mod tests { server_certificate = "/etc/server_certificate.pem" server_private_key = "/etc/server_private_key.pem" ca_certificate = "/etc/ca_certificate.pem" + + [monitoring] + listen_address = "127.0.0.1" + listen_port = 9090 "#; #[test] @@ -610,13 +617,40 @@ mod tests { assert_eq!(s.logging().server_logs(), LoggingType::Stdout); assert_eq!(s.logging().server_logs_pattern().unwrap(), "toto"); assert_eq!(s.logging().access_logs_pattern(), "tutu"); + assert_eq!(s.server().tcp_keepalive_time(), 7200); assert!(s.server().tcp_keepalive_intvl().is_none()); assert!(s.server().tcp_keepalive_probes().is_none()); + assert_eq!(s.cli().read_only_subscriptions(), false); + assert_eq!(s.outputs().garbage_collect_interval(), 600); assert_eq!(s.outputs().files().files_descriptor_close_timeout(), 600); assert!(s.outputs().kafka().options().is_empty()); + + assert!(s.monitoring().is_some()); + assert_eq!(s.monitoring().unwrap().listen_address(), "127.0.0.1"); + assert_eq!(s.monitoring().unwrap().listen_port(), 9090); + assert_eq!( + s.monitoring().unwrap().count_event_size_per_machine(), + false + ); + assert_eq!( + s.monitoring() + .unwrap() + .count_http_request_body_network_size_per_machine(), + false + ); + assert_eq!( + s.monitoring() + .unwrap() + .count_http_request_body_real_size_per_machine(), + false + ); + assert_eq!( + s.monitoring().unwrap().count_received_events_per_machine(), + false + ); } const CONFIG_TLS_POSTGRES_WITH_CLI: &str = r#" @@ -645,12 +679,41 @@ mod tests { [cli] read_only_subscriptions = true + + [monitoring] + listen_address = "127.0.0.1" + listen_port = 9090 + count_event_size_per_machine = true + count_http_request_body_network_size_per_machine = true + count_http_request_body_real_size_per_machine = true + count_received_events_per_machine = true "#; #[test] fn test_settings_tls_postgres_with_cli() { let s = Settings::from_str(CONFIG_TLS_POSTGRES_WITH_CLI).unwrap(); assert_eq!(s.cli().read_only_subscriptions(), true); + + assert!(s.monitoring().is_some()); + assert_eq!(s.monitoring().unwrap().listen_address(), "127.0.0.1"); + assert_eq!(s.monitoring().unwrap().listen_port(), 9090); + assert_eq!(s.monitoring().unwrap().count_event_size_per_machine(), true); + assert_eq!( + s.monitoring() + .unwrap() + .count_http_request_body_network_size_per_machine(), + true + ); + assert_eq!( + s.monitoring() + .unwrap() + .count_http_request_body_real_size_per_machine(), + true + ); + assert_eq!( + s.monitoring().unwrap().count_received_events_per_machine(), + true + ); } const CONFIG_TLS_POSTGRES_WITH_OUTPUTS: &str = r#" From 22a3974978fbb8d54641c6d693497d9b1405846e Mon Sep 17 00:00:00 2001 From: Vincent Ruello <5345986+vruello@users.noreply.github.com> Date: Sat, 23 Nov 2024 19:43:37 +0100 Subject: [PATCH 07/11] Update monitoring documentation --- doc/monitoring.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/doc/monitoring.md b/doc/monitoring.md index a954fa7..b5d7283 100644 --- a/doc/monitoring.md +++ b/doc/monitoring.md @@ -28,3 +28,27 @@ $ openwec heartbeats -a 192.168.1.0 -s my-test-subscription ``` Two formats are available: `text` (default) and `json` (`--format`). + +## Prometheus-compatible endpoint + +OpenWEC can expose a Prometheus-compatible endpoint with multiple metrics. + +### Configuration + +This feature is **disabled** by default. + +Metrics collection and publication can be enabled in the OpenWEC settings (see `monitoring` section of [openwec.conf.sample.toml](../openwec.conf.sample.toml)). + +### Available metrics + +| **Metric** | **Type** | **Labels** | **Description** | +|---|---|---|---| +| `openwec_received_events_total` | `Counter` | `subscription_uuid`, `subscription_name`, `machine` (optional*) | Number of events received by openwec | +| `openwec_event_size_bytes_total` | `Counter` | `subscription_uuid`, `subscription_name`, `machine` (optional*) | The total size of all events received by openwec | +| `http_request_body_real_size_bytes_total` | `Counter` | `method`, `uri`, `machine` (optional*) | The total size of all http requests body received by openwec after decryption and decompression | +| `http_request_body_network_size_bytes_total` | `Counter` | `method`, `uri`, `machine` (optional*) | The total size of all http requests body received by openwec | +| `openwec_messages_total` | `Counter` | `action` (one of `"enumerate"`, `"heartbeat"`, `"events"`) | Number of messages received by openwec | +| `http_request_duration_seconds` | `Histogram` | `method`, `status`, `uri` | HTTP requests duration histogram | + +> [!WARNING] +> Enabling the `machine` labels may cause a **huge** increase in metric cardinality! \ No newline at end of file From 500f05316242b0238145531b50bb094acfebbf15 Mon Sep 17 00:00:00 2001 From: Vincent Ruello <5345986+vruello@users.noreply.github.com> Date: Sat, 23 Nov 2024 20:01:29 +0100 Subject: [PATCH 08/11] Improve "http_request_duration_seconds" metric and add doc --- common/src/settings.rs | 15 ++++++++++++--- openwec.conf.sample.toml | 4 ++++ server/src/lib.rs | 34 +++++++++++++++++----------------- server/src/monitoring.rs | 16 ++++++++-------- 4 files changed, 41 insertions(+), 28 deletions(-) diff --git a/common/src/settings.rs b/common/src/settings.rs index 31fdf3e..2123ee4 100644 --- a/common/src/settings.rs +++ b/common/src/settings.rs @@ -373,7 +373,7 @@ impl Outputs { pub struct Monitoring { listen_address: String, listen_port: u16, - http_requests_histogram_buckets: Option>, + http_request_duration_buckets: Option>, count_received_events_per_machine: Option, count_event_size_per_machine: Option, count_http_request_body_network_size_per_machine: Option, @@ -389,8 +389,8 @@ impl Monitoring { self.listen_port } - pub fn http_requests_histogram_buckets(&self) -> &[f64] { - match &self.http_requests_histogram_buckets { + pub fn http_request_duration_buckets(&self) -> &[f64] { + match &self.http_request_duration_buckets { Some(bucket) => bucket, None => &[ 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, @@ -651,6 +651,10 @@ mod tests { s.monitoring().unwrap().count_received_events_per_machine(), false ); + assert_eq!( + s.monitoring().unwrap().http_request_duration_buckets(), + &[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,] + ); } const CONFIG_TLS_POSTGRES_WITH_CLI: &str = r#" @@ -683,6 +687,7 @@ mod tests { [monitoring] listen_address = "127.0.0.1" listen_port = 9090 + http_request_duration_buckets = [0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0] count_event_size_per_machine = true count_http_request_body_network_size_per_machine = true count_http_request_body_real_size_per_machine = true @@ -714,6 +719,10 @@ mod tests { s.monitoring().unwrap().count_received_events_per_machine(), true ); + assert_eq!( + s.monitoring().unwrap().http_request_duration_buckets(), + &[0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0] + ); } const CONFIG_TLS_POSTGRES_WITH_OUTPUTS: &str = r#" diff --git a/openwec.conf.sample.toml b/openwec.conf.sample.toml index d757eae..49a24c1 100644 --- a/openwec.conf.sample.toml +++ b/openwec.conf.sample.toml @@ -321,6 +321,10 @@ # Listen port of the Prometheus-compatible endpoint # listen_port = +# [Optional] +# Request duration buckets (in seconds) used by the "http_request_duration_seconds" histogram +# http_request_duration_buckets = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0] + # [Optional] # If set, a "machine" label will be added to the "openwec_received_events_total" metric # Warning: this may cause a HUGE increase in metric cardinality diff --git a/server/src/lib.rs b/server/src/lib.rs index db765fc..d521138 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -41,9 +41,9 @@ use libgssapi::error::MajorFlags; use log::{debug, error, info, trace, warn}; use metrics::{counter, histogram}; use monitoring::{ - HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM, HTTP_REQUESTS_MACHINE, HTTP_REQUESTS_METHOD, - HTTP_REQUESTS_STATUS, HTTP_REQUESTS_URI, HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, - HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, + HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, + HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM, HTTP_REQUEST_MACHINE, HTTP_REQUEST_METHOD, + HTTP_REQUEST_STATUS, HTTP_REQUEST_URI, }; use quick_xml::writer::Writer; use soap::Serializable; @@ -194,14 +194,14 @@ async fn get_request_payload( if monitoring_conf.count_http_request_body_network_size_per_machine() => { counter!(HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, - HTTP_REQUESTS_METHOD => request_data.method().to_string(), - HTTP_REQUESTS_URI => request_data.uri().to_string(), - HTTP_REQUESTS_MACHINE => request_data.principal().to_string()) + HTTP_REQUEST_METHOD => request_data.method().to_string(), + HTTP_REQUEST_URI => request_data.uri().to_string(), + HTTP_REQUEST_MACHINE => request_data.principal().to_string()) } _ => { counter!(HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, - HTTP_REQUESTS_METHOD => request_data.method().to_string(), - HTTP_REQUESTS_URI => request_data.uri().to_string()) + HTTP_REQUEST_METHOD => request_data.method().to_string(), + HTTP_REQUEST_URI => request_data.uri().to_string()) } }; http_request_body_network_size_bytes_counter.increment(data.len().try_into()?); @@ -220,14 +220,14 @@ async fn get_request_payload( if monitoring_conf.count_http_request_body_real_size_per_machine() => { counter!(HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, - HTTP_REQUESTS_METHOD => request_data.method().to_string(), - HTTP_REQUESTS_URI => request_data.uri().to_string(), - HTTP_REQUESTS_MACHINE => request_data.principal().to_string()) + HTTP_REQUEST_METHOD => request_data.method().to_string(), + HTTP_REQUEST_URI => request_data.uri().to_string(), + HTTP_REQUEST_MACHINE => request_data.principal().to_string()) } _ => { counter!(HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, - HTTP_REQUESTS_METHOD => request_data.method().to_string(), - HTTP_REQUESTS_URI => request_data.uri().to_string()) + HTTP_REQUEST_METHOD => request_data.method().to_string(), + HTTP_REQUEST_URI => request_data.uri().to_string()) } }; http_request_body_real_size_bytes_counter.increment(bytes.len().try_into()?); @@ -438,10 +438,10 @@ fn log_response( ) { let duration = start.elapsed().as_secs_f64(); - histogram!(HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM, - HTTP_REQUESTS_METHOD => method.to_owned(), - HTTP_REQUESTS_STATUS => status.to_string(), - HTTP_REQUESTS_URI => uri.to_owned()) + histogram!(HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM, + HTTP_REQUEST_METHOD => method.to_owned(), + HTTP_REQUEST_STATUS => status.to_string(), + HTTP_REQUEST_URI => uri.to_owned()) .record(duration); // MDC is thread related, so it should be safe to use it in a non-async diff --git a/server/src/monitoring.rs b/server/src/monitoring.rs index f582358..9625bf5 100644 --- a/server/src/monitoring.rs +++ b/server/src/monitoring.rs @@ -22,11 +22,11 @@ pub const EVENTS_MACHINE: &str = "machine"; pub const FAILED_EVENTS_COUNTER: &str = "openwec_event_output_failures_total"; -pub const HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM: &str = "http_request_duration_seconds"; -pub const HTTP_REQUESTS_METHOD: &str = "method"; -pub const HTTP_REQUESTS_URI: &str = "uri"; -pub const HTTP_REQUESTS_STATUS: &str = "status"; -pub const HTTP_REQUESTS_MACHINE: &str = "machine"; +pub const HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM: &str = "http_request_duration_seconds"; +pub const HTTP_REQUEST_METHOD: &str = "method"; +pub const HTTP_REQUEST_URI: &str = "uri"; +pub const HTTP_REQUEST_STATUS: &str = "status"; +pub const HTTP_REQUEST_MACHINE: &str = "machine"; pub const HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER: &str = "http_request_body_network_size_bytes_total"; @@ -45,8 +45,8 @@ pub fn init(settings: &Monitoring) -> Result<()> { let builder = PrometheusBuilder::new() .with_http_listener(addr) .set_buckets_for_metric( - Matcher::Full(HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM.to_string()), - settings.http_requests_histogram_buckets(), + Matcher::Full(HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM.to_string()), + settings.http_request_duration_buckets(), )?; info!("Starting monitoring server on {}", addr); @@ -69,7 +69,7 @@ pub fn init(settings: &Monitoring) -> Result<()> { "Number of events that could not be written to outputs by openwec" ); describe_histogram!( - HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM, + HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM, Unit::Seconds, "HTTP requests duration histogram" ); From a5e67860d8444e9e8f50e165642aa0ef75414ce6 Mon Sep 17 00:00:00 2001 From: Vincent Ruello <5345986+vruello@users.noreply.github.com> Date: Sat, 23 Nov 2024 20:06:41 +0100 Subject: [PATCH 09/11] Add doc for "openwec_event_output_failures_total" metric --- doc/monitoring.md | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/monitoring.md b/doc/monitoring.md index b5d7283..c9a4581 100644 --- a/doc/monitoring.md +++ b/doc/monitoring.md @@ -48,6 +48,7 @@ Metrics collection and publication can be enabled in the OpenWEC settings (see ` | `http_request_body_real_size_bytes_total` | `Counter` | `method`, `uri`, `machine` (optional*) | The total size of all http requests body received by openwec after decryption and decompression | | `http_request_body_network_size_bytes_total` | `Counter` | `method`, `uri`, `machine` (optional*) | The total size of all http requests body received by openwec | | `openwec_messages_total` | `Counter` | `action` (one of `"enumerate"`, `"heartbeat"`, `"events"`) | Number of messages received by openwec | +| `openwec_event_output_failures_total` | `Counter` | `subscription_uuid`, `subscription_name` | Number of events that could not be written to outputs by openwec | | `http_request_duration_seconds` | `Histogram` | `method`, `status`, `uri` | HTTP requests duration histogram | > [!WARNING] From 3c968b883b9f4a51e6ba6fb2b0c253b0fe870a03 Mon Sep 17 00:00:00 2001 From: Vincent Ruello <5345986+vruello@users.noreply.github.com> Date: Tue, 26 Nov 2024 10:28:33 +0100 Subject: [PATCH 10/11] Update metrics --- common/src/settings.rs | 31 +++++++---- common/src/subscription.rs | 26 +++++---- doc/monitoring.md | 17 +++--- openwec.conf.sample.toml | 14 ++--- server/src/event.rs | 110 +++++++++++++++++++++---------------- server/src/lib.rs | 21 ++++--- server/src/logic.rs | 109 +++++++++++++++++++++++++++--------- server/src/monitoring.rs | 77 ++++++++++++++++++-------- server/src/output.rs | 4 ++ 9 files changed, 267 insertions(+), 142 deletions(-) diff --git a/common/src/settings.rs b/common/src/settings.rs index 2123ee4..8b097a8 100644 --- a/common/src/settings.rs +++ b/common/src/settings.rs @@ -374,8 +374,8 @@ pub struct Monitoring { listen_address: String, listen_port: u16, http_request_duration_buckets: Option>, - count_received_events_per_machine: Option, - count_event_size_per_machine: Option, + count_input_events_per_machine: Option, + count_input_event_bytes_per_machine: Option, count_http_request_body_network_size_per_machine: Option, count_http_request_body_real_size_per_machine: Option, } @@ -398,12 +398,12 @@ impl Monitoring { } } - pub fn count_received_events_per_machine(&self) -> bool { - self.count_received_events_per_machine.unwrap_or(false) + pub fn count_input_events_per_machine(&self) -> bool { + self.count_input_events_per_machine.unwrap_or(false) } - pub fn count_event_size_per_machine(&self) -> bool { - self.count_event_size_per_machine.unwrap_or(false) + pub fn count_input_event_bytes_per_machine(&self) -> bool { + self.count_input_event_bytes_per_machine.unwrap_or(false) } pub fn count_http_request_body_network_size_per_machine(&self) -> bool { @@ -632,7 +632,9 @@ mod tests { assert_eq!(s.monitoring().unwrap().listen_address(), "127.0.0.1"); assert_eq!(s.monitoring().unwrap().listen_port(), 9090); assert_eq!( - s.monitoring().unwrap().count_event_size_per_machine(), + s.monitoring() + .unwrap() + .count_input_event_bytes_per_machine(), false ); assert_eq!( @@ -648,7 +650,7 @@ mod tests { false ); assert_eq!( - s.monitoring().unwrap().count_received_events_per_machine(), + s.monitoring().unwrap().count_input_events_per_machine(), false ); assert_eq!( @@ -688,10 +690,10 @@ mod tests { listen_address = "127.0.0.1" listen_port = 9090 http_request_duration_buckets = [0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0] - count_event_size_per_machine = true + count_input_event_bytes_per_machine = true count_http_request_body_network_size_per_machine = true count_http_request_body_real_size_per_machine = true - count_received_events_per_machine = true + count_input_events_per_machine = true "#; #[test] @@ -702,7 +704,12 @@ mod tests { assert!(s.monitoring().is_some()); assert_eq!(s.monitoring().unwrap().listen_address(), "127.0.0.1"); assert_eq!(s.monitoring().unwrap().listen_port(), 9090); - assert_eq!(s.monitoring().unwrap().count_event_size_per_machine(), true); + assert_eq!( + s.monitoring() + .unwrap() + .count_input_event_bytes_per_machine(), + true + ); assert_eq!( s.monitoring() .unwrap() @@ -716,7 +723,7 @@ mod tests { true ); assert_eq!( - s.monitoring().unwrap().count_received_events_per_machine(), + s.monitoring().unwrap().count_input_events_per_machine(), true ); assert_eq!( diff --git a/common/src/subscription.rs b/common/src/subscription.rs index 81d00ad..e1598b3 100644 --- a/common/src/subscription.rs +++ b/common/src/subscription.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::{anyhow, bail, Result}; use log::{info, warn}; use serde::{Deserialize, Serialize}; -use strum::{AsRefStr, EnumString, VariantNames}; +use strum::{AsRefStr, EnumString, IntoStaticStr, VariantNames}; use uuid::Uuid; use crate::utils::VersionHasher; @@ -96,12 +96,8 @@ pub struct FilesConfiguration { } impl FilesConfiguration { - pub fn new( - path: String, - ) -> Self { - Self { - path - } + pub fn new(path: String) -> Self { + Self { path } } pub fn path(&self) -> &str { @@ -182,7 +178,17 @@ impl Display for SubscriptionOutput { } } #[derive( - Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash, VariantNames, AsRefStr, EnumString, + Debug, + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + Hash, + VariantNames, + AsRefStr, + EnumString, + IntoStaticStr, )] #[strum(serialize_all = "snake_case", ascii_case_insensitive)] pub enum SubscriptionOutputFormat { @@ -683,8 +689,8 @@ impl SubscriptionData { self } - /// Set the subscription's max elements. - pub fn set_max_elements(&mut self, max_elements: Option) -> &mut Self { + /// Set the subscription's max elements. + pub fn set_max_elements(&mut self, max_elements: Option) -> &mut Self { self.parameters.max_elements = max_elements; self.update_internal_version(); self diff --git a/doc/monitoring.md b/doc/monitoring.md index c9a4581..8a1e920 100644 --- a/doc/monitoring.md +++ b/doc/monitoring.md @@ -43,13 +43,16 @@ Metrics collection and publication can be enabled in the OpenWEC settings (see ` | **Metric** | **Type** | **Labels** | **Description** | |---|---|---|---| -| `openwec_received_events_total` | `Counter` | `subscription_uuid`, `subscription_name`, `machine` (optional*) | Number of events received by openwec | -| `openwec_event_size_bytes_total` | `Counter` | `subscription_uuid`, `subscription_name`, `machine` (optional*) | The total size of all events received by openwec | -| `http_request_body_real_size_bytes_total` | `Counter` | `method`, `uri`, `machine` (optional*) | The total size of all http requests body received by openwec after decryption and decompression | -| `http_request_body_network_size_bytes_total` | `Counter` | `method`, `uri`, `machine` (optional*) | The total size of all http requests body received by openwec | -| `openwec_messages_total` | `Counter` | `action` (one of `"enumerate"`, `"heartbeat"`, `"events"`) | Number of messages received by openwec | -| `openwec_event_output_failures_total` | `Counter` | `subscription_uuid`, `subscription_name` | Number of events that could not be written to outputs by openwec | -| `http_request_duration_seconds` | `Histogram` | `method`, `status`, `uri` | HTTP requests duration histogram | +| `openwec_input_events_total` | `Counter` | `subscription_uuid`, `subscription_name`, `machine` (optional*) | The total number of events received by openwec | +| `openwec_input_event_bytes_total` | `Counter` | `subscription_uuid`, `subscription_name`, `machine` (optional*) | The total size of all events received by openwec | +| `openwec_input_messages_total` | `Counter` | `action` (one of `"enumerate"`, `"heartbeat"`, `"events"`) | The total number of messages received by openwec | +| `openwec_input_event_parsing_failures_total` | `Counter` | `subscription_uuid`, `subscription_name`, `type` | The total number of event parsing failures | +| `openwec_http_requests_total` | `Counter` | `uri`, `code` | The total number of HTTP requests handled by openwec | +| `openwec_http_request_duration_seconds` | `Histogram` | `uri` | Histogram of response duration for HTTP requests | +| `openwec_http_request_body_network_size_bytes_total` | `Counter` | `uri`, `machine` (optional*) | The total size of all http requests body received by openwec | +| `openwec_http_request_body_real_size_bytes_total` | `Counter` | `uri`, `machine` (optional*) | The total size of all http requests body received by openwec after decryption and decompression | +| `openwec_output_driver_failures_total` | `Counter` | `subscription_uuid`, `subscription_name`, `driver` | The total number of output driver failures | +| `openwec_output_format_failures_total` | `Counter` | `subscription_uuid`, `subscription_name`, `format` | The total number of output format failures | > [!WARNING] > Enabling the `machine` labels may cause a **huge** increase in metric cardinality! \ No newline at end of file diff --git a/openwec.conf.sample.toml b/openwec.conf.sample.toml index 49a24c1..acd6843 100644 --- a/openwec.conf.sample.toml +++ b/openwec.conf.sample.toml @@ -322,25 +322,25 @@ # listen_port = # [Optional] -# Request duration buckets (in seconds) used by the "http_request_duration_seconds" histogram +# Request duration buckets (in seconds) used by the "openwec_http_request_duration_seconds" histogram # http_request_duration_buckets = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0] # [Optional] -# If set, a "machine" label will be added to the "openwec_received_events_total" metric +# If set, a "machine" label will be added to the "openwec_input_events_total" metric # Warning: this may cause a HUGE increase in metric cardinality -# count_received_events_per_machine = false +# count_input_events_per_machine = false # [Optional] -# If set, a "machine" label will be added to the "openwec_event_size_bytes_total" metric +# If set, a "machine" label will be added to the "openwec_input_event_bytes_total" metric # Warning: this may cause a HUGE increase in metric cardinality -# count_event_size_per_machine = false +# count_input_event_bytes_per_machine = false # [Optional] -# If set, a "machine" label will be added to the "http_request_body_network_size_bytes_total" metric +# If set, a "machine" label will be added to the "openwec_http_request_body_network_size_bytes_total" metric # Warning: this may cause a HUGE increase in metric cardinality # count_http_request_body_network_size_per_machine = false # [Optional] -# If set, a "machine" label will be added to the "http_request_body_real_size_bytes_total" metric +# If set, a "machine" label will be added to the "openwec_http_request_body_real_size_bytes_total" metric # Warning: this may cause a HUGE increase in metric cardinality # count_http_request_body_real_size_per_machine = false diff --git a/server/src/event.rs b/server/src/event.rs index 8df6632..d1bf4ff 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -4,6 +4,7 @@ use log::{debug, info, trace, warn}; use roxmltree::{Document, Error, Node}; use serde::Serialize; use std::{collections::HashMap, fmt::Display, net::SocketAddr, sync::Arc}; +use strum::IntoStaticStr; use crate::subscription::Subscription; @@ -47,7 +48,7 @@ pub enum DataType { Unknown, } -#[derive(Debug, Clone, Default, Eq, PartialEq)] +#[derive(Debug, Clone, Default, Eq, PartialEq, IntoStaticStr)] pub enum ErrorType { /// Initial XML parsing failed but Raw content could be recovered RawContentRecovered(String), @@ -65,9 +66,9 @@ impl Display for ErrorType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { ErrorType::RawContentRecovered(message) => write!(f, "{}", message), - ErrorType::FailedToRecoverRawContent(message ) => write!(f, "{}", message), - ErrorType::Unrecoverable(message ) => write!(f, "{}", message), - ErrorType::FailedToFeedEvent (message ) => write!(f, "{}", message), + ErrorType::FailedToRecoverRawContent(message) => write!(f, "{}", message), + ErrorType::Unrecoverable(message) => write!(f, "{}", message), + ErrorType::FailedToFeedEvent(message) => write!(f, "{}", message), ErrorType::Unknown => write!(f, "Unknown error"), } } @@ -192,7 +193,7 @@ impl Event { pub fn from_str(content: &str) -> Self { let mut event = Event::default(); - + let doc_parse_attempt = Document::parse(content); match doc_parse_attempt { Ok(doc) => { @@ -257,7 +258,9 @@ fn parse_debug_data(debug_data_node: &Node) -> Result { } else if node.tag_name().name() == "LevelName" { debug_data.level_name = node.text().map(str::to_string); } else if node.tag_name().name() == "Component" { - node.text().unwrap_or_default().clone_into(&mut debug_data.component); + node.text() + .unwrap_or_default() + .clone_into(&mut debug_data.component); } else if node.tag_name().name() == "SubComponent" { debug_data.sub_component = node.text().map(str::to_string); } else if node.tag_name().name() == "FileLine" { @@ -265,7 +268,9 @@ fn parse_debug_data(debug_data_node: &Node) -> Result { } else if node.tag_name().name() == "Function" { debug_data.function = node.text().map(str::to_string); } else if node.tag_name().name() == "Message" { - node.text().unwrap_or_default().clone_into(&mut debug_data.message); + node.text() + .unwrap_or_default() + .clone_into(&mut debug_data.message); } } Ok(DataType::DebugData(debug_data)) @@ -277,9 +282,13 @@ fn parse_processing_error_data(processing_error_data_node: &Node) -> Result) { - self.time_received = time_received; + self.time_received = time_received; } /// Get a reference to the event metadata's addr. @@ -583,16 +592,13 @@ impl EventData { } else { None }; - Self { - raw, - event - } + Self { raw, event } } pub fn raw(&self) -> Arc { self.raw.clone() } - + pub fn event(&self) -> Option<&Event> { self.event.as_ref() } @@ -919,9 +925,7 @@ mod tests { #[test] fn test_4689_parsing() { - let event = Event::from_str( - EVENT_4689, - ); + let event = Event::from_str(EVENT_4689); assert!(event.additional.error.is_none()) } @@ -931,16 +935,17 @@ mod tests { fn test_serialize_malformed_raw_content_recovered() { // Try to serialize a malformed event, and use the recovering strategy to // recover its Raw content - let event = Event::from_str( - RAW_CONTENT_RECOVERED, - ); + let event = Event::from_str(RAW_CONTENT_RECOVERED); let error = event.additional.error.unwrap(); assert_eq!(error.error_type, ErrorType::RawContentRecovered("Failed to parse event XML (the root node was opened but never closed) but Raw content could be recovered.".to_string())); assert_eq!(error.original_content, RAW_CONTENT_RECOVERED); let system = event.system.unwrap(); - assert_eq!(system.provider.name.unwrap(), "Microsoft-Windows-Security-Auditing".to_string()); + assert_eq!( + system.provider.name.unwrap(), + "Microsoft-Windows-Security-Auditing".to_string() + ); assert_eq!(system.event_id, 4798); assert_eq!(system.execution.unwrap().thread_id, 16952); @@ -948,10 +953,16 @@ mod tests { match event.data { DataType::EventData(data) => { - assert_eq!(data.named_data.get("TargetDomainName").unwrap(), "xxxxx_xps"); - assert_eq!(data.named_data.get("TargetSid").unwrap(), "S-1-5-21-1604529354-1295832394-4197355770-1001"); - }, - _ => panic!("Wrong event data type") + assert_eq!( + data.named_data.get("TargetDomainName").unwrap(), + "xxxxx_xps" + ); + assert_eq!( + data.named_data.get("TargetSid").unwrap(), + "S-1-5-21-1604529354-1295832394-4197355770-1001" + ); + } + _ => panic!("Wrong event data type"), }; } @@ -960,20 +971,23 @@ mod tests { #[test] fn test_serialize_malformed_unrecoverable_1() { // Try to serialize an event for which there is no recovering strategy - let event = Event::from_str( - UNRECOVERABLE_1, - ); + let event = Event::from_str(UNRECOVERABLE_1); assert!(event.additional.error.is_some()); assert!(event.system.is_none()); assert!(event.rendering_info.is_none()); match event.data { DataType::Unknown => (), - _ => panic!("Wrong event data type") + _ => panic!("Wrong event data type"), }; let error = event.additional.error.unwrap(); - assert_eq!(error.error_type, ErrorType::Unrecoverable("Failed to parse event XML: the root node was opened but never closed".to_string())); + assert_eq!( + error.error_type, + ErrorType::Unrecoverable( + "Failed to parse event XML: the root node was opened but never closed".to_string() + ) + ); assert_eq!(error.original_content, UNRECOVERABLE_1); } @@ -983,20 +997,23 @@ mod tests { fn test_serialize_malformed_unrecoverable_2() { // Try to serialize a malformed event for which no recovery // is possible. - let event = Event::from_str( - UNRECOVERABLE_2, - ); + let event = Event::from_str(UNRECOVERABLE_2); assert!(event.additional.error.is_some()); assert!(event.system.is_none()); assert!(event.rendering_info.is_none()); match event.data { DataType::Unknown => (), - _ => panic!("Wrong event data type") + _ => panic!("Wrong event data type"), }; let error = event.additional.error.unwrap(); - assert_eq!(error.error_type, ErrorType::Unrecoverable("Failed to parse event XML: unexpected end of stream".to_string())); + assert_eq!( + error.error_type, + ErrorType::Unrecoverable( + "Failed to parse event XML: unexpected end of stream".to_string() + ) + ); assert_eq!(error.original_content, UNRECOVERABLE_2); } @@ -1006,16 +1023,14 @@ mod tests { fn test_serialize_failed_to_recover() { // Try to serialize a malformed event for which the recovering strategy can // not succeed - let event = Event::from_str( - FAILED_TO_RECOVER_RAW_CONTENT, - ); + let event = Event::from_str(FAILED_TO_RECOVER_RAW_CONTENT); assert!(event.additional.error.is_some()); assert!(event.system.is_none()); assert!(event.rendering_info.is_none()); match event.data { DataType::Unknown => (), - _ => panic!("Wrong event data type") + _ => panic!("Wrong event data type"), }; let error = event.additional.error.unwrap(); @@ -1029,20 +1044,23 @@ mod tests { fn test_serialize_malformed_failed_to_feed_event() { // Try to serialize a malformed event for which the recovering strategy can // not succeed because is invalid. - let event = Event::from_str( - FAILED_TO_FEED_EVENT, - ); + let event = Event::from_str(FAILED_TO_FEED_EVENT); assert!(event.additional.error.is_some()); assert!(event.system.is_none()); assert!(event.rendering_info.is_none()); match event.data { DataType::Unknown => (), - _ => panic!("Wrong event data type") + _ => panic!("Wrong event data type"), }; let error = event.additional.error.unwrap(); - assert_eq!(error.error_type, ErrorType::FailedToFeedEvent("Could not feed event from document: Parsing failure in System".to_string())); + assert_eq!( + error.error_type, + ErrorType::FailedToFeedEvent( + "Could not feed event from document: Parsing failure in System".to_string() + ) + ); assert_eq!(error.original_content, FAILED_TO_FEED_EVENT); } -} \ No newline at end of file +} diff --git a/server/src/lib.rs b/server/src/lib.rs index d521138..01e5813 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -41,9 +41,9 @@ use libgssapi::error::MajorFlags; use log::{debug, error, info, trace, warn}; use metrics::{counter, histogram}; use monitoring::{ - HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, - HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM, HTTP_REQUEST_MACHINE, HTTP_REQUEST_METHOD, - HTTP_REQUEST_STATUS, HTTP_REQUEST_URI, + HTTP_REQUESTS_COUNTER, HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, + HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM, + HTTP_REQUEST_STATUS_CODE, HTTP_REQUEST_URI, MACHINE, }; use quick_xml::writer::Writer; use soap::Serializable; @@ -194,13 +194,11 @@ async fn get_request_payload( if monitoring_conf.count_http_request_body_network_size_per_machine() => { counter!(HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, - HTTP_REQUEST_METHOD => request_data.method().to_string(), HTTP_REQUEST_URI => request_data.uri().to_string(), - HTTP_REQUEST_MACHINE => request_data.principal().to_string()) + MACHINE => request_data.principal().to_string()) } _ => { counter!(HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, - HTTP_REQUEST_METHOD => request_data.method().to_string(), HTTP_REQUEST_URI => request_data.uri().to_string()) } }; @@ -220,13 +218,11 @@ async fn get_request_payload( if monitoring_conf.count_http_request_body_real_size_per_machine() => { counter!(HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, - HTTP_REQUEST_METHOD => request_data.method().to_string(), HTTP_REQUEST_URI => request_data.uri().to_string(), - HTTP_REQUEST_MACHINE => request_data.principal().to_string()) + MACHINE => request_data.principal().to_string()) } _ => { counter!(HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, - HTTP_REQUEST_METHOD => request_data.method().to_string(), HTTP_REQUEST_URI => request_data.uri().to_string()) } }; @@ -439,11 +435,14 @@ fn log_response( let duration = start.elapsed().as_secs_f64(); histogram!(HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM, - HTTP_REQUEST_METHOD => method.to_owned(), - HTTP_REQUEST_STATUS => status.to_string(), HTTP_REQUEST_URI => uri.to_owned()) .record(duration); + counter!(HTTP_REQUESTS_COUNTER, + HTTP_REQUEST_STATUS_CODE => status.as_str().to_owned(), + HTTP_REQUEST_URI => uri.to_owned()) + .increment(1); + // MDC is thread related, so it should be safe to use it in a non-async // function. log_mdc::insert("http_status", status.as_str()); diff --git a/server/src/logic.rs b/server/src/logic.rs index c89fd00..9c1a89c 100644 --- a/server/src/logic.rs +++ b/server/src/logic.rs @@ -2,10 +2,11 @@ use crate::{ event::{EventData, EventMetadata}, heartbeat::{store_heartbeat, WriteHeartbeatMessage}, monitoring::{ - EVENTS_COUNTER, EVENTS_MACHINE, EVENTS_SUBSCRIPTION_NAME, EVENTS_SUBSCRIPTION_UUID, - EVENT_SIZE_BYTES_COUNTER, FAILED_EVENTS_COUNTER, MESSAGES_ACTION, + INPUT_EVENTS_COUNTER, INPUT_EVENT_BYTES_COUNTER, INPUT_EVENT_PARSING_FAILURES, + INPUT_EVENT_PARSING_FAILURE_ERROR_TYPE, INPUT_MESSAGES_COUNTER, MACHINE, MESSAGES_ACTION, MESSAGES_ACTION_ENUMERATE, MESSAGES_ACTION_EVENTS, MESSAGES_ACTION_HEARTBEAT, - MESSAGES_COUNTER, + OUTPUT_DRIVER, OUTPUT_DRIVER_FAILURES, OUTPUT_FORMAT, OUTPUT_FORMAT_FAILURES, + SUBSCRIPTION_NAME, SUBSCRIPTION_UUID, }, output::get_formatter, soap::{ @@ -33,6 +34,12 @@ use uuid::Uuid; use anyhow::{anyhow, bail, Context, Result}; +#[derive(Debug)] +struct OutputDriverError { + pub driver: String, + pub error: anyhow::Error, +} + pub enum Response { Ok(String, Option), Err(StatusCode), @@ -224,7 +231,7 @@ async fn handle_enumerate( }); } - counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_ENUMERATE).increment(1); + counter!(INPUT_MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_ENUMERATE).increment(1); Ok(Response::ok( ACTION_ENUMERATE_RESPONSE, @@ -293,7 +300,7 @@ async fn handle_heartbeat( .await .context("Failed to store heartbeat")?; - counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_HEARTBEAT).increment(1); + counter!(INPUT_MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_HEARTBEAT).increment(1); Ok(Response::ok(ACTION_ACK, None)) } @@ -308,7 +315,36 @@ fn get_formatted_events( for raw in events.iter() { // EventData parses the raw event into an Event struct // (once for all formatters). - events_data.push(EventData::new(raw.clone(), need_to_parse_event)) + let event_data = EventData::new(raw.clone(), need_to_parse_event); + + if need_to_parse_event { + // Count failures + match event_data.event() { + Some(event) => { + if let Some(error) = &event.additional.error { + let error_type_str: &'static str = error.error_type.clone().into(); + counter!(INPUT_EVENT_PARSING_FAILURES, + SUBSCRIPTION_NAME => metadata.subscription_name().to_owned(), + SUBSCRIPTION_UUID => metadata.subscription_uuid().to_owned(), + INPUT_EVENT_PARSING_FAILURE_ERROR_TYPE => error_type_str) + .increment(1); + warn!("Failed to parse an event: {:?}", error) + } + } + None => { + counter!(INPUT_EVENT_PARSING_FAILURES, + SUBSCRIPTION_NAME => metadata.subscription_name().to_owned(), + SUBSCRIPTION_UUID => metadata.subscription_uuid().to_owned(), + INPUT_EVENT_PARSING_FAILURE_ERROR_TYPE => "Unknown") + .increment(1); + warn!( + "Event should have been parsed but it was not: {}", + event_data.raw() + ) + } + } + } + events_data.push(event_data) } let mut formatted_events: HashMap>>> = @@ -319,6 +355,14 @@ fn get_formatted_events( for event_data in events_data.iter() { if let Some(str) = formatter.format(metadata, event_data) { content.push(str.clone()) + } else { + let format_str: &'static str = format.into(); + counter!(OUTPUT_FORMAT_FAILURES, + SUBSCRIPTION_NAME => metadata.subscription_name().to_owned(), + SUBSCRIPTION_UUID => metadata.subscription_uuid().to_owned(), + OUTPUT_FORMAT => format_str) + .increment(1); + warn!("Failed to format an event using {}", format_str); } } formatted_events.insert(format.clone(), Arc::new(content)); @@ -385,34 +429,34 @@ async fn handle_events( subscription.uuid_string() ); - counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_EVENTS).increment(1); + counter!(INPUT_MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_EVENTS).increment(1); let events_counter = match monitoring { - Some(monitoring_conf) if monitoring_conf.count_received_events_per_machine() => { - counter!(EVENTS_COUNTER, - EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), - EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string(), - EVENTS_MACHINE => request_data.principal().to_string()) + Some(monitoring_conf) if monitoring_conf.count_input_events_per_machine() => { + counter!(INPUT_EVENTS_COUNTER, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string(), + MACHINE => request_data.principal().to_string()) } _ => { - counter!(EVENTS_COUNTER, - EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), - EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string()) + counter!(INPUT_EVENTS_COUNTER, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string()) } }; events_counter.increment(events.len().try_into()?); let event_size_counter = match monitoring { - Some(monitoring_conf) if monitoring_conf.count_event_size_per_machine() => { - counter!(EVENT_SIZE_BYTES_COUNTER, - EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), - EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string(), - EVENTS_MACHINE => request_data.principal().to_string()) + Some(monitoring_conf) if monitoring_conf.count_input_event_bytes_per_machine() => { + counter!(INPUT_EVENT_BYTES_COUNTER, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string(), + MACHINE => request_data.principal().to_string()) } _ => { - counter!(EVENT_SIZE_BYTES_COUNTER, - EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), - EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string()) + counter!(INPUT_EVENT_BYTES_COUNTER, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string()) } }; event_size_counter.increment( @@ -487,6 +531,10 @@ async fn handle_events( output_cloned.describe() ) }) + .map_err(|e| OutputDriverError { + driver: output_cloned.driver(), + error: e, + }) }); } @@ -497,17 +545,26 @@ async fn handle_events( Ok(Ok(())) => (), Ok(Err(err)) => { succeed = false; - warn!("Failed to process output and send event: {:?}", err); + warn!("Failed to process output and send event: {:?}", err.error); + counter!(OUTPUT_DRIVER_FAILURES, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string(), + OUTPUT_DRIVER => err.driver.clone()) + .increment(1); } Err(err) => { succeed = false; - warn!("Something bad happened with a process task: {:?}", err) + warn!("Something bad happened with a process task: {:?}", err); + counter!(OUTPUT_DRIVER_FAILURES, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string(), + OUTPUT_DRIVER => "Unknown") + .increment(1); } } } if !succeed { - counter!(FAILED_EVENTS_COUNTER, EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string()).increment(events.len().try_into()?); return Ok(Response::err(StatusCode::SERVICE_UNAVAILABLE)); } diff --git a/server/src/monitoring.rs b/server/src/monitoring.rs index 9625bf5..a43e9b4 100644 --- a/server/src/monitoring.rs +++ b/server/src/monitoring.rs @@ -9,31 +9,42 @@ use log::info; use metrics::{describe_counter, describe_histogram, Unit}; use metrics_exporter_prometheus::{Matcher, PrometheusBuilder}; -pub const MESSAGES_COUNTER: &str = "openwec_messages_total"; +// input metrics + +pub const INPUT_MESSAGES_COUNTER: &str = "openwec_input_messages_total"; pub const MESSAGES_ACTION: &str = "action"; pub const MESSAGES_ACTION_HEARTBEAT: &str = "heartbeat"; pub const MESSAGES_ACTION_EVENTS: &str = "events"; pub const MESSAGES_ACTION_ENUMERATE: &str = "enumerate"; -pub const EVENTS_COUNTER: &str = "openwec_received_events_total"; -pub const EVENTS_SUBSCRIPTION_UUID: &str = "subscription_uuid"; -pub const EVENTS_SUBSCRIPTION_NAME: &str = "subscription_name"; -pub const EVENTS_MACHINE: &str = "machine"; +pub const INPUT_EVENTS_COUNTER: &str = "openwec_input_events_total"; +pub const SUBSCRIPTION_UUID: &str = "subscription_uuid"; +pub const SUBSCRIPTION_NAME: &str = "subscription_name"; +pub const MACHINE: &str = "machine"; + +pub const INPUT_EVENT_BYTES_COUNTER: &str = "openwec_input_event_bytes_total"; +pub const INPUT_EVENT_PARSING_FAILURES: &str = "openwec_input_event_parsing_failures_total"; +pub const INPUT_EVENT_PARSING_FAILURE_ERROR_TYPE: &str = "type"; + +// http metrics -pub const FAILED_EVENTS_COUNTER: &str = "openwec_event_output_failures_total"; +pub const HTTP_REQUESTS_COUNTER: &str = "openwec_http_requests_total"; -pub const HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM: &str = "http_request_duration_seconds"; -pub const HTTP_REQUEST_METHOD: &str = "method"; +pub const HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM: &str = "openwec_http_request_duration_seconds"; pub const HTTP_REQUEST_URI: &str = "uri"; -pub const HTTP_REQUEST_STATUS: &str = "status"; -pub const HTTP_REQUEST_MACHINE: &str = "machine"; +pub const HTTP_REQUEST_STATUS_CODE: &str = "code"; pub const HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER: &str = - "http_request_body_network_size_bytes_total"; + "openwec_http_request_body_network_size_bytes_total"; pub const HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER: &str = - "http_request_body_real_size_bytes_total"; + "openwec_http_request_body_real_size_bytes_total"; -pub const EVENT_SIZE_BYTES_COUNTER: &str = "openwec_event_size_bytes_total"; +// output metrics + +pub const OUTPUT_DRIVER_FAILURES: &str = "openwec_output_driver_failures_total"; +pub const OUTPUT_DRIVER: &str = "driver"; +pub const OUTPUT_FORMAT_FAILURES: &str = "openwec_output_format_failures_total"; +pub const OUTPUT_FORMAT: &str = "format"; pub fn init(settings: &Monitoring) -> Result<()> { let addr = SocketAddr::from(( @@ -53,25 +64,38 @@ pub fn init(settings: &Monitoring) -> Result<()> { builder.install()?; + // input describe_counter!( - MESSAGES_COUNTER, + INPUT_EVENTS_COUNTER, Unit::Count, - "Number of messages received by openwec" + "The total number of events received by openwec" + ); + describe_counter!( + INPUT_EVENT_BYTES_COUNTER, + Unit::Bytes, + "The total size of all events received by openwec" ); describe_counter!( - EVENTS_COUNTER, + INPUT_MESSAGES_COUNTER, Unit::Count, - "Number of events received by openwec" + "The total number of messages received by openwec" ); describe_counter!( - FAILED_EVENTS_COUNTER, + INPUT_EVENT_PARSING_FAILURES, Unit::Count, - "Number of events that could not be written to outputs by openwec" + "The total number of event parsing failures" + ); + + // http + describe_counter!( + HTTP_REQUESTS_COUNTER, + Unit::Count, + "The total number of HTTP requests handled by openwec" ); describe_histogram!( HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM, Unit::Seconds, - "HTTP requests duration histogram" + "Histogram of response duration for HTTP requests" ); describe_counter!( HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, @@ -83,10 +107,17 @@ pub fn init(settings: &Monitoring) -> Result<()> { Unit::Bytes, "The total size of all http requests body received by openwec after decryption and decompression" ); + + // output describe_counter!( - EVENT_SIZE_BYTES_COUNTER, - Unit::Bytes, - "The total size of all events received by openwec" + OUTPUT_DRIVER_FAILURES, + Unit::Count, + "The total number of output driver failures" + ); + describe_counter!( + OUTPUT_FORMAT_FAILURES, + Unit::Count, + "The total number of output format failures" ); Ok(()) diff --git a/server/src/output.rs b/server/src/output.rs index 99a428f..156b741 100644 --- a/server/src/output.rs +++ b/server/src/output.rs @@ -138,6 +138,10 @@ impl Output { ) } + pub fn driver(&self) -> String { + format!("{:?}", self.subscription_output_driver) + } + pub async fn write( &self, metadata: Arc, From 80652f493d93c918598f586b20c29b3aa1788773 Mon Sep 17 00:00:00 2001 From: Vincent Ruello <5345986+vruello@users.noreply.github.com> Date: Tue, 26 Nov 2024 18:30:22 +0100 Subject: [PATCH 11/11] Add machines gauge --- common/src/settings.rs | 13 +++++ common/src/subscription.rs | 1 + doc/monitoring.md | 7 +-- openwec.conf.sample.toml | 4 ++ server/src/lib.rs | 8 +-- server/src/monitoring.rs | 101 +++++++++++++++++++++++++++++++++++-- 6 files changed, 123 insertions(+), 11 deletions(-) diff --git a/common/src/settings.rs b/common/src/settings.rs index 8b097a8..c12345e 100644 --- a/common/src/settings.rs +++ b/common/src/settings.rs @@ -378,6 +378,7 @@ pub struct Monitoring { count_input_event_bytes_per_machine: Option, count_http_request_body_network_size_per_machine: Option, count_http_request_body_real_size_per_machine: Option, + machines_refresh_interval: Option, } impl Monitoring { @@ -415,6 +416,10 @@ impl Monitoring { self.count_http_request_body_real_size_per_machine .unwrap_or(false) } + + pub fn machines_refresh_interval(&self) -> u64 { + self.machines_refresh_interval.unwrap_or(30) + } } #[derive(Debug, Deserialize, Clone)] @@ -657,6 +662,9 @@ mod tests { s.monitoring().unwrap().http_request_duration_buckets(), &[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,] ); + assert_eq!( + s.monitoring().unwrap().machines_refresh_interval(), 30 + ); } const CONFIG_TLS_POSTGRES_WITH_CLI: &str = r#" @@ -694,6 +702,7 @@ mod tests { count_http_request_body_network_size_per_machine = true count_http_request_body_real_size_per_machine = true count_input_events_per_machine = true + machines_refresh_interval = 10 "#; #[test] @@ -730,6 +739,10 @@ mod tests { s.monitoring().unwrap().http_request_duration_buckets(), &[0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0] ); + assert_eq!( + s.monitoring().unwrap().machines_refresh_interval(), + 10 + ); } const CONFIG_TLS_POSTGRES_WITH_OUTPUTS: &str = r#" diff --git a/common/src/subscription.rs b/common/src/subscription.rs index e1598b3..6f97441 100644 --- a/common/src/subscription.rs +++ b/common/src/subscription.rs @@ -913,6 +913,7 @@ impl SubscriptionStatsCounters { } } +#[derive(IntoStaticStr)] pub enum SubscriptionMachineState { Alive, Active, diff --git a/doc/monitoring.md b/doc/monitoring.md index 8a1e920..30b0215 100644 --- a/doc/monitoring.md +++ b/doc/monitoring.md @@ -41,6 +41,9 @@ Metrics collection and publication can be enabled in the OpenWEC settings (see ` ### Available metrics +> [!CAUTION] +> Enabling the `machine` labels may cause a **huge** increase in metric cardinality! This is disabled by default. + | **Metric** | **Type** | **Labels** | **Description** | |---|---|---|---| | `openwec_input_events_total` | `Counter` | `subscription_uuid`, `subscription_name`, `machine` (optional*) | The total number of events received by openwec | @@ -53,6 +56,4 @@ Metrics collection and publication can be enabled in the OpenWEC settings (see ` | `openwec_http_request_body_real_size_bytes_total` | `Counter` | `uri`, `machine` (optional*) | The total size of all http requests body received by openwec after decryption and decompression | | `openwec_output_driver_failures_total` | `Counter` | `subscription_uuid`, `subscription_name`, `driver` | The total number of output driver failures | | `openwec_output_format_failures_total` | `Counter` | `subscription_uuid`, `subscription_name`, `format` | The total number of output format failures | - -> [!WARNING] -> Enabling the `machine` labels may cause a **huge** increase in metric cardinality! \ No newline at end of file +| `openwec_machines` | `Gauge` | `subscription_uuid`, `subscription_name`, `state` | The number of machines known by openwec | \ No newline at end of file diff --git a/openwec.conf.sample.toml b/openwec.conf.sample.toml index acd6843..366bfc4 100644 --- a/openwec.conf.sample.toml +++ b/openwec.conf.sample.toml @@ -321,6 +321,10 @@ # Listen port of the Prometheus-compatible endpoint # listen_port = +# [Optional] +# The refresh interval of "openwec_machines" gauge +# machines_refresh_interval = 30 + # [Optional] # Request duration buckets (in seconds) used by the "openwec_http_request_duration_seconds" histogram # http_request_duration_buckets = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0] diff --git a/server/src/lib.rs b/server/src/lib.rs index 01e5813..bda3f86 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1112,10 +1112,6 @@ pub async fn run(settings: Settings, verbosity: u8) { panic!("Failed to setup logging: {:?}", e); } - if let Some(monitoring_settings) = settings.monitoring() { - monitoring::init(monitoring_settings).expect("Failed to set metric exporter"); - } - let rt_handle = Handle::current(); // Start monitoring thread @@ -1137,6 +1133,10 @@ pub async fn run(settings: Settings, verbosity: u8) { let subscriptions = Arc::new(RwLock::new(HashMap::new())); + if let Some(monitoring_settings) = settings.monitoring() { + monitoring::init(&db, subscriptions.clone(), monitoring_settings).expect("Failed to initialize metrics exporter"); + } + let reload_interval = settings.server().db_sync_interval(); let outputs_settings = settings.outputs().clone(); let update_task_db = db.clone(); diff --git a/server/src/monitoring.rs b/server/src/monitoring.rs index a43e9b4..3331b38 100644 --- a/server/src/monitoring.rs +++ b/server/src/monitoring.rs @@ -1,13 +1,17 @@ use std::{ net::{IpAddr, SocketAddr}, str::FromStr, + time::{Duration, SystemTime}, }; use anyhow::Result; -use common::settings::Monitoring; -use log::info; -use metrics::{describe_counter, describe_histogram, Unit}; +use common::{database::Db, settings::Monitoring, subscription::SubscriptionMachineState}; +use log::{debug, info}; +use metrics::{describe_counter, describe_gauge, describe_histogram, gauge, Unit}; use metrics_exporter_prometheus::{Matcher, PrometheusBuilder}; +use tokio::time; + +use crate::subscription::Subscriptions; // input metrics @@ -46,7 +50,26 @@ pub const OUTPUT_DRIVER: &str = "driver"; pub const OUTPUT_FORMAT_FAILURES: &str = "openwec_output_format_failures_total"; pub const OUTPUT_FORMAT: &str = "format"; -pub fn init(settings: &Monitoring) -> Result<()> { +// machines metrics + +pub const MACHINES_GAUGE: &str = "openwec_machines"; +pub const MACHINES_STATE: &str = "state"; + +pub fn init(db: &Db, subscriptions: Subscriptions, settings: &Monitoring) -> Result<()> { + let refresh_interval = settings.machines_refresh_interval(); + let refresh_task_db = db.clone(); + let refresh_task_subscriptions = subscriptions.clone(); + + // Launch a task responsible for refreshing machines gauge + tokio::spawn(async move { + refresh_machines_task( + refresh_task_db, + refresh_task_subscriptions, + refresh_interval, + ) + .await + }); + let addr = SocketAddr::from(( IpAddr::from_str(settings.listen_address()) .expect("Failed to parse monitoring.listen_address"), @@ -120,5 +143,75 @@ pub fn init(settings: &Monitoring) -> Result<()> { "The total number of output format failures" ); + // machines + describe_gauge!( + MACHINES_GAUGE, + Unit::Count, + "The number of machines known by openwec" + ); + Ok(()) } + +async fn refresh_machines_task( + db: Db, + subscriptions: Subscriptions, + refresh_interval: u64, +) -> Result<()> { + info!("Starting refresh machines task for monitoring"); + let mut refresh = time::interval(Duration::from_secs(refresh_interval)); + // We don't want the first tick to complete immediatly + refresh.reset_after(Duration::from_secs(refresh_interval)); + loop { + tokio::select! { + _ = refresh.tick() => { + debug!("Refreshing machines stats for monitoring"); + + // We can't await with the lock on "subscriptions" + // So we first copy all data we need from "subscriptions" + let subscriptions_data = { + let subscriptions_unlocked = subscriptions.read().unwrap(); + let mut subscriptions_data = Vec::with_capacity(subscriptions_unlocked.len()); + for (_, subscription) in subscriptions.read().unwrap().iter() { + subscriptions_data.push((subscription.uuid_string(), subscription.data().name().to_string(), subscription.data().heartbeat_interval())); + } + subscriptions_data + }; + + for (subscription_uuid, subscription_name, heartbeat_interval) in subscriptions_data { + let now: i64 = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_secs() + .try_into()?; + + let stats = db + .get_stats(&subscription_uuid, now - (heartbeat_interval as i64)) + .await?; + + debug!("Update {} values with active={}, alive={}, dead={}", MACHINES_GAUGE, stats.active_machines_count(), stats.alive_machines_count(), stats.dead_machines_count()); + + let alive_str: &'static str = SubscriptionMachineState::Alive.into(); + gauge!(MACHINES_GAUGE, + SUBSCRIPTION_NAME => subscription_name.clone(), + SUBSCRIPTION_UUID => subscription_uuid.clone(), + MACHINES_STATE => alive_str) + .set(stats.alive_machines_count() as f64); + + let active_str: &'static str = SubscriptionMachineState::Active.into(); + gauge!(MACHINES_GAUGE, + SUBSCRIPTION_NAME => subscription_name.clone(), + SUBSCRIPTION_UUID => subscription_uuid.clone(), + MACHINES_STATE => active_str) + .set(stats.active_machines_count() as f64); + + let dead_str: &'static str = SubscriptionMachineState::Dead.into(); + gauge!(MACHINES_GAUGE, + SUBSCRIPTION_NAME => subscription_name.clone(), + SUBSCRIPTION_UUID => subscription_uuid.clone(), + MACHINES_STATE => dead_str) + .set(stats.dead_machines_count() as f64); + } + } + } + } +}