From 3c968b883b9f4a51e6ba6fb2b0c253b0fe870a03 Mon Sep 17 00:00:00 2001 From: Vincent Ruello <5345986+vruello@users.noreply.github.com> Date: Tue, 26 Nov 2024 10:28:33 +0100 Subject: [PATCH] Update metrics --- common/src/settings.rs | 31 +++++++---- common/src/subscription.rs | 26 +++++---- doc/monitoring.md | 17 +++--- openwec.conf.sample.toml | 14 ++--- server/src/event.rs | 110 +++++++++++++++++++++---------------- server/src/lib.rs | 21 ++++--- server/src/logic.rs | 109 +++++++++++++++++++++++++++--------- server/src/monitoring.rs | 77 ++++++++++++++++++-------- server/src/output.rs | 4 ++ 9 files changed, 267 insertions(+), 142 deletions(-) diff --git a/common/src/settings.rs b/common/src/settings.rs index 2123ee4..8b097a8 100644 --- a/common/src/settings.rs +++ b/common/src/settings.rs @@ -374,8 +374,8 @@ pub struct Monitoring { listen_address: String, listen_port: u16, http_request_duration_buckets: Option>, - count_received_events_per_machine: Option, - count_event_size_per_machine: Option, + count_input_events_per_machine: Option, + count_input_event_bytes_per_machine: Option, count_http_request_body_network_size_per_machine: Option, count_http_request_body_real_size_per_machine: Option, } @@ -398,12 +398,12 @@ impl Monitoring { } } - pub fn count_received_events_per_machine(&self) -> bool { - self.count_received_events_per_machine.unwrap_or(false) + pub fn count_input_events_per_machine(&self) -> bool { + self.count_input_events_per_machine.unwrap_or(false) } - pub fn count_event_size_per_machine(&self) -> bool { - self.count_event_size_per_machine.unwrap_or(false) + pub fn count_input_event_bytes_per_machine(&self) -> bool { + self.count_input_event_bytes_per_machine.unwrap_or(false) } pub fn count_http_request_body_network_size_per_machine(&self) -> bool { @@ -632,7 +632,9 @@ mod tests { assert_eq!(s.monitoring().unwrap().listen_address(), "127.0.0.1"); assert_eq!(s.monitoring().unwrap().listen_port(), 9090); assert_eq!( - s.monitoring().unwrap().count_event_size_per_machine(), + s.monitoring() + .unwrap() + .count_input_event_bytes_per_machine(), false ); assert_eq!( @@ -648,7 +650,7 @@ mod tests { false ); assert_eq!( - s.monitoring().unwrap().count_received_events_per_machine(), + s.monitoring().unwrap().count_input_events_per_machine(), false ); assert_eq!( @@ -688,10 +690,10 @@ mod tests { listen_address = "127.0.0.1" listen_port = 9090 http_request_duration_buckets = [0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0] - count_event_size_per_machine = true + count_input_event_bytes_per_machine = true count_http_request_body_network_size_per_machine = true count_http_request_body_real_size_per_machine = true - count_received_events_per_machine = true + count_input_events_per_machine = true "#; #[test] @@ -702,7 +704,12 @@ mod tests { assert!(s.monitoring().is_some()); assert_eq!(s.monitoring().unwrap().listen_address(), "127.0.0.1"); assert_eq!(s.monitoring().unwrap().listen_port(), 9090); - assert_eq!(s.monitoring().unwrap().count_event_size_per_machine(), true); + assert_eq!( + s.monitoring() + .unwrap() + .count_input_event_bytes_per_machine(), + true + ); assert_eq!( s.monitoring() .unwrap() @@ -716,7 +723,7 @@ mod tests { true ); assert_eq!( - s.monitoring().unwrap().count_received_events_per_machine(), + s.monitoring().unwrap().count_input_events_per_machine(), true ); assert_eq!( diff --git a/common/src/subscription.rs b/common/src/subscription.rs index 81d00ad..e1598b3 100644 --- a/common/src/subscription.rs +++ b/common/src/subscription.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::{anyhow, bail, Result}; use log::{info, warn}; use serde::{Deserialize, Serialize}; -use strum::{AsRefStr, EnumString, VariantNames}; +use strum::{AsRefStr, EnumString, IntoStaticStr, VariantNames}; use uuid::Uuid; use crate::utils::VersionHasher; @@ -96,12 +96,8 @@ pub struct FilesConfiguration { } impl FilesConfiguration { - pub fn new( - path: String, - ) -> Self { - Self { - path - } + pub fn new(path: String) -> Self { + Self { path } } pub fn path(&self) -> &str { @@ -182,7 +178,17 @@ impl Display for SubscriptionOutput { } } #[derive( - Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash, VariantNames, AsRefStr, EnumString, + Debug, + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + Hash, + VariantNames, + AsRefStr, + EnumString, + IntoStaticStr, )] #[strum(serialize_all = "snake_case", ascii_case_insensitive)] pub enum SubscriptionOutputFormat { @@ -683,8 +689,8 @@ impl SubscriptionData { self } - /// Set the subscription's max elements. - pub fn set_max_elements(&mut self, max_elements: Option) -> &mut Self { + /// Set the subscription's max elements. + pub fn set_max_elements(&mut self, max_elements: Option) -> &mut Self { self.parameters.max_elements = max_elements; self.update_internal_version(); self diff --git a/doc/monitoring.md b/doc/monitoring.md index c9a4581..8a1e920 100644 --- a/doc/monitoring.md +++ b/doc/monitoring.md @@ -43,13 +43,16 @@ Metrics collection and publication can be enabled in the OpenWEC settings (see ` | **Metric** | **Type** | **Labels** | **Description** | |---|---|---|---| -| `openwec_received_events_total` | `Counter` | `subscription_uuid`, `subscription_name`, `machine` (optional*) | Number of events received by openwec | -| `openwec_event_size_bytes_total` | `Counter` | `subscription_uuid`, `subscription_name`, `machine` (optional*) | The total size of all events received by openwec | -| `http_request_body_real_size_bytes_total` | `Counter` | `method`, `uri`, `machine` (optional*) | The total size of all http requests body received by openwec after decryption and decompression | -| `http_request_body_network_size_bytes_total` | `Counter` | `method`, `uri`, `machine` (optional*) | The total size of all http requests body received by openwec | -| `openwec_messages_total` | `Counter` | `action` (one of `"enumerate"`, `"heartbeat"`, `"events"`) | Number of messages received by openwec | -| `openwec_event_output_failures_total` | `Counter` | `subscription_uuid`, `subscription_name` | Number of events that could not be written to outputs by openwec | -| `http_request_duration_seconds` | `Histogram` | `method`, `status`, `uri` | HTTP requests duration histogram | +| `openwec_input_events_total` | `Counter` | `subscription_uuid`, `subscription_name`, `machine` (optional*) | The total number of events received by openwec | +| `openwec_input_event_bytes_total` | `Counter` | `subscription_uuid`, `subscription_name`, `machine` (optional*) | The total size of all events received by openwec | +| `openwec_input_messages_total` | `Counter` | `action` (one of `"enumerate"`, `"heartbeat"`, `"events"`) | The total number of messages received by openwec | +| `openwec_input_event_parsing_failures_total` | `Counter` | `subscription_uuid`, `subscription_name`, `type` | The total number of event parsing failures | +| `openwec_http_requests_total` | `Counter` | `uri`, `code` | The total number of HTTP requests handled by openwec | +| `openwec_http_request_duration_seconds` | `Histogram` | `uri` | Histogram of response duration for HTTP requests | +| `openwec_http_request_body_network_size_bytes_total` | `Counter` | `uri`, `machine` (optional*) | The total size of all http requests body received by openwec | +| `openwec_http_request_body_real_size_bytes_total` | `Counter` | `uri`, `machine` (optional*) | The total size of all http requests body received by openwec after decryption and decompression | +| `openwec_output_driver_failures_total` | `Counter` | `subscription_uuid`, `subscription_name`, `driver` | The total number of output driver failures | +| `openwec_output_format_failures_total` | `Counter` | `subscription_uuid`, `subscription_name`, `format` | The total number of output format failures | > [!WARNING] > Enabling the `machine` labels may cause a **huge** increase in metric cardinality! \ No newline at end of file diff --git a/openwec.conf.sample.toml b/openwec.conf.sample.toml index 49a24c1..acd6843 100644 --- a/openwec.conf.sample.toml +++ b/openwec.conf.sample.toml @@ -322,25 +322,25 @@ # listen_port = # [Optional] -# Request duration buckets (in seconds) used by the "http_request_duration_seconds" histogram +# Request duration buckets (in seconds) used by the "openwec_http_request_duration_seconds" histogram # http_request_duration_buckets = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0] # [Optional] -# If set, a "machine" label will be added to the "openwec_received_events_total" metric +# If set, a "machine" label will be added to the "openwec_input_events_total" metric # Warning: this may cause a HUGE increase in metric cardinality -# count_received_events_per_machine = false +# count_input_events_per_machine = false # [Optional] -# If set, a "machine" label will be added to the "openwec_event_size_bytes_total" metric +# If set, a "machine" label will be added to the "openwec_input_event_bytes_total" metric # Warning: this may cause a HUGE increase in metric cardinality -# count_event_size_per_machine = false +# count_input_event_bytes_per_machine = false # [Optional] -# If set, a "machine" label will be added to the "http_request_body_network_size_bytes_total" metric +# If set, a "machine" label will be added to the "openwec_http_request_body_network_size_bytes_total" metric # Warning: this may cause a HUGE increase in metric cardinality # count_http_request_body_network_size_per_machine = false # [Optional] -# If set, a "machine" label will be added to the "http_request_body_real_size_bytes_total" metric +# If set, a "machine" label will be added to the "openwec_http_request_body_real_size_bytes_total" metric # Warning: this may cause a HUGE increase in metric cardinality # count_http_request_body_real_size_per_machine = false diff --git a/server/src/event.rs b/server/src/event.rs index 8df6632..d1bf4ff 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -4,6 +4,7 @@ use log::{debug, info, trace, warn}; use roxmltree::{Document, Error, Node}; use serde::Serialize; use std::{collections::HashMap, fmt::Display, net::SocketAddr, sync::Arc}; +use strum::IntoStaticStr; use crate::subscription::Subscription; @@ -47,7 +48,7 @@ pub enum DataType { Unknown, } -#[derive(Debug, Clone, Default, Eq, PartialEq)] +#[derive(Debug, Clone, Default, Eq, PartialEq, IntoStaticStr)] pub enum ErrorType { /// Initial XML parsing failed but Raw content could be recovered RawContentRecovered(String), @@ -65,9 +66,9 @@ impl Display for ErrorType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { ErrorType::RawContentRecovered(message) => write!(f, "{}", message), - ErrorType::FailedToRecoverRawContent(message ) => write!(f, "{}", message), - ErrorType::Unrecoverable(message ) => write!(f, "{}", message), - ErrorType::FailedToFeedEvent (message ) => write!(f, "{}", message), + ErrorType::FailedToRecoverRawContent(message) => write!(f, "{}", message), + ErrorType::Unrecoverable(message) => write!(f, "{}", message), + ErrorType::FailedToFeedEvent(message) => write!(f, "{}", message), ErrorType::Unknown => write!(f, "Unknown error"), } } @@ -192,7 +193,7 @@ impl Event { pub fn from_str(content: &str) -> Self { let mut event = Event::default(); - + let doc_parse_attempt = Document::parse(content); match doc_parse_attempt { Ok(doc) => { @@ -257,7 +258,9 @@ fn parse_debug_data(debug_data_node: &Node) -> Result { } else if node.tag_name().name() == "LevelName" { debug_data.level_name = node.text().map(str::to_string); } else if node.tag_name().name() == "Component" { - node.text().unwrap_or_default().clone_into(&mut debug_data.component); + node.text() + .unwrap_or_default() + .clone_into(&mut debug_data.component); } else if node.tag_name().name() == "SubComponent" { debug_data.sub_component = node.text().map(str::to_string); } else if node.tag_name().name() == "FileLine" { @@ -265,7 +268,9 @@ fn parse_debug_data(debug_data_node: &Node) -> Result { } else if node.tag_name().name() == "Function" { debug_data.function = node.text().map(str::to_string); } else if node.tag_name().name() == "Message" { - node.text().unwrap_or_default().clone_into(&mut debug_data.message); + node.text() + .unwrap_or_default() + .clone_into(&mut debug_data.message); } } Ok(DataType::DebugData(debug_data)) @@ -277,9 +282,13 @@ fn parse_processing_error_data(processing_error_data_node: &Node) -> Result) { - self.time_received = time_received; + self.time_received = time_received; } /// Get a reference to the event metadata's addr. @@ -583,16 +592,13 @@ impl EventData { } else { None }; - Self { - raw, - event - } + Self { raw, event } } pub fn raw(&self) -> Arc { self.raw.clone() } - + pub fn event(&self) -> Option<&Event> { self.event.as_ref() } @@ -919,9 +925,7 @@ mod tests { #[test] fn test_4689_parsing() { - let event = Event::from_str( - EVENT_4689, - ); + let event = Event::from_str(EVENT_4689); assert!(event.additional.error.is_none()) } @@ -931,16 +935,17 @@ mod tests { fn test_serialize_malformed_raw_content_recovered() { // Try to serialize a malformed event, and use the recovering strategy to // recover its Raw content - let event = Event::from_str( - RAW_CONTENT_RECOVERED, - ); + let event = Event::from_str(RAW_CONTENT_RECOVERED); let error = event.additional.error.unwrap(); assert_eq!(error.error_type, ErrorType::RawContentRecovered("Failed to parse event XML (the root node was opened but never closed) but Raw content could be recovered.".to_string())); assert_eq!(error.original_content, RAW_CONTENT_RECOVERED); let system = event.system.unwrap(); - assert_eq!(system.provider.name.unwrap(), "Microsoft-Windows-Security-Auditing".to_string()); + assert_eq!( + system.provider.name.unwrap(), + "Microsoft-Windows-Security-Auditing".to_string() + ); assert_eq!(system.event_id, 4798); assert_eq!(system.execution.unwrap().thread_id, 16952); @@ -948,10 +953,16 @@ mod tests { match event.data { DataType::EventData(data) => { - assert_eq!(data.named_data.get("TargetDomainName").unwrap(), "xxxxx_xps"); - assert_eq!(data.named_data.get("TargetSid").unwrap(), "S-1-5-21-1604529354-1295832394-4197355770-1001"); - }, - _ => panic!("Wrong event data type") + assert_eq!( + data.named_data.get("TargetDomainName").unwrap(), + "xxxxx_xps" + ); + assert_eq!( + data.named_data.get("TargetSid").unwrap(), + "S-1-5-21-1604529354-1295832394-4197355770-1001" + ); + } + _ => panic!("Wrong event data type"), }; } @@ -960,20 +971,23 @@ mod tests { #[test] fn test_serialize_malformed_unrecoverable_1() { // Try to serialize an event for which there is no recovering strategy - let event = Event::from_str( - UNRECOVERABLE_1, - ); + let event = Event::from_str(UNRECOVERABLE_1); assert!(event.additional.error.is_some()); assert!(event.system.is_none()); assert!(event.rendering_info.is_none()); match event.data { DataType::Unknown => (), - _ => panic!("Wrong event data type") + _ => panic!("Wrong event data type"), }; let error = event.additional.error.unwrap(); - assert_eq!(error.error_type, ErrorType::Unrecoverable("Failed to parse event XML: the root node was opened but never closed".to_string())); + assert_eq!( + error.error_type, + ErrorType::Unrecoverable( + "Failed to parse event XML: the root node was opened but never closed".to_string() + ) + ); assert_eq!(error.original_content, UNRECOVERABLE_1); } @@ -983,20 +997,23 @@ mod tests { fn test_serialize_malformed_unrecoverable_2() { // Try to serialize a malformed event for which no recovery // is possible. - let event = Event::from_str( - UNRECOVERABLE_2, - ); + let event = Event::from_str(UNRECOVERABLE_2); assert!(event.additional.error.is_some()); assert!(event.system.is_none()); assert!(event.rendering_info.is_none()); match event.data { DataType::Unknown => (), - _ => panic!("Wrong event data type") + _ => panic!("Wrong event data type"), }; let error = event.additional.error.unwrap(); - assert_eq!(error.error_type, ErrorType::Unrecoverable("Failed to parse event XML: unexpected end of stream".to_string())); + assert_eq!( + error.error_type, + ErrorType::Unrecoverable( + "Failed to parse event XML: unexpected end of stream".to_string() + ) + ); assert_eq!(error.original_content, UNRECOVERABLE_2); } @@ -1006,16 +1023,14 @@ mod tests { fn test_serialize_failed_to_recover() { // Try to serialize a malformed event for which the recovering strategy can // not succeed - let event = Event::from_str( - FAILED_TO_RECOVER_RAW_CONTENT, - ); + let event = Event::from_str(FAILED_TO_RECOVER_RAW_CONTENT); assert!(event.additional.error.is_some()); assert!(event.system.is_none()); assert!(event.rendering_info.is_none()); match event.data { DataType::Unknown => (), - _ => panic!("Wrong event data type") + _ => panic!("Wrong event data type"), }; let error = event.additional.error.unwrap(); @@ -1029,20 +1044,23 @@ mod tests { fn test_serialize_malformed_failed_to_feed_event() { // Try to serialize a malformed event for which the recovering strategy can // not succeed because is invalid. - let event = Event::from_str( - FAILED_TO_FEED_EVENT, - ); + let event = Event::from_str(FAILED_TO_FEED_EVENT); assert!(event.additional.error.is_some()); assert!(event.system.is_none()); assert!(event.rendering_info.is_none()); match event.data { DataType::Unknown => (), - _ => panic!("Wrong event data type") + _ => panic!("Wrong event data type"), }; let error = event.additional.error.unwrap(); - assert_eq!(error.error_type, ErrorType::FailedToFeedEvent("Could not feed event from document: Parsing failure in System".to_string())); + assert_eq!( + error.error_type, + ErrorType::FailedToFeedEvent( + "Could not feed event from document: Parsing failure in System".to_string() + ) + ); assert_eq!(error.original_content, FAILED_TO_FEED_EVENT); } -} \ No newline at end of file +} diff --git a/server/src/lib.rs b/server/src/lib.rs index d521138..01e5813 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -41,9 +41,9 @@ use libgssapi::error::MajorFlags; use log::{debug, error, info, trace, warn}; use metrics::{counter, histogram}; use monitoring::{ - HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, - HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM, HTTP_REQUEST_MACHINE, HTTP_REQUEST_METHOD, - HTTP_REQUEST_STATUS, HTTP_REQUEST_URI, + HTTP_REQUESTS_COUNTER, HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, + HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM, + HTTP_REQUEST_STATUS_CODE, HTTP_REQUEST_URI, MACHINE, }; use quick_xml::writer::Writer; use soap::Serializable; @@ -194,13 +194,11 @@ async fn get_request_payload( if monitoring_conf.count_http_request_body_network_size_per_machine() => { counter!(HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, - HTTP_REQUEST_METHOD => request_data.method().to_string(), HTTP_REQUEST_URI => request_data.uri().to_string(), - HTTP_REQUEST_MACHINE => request_data.principal().to_string()) + MACHINE => request_data.principal().to_string()) } _ => { counter!(HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, - HTTP_REQUEST_METHOD => request_data.method().to_string(), HTTP_REQUEST_URI => request_data.uri().to_string()) } }; @@ -220,13 +218,11 @@ async fn get_request_payload( if monitoring_conf.count_http_request_body_real_size_per_machine() => { counter!(HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, - HTTP_REQUEST_METHOD => request_data.method().to_string(), HTTP_REQUEST_URI => request_data.uri().to_string(), - HTTP_REQUEST_MACHINE => request_data.principal().to_string()) + MACHINE => request_data.principal().to_string()) } _ => { counter!(HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, - HTTP_REQUEST_METHOD => request_data.method().to_string(), HTTP_REQUEST_URI => request_data.uri().to_string()) } }; @@ -439,11 +435,14 @@ fn log_response( let duration = start.elapsed().as_secs_f64(); histogram!(HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM, - HTTP_REQUEST_METHOD => method.to_owned(), - HTTP_REQUEST_STATUS => status.to_string(), HTTP_REQUEST_URI => uri.to_owned()) .record(duration); + counter!(HTTP_REQUESTS_COUNTER, + HTTP_REQUEST_STATUS_CODE => status.as_str().to_owned(), + HTTP_REQUEST_URI => uri.to_owned()) + .increment(1); + // MDC is thread related, so it should be safe to use it in a non-async // function. log_mdc::insert("http_status", status.as_str()); diff --git a/server/src/logic.rs b/server/src/logic.rs index c89fd00..9c1a89c 100644 --- a/server/src/logic.rs +++ b/server/src/logic.rs @@ -2,10 +2,11 @@ use crate::{ event::{EventData, EventMetadata}, heartbeat::{store_heartbeat, WriteHeartbeatMessage}, monitoring::{ - EVENTS_COUNTER, EVENTS_MACHINE, EVENTS_SUBSCRIPTION_NAME, EVENTS_SUBSCRIPTION_UUID, - EVENT_SIZE_BYTES_COUNTER, FAILED_EVENTS_COUNTER, MESSAGES_ACTION, + INPUT_EVENTS_COUNTER, INPUT_EVENT_BYTES_COUNTER, INPUT_EVENT_PARSING_FAILURES, + INPUT_EVENT_PARSING_FAILURE_ERROR_TYPE, INPUT_MESSAGES_COUNTER, MACHINE, MESSAGES_ACTION, MESSAGES_ACTION_ENUMERATE, MESSAGES_ACTION_EVENTS, MESSAGES_ACTION_HEARTBEAT, - MESSAGES_COUNTER, + OUTPUT_DRIVER, OUTPUT_DRIVER_FAILURES, OUTPUT_FORMAT, OUTPUT_FORMAT_FAILURES, + SUBSCRIPTION_NAME, SUBSCRIPTION_UUID, }, output::get_formatter, soap::{ @@ -33,6 +34,12 @@ use uuid::Uuid; use anyhow::{anyhow, bail, Context, Result}; +#[derive(Debug)] +struct OutputDriverError { + pub driver: String, + pub error: anyhow::Error, +} + pub enum Response { Ok(String, Option), Err(StatusCode), @@ -224,7 +231,7 @@ async fn handle_enumerate( }); } - counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_ENUMERATE).increment(1); + counter!(INPUT_MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_ENUMERATE).increment(1); Ok(Response::ok( ACTION_ENUMERATE_RESPONSE, @@ -293,7 +300,7 @@ async fn handle_heartbeat( .await .context("Failed to store heartbeat")?; - counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_HEARTBEAT).increment(1); + counter!(INPUT_MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_HEARTBEAT).increment(1); Ok(Response::ok(ACTION_ACK, None)) } @@ -308,7 +315,36 @@ fn get_formatted_events( for raw in events.iter() { // EventData parses the raw event into an Event struct // (once for all formatters). - events_data.push(EventData::new(raw.clone(), need_to_parse_event)) + let event_data = EventData::new(raw.clone(), need_to_parse_event); + + if need_to_parse_event { + // Count failures + match event_data.event() { + Some(event) => { + if let Some(error) = &event.additional.error { + let error_type_str: &'static str = error.error_type.clone().into(); + counter!(INPUT_EVENT_PARSING_FAILURES, + SUBSCRIPTION_NAME => metadata.subscription_name().to_owned(), + SUBSCRIPTION_UUID => metadata.subscription_uuid().to_owned(), + INPUT_EVENT_PARSING_FAILURE_ERROR_TYPE => error_type_str) + .increment(1); + warn!("Failed to parse an event: {:?}", error) + } + } + None => { + counter!(INPUT_EVENT_PARSING_FAILURES, + SUBSCRIPTION_NAME => metadata.subscription_name().to_owned(), + SUBSCRIPTION_UUID => metadata.subscription_uuid().to_owned(), + INPUT_EVENT_PARSING_FAILURE_ERROR_TYPE => "Unknown") + .increment(1); + warn!( + "Event should have been parsed but it was not: {}", + event_data.raw() + ) + } + } + } + events_data.push(event_data) } let mut formatted_events: HashMap>>> = @@ -319,6 +355,14 @@ fn get_formatted_events( for event_data in events_data.iter() { if let Some(str) = formatter.format(metadata, event_data) { content.push(str.clone()) + } else { + let format_str: &'static str = format.into(); + counter!(OUTPUT_FORMAT_FAILURES, + SUBSCRIPTION_NAME => metadata.subscription_name().to_owned(), + SUBSCRIPTION_UUID => metadata.subscription_uuid().to_owned(), + OUTPUT_FORMAT => format_str) + .increment(1); + warn!("Failed to format an event using {}", format_str); } } formatted_events.insert(format.clone(), Arc::new(content)); @@ -385,34 +429,34 @@ async fn handle_events( subscription.uuid_string() ); - counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_EVENTS).increment(1); + counter!(INPUT_MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_EVENTS).increment(1); let events_counter = match monitoring { - Some(monitoring_conf) if monitoring_conf.count_received_events_per_machine() => { - counter!(EVENTS_COUNTER, - EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), - EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string(), - EVENTS_MACHINE => request_data.principal().to_string()) + Some(monitoring_conf) if monitoring_conf.count_input_events_per_machine() => { + counter!(INPUT_EVENTS_COUNTER, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string(), + MACHINE => request_data.principal().to_string()) } _ => { - counter!(EVENTS_COUNTER, - EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), - EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string()) + counter!(INPUT_EVENTS_COUNTER, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string()) } }; events_counter.increment(events.len().try_into()?); let event_size_counter = match monitoring { - Some(monitoring_conf) if monitoring_conf.count_event_size_per_machine() => { - counter!(EVENT_SIZE_BYTES_COUNTER, - EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), - EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string(), - EVENTS_MACHINE => request_data.principal().to_string()) + Some(monitoring_conf) if monitoring_conf.count_input_event_bytes_per_machine() => { + counter!(INPUT_EVENT_BYTES_COUNTER, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string(), + MACHINE => request_data.principal().to_string()) } _ => { - counter!(EVENT_SIZE_BYTES_COUNTER, - EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), - EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string()) + counter!(INPUT_EVENT_BYTES_COUNTER, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string()) } }; event_size_counter.increment( @@ -487,6 +531,10 @@ async fn handle_events( output_cloned.describe() ) }) + .map_err(|e| OutputDriverError { + driver: output_cloned.driver(), + error: e, + }) }); } @@ -497,17 +545,26 @@ async fn handle_events( Ok(Ok(())) => (), Ok(Err(err)) => { succeed = false; - warn!("Failed to process output and send event: {:?}", err); + warn!("Failed to process output and send event: {:?}", err.error); + counter!(OUTPUT_DRIVER_FAILURES, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string(), + OUTPUT_DRIVER => err.driver.clone()) + .increment(1); } Err(err) => { succeed = false; - warn!("Something bad happened with a process task: {:?}", err) + warn!("Something bad happened with a process task: {:?}", err); + counter!(OUTPUT_DRIVER_FAILURES, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string(), + OUTPUT_DRIVER => "Unknown") + .increment(1); } } } if !succeed { - counter!(FAILED_EVENTS_COUNTER, EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string()).increment(events.len().try_into()?); return Ok(Response::err(StatusCode::SERVICE_UNAVAILABLE)); } diff --git a/server/src/monitoring.rs b/server/src/monitoring.rs index 9625bf5..a43e9b4 100644 --- a/server/src/monitoring.rs +++ b/server/src/monitoring.rs @@ -9,31 +9,42 @@ use log::info; use metrics::{describe_counter, describe_histogram, Unit}; use metrics_exporter_prometheus::{Matcher, PrometheusBuilder}; -pub const MESSAGES_COUNTER: &str = "openwec_messages_total"; +// input metrics + +pub const INPUT_MESSAGES_COUNTER: &str = "openwec_input_messages_total"; pub const MESSAGES_ACTION: &str = "action"; pub const MESSAGES_ACTION_HEARTBEAT: &str = "heartbeat"; pub const MESSAGES_ACTION_EVENTS: &str = "events"; pub const MESSAGES_ACTION_ENUMERATE: &str = "enumerate"; -pub const EVENTS_COUNTER: &str = "openwec_received_events_total"; -pub const EVENTS_SUBSCRIPTION_UUID: &str = "subscription_uuid"; -pub const EVENTS_SUBSCRIPTION_NAME: &str = "subscription_name"; -pub const EVENTS_MACHINE: &str = "machine"; +pub const INPUT_EVENTS_COUNTER: &str = "openwec_input_events_total"; +pub const SUBSCRIPTION_UUID: &str = "subscription_uuid"; +pub const SUBSCRIPTION_NAME: &str = "subscription_name"; +pub const MACHINE: &str = "machine"; + +pub const INPUT_EVENT_BYTES_COUNTER: &str = "openwec_input_event_bytes_total"; +pub const INPUT_EVENT_PARSING_FAILURES: &str = "openwec_input_event_parsing_failures_total"; +pub const INPUT_EVENT_PARSING_FAILURE_ERROR_TYPE: &str = "type"; + +// http metrics -pub const FAILED_EVENTS_COUNTER: &str = "openwec_event_output_failures_total"; +pub const HTTP_REQUESTS_COUNTER: &str = "openwec_http_requests_total"; -pub const HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM: &str = "http_request_duration_seconds"; -pub const HTTP_REQUEST_METHOD: &str = "method"; +pub const HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM: &str = "openwec_http_request_duration_seconds"; pub const HTTP_REQUEST_URI: &str = "uri"; -pub const HTTP_REQUEST_STATUS: &str = "status"; -pub const HTTP_REQUEST_MACHINE: &str = "machine"; +pub const HTTP_REQUEST_STATUS_CODE: &str = "code"; pub const HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER: &str = - "http_request_body_network_size_bytes_total"; + "openwec_http_request_body_network_size_bytes_total"; pub const HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER: &str = - "http_request_body_real_size_bytes_total"; + "openwec_http_request_body_real_size_bytes_total"; -pub const EVENT_SIZE_BYTES_COUNTER: &str = "openwec_event_size_bytes_total"; +// output metrics + +pub const OUTPUT_DRIVER_FAILURES: &str = "openwec_output_driver_failures_total"; +pub const OUTPUT_DRIVER: &str = "driver"; +pub const OUTPUT_FORMAT_FAILURES: &str = "openwec_output_format_failures_total"; +pub const OUTPUT_FORMAT: &str = "format"; pub fn init(settings: &Monitoring) -> Result<()> { let addr = SocketAddr::from(( @@ -53,25 +64,38 @@ pub fn init(settings: &Monitoring) -> Result<()> { builder.install()?; + // input describe_counter!( - MESSAGES_COUNTER, + INPUT_EVENTS_COUNTER, Unit::Count, - "Number of messages received by openwec" + "The total number of events received by openwec" + ); + describe_counter!( + INPUT_EVENT_BYTES_COUNTER, + Unit::Bytes, + "The total size of all events received by openwec" ); describe_counter!( - EVENTS_COUNTER, + INPUT_MESSAGES_COUNTER, Unit::Count, - "Number of events received by openwec" + "The total number of messages received by openwec" ); describe_counter!( - FAILED_EVENTS_COUNTER, + INPUT_EVENT_PARSING_FAILURES, Unit::Count, - "Number of events that could not be written to outputs by openwec" + "The total number of event parsing failures" + ); + + // http + describe_counter!( + HTTP_REQUESTS_COUNTER, + Unit::Count, + "The total number of HTTP requests handled by openwec" ); describe_histogram!( HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM, Unit::Seconds, - "HTTP requests duration histogram" + "Histogram of response duration for HTTP requests" ); describe_counter!( HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, @@ -83,10 +107,17 @@ pub fn init(settings: &Monitoring) -> Result<()> { Unit::Bytes, "The total size of all http requests body received by openwec after decryption and decompression" ); + + // output describe_counter!( - EVENT_SIZE_BYTES_COUNTER, - Unit::Bytes, - "The total size of all events received by openwec" + OUTPUT_DRIVER_FAILURES, + Unit::Count, + "The total number of output driver failures" + ); + describe_counter!( + OUTPUT_FORMAT_FAILURES, + Unit::Count, + "The total number of output format failures" ); Ok(()) diff --git a/server/src/output.rs b/server/src/output.rs index 99a428f..156b741 100644 --- a/server/src/output.rs +++ b/server/src/output.rs @@ -138,6 +138,10 @@ impl Output { ) } + pub fn driver(&self) -> String { + format!("{:?}", self.subscription_output_driver) + } + pub async fn write( &self, metadata: Arc,