From b013078002538f7923edcb520af97ce56d64b0d6 Mon Sep 17 00:00:00 2001 From: eatradish Date: Wed, 19 Jun 2024 11:49:48 +0800 Subject: [PATCH] feat(server): add `/ws/:hostname` to stream compile log --- Cargo.lock | 38 +++++++++++++++++- server/Cargo.toml | 2 +- server/src/main.rs | 9 ++++- server/src/routes/mod.rs | 13 +++++- server/src/routes/websocket.rs | 73 ++++++++++++++++++++++++++++++++++ server/src/routes/worker.rs | 2 +- 6 files changed, 130 insertions(+), 7 deletions(-) create mode 100644 server/src/routes/websocket.rs diff --git a/Cargo.lock b/Cargo.lock index f238c51..b3b25dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,6 +276,7 @@ checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" dependencies = [ "async-trait", "axum-core 0.4.3", + "base64 0.21.7", "bytes", "futures-util", "http 1.1.0", @@ -294,8 +295,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper 1.0.0", "tokio", + "tokio-tungstenite 0.21.0", "tower", "tower-layer", "tower-service", @@ -3911,6 +3914,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.21.0", +] + [[package]] name = "tokio-tungstenite" version = "0.23.1" @@ -3920,7 +3935,7 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.23.0", ] [[package]] @@ -4109,6 +4124,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.23.0" @@ -4707,7 +4741,7 @@ dependencies = [ "reqwest 0.11.27", "sysinfo", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.23.1", "vergen", ] diff --git a/server/Cargo.toml b/server/Cargo.toml index 7225692..ba25910 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -24,7 +24,7 @@ buildit-utils = { path = "../buildit-utils" } jsonwebtoken = "9.2.0" size = "0.4.1" dickens = { git = "https://github.com/AOSC-Dev/dickens.git", version = "0.1.0" } -axum = "0.7.4" +axum = { version = "0.7.4", features = ["ws"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } tracing = "0.1.40" tower-http = { version = "0.5.2", features = ["trace", "fs", "cors"] } diff --git a/server/src/main.rs b/server/src/main.rs index d4425fe..0d8294f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -15,13 +15,15 @@ use server::bot::{answer, Command}; use server::recycler::recycler_worker; use server::routes::{ dashboard_status, job_info, job_list, job_restart, ping, pipeline_info, pipeline_list, - pipeline_new_pr, worker_info, worker_job_update, worker_list, worker_poll, AppState, + pipeline_new_pr, worker_info, worker_job_update, worker_list, worker_poll, ws_handler, + AppState, PeerMap, }; use server::routes::{pipeline_new, worker_heartbeat}; use server::routes::{pipeline_status, worker_status}; use server::{DbPool, ARGS}; +use std::collections::HashMap; use std::os::unix::fs::PermissionsExt; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use teloxide::prelude::*; use tokio::net::unix::UCred; use tokio::net::UnixStream; @@ -99,7 +101,9 @@ async fn main() -> anyhow::Result<()> { let state = AppState { pool: pool.clone(), bot, + ws_peer_map: PeerMap::new(RwLock::new(HashMap::new())), }; + let mut app = Router::new() .route("/api/ping", get(ping)) .route("/api/pipeline/new", post(pipeline_new)) @@ -117,6 +121,7 @@ async fn main() -> anyhow::Result<()> { .route("/api/worker/list", get(worker_list)) .route("/api/worker/info", get(worker_info)) .route("/api/dashboard/status", get(dashboard_status)) + .route("/api/ws/:hostname", get(ws_handler)) .nest_service("/assets", ServeDir::new("frontend/dist/assets")) .route_service("/favicon.ico", ServeFile::new("frontend/dist/favicon.ico")) .fallback_service(ServeFile::new("frontend/dist/index.html")) diff --git a/server/src/routes/mod.rs b/server/src/routes/mod.rs index 38ec404..14b6961 100644 --- a/server/src/routes/mod.rs +++ b/server/src/routes/mod.rs @@ -13,28 +13,39 @@ use diesel::dsl::{count, sum}; use diesel::{Connection, ExpressionMethods, QueryDsl, RunQueryDsl}; +use futures::channel::mpsc::UnboundedSender; use serde::Serialize; -use std::collections::BTreeMap; +use std::{ + collections::{BTreeMap, HashMap}, + net::SocketAddr, + sync::{Arc, RwLock}, +}; use teloxide::prelude::*; use tracing::info; pub mod job; pub mod pipeline; +pub mod websocket; pub mod worker; pub use job::*; pub use pipeline::*; +pub use websocket::*; pub use worker::*; pub async fn ping() -> &'static str { "PONG" } +type Tx = (UnboundedSender, String); +pub type PeerMap = Arc>>; + #[derive(Clone)] pub struct AppState { pub pool: DbPool, pub bot: Option, + pub ws_peer_map: PeerMap, } // learned from https://github.com/tokio-rs/axum/blob/main/examples/anyhow-error-response/src/main.rs diff --git a/server/src/routes/websocket.rs b/server/src/routes/websocket.rs new file mode 100644 index 0000000..9611ed7 --- /dev/null +++ b/server/src/routes/websocket.rs @@ -0,0 +1,73 @@ +use std::net::SocketAddr; + +use axum::{ + extract::{ + ws::{Message, WebSocket}, + ConnectInfo, Path, State, WebSocketUpgrade, + }, + response::IntoResponse, +}; +use futures::{ + channel::mpsc::{unbounded, UnboundedSender}, + future, pin_mut, StreamExt, TryStreamExt, +}; +use tracing::info; + +use super::{AppState, PeerMap}; + +/// The handler for the HTTP request (this gets called when the HTTP GET lands at the start +/// of websocket negotiation). After this completes, the actual switching from HTTP to +/// websocket protocol will occur. +/// This is the last point where we can extract TCP/IP metadata such as IP address of the client +/// as well as things from HTTP headers such as user-agent of the browser etc. +pub async fn ws_handler( + Path(hostname): Path, + ws: WebSocketUpgrade, + ConnectInfo(addr): ConnectInfo, + State(state): State, +) -> impl IntoResponse { + // finalize the upgrade process by returning upgrade callback. + // we can customize the callback by sending additional info such as address. + ws.on_upgrade(move |socket| handle_socket(socket, addr, hostname, state.ws_peer_map)) +} + +/// Actual websocket statemachine (one will be spawned per connection) +async fn handle_socket(socket: WebSocket, who: SocketAddr, hostname: String, peer_map: PeerMap) { + let (tx, rx): (UnboundedSender, _) = unbounded(); + peer_map + .write() + .unwrap() + .insert(who, (tx, hostname.clone())); + + let (outgoing, incoming) = socket.split(); + + let broadcast_incoming = incoming.try_for_each(|msg| { + info!("Received a message from {}: {:?}", who, msg); + + let peers = peer_map.read().unwrap(); + + // We want to broadcast the message to everyone except ourselves. + let broadcast_recipients = peers + .iter() + .filter(|(peer_addr, _)| peer_addr != &&who) + .map(|(_, (ws_sink, port))| (ws_sink, port)); + + for recp in broadcast_recipients { + let recp_path = recp.1; + + if *recp_path == hostname { + recp.0.unbounded_send(msg.clone()).unwrap(); + } + } + + future::ok(()) + }); + + let receive_from_others = rx.map(Ok).forward(outgoing); + + pin_mut!(broadcast_incoming, receive_from_others); + future::select(broadcast_incoming, receive_from_others).await; + + info!("{} disconnected", &who); + peer_map.write().unwrap().remove(&who); +} diff --git a/server/src/routes/worker.rs b/server/src/routes/worker.rs index 5f96ca5..933e230 100644 --- a/server/src/routes/worker.rs +++ b/server/src/routes/worker.rs @@ -309,7 +309,7 @@ pub async fn worker_poll( } pub async fn worker_job_update( - State(AppState { pool, bot }): State, + State(AppState { pool, bot, .. }): State, Json(payload): Json, ) -> Result<(), AnyhowError> { if payload.worker_secret != ARGS.worker_secret {