Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Oct 23, 2024
1 parent eb7ca2b commit e595edd
Show file tree
Hide file tree
Showing 17 changed files with 1,345 additions and 1,244 deletions.
372 changes: 183 additions & 189 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions cross_build_and_install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env bash

set -e

TARGET=armv7-unknown-linux-gnueabihf
BUILDTYPE=release

cross build --$BUILDTYPE --target=$TARGET

/home/joaoantoniocardoso/BlueRobotics/cross_build_dev/old/upload_to_blueos.sh \
target/$TARGET/$BUILDTYPE/mavlink-server \
/home/pi/mavlink-server

echo ""
echo ""
echo 'clear; sshpass -p raspberry scp -o StrictHostKeyChecking=no pi@localhost:/home/pi/mavlink-server "$(which mavlink-server)" ; /home/pi/services/ardupilot_manager/main.py'
echo ""
echo ""
92 changes: 0 additions & 92 deletions src/build.rs
Original file line number Diff line number Diff line change
@@ -1,62 +1,5 @@
use std::process::{exit, Command};

use vergen_gix::{BuildBuilder, CargoBuilder, DependencyKind, GixBuilder};

macro_rules! info {
($($tokens: tt)*) => {
println!("cargo:warning={}", format!($($tokens)*))
}
}

fn is_wasm_target_installed() -> bool {
let output = Command::new("rustup")
.args(["target", "list", "--installed"])
.output()
.expect("Failed to execute rustup");

let installed_targets = String::from_utf8_lossy(&output.stdout);
installed_targets.contains("wasm32-unknown-unknown")
}

fn install_wasm_target() {
info!("Adding wasm32-unknown-unknown target...");
let output = Command::new("rustup")
.args(["target", "add", "wasm32-unknown-unknown"])
.output()
.expect("Failed to execute rustup");

if !output.status.success() {
eprintln!("T{}", String::from_utf8_lossy(&output.stderr));
exit(1);
}
}

fn get_trunk_version() -> Option<String> {
Command::new("trunk")
.arg("--version")
.output()
.ok()
.and_then(|output| String::from_utf8(output.stdout).ok())
.and_then(|version_string| version_string.split_whitespace().last().map(String::from))
}

fn install_trunk() -> Result<(), Box<dyn std::error::Error>> {
info!("Installing trunk...");

let output = Command::new("cargo")
.arg("install")
.arg("trunk")
.arg("--force")
.output()?;

if !output.status.success() {
eprintln!("TT{}", String::from_utf8_lossy(&output.stderr));
exit(1);
}

Ok(())
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("cargo:rerun-if-changed=./src/webpage/");

Expand All @@ -68,40 +11,5 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
)?
.emit()?;

if std::env::var("SKIP_FRONTEND").is_ok() {
return Ok(());
}

if !is_wasm_target_installed() {
install_wasm_target();
}

if get_trunk_version().is_none() {
info!("trunk not found");
install_trunk().unwrap_or_else(|e| {
eprintln!("Error: {}", e);
exit(1);
});
}

let mut trunk_command = Command::new("trunk");
trunk_command.args(["build", "./src/webpage/index.html"]);

// Add --release argument if not in debug mode
if cfg!(not(debug_assertions)) {
trunk_command.args(["--release", "--locked"]);
}

let trunk_output = trunk_command.output().expect("Failed to execute trunk");

if !trunk_output.status.success() {
eprintln!(
"Trunk build failed: {}",
String::from_utf8_lossy(&trunk_output.stderr)
);
exit(1);
}
info!("{}", String::from_utf8_lossy(&trunk_output.stdout));

