diff --git a/src/drivers/fake.rs b/src/drivers/fake.rs index 1418bafb..f8119ea1 100644 --- a/src/drivers/fake.rs +++ b/src/drivers/fake.rs @@ -8,21 +8,28 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::{read_all_messages, Protocol}, - stats::accumulated::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, + stats::{ + accumulated::driver::{AccumulatedDriverStatsInner, AccumulatedDriverStatsProvider}, + driver::DriverId, + }, }; pub struct FakeSink { + name: String, + uuid: DriverId, on_message_input: Callbacks>, print: bool, - stats: Arc>, + stats: Arc>, } impl FakeSink { - pub fn builder() -> FakeSinkBuilder { + pub fn builder(name: &str) -> FakeSinkBuilder { FakeSinkBuilder(Self { + name: name.to_string(), + uuid: Self::generate_uuid(name), on_message_input: Callbacks::new(), print: false, - stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())), }) } } @@ -80,16 +87,20 @@ impl Driver for FakeSink { fn info(&self) -> Box { Box::new(FakeSinkInfo) } + + fn uuid(&self) -> &DriverId { + &self.uuid + } } #[async_trait::async_trait] impl AccumulatedDriverStatsProvider for FakeSink { - async fn stats(&self) -> AccumulatedDriverStats { + async fn stats(&self) -> AccumulatedDriverStatsInner { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = AccumulatedDriverStats { + *self.stats.write().await = AccumulatedDriverStatsInner { input: None, output: None, } @@ -129,22 +140,26 @@ impl DriverInfo for FakeSinkInfo { } fn create_endpoint_from_url(&self, _url: &url::Url) -> Option> { - Some(Arc::new(FakeSink::builder().build())) + None } } pub struct FakeSource { + name: String, + uuid: DriverId, period: std::time::Duration, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } impl FakeSource { - pub fn builder(period: std::time::Duration) -> FakeSourceBuilder { + pub fn builder(name: &str, period: std::time::Duration) -> FakeSourceBuilder { FakeSourceBuilder(Self { + name: name.to_string(), + uuid: Self::generate_uuid(name), period, on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())), }) } } @@ -234,16 +249,20 @@ impl Driver for FakeSource { fn info(&self) -> Box { Box::new(FakeSourceInfo) } + + fn uuid(&self) -> &DriverId { + &self.uuid + } } #[async_trait::async_trait] impl AccumulatedDriverStatsProvider for FakeSource { - async fn stats(&self) -> AccumulatedDriverStats { + async fn stats(&self) -> AccumulatedDriverStatsInner { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = AccumulatedDriverStats { + *self.stats.write().await = AccumulatedDriverStatsInner { input: None, output: None, } @@ -284,7 +303,7 @@ impl DriverInfo for FakeSourceInfo { } fn create_endpoint_from_url(&self, _url: &url::Url) -> Option> { - Some(Arc::new(FakeSink::builder().print().build())) + None } } @@ -309,7 +328,7 @@ mod test { let sink_messages = Arc::new(RwLock::new(Vec::>::with_capacity(1000))); // FakeSink and task - let sink = FakeSink::builder() + let sink = FakeSink::builder("test") .on_message_input({ let sink_messages = sink_messages.clone(); @@ -330,7 +349,7 @@ mod test { }); // FakeSource and task - let source = FakeSource::builder(message_period) + let source = FakeSource::builder("test", message_period) .on_message_output({ let source_messages = source_messages.clone(); move |message: Arc| { diff --git a/src/drivers/mod.rs b/src/drivers/mod.rs index a81171d1..2da5bf13 100644 --- a/src/drivers/mod.rs +++ b/src/drivers/mod.rs @@ -39,6 +39,23 @@ pub struct DriverDescriptionLegacy { pub trait Driver: Send + Sync + AccumulatedDriverStatsProvider { async fn run(&self, hub_sender: broadcast::Sender>) -> Result<()>; fn info(&self) -> Box; + + fn uuid(&self) -> &uuid::Uuid; + + fn generate_uuid(name: &str) -> uuid::Uuid + where + Self: Sized, + { + uuid::Uuid::new_v5( + &uuid::Uuid::NAMESPACE_DNS, + format!( + "{typ}:{name}", + typ = std::any::type_name::(), + name = name + ) + .as_bytes(), + ) + } } pub trait DriverInfo: Sync + Send { @@ -224,7 +241,7 @@ mod tests { use tokio::sync::RwLock; use tracing::*; - use crate::stats::accumulated::driver::AccumulatedDriverStats; + use crate::stats::{accumulated::driver::AccumulatedDriverStatsInner, driver::DriverId}; use super::*; @@ -242,17 +259,21 @@ mod tests { // Example struct implementing Driver pub struct ExampleDriver { + name: String, + uuid: uuid::Uuid, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } impl ExampleDriver { - pub fn new() -> ExampleDriverBuilder { + pub fn new(name: &str, id: &str) -> ExampleDriverBuilder { ExampleDriverBuilder(Self { + name: name.to_string(), + uuid: Self::generate_uuid(&format!("{name}:{id}")), on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())), }) } } @@ -299,16 +320,20 @@ mod tests { fn info(&self) -> Box { Box::new(ExampleDriverInfo) } + + fn uuid(&self) -> &DriverId { + &self.uuid + } } #[async_trait::async_trait] impl AccumulatedDriverStatsProvider for ExampleDriver { - async fn stats(&self) -> AccumulatedDriverStats { + async fn stats(&self) -> AccumulatedDriverStatsInner { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = AccumulatedDriverStats { + *self.stats.write().await = AccumulatedDriverStatsInner { input: None, output: None, } @@ -343,7 +368,7 @@ mod tests { let (sender, _receiver) = tokio::sync::broadcast::channel(1); let called = Arc::new(RwLock::new(false)); - let driver = ExampleDriver::new() + let driver = ExampleDriver::new("test", "test") .on_message_input({ let called = called.clone(); move |_msg| { diff --git a/src/drivers/serial/mod.rs b/src/drivers/serial/mod.rs index 751e72c5..9066f6ad 100644 --- a/src/drivers/serial/mod.rs +++ b/src/drivers/serial/mod.rs @@ -12,15 +12,19 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::{read_all_messages, Protocol}, - stats::accumulated::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, + stats::{ + accumulated::driver::{AccumulatedDriverStatsInner, AccumulatedDriverStatsProvider}, + driver::DriverId, + }, }; pub struct Serial { + uuid: uuid::Uuid, pub port_name: String, pub baud_rate: u32, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct SerialBuilder(Serial); @@ -51,11 +55,12 @@ impl Serial { #[instrument(level = "debug")] pub fn builder(port_name: &str, baud_rate: u32) -> SerialBuilder { SerialBuilder(Self { + uuid: Self::generate_uuid(&format!("{port_name}:{baud_rate}")), port_name: port_name.to_string(), baud_rate, on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())), }) } @@ -176,28 +181,30 @@ impl Driver for Serial { fn info(&self) -> Box { Box::new(SerialInfo) } + + fn uuid(&self) -> &DriverId { + &self.uuid + } } #[async_trait::async_trait] impl AccumulatedDriverStatsProvider for Serial { - async fn stats(&self) -> AccumulatedDriverStats { + async fn stats(&self) -> AccumulatedDriverStatsInner { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = AccumulatedDriverStats { + *self.stats.write().await = AccumulatedDriverStatsInner { input: None, output: None, } } } - pub struct SerialInfo; impl DriverInfo for SerialInfo { fn name(&self) -> &str { "Serial" } - fn valid_schemes(&self) -> Vec { vec!["serial".to_string()] } diff --git a/src/drivers/tcp/client.rs b/src/drivers/tcp/client.rs index 4fe4f648..190332a0 100644 --- a/src/drivers/tcp/client.rs +++ b/src/drivers/tcp/client.rs @@ -14,14 +14,18 @@ use crate::{ Driver, DriverInfo, }, protocol::Protocol, - stats::accumulated::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, + stats::{ +accumulated::driver::{AccumulatedDriverStatsInner, AccumulatedDriverStatsProvider}, + driver::DriverId, + }, }; pub struct TcpClient { pub remote_addr: String, + uuid: DriverId, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct TcpClientBuilder(TcpClient); @@ -53,9 +57,10 @@ impl TcpClient { pub fn builder(remote_addr: &str) -> TcpClientBuilder { TcpClientBuilder(Self { remote_addr: remote_addr.to_string(), + uuid: Self::generate_uuid(remote_addr), on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())), }) } } @@ -102,16 +107,20 @@ impl Driver for TcpClient { fn info(&self) -> Box { return Box::new(TcpClientInfo); } + + fn uuid(&self) -> &DriverId{ + &self.uuid + } } #[async_trait::async_trait] impl AccumulatedDriverStatsProvider for TcpClient { - async fn stats(&self) -> AccumulatedDriverStats { + async fn stats(&self) -> AccumulatedDriverStatsInner { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = AccumulatedDriverStats { + *self.stats.write().await = AccumulatedDriverStatsInner { input: None, output: None, } diff --git a/src/drivers/tcp/mod.rs b/src/drivers/tcp/mod.rs index 7bc1b112..51d299ac 100644 --- a/src/drivers/tcp/mod.rs +++ b/src/drivers/tcp/mod.rs @@ -11,7 +11,7 @@ use tracing::*; use crate::{ protocol::{read_all_messages, Protocol}, - stats::accumulated::driver::AccumulatedDriverStats, + stats::accumulated::driver::AccumulatedDriverStatsInner, }; pub mod client; @@ -24,7 +24,7 @@ async fn tcp_receive_task( remote_addr: &str, hub_sender: Arc>>, on_message_input: &Callbacks>, - stats: &Arc>, + stats: &Arc>, ) -> Result<()> { let mut buf = Vec::with_capacity(1024); @@ -67,7 +67,7 @@ async fn tcp_send_task( remote_addr: &str, mut hub_receiver: broadcast::Receiver>, on_message_output: &Callbacks>, - stats: &Arc>, + stats: &Arc>, ) -> Result<()> { loop { let message = match hub_receiver.recv().await { diff --git a/src/drivers/tcp/server.rs b/src/drivers/tcp/server.rs index d8b83c2a..f7741d1d 100644 --- a/src/drivers/tcp/server.rs +++ b/src/drivers/tcp/server.rs @@ -14,15 +14,19 @@ use crate::{ Driver, DriverInfo, }, protocol::Protocol, - stats::accumulated::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, + stats::{ +accumulated::driver::{AccumulatedDriverStatsInner, AccumulatedDriverStatsProvider}, + driver::DriverId, + }, }; #[derive(Clone)] pub struct TcpServer { pub local_addr: String, + uuid: DriverId, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct TcpServerBuilder(TcpServer); @@ -54,9 +58,10 @@ impl TcpServer { pub fn builder(local_addr: &str) -> TcpServerBuilder { TcpServerBuilder(Self { local_addr: local_addr.to_string(), + uuid: Self::generate_uuid(local_addr), on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())), }) } @@ -71,7 +76,7 @@ impl TcpServer { hub_sender: Arc>>, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, ) -> Result<()> { let hub_receiver = hub_sender.subscribe(); @@ -129,16 +134,20 @@ impl Driver for TcpServer { fn info(&self) -> Box { return Box::new(TcpServerInfo); } + + fn uuid(&self) -> &DriverId{ + &self.uuid + } } #[async_trait::async_trait] impl AccumulatedDriverStatsProvider for TcpServer { - async fn stats(&self) -> AccumulatedDriverStats { + async fn stats(&self) -> AccumulatedDriverStatsInner { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = AccumulatedDriverStats { + *self.stats.write().await = AccumulatedDriverStatsInner { input: None, output: None, } diff --git a/src/drivers/tlog/reader.rs b/src/drivers/tlog/reader.rs index 35a95705..adc3479a 100644 --- a/src/drivers/tlog/reader.rs +++ b/src/drivers/tlog/reader.rs @@ -10,13 +10,17 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::Protocol, - stats::accumulated::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, + stats::{ +accumulated::driver::{AccumulatedDriverStatsInner, AccumulatedDriverStatsProvider}, + driver::DriverId, + }, }; pub struct TlogReader { pub path: PathBuf, + uuid: DriverId, on_message_input: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct TlogReaderBuilder(TlogReader); @@ -38,10 +42,13 @@ impl TlogReaderBuilder { impl TlogReader { #[instrument(level = "debug")] pub fn builder(path: PathBuf) -> TlogReaderBuilder { + let path_str = path.clone().display().to_string(); + TlogReaderBuilder(Self { path, + uuid: Self::generate_uuid(&path_str), on_message_input: Callbacks::new(), - stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())), }) } @@ -134,16 +141,20 @@ impl Driver for TlogReader { fn info(&self) -> Box { return Box::new(TlogReaderInfo); } + + fn uuid(&self) -> &DriverId{ + &self.uuid + } } #[async_trait::async_trait] impl AccumulatedDriverStatsProvider for TlogReader { - async fn stats(&self) -> AccumulatedDriverStats { + async fn stats(&self) -> AccumulatedDriverStatsInner { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = AccumulatedDriverStats { + *self.stats.write().await = AccumulatedDriverStatsInner { input: None, output: None, } diff --git a/src/drivers/tlog/writer.rs b/src/drivers/tlog/writer.rs index 60b11e9b..fb4b710d 100644 --- a/src/drivers/tlog/writer.rs +++ b/src/drivers/tlog/writer.rs @@ -11,13 +11,17 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::Protocol, - stats::accumulated::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, + stats::{ +accumulated::driver::{AccumulatedDriverStatsInner, AccumulatedDriverStatsProvider}, + driver::DriverId, + }, }; pub struct TlogWriter { pub path: PathBuf, + uuid: DriverId, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct TlogWriterBuilder(TlogWriter); @@ -39,10 +43,13 @@ impl TlogWriterBuilder { impl TlogWriter { #[instrument(level = "debug")] pub fn builder(path: PathBuf) -> TlogWriterBuilder { + let path_str = path.clone().display().to_string(); + TlogWriterBuilder(Self { path, + uuid: Self::generate_uuid(&path_str), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())), }) } @@ -102,16 +109,20 @@ impl Driver for TlogWriter { fn info(&self) -> Box { return Box::new(TlogWriterInfo); } + + fn uuid(&self) -> &DriverId{ + &self.uuid + } } #[async_trait::async_trait] impl AccumulatedDriverStatsProvider for TlogWriter { - async fn stats(&self) -> AccumulatedDriverStats { + async fn stats(&self) -> AccumulatedDriverStatsInner { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = AccumulatedDriverStats { + *self.stats.write().await = AccumulatedDriverStatsInner { input: None, output: None, } diff --git a/src/drivers/udp/client.rs b/src/drivers/udp/client.rs index b78c356b..a75bdbe2 100644 --- a/src/drivers/udp/client.rs +++ b/src/drivers/udp/client.rs @@ -11,14 +11,18 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::{read_all_messages, Protocol}, - stats::accumulated::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, + stats::{ + accumulated::driver::{AccumulatedDriverStatsInner, AccumulatedDriverStatsProvider}, + driver::DriverId, + }, }; pub struct UdpClient { pub remote_addr: String, + uuid: DriverId, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct UdpClientBuilder(UdpClient); @@ -50,9 +54,10 @@ impl UdpClient { pub fn builder(remote_addr: &str) -> UdpClientBuilder { UdpClientBuilder(Self { remote_addr: remote_addr.to_string(), + uuid: Self::generate_uuid(remote_addr), on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())), }) } @@ -201,16 +206,20 @@ impl Driver for UdpClient { fn info(&self) -> Box { return Box::new(UdpClientInfo); } + + fn uuid(&self) -> &DriverId { + &self.uuid + } } #[async_trait::async_trait] impl AccumulatedDriverStatsProvider for UdpClient { - async fn stats(&self) -> AccumulatedDriverStats { + async fn stats(&self) -> AccumulatedDriverStatsInner { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = AccumulatedDriverStats { + *self.stats.write().await = AccumulatedDriverStatsInner { input: None, output: None, } diff --git a/src/drivers/udp/server.rs b/src/drivers/udp/server.rs index 5dda57db..a6931c2c 100644 --- a/src/drivers/udp/server.rs +++ b/src/drivers/udp/server.rs @@ -11,15 +11,19 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::{read_all_messages, Protocol}, - stats::accumulated::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, + stats::{ + accumulated::driver::{AccumulatedDriverStatsInner, AccumulatedDriverStatsProvider}, + driver::DriverId, + }, }; pub struct UdpServer { pub local_addr: String, + uuid: DriverId, clients: Arc>>, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct UdpServerBuilder(UdpServer); @@ -48,13 +52,14 @@ impl UdpServerBuilder { impl UdpServer { #[instrument(level = "debug")] - pub fn builder(local_addr: String) -> UdpServerBuilder { + pub fn builder(local_addr: &str) -> UdpServerBuilder { UdpServerBuilder(Self { - local_addr, + local_addr: local_addr.to_string(), + uuid: Self::generate_uuid(local_addr), clients: Arc::new(RwLock::new(HashMap::new())), on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())), }) } @@ -209,16 +214,20 @@ impl Driver for UdpServer { fn info(&self) -> Box { return Box::new(UdpServerInfo); } + + fn uuid(&self) -> &DriverId { + &self.uuid + } } #[async_trait::async_trait] impl AccumulatedDriverStatsProvider for UdpServer { - async fn stats(&self) -> AccumulatedDriverStats { + async fn stats(&self) -> AccumulatedDriverStatsInner { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = AccumulatedDriverStats { + *self.stats.write().await = AccumulatedDriverStatsInner { input: None, output: None, } @@ -267,7 +276,7 @@ impl DriverInfo for UdpServerInfo { let host = url.host_str().unwrap(); let port = url.port().unwrap(); Some(Arc::new( - UdpServer::builder(format!("{host}:{port}")).build(), + UdpServer::builder(&format!("{host}:{port}")).build(), )) } } diff --git a/src/hub/actor.rs b/src/hub/actor.rs index c81cd663..bb43e6e3 100644 --- a/src/hub/actor.rs +++ b/src/hub/actor.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, ops::Div, sync::Arc}; use anyhow::{anyhow, Context, Result}; +use indexmap::IndexMap; use mavlink::MAVLinkV2MessageRaw; use tokio::sync::{broadcast, mpsc, RwLock}; use tracing::*; @@ -10,7 +11,8 @@ use crate::{ hub::HubCommand, protocol::Protocol, stats::accumulated::{ - driver::AccumulatedDriverStats, messages::AccumulatedHubMessagesStats, + driver::{AccumulatedDriverStats, AccumulatedDriverStatsInner, AccumulatedDriversStats}, + messages::AccumulatedHubMessagesStats, AccumulatedStatsInner, }, }; @@ -204,14 +206,14 @@ impl HubActor { } #[instrument(level = "debug", skip(self))] - async fn get_drivers_stats(&self) -> Vec<(String, AccumulatedDriverStats)> { - let mut drivers_stats = Vec::with_capacity(self.drivers.len()); + async fn get_drivers_stats(&self) -> AccumulatedDriversStats { + let mut drivers_stats = IndexMap::with_capacity(self.drivers.len()); for (_id, driver) in self.drivers.iter() { let stats = driver.stats().await; - let info = driver.info(); - let name = info.name().to_owned(); + let name = driver.info().name().to_owned(); + let id = *driver.uuid(); - drivers_stats.push((name, stats)); + drivers_stats.insert(id, AccumulatedDriverStats { name, stats }); } drivers_stats diff --git a/src/hub/mod.rs b/src/hub/mod.rs index 45fdf7ca..4a89c8b0 100644 --- a/src/hub/mod.rs +++ b/src/hub/mod.rs @@ -13,7 +13,7 @@ use crate::{ drivers::{Driver, DriverInfo}, protocol::Protocol, stats::accumulated::{ - driver::AccumulatedDriverStats, messages::AccumulatedHubMessagesStats, + driver::AccumulatedDriversStats, messages::AccumulatedHubMessagesStats, AccumulatedStatsInner, }, }; @@ -84,7 +84,7 @@ impl Hub { Ok(res) } - pub async fn drivers_stats(&self) -> Result> { + pub async fn drivers_stats(&self) -> Result { let (response_tx, response_rx) = oneshot::channel(); self.sender .send(HubCommand::GetDriversStats { diff --git a/src/hub/protocol.rs b/src/hub/protocol.rs index 3a30da60..1dbee5a9 100644 --- a/src/hub/protocol.rs +++ b/src/hub/protocol.rs @@ -7,7 +7,8 @@ use crate::{ drivers::{Driver, DriverInfo}, protocol::Protocol, stats::accumulated::{ - driver::AccumulatedDriverStats, messages::AccumulatedHubMessagesStats, + driver::{AccumulatedDriverStatsInner, AccumulatedDriversStats}, + messages::AccumulatedHubMessagesStats, AccumulatedStatsInner, }, }; @@ -34,7 +35,7 @@ pub enum HubCommand { response: oneshot::Sender, }, GetDriversStats { - response: oneshot::Sender>, + response: oneshot::Sender, }, ResetAllStats { response: oneshot::Sender>, diff --git a/src/stats/accumulated/driver.rs b/src/stats/accumulated/driver.rs index 152ef05c..fe4d369d 100644 --- a/src/stats/accumulated/driver.rs +++ b/src/stats/accumulated/driver.rs @@ -1,22 +1,32 @@ use std::sync::Arc; -use crate::protocol::Protocol; +use indexmap::IndexMap; + +use crate::{protocol::Protocol, stats::driver::DriverId}; use super::AccumulatedStatsInner; +pub type AccumulatedDriversStats = IndexMap; + #[async_trait::async_trait] pub trait AccumulatedDriverStatsProvider { - async fn stats(&self) -> AccumulatedDriverStats; + async fn stats(&self) -> AccumulatedDriverStatsInner; async fn reset_stats(&self); } #[derive(Default, Debug, Clone)] pub struct AccumulatedDriverStats { + pub name: String, + pub stats: AccumulatedDriverStatsInner, +} + +#[derive(Default, Debug, Clone)] +pub struct AccumulatedDriverStatsInner { pub input: Option, pub output: Option, } -impl AccumulatedDriverStats { +impl AccumulatedDriverStatsInner { pub fn update_input(&mut self, message: &Arc) { if let Some(stats) = self.input.as_mut() { stats.update(message); diff --git a/src/stats/actor.rs b/src/stats/actor.rs index a8f8aea7..122daaac 100644 --- a/src/stats/actor.rs +++ b/src/stats/actor.rs @@ -1,21 +1,20 @@ -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use std::sync::Arc; use anyhow::Result; -use tokio::sync::{mpsc, RwLock}; +use indexmap::IndexMap; +use tokio::sync::{mpsc, Mutex, RwLock}; use tracing::*; use crate::{ hub::Hub, stats::{ accumulated::{ - driver::AccumulatedDriverStats, messages::AccumulatedHubMessagesStats, + driver::AccumulatedDriversStats, messages::AccumulatedHubMessagesStats, AccumulatedStatsInner, }, + driver::{DriverStats, DriverStatsInner}, messages::HubMessagesStats, - DriverStats, DriversStats, StatsCommand, StatsInner, + DriversStats, StatsCommand, StatsInner, }, }; @@ -23,11 +22,11 @@ pub struct StatsActor { hub: Hub, start_time: Arc>, update_period: Arc>, - last_accumulated_drivers_stats: Arc>>, + last_accumulated_drivers_stats: Arc>, drivers_stats: Arc>, - last_accumulated_hub_stats: Arc>, + last_accumulated_hub_stats: Arc>, hub_stats: Arc>, - last_accumulated_hub_messages_stats: Arc>, + last_accumulated_hub_messages_stats: Arc>, hub_messages_stats: Arc>, } @@ -128,12 +127,13 @@ impl StatsActor { #[instrument(level = "debug", skip(hub))] pub async fn new(hub: Hub, update_period: tokio::time::Duration) -> Self { let update_period = Arc::new(RwLock::new(update_period)); - let last_accumulated_hub_stats = Arc::new(RwLock::new(AccumulatedStatsInner::default())); + let last_accumulated_hub_stats = Arc::new(Mutex::new(AccumulatedStatsInner::default())); let hub_stats = Arc::new(RwLock::new(StatsInner::default())); - let last_accumulated_drivers_stats = Arc::new(RwLock::new(Vec::new())); - let drivers_stats = Arc::new(RwLock::new(Vec::new())); + let last_accumulated_drivers_stats = + Arc::new(Mutex::new(AccumulatedDriversStats::default())); + let drivers_stats = Arc::new(RwLock::new(DriversStats::default())); let last_accumulated_hub_messages_stats = - Arc::new(RwLock::new(AccumulatedHubMessagesStats::default())); + Arc::new(Mutex::new(AccumulatedHubMessagesStats::default())); let hub_messages_stats = Arc::new(RwLock::new(HubMessagesStats::default())); let start_time = Arc::new(RwLock::new(chrono::Utc::now().timestamp_micros() as u64)); @@ -190,28 +190,27 @@ impl StatsActor { } *self.start_time.write().await = chrono::Utc::now().timestamp_micros() as u64; - self.last_accumulated_drivers_stats.write().await.clear(); + self.last_accumulated_drivers_stats.lock().await.clear(); driver_stats.clear(); *hub_messages_stats = HubMessagesStats::default(); - *self.last_accumulated_hub_messages_stats.write().await = + *self.last_accumulated_hub_messages_stats.lock().await = AccumulatedHubMessagesStats::default(); *hub_stats = StatsInner::default(); - *self.last_accumulated_hub_stats.write().await = AccumulatedStatsInner::default(); + *self.last_accumulated_hub_stats.lock().await = AccumulatedStatsInner::default(); Ok(()) } } -#[instrument(level = "debug", skip_all)] async fn update_hub_messages_stats( hub: &Hub, - last_accumulated_hub_messages_stats: &RwLock, + last_accumulated_hub_messages_stats: &Mutex, hub_messages_stats: &RwLock, start_time: &RwLock, ) { - let last_stats = last_accumulated_hub_messages_stats.read().await.clone(); + let mut last_stats = last_accumulated_hub_messages_stats.lock().await; let current_stats = hub.hub_messages_stats().await.unwrap(); let start_time = start_time.read().await.clone(); @@ -250,20 +249,19 @@ async fn update_hub_messages_stats( } } - trace!("{new_hub_messages_stats:#?}"); + debug!("{new_hub_messages_stats:#?}"); *hub_messages_stats.write().await = new_hub_messages_stats; - *last_accumulated_hub_messages_stats.write().await = current_stats; + *last_stats = current_stats; } -#[instrument(level = "debug", skip_all)] async fn update_hub_stats( hub: &Hub, - last_accumulated_hub_stats: &Arc>, + last_accumulated_hub_stats: &Arc>, hub_stats: &Arc>, start_time: &Arc>, ) { - let last_stats = last_accumulated_hub_stats.read().await.clone(); + let mut last_stats = last_accumulated_hub_stats.lock().await; let current_stats = hub.hub_stats().await.unwrap(); let start_time = start_time.read().await.clone(); @@ -272,56 +270,43 @@ async fn update_hub_stats( trace!("{new_hub_stats:#?}"); *hub_stats.write().await = new_hub_stats; - *last_accumulated_hub_stats.write().await = current_stats; + *last_stats = current_stats; } -#[instrument(level = "debug", skip_all)] async fn update_driver_stats( hub: &Hub, - last_accumulated_drivers_stats: &Arc>>, + last_accumulated_drivers_stats: &Arc>, driver_stats: &Arc>, start_time: &Arc>, ) { - let last_stats = last_accumulated_drivers_stats.read().await.clone(); - let current_stats = hub.drivers_stats().await.unwrap(); - - let last_map: HashMap<_, _> = last_stats.into_iter().collect(); - let current_map: HashMap<_, _> = current_stats - .iter() - .map(|(name, raw)| (name.clone(), raw.clone())) - .collect(); - - let merged_keys: HashSet = last_map.keys().chain(current_map.keys()).cloned().collect(); - - let merged_stats: Vec<( - String, - ( - Option, - Option, - ), - )> = merged_keys - .into_iter() - .map(|name| { - let last = last_map.get(&name).cloned(); - let current = current_map.get(&name).cloned(); - (name, (last, current)) - }) - .collect(); - - let mut new_driver_stats = Vec::new(); + let mut last_map = last_accumulated_drivers_stats.lock().await; // Use reference + let current_map = hub.drivers_stats().await.unwrap(); // Assuming this returns a full map + let start_time = *start_time.read().await; // Dereference directly without cloning - let start_time = start_time.read().await.clone(); + let mut merged_stats = IndexMap::with_capacity(last_map.len().max(current_map.len())); - for (name, (last, current)) in merged_stats { - if let Some(current_stats) = current { - let new_input_stats = if let Some(current_input_stats) = ¤t_stats.input { - let default_input = AccumulatedStatsInner::default(); + for (uuid, last_struct) in last_map.iter() { + merged_stats.insert(*uuid, (Some(last_struct), None)); + } + + for (uuid, current_struct) in ¤t_map { + merged_stats + .entry(*uuid) + .and_modify(|e| e.1 = Some(current_struct)) + .or_insert((None, Some(current_struct))); + } + + let mut new_map = IndexMap::with_capacity(merged_stats.len()); - let last_input_stats = if let Some(last_stats) = &last { - last_stats.input.as_ref().unwrap_or(&default_input) - } else { - &default_input - }; + let default_input = AccumulatedStatsInner::default(); + let default_output = AccumulatedStatsInner::default(); + + for (uuid, (last, current)) in merged_stats { + if let Some(current_stats) = current { + let new_input_stats = if let Some(current_input_stats) = ¤t_stats.stats.input { + let last_input_stats = last + .and_then(|l| l.stats.input.as_ref()) + .unwrap_or(&default_input); Some(StatsInner::from_accumulated( current_input_stats, @@ -332,14 +317,10 @@ async fn update_driver_stats( None }; - let new_output_stats = if let Some(current_output_stats) = ¤t_stats.output { - let default_output = AccumulatedStatsInner::default(); - - let last_output_stats = if let Some(last_stats) = &last { - last_stats.output.as_ref().unwrap_or(&default_output) - } else { - &default_output - }; + let new_output_stats = if let Some(current_output_stats) = ¤t_stats.stats.output { + let last_output_stats = last + .and_then(|l| l.stats.output.as_ref()) + .unwrap_or(&default_output); Some(StatsInner::from_accumulated( current_output_stats, @@ -350,18 +331,21 @@ async fn update_driver_stats( None }; - new_driver_stats.push(( - name, + new_map.insert( + uuid, DriverStats { - input: new_input_stats, - output: new_output_stats, + id: uuid, + inner: DriverStatsInner { + input: new_input_stats, + output: new_output_stats, + }, }, - )); + ); } } - trace!("{new_driver_stats:#?}"); + trace!("{new_map:#?}"); - *driver_stats.write().await = new_driver_stats; - *last_accumulated_drivers_stats.write().await = current_stats; + *driver_stats.write().await = new_map; + *last_map = current_map; } diff --git a/src/stats/driver.rs b/src/stats/driver.rs new file mode 100644 index 00000000..ecf4cbf4 --- /dev/null +++ b/src/stats/driver.rs @@ -0,0 +1,19 @@ +use indexmap::IndexMap; + +use super::StatsInner; + +pub type DriverId = uuid::Uuid; + +pub type DriversStats = IndexMap; + +#[derive(Debug, Clone)] +pub struct DriverStats { + pub id: DriverId, + pub inner: DriverStatsInner, +} + +#[derive(Debug, Clone)] +pub struct DriverStatsInner { + pub input: Option, + pub output: Option, +} diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 8c65e7d6..fe67a87f 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -1,5 +1,6 @@ pub mod accumulated; mod actor; +pub mod driver; mod messages; mod protocol; @@ -7,6 +8,7 @@ use std::sync::{Arc, Mutex}; use accumulated::AccumulatedStatsInner; use anyhow::Result; +use driver::DriversStats; use messages::HubMessagesStats; use tokio::sync::{mpsc, oneshot}; @@ -15,28 +17,12 @@ use protocol::StatsCommand; use crate::hub::Hub; -pub type DriversStats = Vec<(String, DriverStats)>; - -#[derive(Debug, Clone)] -pub struct DriverStats { - pub input: Option, - pub output: Option, -} - #[derive(Debug, Clone, Default)] pub struct StatsInner { pub last_message_time_us: u64, - - pub total_bytes: u64, - pub bytes_per_second: f64, - pub average_bytes_per_second: f64, - - pub total_messages: u64, - pub messages_per_second: f64, - pub average_messages_per_second: f64, - - pub delay: f64, - pub jitter: f64, + pub bytes: ByteStats, + pub messages: MessageStats, + pub delay_stats: DelayStats, } impl StatsInner { @@ -49,34 +35,109 @@ impl StatsInner { calculate_time_diff_us(last_stats.last_update_us, current_stats.last_update_us); let total_time = calculate_time_diff_us(start_time, current_stats.last_update_us); - let diff_messages = current_stats.messages - last_stats.messages; - let total_messages = current_stats.messages; - let messages_per_second = divide_safe(diff_messages as f64, time_diff); - let average_messages_per_second = divide_safe(total_messages as f64, total_time); + let byte_stats = ByteStats::from_accumulated( + current_stats.bytes, + last_stats.bytes, + time_diff, + total_time, + ); + + let message_stats = MessageStats::from_accumulated( + current_stats.messages, + last_stats.messages, + time_diff, + total_time, + ); + + let delay_stats = DelayStats::from_accumulated( + current_stats.delay, + last_stats.delay, + current_stats.messages, + last_stats.messages, + ); - let diff_bytes = current_stats.bytes - last_stats.bytes; - let total_bytes = current_stats.bytes; - let bytes_per_second = divide_safe(diff_bytes as f64, time_diff); - let average_bytes_per_second = divide_safe(total_bytes as f64, total_time); + Self { + last_message_time_us: current_stats.last_update_us, + bytes: byte_stats, + messages: message_stats, + delay_stats: delay_stats, + } + } +} - let delay = divide_safe(current_stats.delay as f64, current_stats.messages as f64); - let last_delay = divide_safe(last_stats.delay as f64, last_stats.messages as f64); - let jitter = (delay - last_delay).abs(); +#[derive(Debug, Clone, Default)] +pub struct ByteStats { + pub total_bytes: u64, + pub bytes_per_second: f64, + pub average_bytes_per_second: f64, +} + +impl ByteStats { + pub fn from_accumulated( + current_bytes: u64, + last_bytes: u64, + time_diff: f64, + total_time: f64, + ) -> Self { + let diff_bytes = current_bytes - last_bytes; + let bytes_per_second = divide_safe(diff_bytes as f64, time_diff); + let average_bytes_per_second = divide_safe(current_bytes as f64, total_time); Self { - last_message_time_us: current_stats.last_update_us, - total_bytes, + total_bytes: current_bytes, bytes_per_second, average_bytes_per_second, - total_messages, + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct MessageStats { + pub total_messages: u64, + pub messages_per_second: f64, + pub average_messages_per_second: f64, +} + +impl MessageStats { + pub fn from_accumulated( + current_messages: u64, + last_messages: u64, + time_diff: f64, + total_time: f64, + ) -> Self { + let diff_messages = current_messages - last_messages; + let messages_per_second = divide_safe(diff_messages as f64, time_diff); + let average_messages_per_second = divide_safe(current_messages as f64, total_time); + + Self { + total_messages: current_messages, messages_per_second, average_messages_per_second, - delay, - jitter, } } } +#[derive(Debug, Clone, Default)] +pub struct DelayStats { + pub delay: f64, + pub jitter: f64, +} + +impl DelayStats { + pub fn from_accumulated( + current_delay: u64, + last_delay: u64, + current_messages: u64, + last_messages: u64, + ) -> Self { + let delay = divide_safe(current_delay as f64, current_messages as f64); + let last_delay = divide_safe(last_delay as f64, last_messages as f64); + let jitter = (delay - last_delay).abs(); + + Self { delay, jitter } + } +} + #[derive(Clone)] pub struct Stats { sender: mpsc::Sender, @@ -147,6 +208,7 @@ fn calculate_time_diff_us(last_micros: u64, current_micros: u64) -> f64 { (current_micros as f64 - last_micros as f64) / 1_000_000.0 } +#[inline(always)] fn divide_safe(numerator: f64, denominator: f64) -> f64 { if denominator > 0.0 { numerator / denominator