diff --git a/Cargo.lock b/Cargo.lock index c9e0528..b3b25dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,6 +276,7 @@ checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" dependencies = [ "async-trait", "axum-core 0.4.3", + "base64 0.21.7", "bytes", "futures-util", "http 1.1.0", @@ -294,8 +295,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper 1.0.0", "tokio", + "tokio-tungstenite 0.21.0", "tower", "tower-layer", "tower-service", @@ -736,6 +739,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "data-encoding" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" + [[package]] name = "deranged" version = "0.3.11" @@ -1024,6 +1033,18 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -2310,6 +2331,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -3387,6 +3417,17 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha1_smol" version = "1.0.0" @@ -3517,6 +3558,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "strsim" @@ -3870,6 +3914,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.21.0", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.23.0", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -4056,6 +4124,43 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + +[[package]] +name = "tungstenite" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e2ce1e47ed2994fd43b04c8f618008d4cabdd5ee34027cf14f9d918edd9c8" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -4134,6 +4239,12 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.1" @@ -4621,13 +4732,16 @@ dependencies = [ "common", "dotenv", "env_logger", + "flume", "fs2", + "futures-util", "gethostname", "log", "num_cpus", "reqwest 0.11.27", "sysinfo", "tokio", + "tokio-tungstenite 0.23.1", "vergen", ] diff --git a/server/Cargo.toml b/server/Cargo.toml index 7225692..ba25910 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -24,7 +24,7 @@ buildit-utils = { path = "../buildit-utils" } jsonwebtoken = "9.2.0" size = "0.4.1" dickens = { git = "https://github.com/AOSC-Dev/dickens.git", version = "0.1.0" } -axum = "0.7.4" +axum = { version = "0.7.4", features = ["ws"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } tracing = "0.1.40" tower-http = { version = "0.5.2", features = ["trace", "fs", "cors"] } diff --git a/server/src/main.rs b/server/src/main.rs index d4425fe..0d8294f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -15,13 +15,15 @@ use server::bot::{answer, Command}; use server::recycler::recycler_worker; use server::routes::{ dashboard_status, job_info, job_list, job_restart, ping, pipeline_info, pipeline_list, - pipeline_new_pr, worker_info, worker_job_update, worker_list, worker_poll, AppState, + pipeline_new_pr, worker_info, worker_job_update, worker_list, worker_poll, ws_handler, + AppState, PeerMap, }; use server::routes::{pipeline_new, worker_heartbeat}; use server::routes::{pipeline_status, worker_status}; use server::{DbPool, ARGS}; +use std::collections::HashMap; use std::os::unix::fs::PermissionsExt; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use teloxide::prelude::*; use tokio::net::unix::UCred; use tokio::net::UnixStream; @@ -99,7 +101,9 @@ async fn main() -> anyhow::Result<()> { let state = AppState { pool: pool.clone(), bot, + ws_peer_map: PeerMap::new(RwLock::new(HashMap::new())), }; + let mut app = Router::new() .route("/api/ping", get(ping)) .route("/api/pipeline/new", post(pipeline_new)) @@ -117,6 +121,7 @@ async fn main() -> anyhow::Result<()> { .route("/api/worker/list", get(worker_list)) .route("/api/worker/info", get(worker_info)) .route("/api/dashboard/status", get(dashboard_status)) + .route("/api/ws/:hostname", get(ws_handler)) .nest_service("/assets", ServeDir::new("frontend/dist/assets")) .route_service("/favicon.ico", ServeFile::new("frontend/dist/favicon.ico")) .fallback_service(ServeFile::new("frontend/dist/index.html")) diff --git a/server/src/routes/mod.rs b/server/src/routes/mod.rs index 38ec404..14b6961 100644 --- a/server/src/routes/mod.rs +++ b/server/src/routes/mod.rs @@ -13,28 +13,39 @@ use diesel::dsl::{count, sum}; use diesel::{Connection, ExpressionMethods, QueryDsl, RunQueryDsl}; +use futures::channel::mpsc::UnboundedSender; use serde::Serialize; -use std::collections::BTreeMap; +use std::{ + collections::{BTreeMap, HashMap}, + net::SocketAddr, + sync::{Arc, RwLock}, +}; use teloxide::prelude::*; use tracing::info; pub mod job; pub mod pipeline; +pub mod websocket; pub mod worker; pub use job::*; pub use pipeline::*; +pub use websocket::*; pub use worker::*; pub async fn ping() -> &'static str { "PONG" } +type Tx = (UnboundedSender, String); +pub type PeerMap = Arc>>; + #[derive(Clone)] pub struct AppState { pub pool: DbPool, pub bot: Option, + pub ws_peer_map: PeerMap, } // learned from https://github.com/tokio-rs/axum/blob/main/examples/anyhow-error-response/src/main.rs diff --git a/server/src/routes/websocket.rs b/server/src/routes/websocket.rs new file mode 100644 index 0000000..3be5945 --- /dev/null +++ b/server/src/routes/websocket.rs @@ -0,0 +1,67 @@ +use std::net::SocketAddr; + +use axum::{ + extract::{ws::WebSocket, ConnectInfo, Path, State, WebSocketUpgrade}, + response::IntoResponse, +}; +use futures::{channel::mpsc::unbounded, future, pin_mut, StreamExt, TryStreamExt}; +use tracing::info; + +use super::{AppState, PeerMap}; + +/// The handler for the HTTP request (this gets called when the HTTP GET lands at the start +/// of websocket negotiation). After this completes, the actual switching from HTTP to +/// websocket protocol will occur. +/// This is the last point where we can extract TCP/IP metadata such as IP address of the client +/// as well as things from HTTP headers such as user-agent of the browser etc. +pub async fn ws_handler( + Path(hostname): Path, + ws: WebSocketUpgrade, + ConnectInfo(addr): ConnectInfo, + State(state): State, +) -> impl IntoResponse { + // finalize the upgrade process by returning upgrade callback. + // we can customize the callback by sending additional info such as address. + ws.on_upgrade(move |socket| handle_socket(socket, addr, hostname, state.ws_peer_map)) +} + +/// Actual websocket statemachine (one will be spawned per connection) +async fn handle_socket(socket: WebSocket, who: SocketAddr, hostname: String, peer_map: PeerMap) { + let (tx, rx) = unbounded(); + peer_map + .write() + .unwrap() + .insert(who, (tx, hostname.clone())); + + let (outgoing, incoming) = socket.split(); + + let broadcast_incoming = incoming.try_for_each(|msg| { + info!("Received a message from {}: {:?}", who, msg); + + let peers = peer_map.read().unwrap(); + + // We want to broadcast the message to everyone except ourselves. + let broadcast_recipients = peers + .iter() + .filter(|(peer_addr, _)| peer_addr != &&who) + .map(|(_, (ws_sink, port))| (ws_sink, port)); + + for recp in broadcast_recipients { + let recp_path = recp.1; + + if *recp_path == hostname { + recp.0.unbounded_send(msg.clone()).unwrap(); + } + } + + future::ok(()) + }); + + let receive_from_others = rx.map(Ok).forward(outgoing); + + pin_mut!(broadcast_incoming, receive_from_others); + future::select(broadcast_incoming, receive_from_others).await; + + info!("{} disconnected", &who); + peer_map.write().unwrap().remove(&who); +} diff --git a/server/src/routes/worker.rs b/server/src/routes/worker.rs index 5f96ca5..933e230 100644 --- a/server/src/routes/worker.rs +++ b/server/src/routes/worker.rs @@ -309,7 +309,7 @@ pub async fn worker_poll( } pub async fn worker_job_update( - State(AppState { pool, bot }): State, + State(AppState { pool, bot, .. }): State, Json(payload): Json, ) -> Result<(), AnyhowError> { if payload.worker_secret != ARGS.worker_secret { diff --git a/worker/Cargo.toml b/worker/Cargo.toml index 79604a2..f92e331 100644 --- a/worker/Cargo.toml +++ b/worker/Cargo.toml @@ -20,6 +20,9 @@ num_cpus = "1.16.0" reqwest = { version = "0.11.24", features = ["json"] } sysinfo = "0.30.5" tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread", "process", "sync", "fs"] } +tokio-tungstenite = "0.23.1" +futures-util = "0.3.30" +flume = "0.11.0" [build-dependencies] vergen = { version = "8.3.1", features = ["build", "cargo", "git", "gitcl", "rustc", "si"] } diff --git a/worker/src/build.rs b/worker/src/build.rs index a6196ad..4386d69 100644 --- a/worker/src/build.rs +++ b/worker/src/build.rs @@ -1,19 +1,30 @@ use crate::{get_memory_bytes, Args}; +use anyhow::Context; use chrono::Local; use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse}; +use flume::{unbounded, Receiver, Sender}; +use futures_util::StreamExt; use log::{error, info, warn}; +use reqwest::Url; use std::{ path::Path, - process::Output, + process::{Output, Stdio}, time::{Duration, Instant}, }; -use tokio::{fs, process::Command, time::sleep}; +use tokio::{ + fs, + io::{AsyncBufReadExt, BufReader}, + process::Command, + time::sleep, +}; +use tokio_tungstenite::{connect_async, tungstenite::Message}; async fn get_output_logged( cmd: &str, args: &[&str], cwd: &Path, logs: &mut Vec, + tx: Sender, ) -> anyhow::Result { let begin = Instant::now(); let msg = format!( @@ -26,13 +37,40 @@ async fn get_output_logged( logs.extend(msg.as_bytes()); info!("{}", msg.trim()); - let output = Command::new(cmd) + let mut output = Command::new(cmd) .args(args) .current_dir(cwd) - .output() - .await?; + .stdout(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn()?; let elapsed = begin.elapsed(); + + let stdout = output.stdout.as_mut().context("Failed to get stdout")?; + let mut stdout_reader = BufReader::new(stdout).lines(); + let stderr = output.stderr.take().context("Failed to get stderr")?; + + let txc = tx.clone(); + + let stderr_task = tokio::spawn(async move { + let mut res = vec![]; + let mut stderr_reader = BufReader::new(stderr).lines(); + while let Ok(Some(v)) = stderr_reader.next_line().await { + let _ = txc.clone().into_send_async(Message::Text(v.clone())).await; + res.push(v); + } + + res + }); + + let mut stdout_out = vec![]; + while let Ok(Some(v)) = stdout_reader.next_line().await { + tx.clone().into_send_async(Message::Text(v.clone())).await?; + stdout_out.push(v); + } + + let output = output.wait_with_output().await?; + logs.extend( format!( "{}: `{} {}` finished in {:?} with {}\n", @@ -40,14 +78,14 @@ async fn get_output_logged( cmd, args.join(" "), elapsed, - output.status + output.status.success() ) .as_bytes(), ); logs.extend("STDOUT:\n".as_bytes()); - logs.extend(output.stdout.clone()); + logs.extend(stdout_out.join("\n").as_bytes()); logs.extend("STDERR:\n".as_bytes()); - logs.extend(output.stderr.clone()); + logs.extend(stderr_task.await?.join("\n").as_bytes()); Ok(output) } @@ -58,12 +96,13 @@ async fn run_logged_with_retry( args: &[&str], cwd: &Path, logs: &mut Vec, + tx: Sender, ) -> anyhow::Result { for i in 0..5 { if i > 0 { info!("Attempt #{i} to run `{cmd} {}`", args.join(" ")); } - match get_output_logged(cmd, args, cwd, logs).await { + match get_output_logged(cmd, args, cwd, logs, tx.clone()).await { Ok(output) => { if output.status.success() { return Ok(true); @@ -90,6 +129,7 @@ async fn build( job: &WorkerPollResponse, tree_path: &Path, args: &Args, + tx: Sender, ) -> anyhow::Result { let begin = Instant::now(); let mut successful_packages = vec![]; @@ -103,7 +143,7 @@ async fn build( // clear output directory if output_path.exists() { - get_output_logged("rm", &["-rf", "debs"], &output_path, &mut logs).await?; + get_output_logged("rm", &["-rf", "debs"], &output_path, &mut logs, tx.clone()).await?; } // switch to git ref @@ -116,6 +156,7 @@ async fn build( ], tree_path, &mut logs, + tx.clone(), ) .await?; @@ -129,10 +170,18 @@ async fn build( &["checkout", "-b", &job.git_branch], tree_path, &mut logs, + tx.clone(), ) .await?; // checkout to branch - get_output_logged("git", &["checkout", &job.git_branch], tree_path, &mut logs).await?; + get_output_logged( + "git", + &["checkout", &job.git_branch], + tree_path, + &mut logs, + tx.clone(), + ) + .await?; // switch to the commit by sha // to avoid race condition, resolve branch name to sha in server @@ -141,17 +190,27 @@ async fn build( &["reset", &job.git_sha, "--hard"], tree_path, &mut logs, + tx.clone(), ) .await?; if output.status.success() { // update container - get_output_logged("ciel", &["update-os"], &args.ciel_path, &mut logs).await?; + get_output_logged( + "ciel", + &["update-os"], + &args.ciel_path, + &mut logs, + tx.clone(), + ) + .await?; // build packages let mut ciel_args = vec!["build", "-i", &args.ciel_instance]; ciel_args.extend(job.packages.split(',')); - let output = get_output_logged("ciel", &ciel_args, &args.ciel_path, &mut logs).await?; + let output = + get_output_logged("ciel", &ciel_args, &args.ciel_path, &mut logs, tx.clone()) + .await?; build_success = output.status.success(); @@ -216,8 +275,14 @@ async fn build( // allow force push if noarch and non stable args.insert(0, "--force-push-noarch-package"); } - pushpkg_success = - run_logged_with_retry("pushpkg", &args, &output_path, &mut logs).await?; + pushpkg_success = run_logged_with_retry( + "pushpkg", + &args, + &output_path, + &mut logs, + tx.clone(), + ) + .await?; } } } @@ -248,6 +313,7 @@ async fn build( ], &tree_path, &mut scp_log, + tx, ) .await? { @@ -297,8 +363,10 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { .timeout(Duration::from_secs(30)) .build() .unwrap(); + + let hostname = gethostname::gethostname().to_string_lossy().to_string(); let req = WorkerPollRequest { - hostname: gethostname::gethostname().to_string_lossy().to_string(), + hostname: hostname.clone(), arch: args.arch.clone(), worker_secret: args.worker_secret.clone(), memory_bytes: get_memory_bytes(), @@ -306,6 +374,14 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { logical_cores: num_cpus::get() as i32, }; + let ws = Url::parse(&args.websocket)?.join(&hostname)?; + + let (tx, rx) = unbounded(); + + tokio::spawn(async move { + websocket_connect(rx, ws).await; + }); + loop { if let Some(job) = client .post(format!("{}/api/worker/poll", args.server)) @@ -317,7 +393,7 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { { info!("Processing job {:?}", job); - match build(&job, &tree_path, args).await { + match build(&job, &tree_path, args, tx.clone()).await { Ok(result) => { // post result info!("Finished to run job {:?} with result {:?}", job, result); @@ -356,3 +432,23 @@ pub async fn build_worker(args: Args) -> ! { tokio::time::sleep(Duration::from_secs(5)).await; } } + +pub async fn websocket_connect(rx: Receiver, ws: Url) -> ! { + loop { + info!("Starting websocket connect"); + match connect_async(ws.as_str()).await { + Ok((ws_stream, _)) => { + let (write, _) = ws_stream.split(); + let rx = rx.clone().into_stream(); + if let Err(e) = rx.map(Ok).forward(write).await { + warn!("{e}"); + } + } + Err(err) => { + warn!("Got error connecting to websocket: {}", err); + } + } + + tokio::time::sleep(Duration::from_secs(5)).await; + } +} diff --git a/worker/src/lib.rs b/worker/src/lib.rs index 6f03771..00dc5f4 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -49,6 +49,10 @@ pub struct Args { /// Performance number of the worker (smaller is better) #[arg(short = 'p', long, env = "BUILDIT_WORKER_PERFORMANCE")] pub worker_performance: Option, + + /// Websocket uri + #[arg(short = 'w', long, env = "BUILDIT_WS")] + pub websocket: String, } pub fn get_memory_bytes() -> i64 { diff --git a/worker/src/main.rs b/worker/src/main.rs index 03dc7b9..b6d87f9 100644 --- a/worker/src/main.rs +++ b/worker/src/main.rs @@ -15,7 +15,6 @@ async fn main() -> anyhow::Result<()> { s.refresh_memory(); tokio::spawn(heartbeat_worker(args.clone())); - build_worker(args.clone()).await; Ok(()) }