Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Oct 29, 2024
1 parent c1d718a commit 02cb4ac
Show file tree
Hide file tree
Showing 16 changed files with 1,428 additions and 1,186 deletions.
372 changes: 183 additions & 189 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions cross_build_and_install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env bash

set -e

TARGET=armv7-unknown-linux-gnueabihf
BUILDTYPE=release

cross build --$BUILDTYPE --target=$TARGET

/home/joaoantoniocardoso/BlueRobotics/cross_build_dev/old/upload_to_blueos.sh \
target/$TARGET/$BUILDTYPE/mavlink-server \
/home/pi/mavlink-server

echo ""
echo ""
echo 'clear; sshpass -p raspberry scp -o StrictHostKeyChecking=no pi@localhost:/home/pi/mavlink-server "$(which mavlink-server)" ; /home/pi/services/ardupilot_manager/main.py'
echo ""
echo ""
8 changes: 4 additions & 4 deletions src/lib/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ lazy_static! {
#[derive(Clone)]
struct Stats {
sender: mpsc::Sender<StatsCommand>,
task: Arc<Mutex<tokio::task::JoinHandle<()>>>,
_task: Arc<Mutex<tokio::task::JoinHandle<()>>>,
}

#[derive(Debug, Clone, Default, Serialize)]
Expand Down Expand Up @@ -59,8 +59,8 @@ impl Stats {
fn new(update_period: tokio::time::Duration) -> Self {
let (sender, receiver) = mpsc::channel(32);
let actor = StatsActor::new(update_period);
let task = Arc::new(Mutex::new(tokio::spawn(actor.start(receiver))));
Self { sender, task }
let _task = Arc::new(Mutex::new(tokio::spawn(actor.start(receiver))));
Self { sender, _task }
}
}

Expand Down Expand Up @@ -155,7 +155,7 @@ impl StatsInner {
last_message_time_us: current_stats.last_update_us,
bytes: byte_stats,
messages: message_stats,
delay_stats: delay_stats,
delay_stats,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib/web/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub async fn message_id_from_name(name: Path<String>) -> impl IntoResponse {
.unwrap_or_else(|_| (StatusCode::NOT_FOUND, "404 Not Found").into_response())
}

pub async fn driver_stats() -> impl IntoResponse {
pub async fn drivers_stats() -> impl IntoResponse {
Json(stats::drivers_stats().await.unwrap())
}

Expand Down
167 changes: 154 additions & 13 deletions src/lib/web/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,23 @@ use uuid::Uuid;

use lazy_static::lazy_static;

use crate::{hub, stats};
use crate::stats;

fn default_router(state: AppState) -> Router {
Router::new()
.route("/", get(endpoints::root))
.route("/:path", get(endpoints::root))
.route("/info", get(endpoints::info))
.route("/info_full", get(endpoints::info_full))
.route("/stats/driver", get(endpoints::driver_stats))
.route("/stats/drivers", get(endpoints::drivers_stats))
.route("/stats/hub", get(endpoints::hub_stats))
.route("/stats/messages", get(endpoints::hub_messages_stats))
.route("/stats/ws", get(stats_websocket_handler))
.route("/stats/drivers/ws", get(drivers_stats_websocket_handler))
.route("/stats/hub/ws", get(hub_stats_websocket_handler))
.route(
"/stats/messages/ws",
get(hub_messages_stats_websocket_handler),
)
.route("/rest/ws", get(websocket_handler))
// We are matching all possible keys for the user
.route("/rest/mavlink", get(endpoints::mavlink))
Expand All @@ -54,6 +59,8 @@ fn default_router(state: AppState) -> Router {
async fn websocket_handler(ws: WebSocketUpgrade, State(state): State<AppState>) -> Response {
ws.on_upgrade(|socket| async { websocket_connection(socket, state).await })
}

#[instrument(level = "debug", skip(socket, state))]
async fn websocket_connection(socket: WebSocket, state: AppState) {
let identifier = Uuid::new_v4();
debug!("WS client connected with ID: {identifier}");
Expand Down Expand Up @@ -95,20 +102,21 @@ async fn websocket_connection(socket: WebSocket, state: AppState) {
send_task.await.unwrap();
}

#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
struct FrequencyQuery {
frequency: Option<u8>,
}

async fn stats_websocket_handler(
async fn hub_messages_stats_websocket_handler(
ws: WebSocketUpgrade,
State(state): State<AppState>,
Query(freq_query): Query<FrequencyQuery>,
) -> Response {
ws.on_upgrade(|socket| stats_websocket_connection(socket, state, freq_query))
ws.on_upgrade(|socket| hub_messages_stats_websocket_connection(socket, state, freq_query))
}

async fn stats_websocket_connection(
#[instrument(level = "debug", skip(socket, state))]
async fn hub_messages_stats_websocket_connection(
socket: WebSocket,
state: AppState,
freq_query: FrequencyQuery,
Expand All @@ -122,29 +130,162 @@ async fn stats_websocket_connection(
let interval_duration = tokio::time::Duration::from_secs_f32(1.0 / frequency as f32);
let periodic_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(interval_duration);
let mut first = true;
loop {
interval.tick().await;
let hub_message_stats = match hub::hub_messages_stats().await {
if first {
first = false;
} else {
interval.tick().await;
}

let hub_message_stats = match stats::hub_messages_stats().await {
Ok(hub_message_stats) => hub_message_stats,
Err(error) => {
warn!("Failed getting hub message stats: {error:?}");
warn!("Failed getting Hub Messages Stats: {error:?}");
continue;
}
};

let json = match serde_json::to_string(&hub_message_stats) {
Ok(json) => json,
Err(error) => {
warn!("Failed to create json from Hub Message Stats: {error:?}");
warn!("Failed to create json from Hub Messages Stats: {error:?}");
continue;
}
};

if let Err(error) = sender.send(Message::Text(json)).await {
warn!("Failed to send message to WebSocket: {error:?}");
break;
}
}
});
if let Err(error) = periodic_task.await {
error!("Failed finishing task Hub Messages Stats WebSocket task: {error:?}");
}

// Clean up when the connection is closed
state.clients.write().await.remove(&identifier);
debug!("WS client {identifier} removed");
}

async fn hub_stats_websocket_handler(
ws: WebSocketUpgrade,
State(state): State<AppState>,
Query(freq_query): Query<FrequencyQuery>,
) -> Response {
ws.on_upgrade(|socket| hub_stats_websocket_connection(socket, state, freq_query))
}

#[instrument(level = "debug", skip(socket, state))]
async fn hub_stats_websocket_connection(
socket: WebSocket,
state: AppState,
freq_query: FrequencyQuery,
) {
let frequency = freq_query.frequency.unwrap_or(1);
let identifier = Uuid::new_v4();
debug!("WS client connected with ID: {identifier}");

let (mut sender, _receiver) = socket.split();

let interval_duration = tokio::time::Duration::from_secs_f32(1.0 / frequency as f32);
let periodic_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(interval_duration);
let mut first = true;
loop {
if first {
first = false;
} else {
interval.tick().await;
}

let hub_message_stats = match stats::hub_stats().await {
Ok(hub_message_stats) => hub_message_stats,
Err(error) => {
warn!("Failed getting Hub Stats: {error:?}");
continue;
}
};
if sender.send(Message::Text(json)).await.is_err() {

let json = match serde_json::to_string(&hub_message_stats) {
Ok(json) => json,
Err(error) => {
warn!("Failed to create json from Hub Stats: {error:?}");
continue;
}
};

if let Err(error) = sender.send(Message::Text(json)).await {
warn!("Failed to send message to WebSocket: {error:?}");
break;
}
}
});
if let Err(error) = periodic_task.await {
error!("Failed finishing task Hub Stats WebSocket task: {error:?}");
}

// Clean up when the connection is closed
state.clients.write().await.remove(&identifier);
debug!("WS client {identifier} removed");
}

async fn drivers_stats_websocket_handler(
ws: WebSocketUpgrade,
State(state): State<AppState>,
Query(freq_query): Query<FrequencyQuery>,
) -> Response {
ws.on_upgrade(|socket| drivers_stats_websocket_connection(socket, state, freq_query))
}

#[instrument(level = "debug", skip(socket, state))]
async fn drivers_stats_websocket_connection(
socket: WebSocket,
state: AppState,
freq_query: FrequencyQuery,
) {
let frequency = freq_query.frequency.unwrap_or(1);
let identifier = Uuid::new_v4();
debug!("WS client connected with ID: {identifier}");

let (mut sender, _receiver) = socket.split();

let interval_duration = tokio::time::Duration::from_secs_f32(1.0 / frequency as f32);
let periodic_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(interval_duration);
let mut first = true;
loop {
if first {
first = false;
} else {
interval.tick().await;
}

let drivers_stats = match stats::drivers_stats().await {
Ok(drivers_stats) => drivers_stats,
Err(error) => {
warn!("Failed getting Drivers Stats: {error:?}");
continue;
}
};

let json = match serde_json::to_string(&drivers_stats) {
Ok(json) => json,
Err(error) => {
warn!("Failed to create json from Drivers Stats: {error:?}");
continue;
}
};

if let Err(error) = sender.send(Message::Text(json)).await {
warn!("Failed to send message to WebSocket: {error:?}");
break;
}
}
});
if let Err(error) = periodic_task.await {
error!("Failed finishing task Stats WebSocket task: {error:?}");
error!("Failed finishing task Drivers Stats WebSocket task: {error:?}");
}

// Clean up when the connection is closed
Expand Down
5 changes: 5 additions & 0 deletions src/webpage/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[build]
target = "wasm32-unknown-unknown"

[config]
target_arch = "wasm32"
Loading

0 comments on commit 02cb4ac

Please sign in to comment.