Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update frontend data #66

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
36dede6
src: lib: stats: Rework period internal API
joaoantoniocardoso Nov 4, 2024
e5b4fea
src: lib: web: Add stats stream internal API
joaoantoniocardoso Nov 4, 2024
8365a56
src: lib: web: Add stats frequency external API
joaoantoniocardoso Nov 4, 2024
4ce4be8
src: lib: web: Rename Drivers Stats external API
joaoantoniocardoso Nov 4, 2024
011416f
src: lib: web: Add stats stream external API
joaoantoniocardoso Nov 4, 2024
35084f7
src: lib: stats: underline unused property
joaoantoniocardoso Nov 4, 2024
b6841ee
src: lib: stats: Fix redundant field initialization
joaoantoniocardoso Nov 4, 2024
9cdf85e
src: lib: web: Remove unused imports
joaoantoniocardoso Nov 4, 2024
e3ae23f
src: lib: web: remove redundant async syntax
joaoantoniocardoso Nov 4, 2024
0d7534a
src: lib: web: instrument websocket_handler
joaoantoniocardoso Nov 4, 2024
383090b
src: lib: web: Don't delay on startup
joaoantoniocardoso Nov 4, 2024
a787bbb
cargo: Update cargo lock file
joaoantoniocardoso Nov 4, 2024
a6aad10
src: webpage: cargo: Update ewebsock crate
joaoantoniocardoso Nov 4, 2024
4c6040a
src: webpage: cargo: Update tokio_with_wasm crate
joaoantoniocardoso Nov 4, 2024
37310c0
src: webpage: cargo: Add indexmap, ehttp, ringbuffer, uuid crates
joaoantoniocardoso Nov 4, 2024
8370f2a
src: webpage: cargo: Update cargo lock file
joaoantoniocardoso Nov 4, 2024
49c961b
src: webpage: .cargo: Add cargo config for wasm32 project
joaoantoniocardoso Nov 4, 2024
6750f11
src: webpage: src: Add stats mod, move vehicle messages to its own mo…
joaoantoniocardoso Nov 4, 2024
852c0c0
src: webpage: src: Move from new to default
joaoantoniocardoso Nov 4, 2024
e562d11
src: webpage: src: Refactor connection/reconnection
joaoantoniocardoso Nov 4, 2024
85ef84a
src: webpage: src: Refactor message and hub message stats receiving
joaoantoniocardoso Nov 4, 2024
d69d29f
src: webpage: src: Refactor message and hub message UI
joaoantoniocardoso Nov 4, 2024
ecbd0c4
src: webpage: src: Add Hub Stats
joaoantoniocardoso Nov 4, 2024
9395934
src: webpage: src: Add Drivers Stats
joaoantoniocardoso Nov 4, 2024
39bea78
src: webpage: src: Add frequency control
joaoantoniocardoso Nov 4, 2024
6b1e9aa
src: webpage: src: Refactor the App UI
joaoantoniocardoso Nov 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
866 changes: 533 additions & 333 deletions Cargo.lock

Large diffs are not rendered by default.

111 changes: 107 additions & 4 deletions src/lib/stats/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use anyhow::Result;
use indexmap::IndexMap;
use tokio::sync::{mpsc, Mutex, RwLock};
use tokio::sync::{mpsc, Mutex, Notify, RwLock};
use tracing::*;

