Skip to content

Commit

Permalink
web: Move mavlink endpoints to its own file
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick José Pereira <[email protected]>
  • Loading branch information
patrickelectric authored and joaoantoniocardoso committed Nov 13, 2024
1 parent c9fe632 commit 9ea2477
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 83 deletions.
32 changes: 1 addition & 31 deletions src/lib/web/endpoints.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
use std::net::SocketAddr;

use axum::{
extract::{connect_info::ConnectInfo, Path, State},
extract::Path,
http::{header, StatusCode},
response::IntoResponse,
Json,
};
use include_dir::{include_dir, Dir};
use mime_guess::from_path;
use serde::{Deserialize, Serialize};
use tracing::*;

use crate::stats;
use crate::web::AppState;

static HTML_DIST: Dir = include_dir!("src/webpage/dist");

Expand Down Expand Up @@ -88,32 +84,6 @@ pub async fn info_full() -> impl IntoResponse {
serde_json::to_string(&content).unwrap()
}

pub async fn mavlink(path: Option<Path<String>>) -> impl IntoResponse {
let path = match path {
Some(path) => path.0.to_string(),
None => String::default(),
};
crate::drivers::rest::data::messages(&path)
}

pub async fn post_mavlink(
ConnectInfo(address): ConnectInfo<SocketAddr>,
State(state): State<AppState>,
message: String,
) {
debug!("Got message from: {address:?}, {message}");
if let Err(error) = state.message_tx.send(message) {
error!("Failed to send message to main loop: {error:?}");
}
}

pub async fn message_id_from_name(name: Path<String>) -> impl IntoResponse {
use mavlink::{self, Message};
mavlink::ardupilotmega::MavMessage::message_id_from_name(&name.0.to_ascii_uppercase())
.map(|id| (StatusCode::OK, Json(id)).into_response())
.unwrap_or_else(|_| (StatusCode::NOT_FOUND, "404 Not Found").into_response())
}

pub async fn drivers_stats() -> impl IntoResponse {
Json(stats::drivers_stats().await.unwrap())
}
Expand Down
91 changes: 91 additions & 0 deletions src/lib/web/mavlink_endpoints.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use std::net::SocketAddr;

use axum::{
extract::{
connect_info::ConnectInfo,
ws::{Message, WebSocket, WebSocketUpgrade},
Path, State,
},
http::StatusCode,
response::{IntoResponse, Response},
Json,
};

use futures::{sink::SinkExt, stream::StreamExt};
use tokio::sync::mpsc;
use tracing::*;
use uuid::Uuid;

use crate::web::{broadcast_message_websockets, AppState};

pub async fn mavlink(path: Option<Path<String>>) -> impl IntoResponse {
let path = match path {
Some(path) => path.0.to_string(),
None => String::default(),
};
crate::drivers::rest::data::messages(&path)
}

pub async fn post_mavlink(
ConnectInfo(address): ConnectInfo<SocketAddr>,
State(state): State<AppState>,
message: String,
) {
debug!("Got message from: {address:?}, {message}");
if let Err(error) = state.message_tx.send(message) {
error!("Failed to send message to main loop: {error:?}");
}
}

pub async fn message_id_from_name(name: Path<String>) -> impl IntoResponse {
use mavlink::{self, Message};
mavlink::ardupilotmega::MavMessage::message_id_from_name(&name.0.to_ascii_uppercase())
.map(|id| (StatusCode::OK, Json(id)).into_response())
.unwrap_or_else(|_| (StatusCode::NOT_FOUND, "404 Not Found").into_response())
}

pub async fn websocket_handler(ws: WebSocketUpgrade, State(state): State<AppState>) -> Response {
ws.on_upgrade(|socket| websocket_connection(socket, state))
}

