Skip to content

Commit

Permalink
src: stats: Add hub messages stats to the stats actor
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Sep 17, 2024
1 parent ebb5aad commit 518e999
Showing 1 changed file with 95 additions and 2 deletions.
97 changes: 95 additions & 2 deletions src/stats/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ use tracing::*;
use crate::{
hub::Hub,
stats::{
accumulated::{driver::AccumulatedDriverStats, AccumulatedStatsInner},
accumulated::{
driver::AccumulatedDriverStats, messages::AccumulatedHubMessagesStats,
AccumulatedStatsInner,
},
messages::HubMessagesStats,
DriverStats, DriversStats, StatsCommand, StatsInner,
},
};
Expand All @@ -23,6 +27,8 @@ pub struct StatsActor {
drivers_stats: Arc<RwLock<DriversStats>>,
last_accumulated_hub_stats: Arc<RwLock<AccumulatedStatsInner>>,
hub_stats: Arc<RwLock<StatsInner>>,
last_accumulated_hub_messages_stats: Arc<RwLock<AccumulatedHubMessagesStats>>,
hub_messages_stats: Arc<RwLock<HubMessagesStats>>,
}

impl StatsActor {
Expand Down Expand Up @@ -66,6 +72,29 @@ impl StatsActor {
}
});

let hub_messages_stats_task = tokio::spawn({
let hub = self.hub.clone();
let update_period = self.update_period.clone();
let last_accumulated_hub_messages_stats =
self.last_accumulated_hub_messages_stats.clone();
let hub_messages_stats = self.hub_messages_stats.clone();
let start_time = self.start_time.clone();

async move {
loop {
update_hub_messages_stats(
&hub,
&last_accumulated_hub_messages_stats,
&hub_messages_stats,
&start_time,
)
.await;

tokio::time::sleep(*update_period.read().await).await;
}
}
});

while let Some(command) = receiver.recv().await {
match command {
StatsCommand::SetPeriod { period, response } => {
Expand All @@ -87,6 +116,7 @@ impl StatsActor {
}
}

hub_messages_stats_task.abort();
drivers_stats_task.abort();
hub_stats_task.abort();
}
Expand All @@ -98,16 +128,21 @@ impl StatsActor {
let hub_stats = Arc::new(RwLock::new(StatsInner::default()));
let last_accumulated_drivers_stats = Arc::new(RwLock::new(Vec::new()));
let drivers_stats = Arc::new(RwLock::new(Vec::new()));
let last_accumulated_hub_messages_stats =
Arc::new(RwLock::new(AccumulatedHubMessagesStats::default()));
let hub_messages_stats = Arc::new(RwLock::new(HubMessagesStats::default()));
let start_time = Arc::new(RwLock::new(chrono::Utc::now().timestamp_micros() as u64));

Self {
hub,
start_time,
update_period,
last_accumulated_hub_stats,
hub_stats,
last_accumulated_drivers_stats,
drivers_stats,
start_time,
last_accumulated_hub_messages_stats,
hub_messages_stats,
}
}

Expand Down Expand Up @@ -137,21 +172,79 @@ impl StatsActor {
// note: hold the guards until the hub clear each driver stats to minimize weird states
let mut hub_stats = self.hub_stats.write().await;
let mut driver_stats = self.drivers_stats.write().await;
let mut hub_messages_stats = self.hub_messages_stats.write().await;

if let Err(error) = self.hub.reset_all_stats().await {
error!("Failed resetting stats: {error:?}");
}
*self.start_time.write().await = chrono::Utc::now().timestamp_micros() as u64;

self.last_accumulated_drivers_stats.write().await.clear();
driver_stats.clear();

*hub_messages_stats = HubMessagesStats::default();
*self.last_accumulated_hub_messages_stats.write().await =
AccumulatedHubMessagesStats::default();

*hub_stats = StatsInner::default();
*self.last_accumulated_hub_stats.write().await = AccumulatedStatsInner::default();

Ok(())
}
}

#[instrument(level = "debug", skip_all)]
async fn update_hub_messages_stats(
hub: &Hub,
last_accumulated_hub_messages_stats: &RwLock<AccumulatedHubMessagesStats>,
hub_messages_stats: &RwLock<HubMessagesStats>,
start_time: &RwLock<u64>,
) {
let last_stats = last_accumulated_hub_messages_stats.read().await.clone();
let current_stats = hub.hub_messages_stats().await.unwrap();
let start_time = start_time.read().await.clone();

let mut new_hub_messages_stats = HubMessagesStats::default();

for (system_id, current_system_stats) in &current_stats.systems_messages_stats {
for (component_id, current_component_stats) in
&current_system_stats.components_messages_stats
{
for (message_id, current_message_stats) in &current_component_stats.messages_stats {
let default_message_stats = AccumulatedStatsInner::default();

let last_message_stats = last_stats
.systems_messages_stats
.get(system_id)
.and_then(|sys| sys.components_messages_stats.get(component_id))
.and_then(|comp| comp.messages_stats.get(message_id))
.unwrap_or(&default_message_stats);

let new_stats = StatsInner::from_accumulated(
current_message_stats,
last_message_stats,
start_time,
);

new_hub_messages_stats
.systems_messages_stats
.entry(*system_id)
.or_default()
.components_messages_stats
.entry(*component_id)
.or_default()
.messages_stats
.insert(*message_id, new_stats);
}
}
}

trace!("{new_hub_messages_stats:#?}");

*hub_messages_stats.write().await = new_hub_messages_stats;
*last_accumulated_hub_messages_stats.write().await = current_stats;
}

#[instrument(level = "debug", skip_all)]
async fn update_hub_stats(
hub: &Hub,
Expand Down

0 comments on commit 518e999

Please sign in to comment.