Skip to content

Commit

Permalink
src: stats: Add hub stats to the stats actor
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Sep 16, 2024
1 parent 5907551 commit 9f41887
Showing 1 changed file with 82 additions and 0 deletions.
82 changes: 82 additions & 0 deletions src/stats/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub struct StatsActor {
update_period: Arc<RwLock<tokio::time::Duration>>,
last_accumulated_drivers_stats: Arc<RwLock<Vec<(String, AccumulatedDriverStats)>>>,
drivers_stats: Arc<RwLock<DriversStats>>,
last_accumulated_hub_stats: Arc<RwLock<AccumulatedStatsInner>>,
hub_stats: Arc<RwLock<StatsInner>>,
}

impl StatsActor {
Expand All @@ -47,6 +49,23 @@ impl StatsActor {
}
});

let hub_stats_task = tokio::spawn({
let hub = self.hub.clone();
let update_period = Arc::clone(&self.update_period);
let last_accumulated_hub_stats = Arc::clone(&self.last_accumulated_hub_stats);
let hub_stats = Arc::clone(&self.hub_stats);
let start_time = Arc::clone(&self.start_time);

async move {
loop {
update_hub_stats(&hub, &last_accumulated_hub_stats, &hub_stats, &start_time)
.await;

tokio::time::sleep(*update_period.read().await).await;
}
}
});

while let Some(command) = receiver.recv().await {
match command {
StatsCommand::SetPeriod { period, response } => {
Expand All @@ -65,24 +84,36 @@ impl StatsActor {
}

drivers_stats_task.abort();
hub_stats_task.abort();
}

#[instrument(level = "debug", skip(hub))]
pub async fn new(hub: Hub, update_period: tokio::time::Duration) -> Self {
let update_period = Arc::new(RwLock::new(update_period));
let last_accumulated_hub_stats = Arc::new(RwLock::new(AccumulatedStatsInner::default()));
let hub_stats = Arc::new(RwLock::new(StatsInner::default()));
let last_accumulated_drivers_stats = Arc::new(RwLock::new(Vec::new()));
let drivers_stats = Arc::new(RwLock::new(Vec::new()));
let start_time = Arc::new(RwLock::new(chrono::Utc::now().timestamp_micros() as u64));

Self {
hub,
update_period,
last_accumulated_hub_stats,
hub_stats,
last_accumulated_drivers_stats,
drivers_stats,
start_time,
}
}

#[instrument(level = "debug", skip(self))]
async fn hub_stats(&self) -> Result<StatsInner> {
let hub_stats = self.hub_stats.read().await.clone();

Ok(hub_stats)
}

#[instrument(level = "debug", skip(self))]
async fn drivers_stats(&mut self) -> Result<DriversStats> {
let drivers_stats = self.drivers_stats.read().await.clone();
Expand All @@ -100,6 +131,7 @@ impl StatsActor {
#[instrument(level = "debug", skip(self))]
async fn reset(&mut self) -> Result<()> {
// note: hold the guards until the hub clear each driver stats to minimize weird states
let mut hub_stats = self.hub_stats.write().await;
let mut driver_stats = self.drivers_stats.write().await;

if let Err(error) = self.hub.reset_all_stats().await {
Expand All @@ -109,10 +141,60 @@ impl StatsActor {
self.last_accumulated_drivers_stats.write().await.clear();
driver_stats.clear();

*hub_stats = StatsInner::default();
*self.last_accumulated_hub_stats.write().await = AccumulatedStatsInner::default();

Ok(())
}
}

#[instrument(level = "debug", skip_all)]
async fn update_hub_stats(
hub: &Hub,
last_accumulated_hub_stats: &Arc<RwLock<AccumulatedStatsInner>>,
hub_stats: &Arc<RwLock<StatsInner>>,
start_time: &Arc<RwLock<u64>>,
) {
let last_stats = last_accumulated_hub_stats.read().await.clone();
let current_stats = hub.hub_stats().await.unwrap();

let start_time = start_time.read().await.clone();

let time_diff = calculate_time_diff(last_stats.last_update, current_stats.last_update);
let total_time = calculate_time_diff(start_time, current_stats.last_update);

let diff_messages = current_stats.messages - last_stats.messages;
let total_messages = current_stats.messages;
let messages_per_second = divide_safe(diff_messages as f64, time_diff);
let average_messages_per_second = divide_safe(total_messages as f64, total_time);

let diff_bytes = current_stats.bytes - last_stats.bytes;
let total_bytes = current_stats.bytes;
let bytes_per_second = divide_safe(diff_bytes as f64, time_diff);
let average_bytes_per_second = divide_safe(total_bytes as f64, total_time);

let delay = divide_safe(current_stats.delay as f64, current_stats.messages as f64);
let last_delay = divide_safe(last_stats.delay as f64, last_stats.messages as f64);
let jitter = (delay - last_delay).abs();

let new_hub_stats = StatsInner {
last_message_time: current_stats.last_update,
total_bytes,
bytes_per_second,
average_bytes_per_second,
total_messages,
messages_per_second,
average_messages_per_second,
delay,
jitter,
};

trace!("{new_hub_stats:#?}");

*hub_stats.write().await = new_hub_stats;
*last_accumulated_hub_stats.write().await = current_stats;
}

#[instrument(level = "debug", skip_all)]
async fn update_driver_stats(
hub: &Hub,
Expand Down

0 comments on commit 9f41887

Please sign in to comment.