#[instrument(level = "debug", skip_all)]
async fn websocket_connection(socket: WebSocket, state: AppState) {
let identifier = Uuid::new_v4();
debug!("WS client connected with ID: {identifier}");

let (mut sender, mut receiver) = socket.split();
let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
state.clients.write().await.insert(identifier, tx);

// Spawn a task to forward messages from the channel to the websocket
let send_task = tokio::spawn(async move {
while let Some(message) = rx.recv().await {
if sender.send(message).await.is_err() {
break;
}
}
});

// Handle incoming messages
while let Some(Ok(message)) = receiver.next().await {
match message {
Message::Text(text) => {
trace!("WS client received from {identifier}: {text}");
if let Err(error) = state.message_tx.send(text.clone()) {
error!("Failed to send message to main loop: {error:?}");
}
broadcast_message_websockets(&state, identifier, Message::Text(text)).await;
}
Message::Close(frame) => {
debug!("WS client {identifier} disconnected: {frame:?}");
break;
}
_ => {}
}
}

// We should be disconnected now, let's remove it
state.clients.write().await.remove(&identifier);
debug!("WS client {identifier} removed");
send_task.await.unwrap();
}
59 changes: 7 additions & 52 deletions src/lib/web/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod endpoints;
mod mavlink_endpoints;

use std::{
collections::HashMap,
Expand Down Expand Up @@ -48,66 +49,20 @@ fn default_router(state: AppState) -> Router {
"/stats/messages/ws",
get(hub_messages_stats_websocket_handler),
)
.route("/rest/ws", get(websocket_handler))
.route("/rest/ws", get(mavlink_endpoints::websocket_handler))
// We are matching all possible keys for the user
.route("/rest/mavlink", get(endpoints::mavlink))
.route("/rest/mavlink", post(endpoints::post_mavlink))
.route("/rest/mavlink/", get(endpoints::mavlink))
.route("/rest/mavlink/*path", get(endpoints::mavlink))
.route("/rest/mavlink", get(mavlink_endpoints::mavlink))
.route("/rest/mavlink", post(mavlink_endpoints::post_mavlink))
.route("/rest/mavlink/", get(mavlink_endpoints::mavlink))
.route("/rest/mavlink/*path", get(mavlink_endpoints::mavlink))
.route(
"/rest/mavlink/message_id_from_name/*name",
get(endpoints::message_id_from_name),
get(mavlink_endpoints::message_id_from_name),
)
.fallback(get(|| async { (StatusCode::NOT_FOUND, "Not found :(") }))
.with_state(state)
}

async fn websocket_handler(ws: WebSocketUpgrade, State(state): State<AppState>) -> Response {
ws.on_upgrade(|socket| websocket_connection(socket, state))
}

#[instrument(level = "debug", skip_all)]
async fn websocket_connection(socket: WebSocket, state: AppState) {
let identifier = Uuid::new_v4();
debug!("WS client connected with ID: {identifier}");

let (mut sender, mut receiver) = socket.split();
let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
state.clients.write().await.insert(identifier, tx);

// Spawn a task to forward messages from the channel to the websocket
let send_task = tokio::spawn(async move {
while let Some(message) = rx.recv().await {
if sender.send(message).await.is_err() {
break;
}
}
});

// Handle incoming messages
while let Some(Ok(message)) = receiver.next().await {
match message {
Message::Text(text) => {
trace!("WS client received from {identifier}: {text}");
if let Err(error) = state.message_tx.send(text.clone()) {
error!("Failed to send message to main loop: {error:?}");
}
broadcast_message_websockets(&state, identifier, Message::Text(text)).await;
}
Message::Close(frame) => {
debug!("WS client {identifier} disconnected: {frame:?}");
break;
}
_ => {}
}
}

// We should be disconnected now, let's remove it
state.clients.write().await.remove(&identifier);
debug!("WS client {identifier} removed");
send_task.await.unwrap();
}

async fn log_websocket_handler(ws: WebSocketUpgrade) -> Response {
ws.on_upgrade(log_websocket_connection)
}
Expand Down

0 comments on commit 9ea2477

Please sign in to comment.