diff --git a/src/hub/actor.rs b/src/hub/actor.rs index 3daf3cb7..8019c138 100644 --- a/src/hub/actor.rs +++ b/src/hub/actor.rs @@ -7,12 +7,11 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, + hub::{HubCommand, HubStatsInner}, protocol::Protocol, stats::driver::DriverStatsInfo, }; -use super::protocol::HubCommand; - pub struct HubActor { drivers: HashMap>, bcst_sender: broadcast::Sender>, @@ -20,6 +19,8 @@ pub struct HubActor { component_id: Arc>, system_id: Arc>, heartbeat_task: tokio::task::JoinHandle>, + stats_task: tokio::task::JoinHandle>, + stats: Arc>, } impl HubActor { @@ -70,6 +71,12 @@ impl HubActor { Self::heartbeat_task(bcst_sender, component_id, system_id, frequency) }); + let stats = Arc::new(RwLock::new(HubStatsInner::default())); + let stats_task = tokio::spawn({ + let bcst_sender = bcst_sender.clone(); + let stats = stats.clone(); + + Self::stats_task(bcst_sender, stats) }); Self { @@ -79,6 +86,8 @@ impl HubActor { component_id, system_id, heartbeat_task, + stats_task, + stats, } } @@ -158,6 +167,43 @@ impl HubActor { } } + async fn stats_task( + bcst_sender: broadcast::Sender>, + hub_stats: Arc>, + ) -> Result<()> { + let mut bsct_receiver = bcst_sender.subscribe(); + + while let Ok(message) = bsct_receiver.recv().await { + let bytes = message.raw_bytes().len() as u64; + let system_id = message.system_id(); + let component_id = message.component_id(); + let message_id = message.message_id(); + let timestamp = message.timestamp; + + let mut hub_stats = hub_stats.write().await; + + hub_stats.timestamp = timestamp; + hub_stats.bytes += bytes; + hub_stats.messages += 1; + + // Aggregate statistics per system_id, component_id, and message_id + hub_stats + .message_stats + .entry(system_id) + .or_default() + .entry(component_id) + .or_default() + .entry(message_id) + .and_modify(|entry| { + entry.0 = timestamp; + entry.1 += 1; + }) + .or_insert((timestamp, 1)); + } + + Ok(()) + } + #[instrument(level = "debug", skip(self))] pub fn get_sender(&self) -> broadcast::Sender> { self.bcst_sender.clone() diff --git a/src/hub/mod.rs b/src/hub/mod.rs index e4bfa210..6bec2b4e 100644 --- a/src/hub/mod.rs +++ b/src/hub/mod.rs @@ -7,6 +7,7 @@ use std::{ }; use anyhow::Result; +use indexmap::IndexMap; use tokio::sync::{broadcast, mpsc, oneshot, RwLock}; use crate::{ @@ -18,6 +19,22 @@ use crate::{ use actor::HubActor; use protocol::HubCommand; +#[derive(Default, Debug, Clone)] +pub struct HubStatsInner { + timestamp: u64, + bytes: u64, + messages: u64, + message_stats: HubMessageStats, +} + +type SystemId = u8; +type ComponentId = u8; +type MessageId = u32; +type MessageCounter = u64; +type LastTimestamp = u64; +type HubMessageStats = + IndexMap>>; + #[derive(Clone)] pub struct Hub { sender: mpsc::Sender,