Skip to content

Commit

Permalink
WIP: src: Add hub stats
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Sep 10, 2024
1 parent 4dde7df commit a2537ac
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 2 deletions.
50 changes: 48 additions & 2 deletions src/hub/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@ use tracing::*;

use crate::{
drivers::{Driver, DriverInfo},
hub::{HubCommand, HubStatsInner},
protocol::Protocol,
stats::driver::DriverStatsInfo,
};

use super::protocol::HubCommand;

pub struct HubActor {
drivers: HashMap<u64, Arc<dyn Driver>>,
bcst_sender: broadcast::Sender<Arc<Protocol>>,
last_driver_id: Arc<RwLock<u64>>,
component_id: Arc<RwLock<u8>>,
system_id: Arc<RwLock<u8>>,
heartbeat_task: tokio::task::JoinHandle<Result<()>>,
stats_task: tokio::task::JoinHandle<Result<()>>,
stats: Arc<RwLock<HubStatsInner>>,
}

impl HubActor {
Expand Down Expand Up @@ -70,6 +71,12 @@ impl HubActor {
Self::heartbeat_task(bcst_sender, component_id, system_id, frequency)
});

let stats = Arc::new(RwLock::new(HubStatsInner::default()));
let stats_task = tokio::spawn({
let bcst_sender = bcst_sender.clone();
let stats = stats.clone();

Self::stats_task(bcst_sender, stats)
});

Self {
Expand All @@ -79,6 +86,8 @@ impl HubActor {
component_id,
system_id,
heartbeat_task,
stats_task,
stats,
}
}

Expand Down Expand Up @@ -158,6 +167,43 @@ impl HubActor {
}
}

async fn stats_task(
bcst_sender: broadcast::Sender<Arc<Protocol>>,
hub_stats: Arc<RwLock<HubStatsInner>>,
) -> Result<()> {
let mut bsct_receiver = bcst_sender.subscribe();

while let Ok(message) = bsct_receiver.recv().await {
let bytes = message.raw_bytes().len() as u64;
let system_id = message.system_id();
let component_id = message.component_id();
let message_id = message.message_id();
let timestamp = message.timestamp;

let mut hub_stats = hub_stats.write().await;

hub_stats.timestamp = timestamp;
hub_stats.bytes += bytes;
hub_stats.messages += 1;

// Aggregate statistics per system_id, component_id, and message_id
hub_stats
.message_stats
.entry(system_id)
.or_default()
.entry(component_id)
.or_default()
.entry(message_id)
.and_modify(|entry| {
entry.0 = timestamp;
entry.1 += 1;
})
.or_insert((timestamp, 1));
}

Ok(())
}

#[instrument(level = "debug", skip(self))]
pub fn get_sender(&self) -> broadcast::Sender<Arc<Protocol>> {
self.bcst_sender.clone()
Expand Down
17 changes: 17 additions & 0 deletions src/hub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use anyhow::Result;
use indexmap::IndexMap;
use tokio::sync::{broadcast, mpsc, oneshot, RwLock};

use crate::{
Expand All @@ -18,6 +19,22 @@ use crate::{
use actor::HubActor;
use protocol::HubCommand;

#[derive(Default, Debug, Clone)]
pub struct HubStatsInner {
timestamp: u64,
bytes: u64,
messages: u64,
message_stats: HubMessageStats,
}

type SystemId = u8;
type ComponentId = u8;
type MessageId = u32;
type MessageCounter = u64;
type LastTimestamp = u64;
type HubMessageStats =
IndexMap<SystemId, IndexMap<ComponentId, IndexMap<MessageId, (LastTimestamp, MessageCounter)>>>;

#[derive(Clone)]
pub struct Hub {
sender: mpsc::Sender<HubCommand>,
Expand Down

0 comments on commit a2537ac

Please sign in to comment.