Ok(())
}
8 changes: 4 additions & 4 deletions src/lib/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ lazy_static! {
#[derive(Clone)]
struct Stats {
sender: mpsc::Sender<StatsCommand>,
task: Arc<Mutex<tokio::task::JoinHandle<()>>>,
_task: Arc<Mutex<tokio::task::JoinHandle<()>>>,
}

#[derive(Debug, Clone, Default, Serialize)]
Expand Down Expand Up @@ -59,8 +59,8 @@ impl Stats {
fn new(update_period: tokio::time::Duration) -> Self {
let (sender, receiver) = mpsc::channel(32);
let actor = StatsActor::new(update_period);
let task = Arc::new(Mutex::new(tokio::spawn(actor.start(receiver))));
Self { sender, task }
let _task = Arc::new(Mutex::new(tokio::spawn(actor.start(receiver))));
Self { sender, _task }
}
}

Expand Down Expand Up @@ -155,7 +155,7 @@ impl StatsInner {
last_message_time_us: current_stats.last_update_us,
bytes: byte_stats,
messages: message_stats,
delay_stats: delay_stats,
delay_stats,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib/web/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub async fn mavlink(path: Option<Path<String>>) -> impl IntoResponse {
crate::drivers::rest::data::messages(&path)
}

pub async fn driver_stats() -> impl IntoResponse {
pub async fn drivers_stats() -> impl IntoResponse {
Json(stats::drivers_stats().await.unwrap())
}

Expand Down
167 changes: 154 additions & 13 deletions src/lib/web/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,22 @@ use uuid::Uuid;

use lazy_static::lazy_static;

use crate::{hub, stats};
use crate::stats;

fn default_router(state: AppState) -> Router {
Router::new()
.route("/", get(endpoints::root))
.route("/:path", get(endpoints::root))
.route("/info", get(endpoints::info))
.route("/stats/driver", get(endpoints::driver_stats))
.route("/stats/drivers", get(endpoints::drivers_stats))
.route("/stats/hub", get(endpoints::hub_stats))
.route("/stats/messages", get(endpoints::hub_messages_stats))
.route("/stats/ws", get(stats_websocket_handler))
.route("/stats/drivers/ws", get(drivers_stats_websocket_handler))
.route("/stats/hub/ws", get(hub_stats_websocket_handler))
.route(
"/stats/messages/ws",
get(hub_messages_stats_websocket_handler),
)
.route("/rest/ws", get(websocket_handler))
// We are matching all possible keys for the user
.route("/rest/mavlink", get(endpoints::mavlink))
Expand All @@ -49,6 +54,8 @@ fn default_router(state: AppState) -> Router {
async fn websocket_handler(ws: WebSocketUpgrade, State(state): State<AppState>) -> Response {
ws.on_upgrade(|socket| async { websocket_connection(socket, state).await })
}

#[instrument(level = "debug", skip(socket, state))]
async fn websocket_connection(socket: WebSocket, state: AppState) {
let identifier = Uuid::new_v4();
debug!("WS client connected with ID: {identifier}");
Expand Down Expand Up @@ -90,20 +97,21 @@ async fn websocket_connection(socket: WebSocket, state: AppState) {
send_task.await.unwrap();
}

#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
struct FrequencyQuery {
frequency: Option<u8>,
}

async fn stats_websocket_handler(
async fn hub_messages_stats_websocket_handler(
ws: WebSocketUpgrade,
State(state): State<AppState>,
Query(freq_query): Query<FrequencyQuery>,
) -> Response {
ws.on_upgrade(|socket| stats_websocket_connection(socket, state, freq_query))
ws.on_upgrade(|socket| hub_messages_stats_websocket_connection(socket, state, freq_query))
}

async fn stats_websocket_connection(
#[instrument(level = "debug", skip(socket, state))]
async fn hub_messages_stats_websocket_connection(
socket: WebSocket,
state: AppState,
freq_query: FrequencyQuery,
Expand All @@ -117,29 +125,162 @@ async fn stats_websocket_connection(
let interval_duration = tokio::time::Duration::from_secs_f32(1.0 / frequency as f32);
let periodic_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(interval_duration);
let mut first = true;
loop {
interval.tick().await;
let hub_message_stats = match hub::hub_messages_stats().await {
if first {
first = false;
} else {
interval.tick().await;
}

let hub_message_stats = match stats::hub_messages_stats().await {
Ok(hub_message_stats) => hub_message_stats,
Err(error) => {
warn!("Failed getting hub message stats: {error:?}");
warn!("Failed getting Hub Messages Stats: {error:?}");
continue;
}
};

let json = match serde_json::to_string(&hub_message_stats) {
Ok(json) => json,
Err(error) => {
warn!("Failed to create json from Hub Message Stats: {error:?}");
warn!("Failed to create json from Hub Messages Stats: {error:?}");
continue;
}
};
if sender.send(Message::Text(json)).await.is_err() {

if let Err(error) = sender.send(Message::Text(json)).await {
warn!("Failed to send message to WebSocket: {error:?}");
break;
}
}
});
if let Err(error) = periodic_task.await {
error!("Failed finishing task Hub Messages Stats WebSocket task: {error:?}");
}

// Clean up when the connection is closed
state.clients.write().await.remove(&identifier);
debug!("WS client {identifier} removed");
}

async fn hub_stats_websocket_handler(
ws: WebSocketUpgrade,
State(state): State<AppState>,
Query(freq_query): Query<FrequencyQuery>,
) -> Response {
ws.on_upgrade(|socket| hub_stats_websocket_connection(socket, state, freq_query))
}

#[instrument(level = "debug", skip(socket, state))]
async fn hub_stats_websocket_connection(
socket: WebSocket,
state: AppState,
freq_query: FrequencyQuery,
) {
let frequency = freq_query.frequency.unwrap_or(1);
let identifier = Uuid::new_v4();
debug!("WS client connected with ID: {identifier}");

let (mut sender, _receiver) = socket.split();

let interval_duration = tokio::time::Duration::from_secs_f32(1.0 / frequency as f32);
let periodic_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(interval_duration);
let mut first = true;
loop {
if first {
first = false;
} else {
interval.tick().await;
}

let hub_message_stats = match stats::hub_stats().await {
Ok(hub_message_stats) => hub_message_stats,
Err(error) => {
warn!("Failed getting Hub Stats: {error:?}");
continue;
}
};

let json = match serde_json::to_string(&hub_message_stats) {
Ok(json) => json,
Err(error) => {
warn!("Failed to create json from Hub Stats: {error:?}");
continue;
}
};

if let Err(error) = sender.send(Message::Text(json)).await {
warn!("Failed to send message to WebSocket: {error:?}");
break;
}
}
});
if let Err(error) = periodic_task.await {
error!("Failed finishing task Hub Stats WebSocket task: {error:?}");
}

// Clean up when the connection is closed
state.clients.write().await.remove(&identifier);
debug!("WS client {identifier} removed");
}

async fn drivers_stats_websocket_handler(
ws: WebSocketUpgrade,
State(state): State<AppState>,
Query(freq_query): Query<FrequencyQuery>,
) -> Response {
ws.on_upgrade(|socket| drivers_stats_websocket_connection(socket, state, freq_query))
}

#[instrument(level = "debug", skip(socket, state))]
async fn drivers_stats_websocket_connection(
socket: WebSocket,
state: AppState,
freq_query: FrequencyQuery,
) {
let frequency = freq_query.frequency.unwrap_or(1);
let identifier = Uuid::new_v4();
debug!("WS client connected with ID: {identifier}");

let (mut sender, _receiver) = socket.split();

let interval_duration = tokio::time::Duration::from_secs_f32(1.0 / frequency as f32);
let periodic_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(interval_duration);
let mut first = true;
loop {
if first {
first = false;
} else {
interval.tick().await;
}

let drivers_stats = match stats::drivers_stats().await {
Ok(drivers_stats) => drivers_stats,
Err(error) => {
warn!("Failed getting Drivers Stats: {error:?}");
continue;
}
};

let json = match serde_json::to_string(&drivers_stats) {
Ok(json) => json,
Err(error) => {
warn!("Failed to create json from Drivers Stats: {error:?}");
continue;
}
};

if let Err(error) = sender.send(Message::Text(json)).await {
warn!("Failed to send message to WebSocket: {error:?}");
break;
}
}
});
if let Err(error) = periodic_task.await {
error!("Failed finishing task Stats WebSocket task: {error:?}");
error!("Failed finishing task Drivers Stats WebSocket task: {error:?}");
}

// Clean up when the connection is closed
Expand Down
5 changes: 5 additions & 0 deletions src/webpage/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[build]
target = "wasm32-unknown-unknown"

[config]
target_arch = "wasm32"
Loading

0 comments on commit e595edd

Please sign in to comment.