Skip to content

Commit

Permalink
src: hub: Add driver stats to the hub
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Sep 8, 2024
1 parent c63da3a commit 67a2027
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 2 deletions.
35 changes: 35 additions & 0 deletions src/hub/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tracing::*;
use crate::{
drivers::{Driver, DriverInfo},
protocol::Protocol,
stats::driver::DriverStatsInfo,
};

use super::protocol::HubCommand;
Expand Down Expand Up @@ -40,6 +41,13 @@ impl HubActor {
HubCommand::GetSender { response } => {
let _ = response.send(self.bcst_sender.clone());
}
HubCommand::GetStats { response } => {
let stats = self.get_stats().await;
let _ = response.send(stats);
}
HubCommand::ResetAllStats { response } => {
let _ = response.send(self.reset_all_stats().await);
}
}
}
}
Expand Down Expand Up @@ -159,4 +167,31 @@ impl HubActor {
pub fn get_sender(&self) -> broadcast::Sender<Arc<Protocol>> {
self.bcst_sender.clone()
}

#[instrument(level = "debug", skip(self))]
pub async fn get_stats(&self) -> Vec<(String, DriverStatsInfo)> {
let drivers = self.drivers.read().await;

let mut drivers_stats = Vec::with_capacity(drivers.len());
for (_id, driver) in drivers.iter() {
let stats = driver.stats().await;
let info = driver.info();
let name = info.name().to_owned();

drivers_stats.push((name, stats));
}

drivers_stats
}

#[instrument(level = "debug", skip(self))]
pub async fn reset_all_stats(&self) -> Result<()> {
let drivers = self.drivers.write().await;

for (_id, driver) in drivers.iter() {
driver.reset_stats().await;
}

Ok(())
}
}
28 changes: 26 additions & 2 deletions src/hub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ use std::{
use anyhow::Result;
use tokio::sync::{broadcast, mpsc, oneshot, RwLock};

use crate::drivers::{Driver, DriverInfo};
use crate::protocol::Protocol;
use crate::{
drivers::{Driver, DriverInfo},
protocol::Protocol,
stats::driver::DriverStatsInfo,
};

use actor::HubActor;
use protocol::HubCommand;
Expand Down Expand Up @@ -77,4 +80,25 @@ impl Hub {
let res = response_rx.await?;
Ok(res)
}

pub async fn stats(&self) -> Result<Vec<(String, DriverStatsInfo)>> {
let (response_tx, response_rx) = oneshot::channel();
self.sender
.send(HubCommand::GetStats {
response: response_tx,
})
.await?;
let res = response_rx.await?;
Ok(res)
}

pub async fn reset_all_stats(&self) -> Result<()> {
let (response_tx, response_rx) = oneshot::channel();
self.sender
.send(HubCommand::ResetAllStats {
response: response_tx,
})
.await?;
response_rx.await?
}
}
7 changes: 7 additions & 0 deletions src/hub/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::sync::{broadcast, oneshot};
use crate::{
drivers::{Driver, DriverInfo},
protocol::Protocol,
stats::driver::DriverStatsInfo,
};

pub enum HubCommand {
Expand All @@ -23,4 +24,10 @@ pub enum HubCommand {
GetSender {
response: oneshot::Sender<broadcast::Sender<Arc<Protocol>>>,
},
GetStats {
response: oneshot::Sender<Vec<(String, DriverStatsInfo)>>,
},
ResetAllStats {
response: oneshot::Sender<Result<()>>,
},
}

0 comments on commit 67a2027

Please sign in to comment.