diff --git a/src/drivers/fake.rs b/src/drivers/fake.rs index 10afef1c..7e744eeb 100644 --- a/src/drivers/fake.rs +++ b/src/drivers/fake.rs @@ -8,13 +8,13 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::{read_all_messages, Protocol}, - stats::driver::{DriverStats, DriverStatsInfo}, + stats::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, }; pub struct FakeSink { on_message_input: Callbacks>, print: bool, - stats: Arc>, + stats: Arc>, } impl FakeSink { @@ -22,7 +22,7 @@ impl FakeSink { FakeSinkBuilder(Self { on_message_input: Callbacks::new(), print: false, - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } } @@ -93,13 +93,13 @@ impl Driver for FakeSink { } #[async_trait::async_trait] -impl DriverStats for FakeSink { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for FakeSink { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } @@ -146,7 +146,7 @@ impl DriverInfo for FakeSinkInfo { pub struct FakeSource { period: std::time::Duration, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } impl FakeSource { @@ -154,7 +154,7 @@ impl FakeSource { FakeSourceBuilder(Self { period, on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } } @@ -248,13 +248,13 @@ impl Driver for FakeSource { } #[async_trait::async_trait] -impl DriverStats for FakeSource { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for FakeSource { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/drivers/mod.rs b/src/drivers/mod.rs index 953a85ad..afe2e1d8 100644 --- a/src/drivers/mod.rs +++ b/src/drivers/mod.rs @@ -12,7 +12,7 @@ use tokio::sync::broadcast; use tracing::*; use url::Url; -use crate::{protocol::Protocol, stats::driver::DriverStats}; +use crate::{protocol::Protocol, stats::driver::AccumulatedDriverStatsProvider}; #[derive(Clone, Debug, Eq, Hash, PartialEq)] pub enum Type { @@ -36,7 +36,7 @@ pub struct DriverDescriptionLegacy { } #[async_trait::async_trait] -pub trait Driver: Send + Sync + DriverStats { +pub trait Driver: Send + Sync + AccumulatedDriverStatsProvider { async fn run(&self, hub_sender: broadcast::Sender>) -> Result<()>; fn info(&self) -> Box; } @@ -224,7 +224,7 @@ mod tests { use tokio::sync::RwLock; use tracing::*; - use crate::stats::driver::DriverStatsInfo; + use crate::stats::driver::AccumulatedDriverStats; use super::*; @@ -244,7 +244,7 @@ mod tests { pub struct ExampleDriver { on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } impl ExampleDriver { @@ -252,7 +252,7 @@ mod tests { ExampleDriverBuilder(Self { on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } } @@ -306,13 +306,13 @@ mod tests { } #[async_trait::async_trait] - impl DriverStats for ExampleDriver { - async fn stats(&self) -> DriverStatsInfo { + impl AccumulatedDriverStatsProvider for ExampleDriver { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/drivers/serial/mod.rs b/src/drivers/serial/mod.rs index 70bfdc34..4c1a6546 100644 --- a/src/drivers/serial/mod.rs +++ b/src/drivers/serial/mod.rs @@ -12,7 +12,7 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::{read_all_messages, Protocol}, - stats::driver::{DriverStats, DriverStatsInfo}, + stats::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, }; pub struct Serial { @@ -20,7 +20,7 @@ pub struct Serial { pub baud_rate: u32, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct SerialBuilder(Serial); @@ -55,7 +55,7 @@ impl Serial { baud_rate, on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } @@ -179,13 +179,13 @@ impl Driver for Serial { } #[async_trait::async_trait] -impl DriverStats for Serial { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for Serial { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/drivers/tcp/client.rs b/src/drivers/tcp/client.rs index f570402c..782e82f1 100644 --- a/src/drivers/tcp/client.rs +++ b/src/drivers/tcp/client.rs @@ -14,14 +14,14 @@ use crate::{ Driver, DriverInfo, }, protocol::Protocol, - stats::driver::{DriverStats, DriverStatsInfo}, + stats::driver::{AccumulatedDriverStatsProvider, AccumulatedDriverStats}, }; pub struct TcpClient { pub remote_addr: String, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct TcpClientBuilder(TcpClient); @@ -55,7 +55,7 @@ impl TcpClient { remote_addr: remote_addr.to_string(), on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } } @@ -105,13 +105,13 @@ impl Driver for TcpClient { } #[async_trait::async_trait] -impl DriverStats for TcpClient { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for TcpClient { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/drivers/tcp/mod.rs b/src/drivers/tcp/mod.rs index 987e93f6..0c546cba 100644 --- a/src/drivers/tcp/mod.rs +++ b/src/drivers/tcp/mod.rs @@ -11,7 +11,7 @@ use tracing::*; use crate::{ protocol::{read_all_messages, Protocol}, - stats::driver::DriverStatsInfo, + stats::driver::AccumulatedDriverStats, }; pub mod client; @@ -24,7 +24,7 @@ async fn tcp_receive_task( remote_addr: &str, hub_sender: Arc>>, on_message_input: &Callbacks>, - stats: &Arc>, + stats: &Arc>, ) -> Result<()> { let mut buf = Vec::with_capacity(1024); @@ -67,7 +67,7 @@ async fn tcp_send_task( remote_addr: &str, mut hub_receiver: broadcast::Receiver>, on_message_output: &Callbacks>, - stats: &Arc>, + stats: &Arc>, ) -> Result<()> { loop { let message = match hub_receiver.recv().await { diff --git a/src/drivers/tcp/server.rs b/src/drivers/tcp/server.rs index c824f726..711d39ee 100644 --- a/src/drivers/tcp/server.rs +++ b/src/drivers/tcp/server.rs @@ -14,7 +14,7 @@ use crate::{ Driver, DriverInfo, }, protocol::Protocol, - stats::driver::{DriverStats, DriverStatsInfo}, + stats::driver::{AccumulatedDriverStatsProvider, AccumulatedDriverStats}, }; #[derive(Clone)] @@ -22,7 +22,7 @@ pub struct TcpServer { pub local_addr: String, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct TcpServerBuilder(TcpServer); @@ -56,7 +56,7 @@ impl TcpServer { local_addr: local_addr.to_string(), on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } @@ -71,7 +71,7 @@ impl TcpServer { hub_sender: Arc>>, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, ) -> Result<()> { let hub_receiver = hub_sender.subscribe(); @@ -132,13 +132,13 @@ impl Driver for TcpServer { } #[async_trait::async_trait] -impl DriverStats for TcpServer { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for TcpServer { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/drivers/tlog/reader.rs b/src/drivers/tlog/reader.rs index 3150849b..10777f41 100644 --- a/src/drivers/tlog/reader.rs +++ b/src/drivers/tlog/reader.rs @@ -10,13 +10,13 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::Protocol, - stats::driver::{DriverStats, DriverStatsInfo}, + stats::driver::{AccumulatedDriverStatsProvider, AccumulatedDriverStats}, }; pub struct TlogReader { pub path: PathBuf, on_message_input: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct TlogReaderBuilder(TlogReader); @@ -41,7 +41,7 @@ impl TlogReader { TlogReaderBuilder(Self { path, on_message_input: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } @@ -141,13 +141,13 @@ impl Driver for TlogReader { } #[async_trait::async_trait] -impl DriverStats for TlogReader { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for TlogReader { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/drivers/tlog/writer.rs b/src/drivers/tlog/writer.rs index 3bb4f38c..8da74a26 100644 --- a/src/drivers/tlog/writer.rs +++ b/src/drivers/tlog/writer.rs @@ -11,13 +11,13 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::Protocol, - stats::driver::{DriverStats, DriverStatsInfo}, + stats::driver::{AccumulatedDriverStatsProvider, AccumulatedDriverStats}, }; pub struct TlogWriter { pub path: PathBuf, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct TlogWriterBuilder(TlogWriter); @@ -42,7 +42,7 @@ impl TlogWriter { TlogWriterBuilder(Self { path, on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } @@ -109,13 +109,13 @@ impl Driver for TlogWriter { } #[async_trait::async_trait] -impl DriverStats for TlogWriter { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for TlogWriter { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/drivers/udp/client.rs b/src/drivers/udp/client.rs index c50d4fb7..cd8e9a26 100644 --- a/src/drivers/udp/client.rs +++ b/src/drivers/udp/client.rs @@ -11,14 +11,14 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::{read_all_messages, Protocol}, - stats::driver::{DriverStats, DriverStatsInfo}, + stats::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, }; pub struct UdpClient { pub remote_addr: String, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct UdpClientBuilder(UdpClient); @@ -52,7 +52,7 @@ impl UdpClient { remote_addr: remote_addr.to_string(), on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } @@ -209,13 +209,13 @@ impl Driver for UdpClient { } #[async_trait::async_trait] -impl DriverStats for UdpClient { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for UdpClient { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/drivers/udp/server.rs b/src/drivers/udp/server.rs index bfacd948..b8572804 100644 --- a/src/drivers/udp/server.rs +++ b/src/drivers/udp/server.rs @@ -11,7 +11,7 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::{read_all_messages, Protocol}, - stats::driver::{DriverStats, DriverStatsInfo}, + stats::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, }; pub struct UdpServer { @@ -19,7 +19,7 @@ pub struct UdpServer { clients: Arc>>, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct UdpServerBuilder(UdpServer); @@ -54,7 +54,7 @@ impl UdpServer { clients: Arc::new(RwLock::new(HashMap::new())), on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } @@ -217,13 +217,13 @@ impl Driver for UdpServer { } #[async_trait::async_trait] -impl DriverStats for UdpServer { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for UdpServer { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/hub/actor.rs b/src/hub/actor.rs index 3daf3cb7..02e2b1a2 100644 --- a/src/hub/actor.rs +++ b/src/hub/actor.rs @@ -7,12 +7,11 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, + hub::HubCommand, protocol::Protocol, - stats::driver::DriverStatsInfo, + stats::{driver::AccumulatedDriverStats, hub::AccumulatedHubStats}, }; -use super::protocol::HubCommand; - pub struct HubActor { drivers: HashMap>, bcst_sender: broadcast::Sender>, @@ -20,6 +19,8 @@ pub struct HubActor { component_id: Arc>, system_id: Arc>, heartbeat_task: tokio::task::JoinHandle>, + stats_task: tokio::task::JoinHandle>, + hub_stats: Arc>, } impl HubActor { @@ -41,9 +42,13 @@ impl HubActor { HubCommand::GetSender { response } => { let _ = response.send(self.bcst_sender.clone()); } - HubCommand::GetStats { response } => { - let stats = self.get_stats().await; - let _ = response.send(stats); + HubCommand::GetDriversStats { response } => { + let drivers_stats = self.get_drivers_stats().await; + let _ = response.send(drivers_stats); + } + HubCommand::GetHubStats { response } => { + let hub_stats = self.get_hub_stats().await; + let _ = response.send(hub_stats); } HubCommand::ResetAllStats { response } => { let _ = response.send(self.reset_all_stats().await); @@ -70,6 +75,12 @@ impl HubActor { Self::heartbeat_task(bcst_sender, component_id, system_id, frequency) }); + let hub_stats = Arc::new(RwLock::new(AccumulatedHubStats::default())); + let stats_task = tokio::spawn({ + let bcst_sender = bcst_sender.clone(); + let hub_stats = hub_stats.clone(); + + Self::stats_task(bcst_sender, hub_stats) }); Self { @@ -79,11 +90,13 @@ impl HubActor { component_id, system_id, heartbeat_task, + stats_task, + hub_stats, } } #[instrument(level = "debug", skip(self, driver))] - pub async fn add_driver(&mut self, driver: Arc) -> Result { + async fn add_driver(&mut self, driver: Arc) -> Result { let mut last_id = self.last_driver_id.write().await; let id = *last_id; *last_id += 1; @@ -102,7 +115,7 @@ impl HubActor { } #[instrument(level = "debug", skip(self))] - pub async fn remove_driver(&mut self, id: u64) -> Result<()> { + async fn remove_driver(&mut self, id: u64) -> Result<()> { self.drivers .remove(&id) .context("Driver id {id:?} not found")?; @@ -110,7 +123,7 @@ impl HubActor { } #[instrument(level = "debug", skip(self))] - pub async fn drivers(&self) -> HashMap> { + async fn drivers(&self) -> HashMap> { self.drivers .iter() .map(|(&id, driver)| (id, driver.info())) @@ -158,13 +171,57 @@ impl HubActor { } } + async fn stats_task( + bcst_sender: broadcast::Sender>, + hub_stats: Arc>, + ) -> Result<()> { + let mut bsct_receiver = bcst_sender.subscribe(); + + while let Ok(message) = bsct_receiver.recv().await { + let bytes = message.raw_bytes().len() as u64; + let timestamp = message.timestamp; + let current_time = chrono::Utc::now().timestamp_micros() as u64; + + { + let mut hub_stats = hub_stats.write().await; + + hub_stats.last_message_time = timestamp; + hub_stats.bytes += bytes; + hub_stats.messages += 1; + hub_stats.delay += current_time - timestamp; + } + + { + // let system_id = message.system_id(); + // let component_id = message.component_id(); + // let message_id = message.message_id(); + // let mut messages_stats = messages_stats.write().await; + + // // Aggregate statistics per system_id, component_id, and message_id + // message_stats + // .entry(system_id) + // .or_default() + // .entry(component_id) + // .or_default() + // .entry(message_id) + // .and_modify(|entry| { + // entry.0 = timestamp; + // entry.1 += 1; + // }) + // .or_insert((timestamp, 1)); + } + } + + Ok(()) + } + #[instrument(level = "debug", skip(self))] - pub fn get_sender(&self) -> broadcast::Sender> { + fn get_sender(&self) -> broadcast::Sender> { self.bcst_sender.clone() } #[instrument(level = "debug", skip(self))] - pub async fn get_stats(&self) -> Vec<(String, DriverStatsInfo)> { + async fn get_drivers_stats(&self) -> Vec<(String, AccumulatedDriverStats)> { let mut drivers_stats = Vec::with_capacity(self.drivers.len()); for (_id, driver) in self.drivers.iter() { let stats = driver.stats().await; @@ -178,11 +235,18 @@ impl HubActor { } #[instrument(level = "debug", skip(self))] - pub async fn reset_all_stats(&mut self) -> Result<()> { + async fn get_hub_stats(&self) -> AccumulatedHubStats { + self.hub_stats.read().await.clone() + } + + #[instrument(level = "debug", skip(self))] + async fn reset_all_stats(&mut self) -> Result<()> { for (_id, driver) in self.drivers.iter() { driver.reset_stats().await; } + *self.hub_stats.write().await = AccumulatedHubStats::default(); + Ok(()) } } diff --git a/src/hub/mod.rs b/src/hub/mod.rs index e4bfa210..6e188119 100644 --- a/src/hub/mod.rs +++ b/src/hub/mod.rs @@ -12,7 +12,7 @@ use tokio::sync::{broadcast, mpsc, oneshot, RwLock}; use crate::{ drivers::{Driver, DriverInfo}, protocol::Protocol, - stats::driver::DriverStatsInfo, + stats::{driver::AccumulatedDriverStats, hub::AccumulatedHubStats}, }; use actor::HubActor; @@ -21,7 +21,7 @@ use protocol::HubCommand; #[derive(Clone)] pub struct Hub { sender: mpsc::Sender, - task: Arc>>, + _task: Arc>>, } impl Hub { @@ -33,8 +33,8 @@ impl Hub { ) -> Self { let (sender, receiver) = mpsc::channel(32); let hub = HubActor::new(buffer_size, component_id, system_id, frequency).await; - let task = Arc::new(Mutex::new(tokio::spawn(hub.start(receiver)))); - Self { sender, task } + let _task = Arc::new(Mutex::new(tokio::spawn(hub.start(receiver)))); + Self { sender, _task } } pub async fn add_driver(&self, driver: Arc) -> Result { @@ -81,10 +81,21 @@ impl Hub { Ok(res) } - pub async fn stats(&self) -> Result> { + pub async fn drivers_stats(&self) -> Result> { let (response_tx, response_rx) = oneshot::channel(); self.sender - .send(HubCommand::GetStats { + .send(HubCommand::GetDriversStats { + response: response_tx, + }) + .await?; + let res = response_rx.await?; + Ok(res) + } + + pub async fn hub_stats(&self) -> Result { + let (response_tx, response_rx) = oneshot::channel(); + self.sender + .send(HubCommand::GetHubStats { response: response_tx, }) .await?; diff --git a/src/hub/protocol.rs b/src/hub/protocol.rs index b77ab8da..39a83508 100644 --- a/src/hub/protocol.rs +++ b/src/hub/protocol.rs @@ -6,7 +6,7 @@ use tokio::sync::{broadcast, oneshot}; use crate::{ drivers::{Driver, DriverInfo}, protocol::Protocol, - stats::driver::DriverStatsInfo, + stats::{driver::AccumulatedDriverStats, hub::AccumulatedHubStats}, }; pub enum HubCommand { @@ -24,8 +24,11 @@ pub enum HubCommand { GetSender { response: oneshot::Sender>>, }, - GetStats { - response: oneshot::Sender>, + GetHubStats { + response: oneshot::Sender, + }, + GetDriversStats { + response: oneshot::Sender>, }, ResetAllStats { response: oneshot::Sender>, diff --git a/src/main.rs b/src/main.rs index 67d96fa9..f2bde997 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,7 @@ use anyhow::*; use tokio::sync::RwLock; use tracing::*; -#[tokio::main(flavor = "multi_thread", worker_threads = 10)] +#[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { // CLI should be started before logger to allow control over verbosity cli::init(); diff --git a/src/stats/actor.rs b/src/stats/actor.rs index 12caf6c8..3fd84f8e 100644 --- a/src/stats/actor.rs +++ b/src/stats/actor.rs @@ -10,8 +10,9 @@ use tracing::*; use crate::{ hub::Hub, stats::{ - driver::{DriverStatsInfo, DriverStatsInfoInner}, - DriverStats, DriverStatsInner, StatsCommand, + driver::{AccumulatedDriverStats, AccumulatedDriverStatsInner}, + hub::AccumulatedHubStats, + DriverStats, DriversStats, StatsCommand, StatsInner, }, }; @@ -19,22 +20,47 @@ pub struct StatsActor { hub: Hub, start_time: Arc>, update_period: Arc>, - last_raw: Arc>>, - driver_stats: Arc>>, + last_accumulated_drivers_stats: Arc>>, + drivers_stats: Arc>, + last_accumulated_hub_stats: Arc>, + hub_stats: Arc>, } impl StatsActor { pub async fn start(mut self, mut receiver: mpsc::Receiver) { - let task = tokio::spawn({ + let drivers_stats_task = tokio::spawn({ let hub = self.hub.clone(); let update_period = Arc::clone(&self.update_period); - let last_raw = Arc::clone(&self.last_raw); - let driver_stats = Arc::clone(&self.driver_stats); + let last_accumulated_drivers_stats = Arc::clone(&self.last_accumulated_drivers_stats); + let drivers_stats = Arc::clone(&self.drivers_stats); let start_time = Arc::clone(&self.start_time); async move { loop { - update_driver_stats(&hub, &last_raw, &driver_stats, &start_time).await; + update_driver_stats( + &hub, + &last_accumulated_drivers_stats, + &drivers_stats, + &start_time, + ) + .await; + + tokio::time::sleep(*update_period.read().await).await; + } + } + }); + + let hub_stats_task = tokio::spawn({ + let hub = self.hub.clone(); + let update_period = Arc::clone(&self.update_period); + let last_accumulated_hub_stats = Arc::clone(&self.last_accumulated_hub_stats); + let hub_stats = Arc::clone(&self.hub_stats); + let start_time = Arc::clone(&self.start_time); + + async move { + loop { + update_hub_stats(&hub, &last_accumulated_hub_stats, &hub_stats, &start_time) + .await; tokio::time::sleep(*update_period.read().await).await; } @@ -55,31 +81,47 @@ impl StatsActor { let result = self.drivers_stats().await; let _ = response.send(result); } + StatsCommand::GetHubStats { response } => { + let result = self.hub_stats().await; + let _ = response.send(result); + } } } - task.abort(); + drivers_stats_task.abort(); + hub_stats_task.abort(); } #[instrument(level = "debug", skip(hub))] pub async fn new(hub: Hub, update_period: tokio::time::Duration) -> Self { let update_period = Arc::new(RwLock::new(update_period)); - let last_raw = Arc::new(RwLock::new(Vec::new())); - let driver_stats = Arc::new(RwLock::new(Vec::new())); + let last_accumulated_hub_stats = Arc::new(RwLock::new(AccumulatedHubStats::default())); + let hub_stats = Arc::new(RwLock::new(StatsInner::default())); + let last_accumulated_drivers_stats = Arc::new(RwLock::new(Vec::new())); + let drivers_stats = Arc::new(RwLock::new(Vec::new())); let start_time = Arc::new(RwLock::new(chrono::Utc::now().timestamp_micros() as u64)); Self { hub, update_period, - last_raw, - driver_stats, + last_accumulated_hub_stats, + hub_stats, + last_accumulated_drivers_stats, + drivers_stats, start_time, } } #[instrument(level = "debug", skip(self))] - pub async fn drivers_stats(&mut self) -> Result> { - let drivers_stats = self.driver_stats.read().await.clone(); + async fn hub_stats(&self) -> Result { + let hub_stats = self.hub_stats.read().await.clone(); + + Ok(hub_stats) + } + + #[instrument(level = "debug", skip(self))] + pub async fn drivers_stats(&mut self) -> Result { + let drivers_stats = self.drivers_stats.read().await.clone(); Ok(drivers_stats) } @@ -93,46 +135,106 @@ impl StatsActor { #[instrument(level = "debug", skip(self))] pub async fn reset(&mut self) -> Result<()> { - // note: hold the driver_stats locked until the hub clear each driver stats to minimize weird states - let mut driver_stats = self.driver_stats.write().await; + // note: hold the guards until the hub clear each driver stats to minimize weird states + let mut hub_stats = self.hub_stats.write().await; + let mut driver_stats = self.drivers_stats.write().await; + if let Err(error) = self.hub.reset_all_stats().await { - error!("Failed resetting driver stats: {error:?}"); + error!("Failed resetting stats: {error:?}"); } *self.start_time.write().await = chrono::Utc::now().timestamp_micros() as u64; - self.last_raw.write().await.clear(); + self.last_accumulated_drivers_stats.write().await.clear(); driver_stats.clear(); + *hub_stats = StatsInner::default(); + *self.last_accumulated_hub_stats.write().await = AccumulatedHubStats::default(); + Ok(()) } } +#[instrument(level = "debug", skip_all)] +async fn update_hub_stats( + hub: &Hub, + last_accumulated_hub_stats: &Arc>, + hub_stats: &Arc>, + start_time: &Arc>, +) { + let last_stats = last_accumulated_hub_stats.read().await.clone(); + let current_stats = hub.hub_stats().await.unwrap(); + + let start_time = start_time.read().await.clone(); + + let time_diff = calculate_time_diff( + last_stats.last_message_time, + current_stats.last_message_time, + ); + let total_time = calculate_time_diff(start_time, current_stats.last_message_time); + + let diff_messages = current_stats.messages - last_stats.messages; + let total_messages = current_stats.messages; + let messages_per_second = divide_safe(diff_messages as f64, time_diff); + let average_messages_per_second = divide_safe(total_messages as f64, total_time); + + let diff_bytes = current_stats.bytes - last_stats.bytes; + let total_bytes = current_stats.bytes; + let bytes_per_second = divide_safe(diff_bytes as f64, time_diff); + let average_bytes_per_second = divide_safe(total_bytes as f64, total_time); + + let delay = divide_safe(current_stats.delay as f64, current_stats.messages as f64); + let last_delay = divide_safe(last_stats.delay as f64, last_stats.messages as f64); + let jitter = (delay - last_delay).abs(); + + let new_hub_stats = StatsInner { + last_message_time: current_stats.last_message_time, + total_bytes, + bytes_per_second, + average_bytes_per_second, + total_messages, + messages_per_second, + average_messages_per_second, + delay, + jitter, + }; + + trace!("{new_hub_stats:#?}"); + + *hub_stats.write().await = new_hub_stats; + *last_accumulated_hub_stats.write().await = current_stats; +} + #[instrument(level = "debug", skip_all)] async fn update_driver_stats( hub: &Hub, - last_raw: &Arc>>, - driver_stats: &Arc>>, + last_accumulated_drivers_stats: &Arc>>, + driver_stats: &Arc>, start_time: &Arc>, ) { - let last_raw_stats: Vec<(String, DriverStatsInfo)> = last_raw.read().await.clone(); - let current_raw_stats: Vec<(String, DriverStatsInfo)> = hub.stats().await.unwrap(); + let last_stats = last_accumulated_drivers_stats.read().await.clone(); + let current_stats = hub.drivers_stats().await.unwrap(); - let last_map: HashMap<_, _> = last_raw_stats.into_iter().collect(); - let current_map: HashMap<_, _> = current_raw_stats + let last_map: HashMap<_, _> = last_stats.into_iter().collect(); + let current_map: HashMap<_, _> = current_stats .iter() .map(|(name, raw)| (name.clone(), raw.clone())) .collect(); let merged_keys: HashSet = last_map.keys().chain(current_map.keys()).cloned().collect(); - let merged_stats: Vec<(String, (Option, Option))> = - merged_keys - .into_iter() - .map(|name| { - let last = last_map.get(&name).cloned(); - let current = current_map.get(&name).cloned(); - (name, (last, current)) - }) - .collect(); + let merged_stats: Vec<( + String, + ( + Option, + Option, + ), + )> = merged_keys + .into_iter() + .map(|name| { + let last = last_map.get(&name).cloned(); + let current = current_map.get(&name).cloned(); + (name, (last, current)) + }) + .collect(); let mut new_driver_stats = Vec::new(); @@ -164,18 +266,18 @@ async fn update_driver_stats( trace!("{new_driver_stats:#?}"); *driver_stats.write().await = new_driver_stats; - *last_raw.write().await = current_raw_stats; + *last_accumulated_drivers_stats.write().await = current_stats; } /// Function to calculate the driver stats for either input or output, with proper averages #[instrument(level = "debug")] fn calculate_driver_stats( - last_stats: Option, - current_stats: Option, + last_stats: Option, + current_stats: Option, start_time: u64, -) -> Option { +) -> Option { if let Some(current_stats) = current_stats { - let time_diff = time_diff(last_stats.as_ref(), ¤t_stats); + let time_diff = accumulated_driver_stats_time_diff(last_stats.as_ref(), ¤t_stats); let total_time = total_time_since_start(start_time, ¤t_stats); let diff_messages = current_stats.messages as u64 @@ -199,7 +301,7 @@ fn calculate_driver_stats( ); let jitter = (delay - last_delay).abs(); - Some(DriverStatsInner { + Some(StatsInner { last_message_time: current_stats.last_update, total_bytes, bytes_per_second, @@ -217,24 +319,28 @@ fn calculate_driver_stats( /// Function to calculate the total time since the start (in seconds) #[instrument(level = "debug")] -fn total_time_since_start(start_time: u64, current_stats: &DriverStatsInfoInner) -> f64 { - (current_stats.last_update as f64 - start_time as f64) / 1_000_000.0 +fn total_time_since_start(start_time: u64, current_stats: &AccumulatedDriverStatsInner) -> f64 { + calculate_time_diff(start_time, current_stats.last_update) } /// Function to calculate the time difference (in seconds) #[instrument(level = "debug")] -fn time_diff( - last_stats: Option<&DriverStatsInfoInner>, - current_stats: &DriverStatsInfoInner, +fn accumulated_driver_stats_time_diff( + last_stats: Option<&AccumulatedDriverStatsInner>, + current_stats: &AccumulatedDriverStatsInner, ) -> f64 { if let Some(last_stats) = last_stats { // Microseconds to seconds - (current_stats.last_update as f64 - last_stats.last_update as f64) / 1_000_000.0 + calculate_time_diff(last_stats.last_update, current_stats.last_update) } else { f64::INFINITY } } +fn calculate_time_diff(last_time: u64, current_time: u64) -> f64 { + (current_time as f64 - last_time as f64) / 1_000_000.0 +} + #[instrument(level = "debug")] fn divide_safe(numerator: f64, denominator: f64) -> f64 { if denominator > 0.0 { diff --git a/src/stats/driver.rs b/src/stats/driver.rs index 18236a37..efbfc90f 100644 --- a/src/stats/driver.rs +++ b/src/stats/driver.rs @@ -3,23 +3,23 @@ use std::sync::Arc; use crate::protocol::Protocol; #[async_trait::async_trait] -pub trait DriverStats { - async fn stats(&self) -> DriverStatsInfo; +pub trait AccumulatedDriverStatsProvider { + async fn stats(&self) -> AccumulatedDriverStats; async fn reset_stats(&self); } #[derive(Default, Debug, Clone)] -pub struct DriverStatsInfo { - pub input: Option, - pub output: Option, +pub struct AccumulatedDriverStats { + pub input: Option, + pub output: Option, } -impl DriverStatsInfo { +impl AccumulatedDriverStats { pub async fn update_input(&mut self, message: Arc) { if let Some(stats) = self.input.as_mut() { stats.update(message).await; } else { - self.input.replace(DriverStatsInfoInner::default()); + self.input.replace(AccumulatedDriverStatsInner::default()); } } @@ -27,20 +27,20 @@ impl DriverStatsInfo { if let Some(stats) = self.output.as_mut() { stats.update(message).await; } else { - self.output.replace(DriverStatsInfoInner::default()); + self.output.replace(AccumulatedDriverStatsInner::default()); } } } #[derive(Clone, Debug)] -pub struct DriverStatsInfoInner { +pub struct AccumulatedDriverStatsInner { pub last_update: u64, pub messages: usize, pub bytes: usize, pub delay: u64, } -impl Default for DriverStatsInfoInner { +impl Default for AccumulatedDriverStatsInner { fn default() -> Self { Self { last_update: chrono::Utc::now().timestamp_micros() as u64, @@ -51,7 +51,7 @@ impl Default for DriverStatsInfoInner { } } -impl DriverStatsInfoInner { +impl AccumulatedDriverStatsInner { pub async fn update(&mut self, message: Arc) { self.last_update = chrono::Utc::now().timestamp_micros() as u64; self.bytes += message.raw_bytes().len(); diff --git a/src/stats/hub.rs b/src/stats/hub.rs new file mode 100644 index 00000000..86cf6bba --- /dev/null +++ b/src/stats/hub.rs @@ -0,0 +1,30 @@ +use indexmap::IndexMap; + +#[derive(Default, Debug, Clone)] +pub struct AccumulatedHubStats { + pub last_message_time: u64, + pub bytes: u64, + pub messages: u64, + pub delay: u64, +} + +pub type SystemId = u8; +pub type ComponentId = u8; +pub type MessageId = u32; + +pub struct HubMessagesStats { + pub systems_messages_stats: IndexMap, +} + +pub struct SystemMessagesStats { + pub components_messages_stats: IndexMap, +} + +pub struct ComponentIdMessageStats { + pub messages_stats: IndexMap, +} + +pub struct MessageStats { + pub last_message_time: u64, + pub counter: u64, +} diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 0b8b264f..3132b4f3 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -1,5 +1,6 @@ mod actor; pub mod driver; +pub mod hub; mod protocol; use std::sync::{Arc, Mutex}; @@ -12,26 +13,28 @@ use protocol::StatsCommand; use crate::hub::Hub; +pub type DriversStats = Vec<(String, DriverStats)>; + #[derive(Debug, Clone)] pub struct DriverStats { - input: Option, - output: Option, + pub input: Option, + pub output: Option, } -#[derive(Debug, Clone)] -pub struct DriverStatsInner { - last_message_time: u64, +#[derive(Debug, Clone, Default)] +pub struct StatsInner { + pub last_message_time: u64, - total_bytes: u64, - bytes_per_second: f64, - average_bytes_per_second: f64, + pub total_bytes: u64, + pub bytes_per_second: f64, + pub average_bytes_per_second: f64, - total_messages: u64, - messages_per_second: f64, - average_messages_per_second: f64, + pub total_messages: u64, + pub messages_per_second: f64, + pub average_messages_per_second: f64, - delay: f64, - jitter: f64, + pub delay: f64, + pub jitter: f64, } #[derive(Clone)] @@ -48,7 +51,7 @@ impl Stats { Self { sender, task } } - pub async fn driver_stats(&mut self) -> Result> { + pub async fn driver_stats(&mut self) -> Result { let (response_tx, response_rx) = oneshot::channel(); self.sender .send(StatsCommand::GetDriversStats { @@ -58,6 +61,16 @@ impl Stats { response_rx.await? } + pub async fn hub_stats(&mut self) -> Result { + let (response_tx, response_rx) = oneshot::channel(); + self.sender + .send(StatsCommand::GetHubStats { + response: response_tx, + }) + .await?; + response_rx.await? + } + pub async fn set_period(&mut self, period: tokio::time::Duration) -> Result<()> { let (response_tx, response_rx) = oneshot::channel(); self.sender diff --git a/src/stats/protocol.rs b/src/stats/protocol.rs index 297a155f..62422d4a 100644 --- a/src/stats/protocol.rs +++ b/src/stats/protocol.rs @@ -1,7 +1,7 @@ use anyhow::Result; use tokio::sync::oneshot; -use crate::stats::DriverStats; +use crate::stats::{DriversStats, StatsInner}; pub enum StatsCommand { SetPeriod { @@ -12,6 +12,9 @@ pub enum StatsCommand { response: oneshot::Sender>, }, GetDriversStats { - response: oneshot::Sender>>, + response: oneshot::Sender>, + }, + GetHubStats { + response: oneshot::Sender>, }, }