use crate::{
Expand All @@ -23,10 +23,13 @@ pub struct StatsActor {
update_period: Arc<RwLock<tokio::time::Duration>>,
last_accumulated_drivers_stats: Arc<Mutex<AccumulatedDriversStats>>,
drivers_stats: Arc<RwLock<DriversStats>>,
drivers_stats_notify: Arc<Notify>,
last_accumulated_hub_stats: Arc<Mutex<AccumulatedStatsInner>>,
hub_stats: Arc<RwLock<StatsInner>>,
hub_stats_notify: Arc<Notify>,
last_accumulated_hub_messages_stats: Arc<Mutex<AccumulatedHubMessagesStats>>,
hub_messages_stats: Arc<RwLock<HubMessagesStats>>,
hub_messages_stats_notify: Arc<Notify>,
}

impl StatsActor {
Expand All @@ -35,6 +38,7 @@ impl StatsActor {
let update_period = self.update_period.clone();
let last_accumulated_drivers_stats = self.last_accumulated_drivers_stats.clone();
let drivers_stats = self.drivers_stats.clone();
let drivers_stats_notify = self.drivers_stats_notify.clone();
let start_time = self.start_time.clone();

async move {
Expand All @@ -46,6 +50,8 @@ impl StatsActor {
)
.await;

drivers_stats_notify.notify_waiters();

tokio::time::sleep(*update_period.read().await).await;
}
}
Expand All @@ -55,12 +61,15 @@ impl StatsActor {
let update_period = self.update_period.clone();
let last_accumulated_hub_stats = self.last_accumulated_hub_stats.clone();
let hub_stats = self.hub_stats.clone();
let hub_stats_notify = self.hub_stats_notify.clone();
let start_time = self.start_time.clone();

async move {
loop {
update_hub_stats(&last_accumulated_hub_stats, &hub_stats, &start_time).await;

hub_stats_notify.notify_waiters();

tokio::time::sleep(*update_period.read().await).await;
}
}
Expand All @@ -71,6 +80,7 @@ impl StatsActor {
let last_accumulated_hub_messages_stats =
self.last_accumulated_hub_messages_stats.clone();
let hub_messages_stats = self.hub_messages_stats.clone();
let hub_messages_stats_notify = self.hub_messages_stats_notify.clone();
let start_time = self.start_time.clone();

async move {
Expand All @@ -82,6 +92,8 @@ impl StatsActor {
)
.await;

hub_messages_stats_notify.notify_waiters();

tokio::time::sleep(*update_period.read().await).await;
}
}
Expand All @@ -93,22 +105,38 @@ impl StatsActor {
let result = self.set_period(period).await;
let _ = response.send(result);
}
StatsCommand::GetPeriod { response } => {
let result = self.period().await;
let _ = response.send(result);
}
StatsCommand::Reset { response } => {
let result = self.reset().await;
let result: std::result::Result<(), anyhow::Error> = self.reset().await;
let _ = response.send(result);
}
StatsCommand::GetDriversStats { response } => {
let result = self.drivers_stats().await;
let _ = response.send(result);
}
StatsCommand::GetDriversStatsStream { response } => {
let result = self.drivers_stats_stream().await;
let _ = response.send(result);
}
StatsCommand::GetHubStats { response } => {
let result = self.hub_stats().await;
let _ = response.send(result);
}
StatsCommand::GetHubStatsStream { response } => {
let result = self.hub_stats_stream().await;
let _ = response.send(result);
}
StatsCommand::GetHubMessagesStats { response } => {
let result = self.hub_messages_stats().await;
let _ = response.send(result);
}
StatsCommand::GetHubMessagesStatsStream { response } => {
let result = self.hub_messages_stats_stream().await;
let _ = response.send(result);
}
}
}

Expand All @@ -122,23 +150,29 @@ impl StatsActor {
let update_period = Arc::new(RwLock::new(update_period));
let last_accumulated_hub_stats = Arc::new(Mutex::new(AccumulatedStatsInner::default()));
let hub_stats = Arc::new(RwLock::new(StatsInner::default()));
let hub_stats_notify = Arc::new(Notify::new());
let last_accumulated_drivers_stats =
Arc::new(Mutex::new(AccumulatedDriversStats::default()));
let drivers_stats = Arc::new(RwLock::new(DriversStats::default()));
let drivers_stats_notify = Arc::new(Notify::new());
let last_accumulated_hub_messages_stats =
Arc::new(Mutex::new(AccumulatedHubMessagesStats::default()));
let hub_messages_stats = Arc::new(RwLock::new(HubMessagesStats::default()));
let hub_messages_stats_notify = Arc::new(Notify::new());
let start_time = Arc::new(RwLock::new(chrono::Utc::now().timestamp_micros() as u64));

Self {
start_time,
update_period,
last_accumulated_hub_stats,
hub_stats,
hub_stats_notify,
last_accumulated_drivers_stats,
drivers_stats,
drivers_stats_notify,
last_accumulated_hub_messages_stats,
hub_messages_stats,
hub_messages_stats_notify,
}
}

Expand All @@ -149,13 +183,55 @@ impl StatsActor {
Ok(hub_stats)
}

#[instrument(level = "debug", skip(self))]
async fn hub_stats_stream(&self) -> Result<mpsc::Receiver<StatsInner>> {
let (sender, receiver) = mpsc::channel(100);

let hub_stats = self.hub_stats.clone();
let hub_stats_notify = self.hub_stats_notify.clone();

tokio::spawn(async move {
loop {
hub_stats_notify.notified().await;

if let Err(error) = sender.send(hub_stats.read().await.clone()).await {
trace!("Finishing Hub Stats stream: {error:?}");
break;
}
}
});

Ok(receiver)
}

#[instrument(level = "debug", skip(self))]
async fn hub_messages_stats(&self) -> Result<HubMessagesStats> {
let hub_messages_stats = self.hub_messages_stats.read().await.clone();

Ok(hub_messages_stats)
}

#[instrument(level = "debug", skip(self))]
async fn hub_messages_stats_stream(&self) -> Result<mpsc::Receiver<HubMessagesStats>> {
let (sender, receiver) = mpsc::channel(100);

let hub_messages_stats = self.hub_messages_stats.clone();
let hub_messages_stats_notify = self.hub_messages_stats_notify.clone();

tokio::spawn(async move {
loop {
hub_messages_stats_notify.notified().await;

if let Err(error) = sender.send(hub_messages_stats.read().await.clone()).await {
trace!("Finishing Hub Messages Stats stream: {error:?}");
break;
}
}
});

Ok(receiver)
}

#[instrument(level = "debug", skip(self))]
async fn drivers_stats(&mut self) -> Result<DriversStats> {
let drivers_stats = self.drivers_stats.read().await.clone();
Expand All @@ -164,10 +240,37 @@ impl StatsActor {
}

#[instrument(level = "debug", skip(self))]
async fn set_period(&mut self, period: tokio::time::Duration) -> Result<()> {
async fn drivers_stats_stream(&self) -> Result<mpsc::Receiver<DriversStats>> {
let (sender, receiver) = mpsc::channel(100);

let drivers_stats = self.drivers_stats.clone();
let drivers_stats_notify = self.drivers_stats_notify.clone();

tokio::spawn(async move {
loop {
drivers_stats_notify.notified().await;

if let Err(error) = sender.send(drivers_stats.read().await.clone()).await {
trace!("Finishing Drivers Stats stream: {error:?}");
break;
}
}
});

Ok(receiver)
}

#[instrument(level = "debug", skip(self))]
async fn set_period(&mut self, period: tokio::time::Duration) -> Result<tokio::time::Duration> {
let period = tokio::time::Duration::from_secs_f32(period.as_secs_f32().clamp(0.1, 10.));
*self.update_period.write().await = period;

Ok(())
Ok(*self.update_period.read().await)
}

#[instrument(level = "debug", skip(self))]
async fn period(&mut self) -> Result<tokio::time::Duration> {
Ok(*self.update_period.read().await)
}

#[instrument(level = "debug", skip(self))]
Expand Down
54 changes: 49 additions & 5 deletions src/lib/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ lazy_static! {
#[derive(Clone)]
struct Stats {
sender: mpsc::Sender<StatsCommand>,
task: Arc<Mutex<tokio::task::JoinHandle<()>>>,
_task: Arc<Mutex<tokio::task::JoinHandle<()>>>,
}

#[derive(Debug, Clone, Default, Serialize)]
Expand Down Expand Up @@ -59,8 +59,8 @@ impl Stats {
fn new(update_period: tokio::time::Duration) -> Self {
let (sender, receiver) = mpsc::channel(32);
let actor = StatsActor::new(update_period);
let task = Arc::new(Mutex::new(tokio::spawn(actor.start(receiver))));
Self { sender, task }
let _task = Arc::new(Mutex::new(tokio::spawn(actor.start(receiver))));
Self { sender, _task }
}
}

Expand All @@ -75,6 +75,17 @@ pub async fn drivers_stats() -> Result<DriversStats> {
response_rx.await?
}

pub async fn drivers_stats_stream() -> Result<mpsc::Receiver<DriversStats>> {
let (response_tx, response_rx) = oneshot::channel();
STATS
.sender
.send(StatsCommand::GetDriversStatsStream {
response: response_tx,
})
.await?;
response_rx.await?
}

pub async fn hub_stats() -> Result<StatsInner> {
let (response_tx, response_rx) = oneshot::channel();
STATS
Expand All @@ -86,6 +97,17 @@ pub async fn hub_stats() -> Result<StatsInner> {
response_rx.await?
}

pub async fn hub_stats_stream() -> Result<mpsc::Receiver<StatsInner>> {
let (response_tx, response_rx) = oneshot::channel();
STATS
.sender
.send(StatsCommand::GetHubStatsStream {
response: response_tx,
})
.await?;
response_rx.await?
}

pub async fn hub_messages_stats() -> Result<HubMessagesStats> {
let (response_tx, response_rx) = oneshot::channel();
STATS
Expand All @@ -97,7 +119,29 @@ pub async fn hub_messages_stats() -> Result<HubMessagesStats> {
response_rx.await?
}

pub async fn set_period(period: tokio::time::Duration) -> Result<()> {
pub async fn hub_messages_stats_stream() -> Result<mpsc::Receiver<HubMessagesStats>> {
let (response_tx, response_rx) = oneshot::channel();
STATS
.sender
.send(StatsCommand::GetHubMessagesStatsStream {
response: response_tx,
})
.await?;
response_rx.await?
}

pub async fn period() -> Result<tokio::time::Duration> {
let (response_tx, response_rx) = oneshot::channel();
STATS
.sender
.send(StatsCommand::GetPeriod {
response: response_tx,
})
.await?;
response_rx.await?
}

pub async fn set_period(period: tokio::time::Duration) -> Result<tokio::time::Duration> {
let (response_tx, response_rx) = oneshot::channel();
STATS
.sender
Expand Down Expand Up @@ -155,7 +199,7 @@ impl StatsInner {
last_message_time_us: current_stats.last_update_us,
bytes: byte_stats,
messages: message_stats,
delay_stats: delay_stats,
delay_stats,
}
}
}
Expand Down
16 changes: 14 additions & 2 deletions src/lib/stats/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use tokio::sync::oneshot;
use tokio::sync::{mpsc, oneshot};

use crate::stats::{DriversStats, StatsInner};

Expand All @@ -8,18 +8,30 @@ use super::messages::HubMessagesStats;
pub enum StatsCommand {
SetPeriod {
period: tokio::time::Duration,
response: oneshot::Sender<Result<()>>,
response: oneshot::Sender<Result<std::time::Duration>>,
},
GetPeriod {
response: oneshot::Sender<Result<std::time::Duration>>,
},
Reset {
response: oneshot::Sender<Result<()>>,
},
GetDriversStats {
response: oneshot::Sender<Result<DriversStats>>,
},
GetDriversStatsStream {
response: oneshot::Sender<Result<mpsc::Receiver<DriversStats>>>,
},
GetHubStats {
response: oneshot::Sender<Result<StatsInner>>,
},
GetHubStatsStream {
response: oneshot::Sender<Result<mpsc::Receiver<StatsInner>>>,
},
GetHubMessagesStats {
response: oneshot::Sender<Result<HubMessagesStats>>,
},
GetHubMessagesStatsStream {
response: oneshot::Sender<Result<mpsc::Receiver<HubMessagesStats>>>,
},
}
Loading