From 63468808fdfc0bffdb18d86916fe71818d9a29df Mon Sep 17 00:00:00 2001 From: eatradish Date: Tue, 18 Jun 2024 17:41:25 +0800 Subject: [PATCH 1/7] feat(worker): stream compile log to buildit-monitor --- Cargo.lock | 55 ++++++++++++++++++++ worker/Cargo.toml | 2 + worker/src/build.rs | 119 ++++++++++++++++++++++++++++++++++++++------ worker/src/lib.rs | 4 ++ 4 files changed, 164 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c9e0528..3589e12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -736,6 +736,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "data-encoding" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" + [[package]] name = "deranged" version = "0.3.11" @@ -3387,6 +3393,17 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha1_smol" version = "1.0.0" @@ -3870,6 +3887,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -4056,6 +4085,24 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e2ce1e47ed2994fd43b04c8f618008d4cabdd5ee34027cf14f9d918edd9c8" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -4134,6 +4181,12 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.1" @@ -4622,12 +4675,14 @@ dependencies = [ "dotenv", "env_logger", "fs2", + "futures-util", "gethostname", "log", "num_cpus", "reqwest 0.11.27", "sysinfo", "tokio", + "tokio-tungstenite", "vergen", ] diff --git a/worker/Cargo.toml b/worker/Cargo.toml index 79604a2..fd10662 100644 --- a/worker/Cargo.toml +++ b/worker/Cargo.toml @@ -20,6 +20,8 @@ num_cpus = "1.16.0" reqwest = { version = "0.11.24", features = ["json"] } sysinfo = "0.30.5" tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread", "process", "sync", "fs"] } +tokio-tungstenite = "0.23.1" +futures-util = "0.3.30" [build-dependencies] vergen = { version = "8.3.1", features = ["build", "cargo", "git", "gitcl", "rustc", "si"] } diff --git a/worker/src/build.rs b/worker/src/build.rs index a6196ad..d6cc710 100644 --- a/worker/src/build.rs +++ b/worker/src/build.rs @@ -1,19 +1,33 @@ use crate::{get_memory_bytes, Args}; +use anyhow::Context; use chrono::Local; use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse}; +use futures_util::{stream::SplitSink, SinkExt, StreamExt}; use log::{error, info, warn}; use std::{ path::Path, - process::Output, + process::{Output, Stdio}, + sync::Arc, time::{Duration, Instant}, }; -use tokio::{fs, process::Command, time::sleep}; +use tokio::{ + fs, + io::{AsyncBufReadExt, BufReader}, + net::TcpStream, + process::Command, + sync::Mutex, + time::sleep, +}; +use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; + +type WsWriter = SplitSink>, Message>; async fn get_output_logged( cmd: &str, args: &[&str], cwd: &Path, logs: &mut Vec, + write: Arc>, ) -> anyhow::Result { let begin = Instant::now(); let msg = format!( @@ -26,13 +40,42 @@ async fn get_output_logged( logs.extend(msg.as_bytes()); info!("{}", msg.trim()); - let output = Command::new(cmd) + let mut output = Command::new(cmd) .args(args) .current_dir(cwd) - .output() - .await?; + .stdout(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn()?; let elapsed = begin.elapsed(); + + let stdout = output.stdout.as_mut().context("Failed to get stdout")?; + let mut stdout_reader = BufReader::new(stdout).lines(); + let stderr = output.stderr.take().context("Failed to get stderr")?; + + let wc = write.clone(); + + let stderr_task = tokio::spawn(async move { + let mut res = vec![]; + let mut stderr_reader = BufReader::new(stderr).lines(); + while let Ok(Some(v)) = stderr_reader.next_line().await { + let mut lock = wc.lock().await; + let _ = lock.send(Message::Text(v.clone())).await; + res.push(v); + } + + res + }); + + let mut stdout_out = vec![]; + while let Ok(Some(v)) = stdout_reader.next_line().await { + let mut lock = write.lock().await; + lock.send(Message::Text(v.clone())).await?; + stdout_out.push(v); + } + + let output = output.wait_with_output().await?; + logs.extend( format!( "{}: `{} {}` finished in {:?} with {}\n", @@ -40,14 +83,14 @@ async fn get_output_logged( cmd, args.join(" "), elapsed, - output.status + output.status.success() ) .as_bytes(), ); logs.extend("STDOUT:\n".as_bytes()); - logs.extend(output.stdout.clone()); + logs.extend(stdout_out.join("\n").as_bytes()); logs.extend("STDERR:\n".as_bytes()); - logs.extend(output.stderr.clone()); + logs.extend(stderr_task.await?.join("\n").as_bytes()); Ok(output) } @@ -58,12 +101,13 @@ async fn run_logged_with_retry( args: &[&str], cwd: &Path, logs: &mut Vec, + write: Arc>, ) -> anyhow::Result { for i in 0..5 { if i > 0 { info!("Attempt #{i} to run `{cmd} {}`", args.join(" ")); } - match get_output_logged(cmd, args, cwd, logs).await { + match get_output_logged(cmd, args, cwd, logs, write.clone()).await { Ok(output) => { if output.status.success() { return Ok(true); @@ -90,6 +134,7 @@ async fn build( job: &WorkerPollResponse, tree_path: &Path, args: &Args, + write: Arc>, ) -> anyhow::Result { let begin = Instant::now(); let mut successful_packages = vec![]; @@ -103,7 +148,14 @@ async fn build( // clear output directory if output_path.exists() { - get_output_logged("rm", &["-rf", "debs"], &output_path, &mut logs).await?; + get_output_logged( + "rm", + &["-rf", "debs"], + &output_path, + &mut logs, + write.clone(), + ) + .await?; } // switch to git ref @@ -116,6 +168,7 @@ async fn build( ], tree_path, &mut logs, + write.clone(), ) .await?; @@ -129,10 +182,18 @@ async fn build( &["checkout", "-b", &job.git_branch], tree_path, &mut logs, + write.clone(), ) .await?; // checkout to branch - get_output_logged("git", &["checkout", &job.git_branch], tree_path, &mut logs).await?; + get_output_logged( + "git", + &["checkout", &job.git_branch], + tree_path, + &mut logs, + write.clone(), + ) + .await?; // switch to the commit by sha // to avoid race condition, resolve branch name to sha in server @@ -141,17 +202,32 @@ async fn build( &["reset", &job.git_sha, "--hard"], tree_path, &mut logs, + write.clone(), ) .await?; if output.status.success() { // update container - get_output_logged("ciel", &["update-os"], &args.ciel_path, &mut logs).await?; + get_output_logged( + "ciel", + &["update-os"], + &args.ciel_path, + &mut logs, + write.clone(), + ) + .await?; // build packages let mut ciel_args = vec!["build", "-i", &args.ciel_instance]; ciel_args.extend(job.packages.split(',')); - let output = get_output_logged("ciel", &ciel_args, &args.ciel_path, &mut logs).await?; + let output = get_output_logged( + "ciel", + &ciel_args, + &args.ciel_path, + &mut logs, + write.clone(), + ) + .await?; build_success = output.status.success(); @@ -216,8 +292,14 @@ async fn build( // allow force push if noarch and non stable args.insert(0, "--force-push-noarch-package"); } - pushpkg_success = - run_logged_with_retry("pushpkg", &args, &output_path, &mut logs).await?; + pushpkg_success = run_logged_with_retry( + "pushpkg", + &args, + &output_path, + &mut logs, + write.clone(), + ) + .await?; } } } @@ -248,6 +330,7 @@ async fn build( ], &tree_path, &mut scp_log, + write, ) .await? { @@ -288,6 +371,10 @@ async fn build( } async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { + let ws = &args.websocket; + let (ws_stream, _) = connect_async(ws).await?; + let (write, _) = ws_stream.split(); + let write = Arc::new(Mutex::new(write)); let mut tree_path = args.ciel_path.clone(); tree_path.push("TREE"); @@ -317,7 +404,7 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { { info!("Processing job {:?}", job); - match build(&job, &tree_path, args).await { + match build(&job, &tree_path, args, write.clone()).await { Ok(result) => { // post result info!("Finished to run job {:?} with result {:?}", job, result); diff --git a/worker/src/lib.rs b/worker/src/lib.rs index 6f03771..00dc5f4 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -49,6 +49,10 @@ pub struct Args { /// Performance number of the worker (smaller is better) #[arg(short = 'p', long, env = "BUILDIT_WORKER_PERFORMANCE")] pub worker_performance: Option, + + /// Websocket uri + #[arg(short = 'w', long, env = "BUILDIT_WS")] + pub websocket: String, } pub fn get_memory_bytes() -> i64 { From c2db307ac5c5df92995d00705d6501d9ecc02021 Mon Sep 17 00:00:00 2001 From: eatradish Date: Tue, 18 Jun 2024 18:05:34 +0800 Subject: [PATCH 2/7] refactor: use `UnboundChannel` to replaced `Mutex` --- worker/src/build.rs | 73 ++++++++++++++++++++------------------------- 1 file changed, 32 insertions(+), 41 deletions(-) diff --git a/worker/src/build.rs b/worker/src/build.rs index d6cc710..e5eee62 100644 --- a/worker/src/build.rs +++ b/worker/src/build.rs @@ -2,32 +2,29 @@ use crate::{get_memory_bytes, Args}; use anyhow::Context; use chrono::Local; use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse}; -use futures_util::{stream::SplitSink, SinkExt, StreamExt}; +use futures_util::{SinkExt, StreamExt}; use log::{error, info, warn}; use std::{ path::Path, process::{Output, Stdio}, - sync::Arc, time::{Duration, Instant}, }; use tokio::{ fs, io::{AsyncBufReadExt, BufReader}, - net::TcpStream, process::Command, - sync::Mutex, + sync::mpsc::{unbounded_channel, UnboundedSender}, time::sleep, }; -use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; +use tokio_tungstenite::{connect_async, tungstenite::Message}; -type WsWriter = SplitSink>, Message>; async fn get_output_logged( cmd: &str, args: &[&str], cwd: &Path, logs: &mut Vec, - write: Arc>, + tx: UnboundedSender, ) -> anyhow::Result { let begin = Instant::now(); let msg = format!( @@ -53,14 +50,13 @@ async fn get_output_logged( let mut stdout_reader = BufReader::new(stdout).lines(); let stderr = output.stderr.take().context("Failed to get stderr")?; - let wc = write.clone(); + let txc = tx.clone(); let stderr_task = tokio::spawn(async move { let mut res = vec![]; let mut stderr_reader = BufReader::new(stderr).lines(); while let Ok(Some(v)) = stderr_reader.next_line().await { - let mut lock = wc.lock().await; - let _ = lock.send(Message::Text(v.clone())).await; + let _ = txc.send(v.clone()); res.push(v); } @@ -69,8 +65,7 @@ async fn get_output_logged( let mut stdout_out = vec![]; while let Ok(Some(v)) = stdout_reader.next_line().await { - let mut lock = write.lock().await; - lock.send(Message::Text(v.clone())).await?; + tx.send(v.clone())?; stdout_out.push(v); } @@ -101,13 +96,13 @@ async fn run_logged_with_retry( args: &[&str], cwd: &Path, logs: &mut Vec, - write: Arc>, + tx: UnboundedSender, ) -> anyhow::Result { for i in 0..5 { if i > 0 { info!("Attempt #{i} to run `{cmd} {}`", args.join(" ")); } - match get_output_logged(cmd, args, cwd, logs, write.clone()).await { + match get_output_logged(cmd, args, cwd, logs, tx.clone()).await { Ok(output) => { if output.status.success() { return Ok(true); @@ -134,7 +129,7 @@ async fn build( job: &WorkerPollResponse, tree_path: &Path, args: &Args, - write: Arc>, + tx: UnboundedSender, ) -> anyhow::Result { let begin = Instant::now(); let mut successful_packages = vec![]; @@ -148,14 +143,7 @@ async fn build( // clear output directory if output_path.exists() { - get_output_logged( - "rm", - &["-rf", "debs"], - &output_path, - &mut logs, - write.clone(), - ) - .await?; + get_output_logged("rm", &["-rf", "debs"], &output_path, &mut logs, tx.clone()).await?; } // switch to git ref @@ -168,7 +156,7 @@ async fn build( ], tree_path, &mut logs, - write.clone(), + tx.clone(), ) .await?; @@ -182,7 +170,7 @@ async fn build( &["checkout", "-b", &job.git_branch], tree_path, &mut logs, - write.clone(), + tx.clone(), ) .await?; // checkout to branch @@ -191,7 +179,7 @@ async fn build( &["checkout", &job.git_branch], tree_path, &mut logs, - write.clone(), + tx.clone(), ) .await?; @@ -202,7 +190,7 @@ async fn build( &["reset", &job.git_sha, "--hard"], tree_path, &mut logs, - write.clone(), + tx.clone(), ) .await?; @@ -213,21 +201,16 @@ async fn build( &["update-os"], &args.ciel_path, &mut logs, - write.clone(), + tx.clone(), ) .await?; // build packages let mut ciel_args = vec!["build", "-i", &args.ciel_instance]; ciel_args.extend(job.packages.split(',')); - let output = get_output_logged( - "ciel", - &ciel_args, - &args.ciel_path, - &mut logs, - write.clone(), - ) - .await?; + let output = + get_output_logged("ciel", &ciel_args, &args.ciel_path, &mut logs, tx.clone()) + .await?; build_success = output.status.success(); @@ -297,7 +280,7 @@ async fn build( &args, &output_path, &mut logs, - write.clone(), + tx.clone(), ) .await?; } @@ -330,7 +313,7 @@ async fn build( ], &tree_path, &mut scp_log, - write, + tx, ) .await? { @@ -373,8 +356,10 @@ async fn build( async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { let ws = &args.websocket; let (ws_stream, _) = connect_async(ws).await?; - let (write, _) = ws_stream.split(); - let write = Arc::new(Mutex::new(write)); + let (mut write, _) = ws_stream.split(); + + let (tx, mut rx): (UnboundedSender, _) = unbounded_channel(); + let mut tree_path = args.ciel_path.clone(); tree_path.push("TREE"); @@ -393,6 +378,12 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { logical_cores: num_cpus::get() as i32, }; + tokio::spawn(async move { + if let Some(v) = rx.recv().await { + let _ = write.send(Message::Text(v)).await; + } + }); + loop { if let Some(job) = client .post(format!("{}/api/worker/poll", args.server)) @@ -404,7 +395,7 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { { info!("Processing job {:?}", job); - match build(&job, &tree_path, args, write.clone()).await { + match build(&job, &tree_path, args, tx.clone()).await { Ok(result) => { // post result info!("Finished to run job {:?} with result {:?}", job, result); From 83c08899c8eeedf78d05aca773fc025e43c62476 Mon Sep 17 00:00:00 2001 From: eatradish Date: Tue, 18 Jun 2024 18:15:14 +0800 Subject: [PATCH 3/7] refactor: use `forward` to send channcel to websocket write --- Cargo.lock | 1 + worker/Cargo.toml | 1 + worker/src/build.rs | 25 ++++++++++--------------- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3589e12..28fd4fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4675,6 +4675,7 @@ dependencies = [ "dotenv", "env_logger", "fs2", + "futures-channel", "futures-util", "gethostname", "log", diff --git a/worker/Cargo.toml b/worker/Cargo.toml index fd10662..e1fa733 100644 --- a/worker/Cargo.toml +++ b/worker/Cargo.toml @@ -22,6 +22,7 @@ sysinfo = "0.30.5" tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread", "process", "sync", "fs"] } tokio-tungstenite = "0.23.1" futures-util = "0.3.30" +futures-channel = "0.3.30" [build-dependencies] vergen = { version = "8.3.1", features = ["build", "cargo", "git", "gitcl", "rustc", "si"] } diff --git a/worker/src/build.rs b/worker/src/build.rs index e5eee62..8b32465 100644 --- a/worker/src/build.rs +++ b/worker/src/build.rs @@ -2,6 +2,7 @@ use crate::{get_memory_bytes, Args}; use anyhow::Context; use chrono::Local; use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse}; +use futures_channel::mpsc::{unbounded, UnboundedSender}; use futures_util::{SinkExt, StreamExt}; use log::{error, info, warn}; use std::{ @@ -13,18 +14,16 @@ use tokio::{ fs, io::{AsyncBufReadExt, BufReader}, process::Command, - sync::mpsc::{unbounded_channel, UnboundedSender}, time::sleep, }; use tokio_tungstenite::{connect_async, tungstenite::Message}; - async fn get_output_logged( cmd: &str, args: &[&str], cwd: &Path, logs: &mut Vec, - tx: UnboundedSender, + mut tx: UnboundedSender, ) -> anyhow::Result { let begin = Instant::now(); let msg = format!( @@ -50,13 +49,13 @@ async fn get_output_logged( let mut stdout_reader = BufReader::new(stdout).lines(); let stderr = output.stderr.take().context("Failed to get stderr")?; - let txc = tx.clone(); + let mut txc = tx.clone(); let stderr_task = tokio::spawn(async move { let mut res = vec![]; let mut stderr_reader = BufReader::new(stderr).lines(); while let Ok(Some(v)) = stderr_reader.next_line().await { - let _ = txc.send(v.clone()); + let _ = txc.send(Message::Text(v.clone())).await; res.push(v); } @@ -65,7 +64,7 @@ async fn get_output_logged( let mut stdout_out = vec![]; while let Ok(Some(v)) = stdout_reader.next_line().await { - tx.send(v.clone())?; + tx.send(Message::Text(v.clone())).await?; stdout_out.push(v); } @@ -96,7 +95,7 @@ async fn run_logged_with_retry( args: &[&str], cwd: &Path, logs: &mut Vec, - tx: UnboundedSender, + tx: UnboundedSender, ) -> anyhow::Result { for i in 0..5 { if i > 0 { @@ -129,7 +128,7 @@ async fn build( job: &WorkerPollResponse, tree_path: &Path, args: &Args, - tx: UnboundedSender, + tx: UnboundedSender, ) -> anyhow::Result { let begin = Instant::now(); let mut successful_packages = vec![]; @@ -356,9 +355,9 @@ async fn build( async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { let ws = &args.websocket; let (ws_stream, _) = connect_async(ws).await?; - let (mut write, _) = ws_stream.split(); + let (write, _) = ws_stream.split(); - let (tx, mut rx): (UnboundedSender, _) = unbounded_channel(); + let (tx, rx) = unbounded(); let mut tree_path = args.ciel_path.clone(); tree_path.push("TREE"); @@ -378,11 +377,7 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { logical_cores: num_cpus::get() as i32, }; - tokio::spawn(async move { - if let Some(v) = rx.recv().await { - let _ = write.send(Message::Text(v)).await; - } - }); + tokio::spawn(async move { rx.map(Ok).forward(write) }); loop { if let Some(job) = client From cad1fcaaf0f935176c3c16fadb02e1a24e954018 Mon Sep 17 00:00:00 2001 From: eatradish Date: Wed, 19 Jun 2024 02:21:16 +0800 Subject: [PATCH 4/7] refactor: retry websocket connect --- Cargo.lock | 26 ++++++++++++++++++++++- worker/Cargo.toml | 2 +- worker/src/build.rs | 50 +++++++++++++++++++++++++++++++-------------- worker/src/main.rs | 1 - 4 files changed, 61 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 28fd4fa..f238c51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1030,6 +1030,18 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -2316,6 +2328,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -3534,6 +3555,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "strsim" @@ -4674,8 +4698,8 @@ dependencies = [ "common", "dotenv", "env_logger", + "flume", "fs2", - "futures-channel", "futures-util", "gethostname", "log", diff --git a/worker/Cargo.toml b/worker/Cargo.toml index e1fa733..f92e331 100644 --- a/worker/Cargo.toml +++ b/worker/Cargo.toml @@ -22,7 +22,7 @@ sysinfo = "0.30.5" tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread", "process", "sync", "fs"] } tokio-tungstenite = "0.23.1" futures-util = "0.3.30" -futures-channel = "0.3.30" +flume = "0.11.0" [build-dependencies] vergen = { version = "8.3.1", features = ["build", "cargo", "git", "gitcl", "rustc", "si"] } diff --git a/worker/src/build.rs b/worker/src/build.rs index 8b32465..49ce7bd 100644 --- a/worker/src/build.rs +++ b/worker/src/build.rs @@ -2,8 +2,8 @@ use crate::{get_memory_bytes, Args}; use anyhow::Context; use chrono::Local; use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse}; -use futures_channel::mpsc::{unbounded, UnboundedSender}; -use futures_util::{SinkExt, StreamExt}; +use flume::{unbounded, Receiver, Sender}; +use futures_util::StreamExt; use log::{error, info, warn}; use std::{ path::Path, @@ -23,7 +23,7 @@ async fn get_output_logged( args: &[&str], cwd: &Path, logs: &mut Vec, - mut tx: UnboundedSender, + tx: Sender, ) -> anyhow::Result { let begin = Instant::now(); let msg = format!( @@ -49,13 +49,13 @@ async fn get_output_logged( let mut stdout_reader = BufReader::new(stdout).lines(); let stderr = output.stderr.take().context("Failed to get stderr")?; - let mut txc = tx.clone(); + let txc = tx.clone(); let stderr_task = tokio::spawn(async move { let mut res = vec![]; let mut stderr_reader = BufReader::new(stderr).lines(); while let Ok(Some(v)) = stderr_reader.next_line().await { - let _ = txc.send(Message::Text(v.clone())).await; + let _ = txc.clone().into_send_async(Message::Text(v.clone())).await; res.push(v); } @@ -64,7 +64,7 @@ async fn get_output_logged( let mut stdout_out = vec![]; while let Ok(Some(v)) = stdout_reader.next_line().await { - tx.send(Message::Text(v.clone())).await?; + tx.clone().into_send_async(Message::Text(v.clone())).await?; stdout_out.push(v); } @@ -95,7 +95,7 @@ async fn run_logged_with_retry( args: &[&str], cwd: &Path, logs: &mut Vec, - tx: UnboundedSender, + tx: Sender, ) -> anyhow::Result { for i in 0..5 { if i > 0 { @@ -128,7 +128,7 @@ async fn build( job: &WorkerPollResponse, tree_path: &Path, args: &Args, - tx: UnboundedSender, + tx: Sender, ) -> anyhow::Result { let begin = Instant::now(); let mut successful_packages = vec![]; @@ -353,12 +353,6 @@ async fn build( } async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { - let ws = &args.websocket; - let (ws_stream, _) = connect_async(ws).await?; - let (write, _) = ws_stream.split(); - - let (tx, rx) = unbounded(); - let mut tree_path = args.ciel_path.clone(); tree_path.push("TREE"); @@ -377,7 +371,13 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { logical_cores: num_cpus::get() as i32, }; - tokio::spawn(async move { rx.map(Ok).forward(write) }); + let ws = args.websocket.clone(); + + let (tx, rx) = unbounded(); + + tokio::spawn(async move { + websocket_connect(rx, ws).await; + }); loop { if let Some(job) = client @@ -429,3 +429,23 @@ pub async fn build_worker(args: Args) -> ! { tokio::time::sleep(Duration::from_secs(5)).await; } } + +pub async fn websocket_connect(rx: Receiver, ws: String) -> ! { + loop { + info!("Starting websocket connect"); + match connect_async(&ws).await { + Ok((ws_stream, _)) => { + let (write, _) = ws_stream.split(); + let rx = rx.clone().into_stream(); + if let Err(e) = rx.map(Ok).forward(write).await { + warn!("{e}"); + } + } + Err(err) => { + warn!("Got error connecting to websocket: {}", err); + } + } + + tokio::time::sleep(Duration::from_secs(5)).await; + } +} diff --git a/worker/src/main.rs b/worker/src/main.rs index 03dc7b9..b6d87f9 100644 --- a/worker/src/main.rs +++ b/worker/src/main.rs @@ -15,7 +15,6 @@ async fn main() -> anyhow::Result<()> { s.refresh_memory(); tokio::spawn(heartbeat_worker(args.clone())); - build_worker(args.clone()).await; Ok(()) } From df6acff7fa32f4b456cd04e46ba68ded30f0903b Mon Sep 17 00:00:00 2001 From: eatradish Date: Wed, 19 Jun 2024 05:48:08 +0800 Subject: [PATCH 5/7] feat(worker): filter machine port to recv message --- worker/src/build.rs | 7 ++++--- worker/src/lib.rs | 4 ++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/worker/src/build.rs b/worker/src/build.rs index 49ce7bd..c4ae52f 100644 --- a/worker/src/build.rs +++ b/worker/src/build.rs @@ -5,6 +5,7 @@ use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollRespons use flume::{unbounded, Receiver, Sender}; use futures_util::StreamExt; use log::{error, info, warn}; +use reqwest::Url; use std::{ path::Path, process::{Output, Stdio}, @@ -371,7 +372,7 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { logical_cores: num_cpus::get() as i32, }; - let ws = args.websocket.clone(); + let ws = Url::parse(&args.websocket)?.join(&args.port.to_string())?; let (tx, rx) = unbounded(); @@ -430,10 +431,10 @@ pub async fn build_worker(args: Args) -> ! { } } -pub async fn websocket_connect(rx: Receiver, ws: String) -> ! { +pub async fn websocket_connect(rx: Receiver, ws: Url) -> ! { loop { info!("Starting websocket connect"); - match connect_async(&ws).await { + match connect_async(ws.as_str()).await { Ok((ws_stream, _)) => { let (write, _) = ws_stream.split(); let rx = rx.clone().into_stream(); diff --git a/worker/src/lib.rs b/worker/src/lib.rs index 00dc5f4..44ba1a8 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -53,6 +53,10 @@ pub struct Args { /// Websocket uri #[arg(short = 'w', long, env = "BUILDIT_WS")] pub websocket: String, + + /// Worker machine relay port + #[arg(long, env = "BUILDIT_RELAY_PORT")] + pub port: u16, } pub fn get_memory_bytes() -> i64 { From 807728ee7228f20e9db757fed2bb0253ef37453c Mon Sep 17 00:00:00 2001 From: eatradish Date: Wed, 19 Jun 2024 11:49:48 +0800 Subject: [PATCH 6/7] feat(server): add `/ws/:hostname` to stream compile log --- Cargo.lock | 38 ++++++++++++++++++- server/Cargo.toml | 2 +- server/src/main.rs | 9 ++++- server/src/routes/mod.rs | 13 ++++++- server/src/routes/websocket.rs | 67 ++++++++++++++++++++++++++++++++++ server/src/routes/worker.rs | 2 +- 6 files changed, 124 insertions(+), 7 deletions(-) create mode 100644 server/src/routes/websocket.rs diff --git a/Cargo.lock b/Cargo.lock index f238c51..b3b25dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,6 +276,7 @@ checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" dependencies = [ "async-trait", "axum-core 0.4.3", + "base64 0.21.7", "bytes", "futures-util", "http 1.1.0", @@ -294,8 +295,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper 1.0.0", "tokio", + "tokio-tungstenite 0.21.0", "tower", "tower-layer", "tower-service", @@ -3911,6 +3914,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.21.0", +] + [[package]] name = "tokio-tungstenite" version = "0.23.1" @@ -3920,7 +3935,7 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.23.0", ] [[package]] @@ -4109,6 +4124,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.23.0" @@ -4707,7 +4741,7 @@ dependencies = [ "reqwest 0.11.27", "sysinfo", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.23.1", "vergen", ] diff --git a/server/Cargo.toml b/server/Cargo.toml index 7225692..ba25910 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -24,7 +24,7 @@ buildit-utils = { path = "../buildit-utils" } jsonwebtoken = "9.2.0" size = "0.4.1" dickens = { git = "https://github.com/AOSC-Dev/dickens.git", version = "0.1.0" } -axum = "0.7.4" +axum = { version = "0.7.4", features = ["ws"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } tracing = "0.1.40" tower-http = { version = "0.5.2", features = ["trace", "fs", "cors"] } diff --git a/server/src/main.rs b/server/src/main.rs index d4425fe..0d8294f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -15,13 +15,15 @@ use server::bot::{answer, Command}; use server::recycler::recycler_worker; use server::routes::{ dashboard_status, job_info, job_list, job_restart, ping, pipeline_info, pipeline_list, - pipeline_new_pr, worker_info, worker_job_update, worker_list, worker_poll, AppState, + pipeline_new_pr, worker_info, worker_job_update, worker_list, worker_poll, ws_handler, + AppState, PeerMap, }; use server::routes::{pipeline_new, worker_heartbeat}; use server::routes::{pipeline_status, worker_status}; use server::{DbPool, ARGS}; +use std::collections::HashMap; use std::os::unix::fs::PermissionsExt; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use teloxide::prelude::*; use tokio::net::unix::UCred; use tokio::net::UnixStream; @@ -99,7 +101,9 @@ async fn main() -> anyhow::Result<()> { let state = AppState { pool: pool.clone(), bot, + ws_peer_map: PeerMap::new(RwLock::new(HashMap::new())), }; + let mut app = Router::new() .route("/api/ping", get(ping)) .route("/api/pipeline/new", post(pipeline_new)) @@ -117,6 +121,7 @@ async fn main() -> anyhow::Result<()> { .route("/api/worker/list", get(worker_list)) .route("/api/worker/info", get(worker_info)) .route("/api/dashboard/status", get(dashboard_status)) + .route("/api/ws/:hostname", get(ws_handler)) .nest_service("/assets", ServeDir::new("frontend/dist/assets")) .route_service("/favicon.ico", ServeFile::new("frontend/dist/favicon.ico")) .fallback_service(ServeFile::new("frontend/dist/index.html")) diff --git a/server/src/routes/mod.rs b/server/src/routes/mod.rs index 38ec404..14b6961 100644 --- a/server/src/routes/mod.rs +++ b/server/src/routes/mod.rs @@ -13,28 +13,39 @@ use diesel::dsl::{count, sum}; use diesel::{Connection, ExpressionMethods, QueryDsl, RunQueryDsl}; +use futures::channel::mpsc::UnboundedSender; use serde::Serialize; -use std::collections::BTreeMap; +use std::{ + collections::{BTreeMap, HashMap}, + net::SocketAddr, + sync::{Arc, RwLock}, +}; use teloxide::prelude::*; use tracing::info; pub mod job; pub mod pipeline; +pub mod websocket; pub mod worker; pub use job::*; pub use pipeline::*; +pub use websocket::*; pub use worker::*; pub async fn ping() -> &'static str { "PONG" } +type Tx = (UnboundedSender, String); +pub type PeerMap = Arc>>; + #[derive(Clone)] pub struct AppState { pub pool: DbPool, pub bot: Option, + pub ws_peer_map: PeerMap, } // learned from https://github.com/tokio-rs/axum/blob/main/examples/anyhow-error-response/src/main.rs diff --git a/server/src/routes/websocket.rs b/server/src/routes/websocket.rs new file mode 100644 index 0000000..3be5945 --- /dev/null +++ b/server/src/routes/websocket.rs @@ -0,0 +1,67 @@ +use std::net::SocketAddr; + +use axum::{ + extract::{ws::WebSocket, ConnectInfo, Path, State, WebSocketUpgrade}, + response::IntoResponse, +}; +use futures::{channel::mpsc::unbounded, future, pin_mut, StreamExt, TryStreamExt}; +use tracing::info; + +use super::{AppState, PeerMap}; + +/// The handler for the HTTP request (this gets called when the HTTP GET lands at the start +/// of websocket negotiation). After this completes, the actual switching from HTTP to +/// websocket protocol will occur. +/// This is the last point where we can extract TCP/IP metadata such as IP address of the client +/// as well as things from HTTP headers such as user-agent of the browser etc. +pub async fn ws_handler( + Path(hostname): Path, + ws: WebSocketUpgrade, + ConnectInfo(addr): ConnectInfo, + State(state): State, +) -> impl IntoResponse { + // finalize the upgrade process by returning upgrade callback. + // we can customize the callback by sending additional info such as address. + ws.on_upgrade(move |socket| handle_socket(socket, addr, hostname, state.ws_peer_map)) +} + +/// Actual websocket statemachine (one will be spawned per connection) +async fn handle_socket(socket: WebSocket, who: SocketAddr, hostname: String, peer_map: PeerMap) { + let (tx, rx) = unbounded(); + peer_map + .write() + .unwrap() + .insert(who, (tx, hostname.clone())); + + let (outgoing, incoming) = socket.split(); + + let broadcast_incoming = incoming.try_for_each(|msg| { + info!("Received a message from {}: {:?}", who, msg); + + let peers = peer_map.read().unwrap(); + + // We want to broadcast the message to everyone except ourselves. + let broadcast_recipients = peers + .iter() + .filter(|(peer_addr, _)| peer_addr != &&who) + .map(|(_, (ws_sink, port))| (ws_sink, port)); + + for recp in broadcast_recipients { + let recp_path = recp.1; + + if *recp_path == hostname { + recp.0.unbounded_send(msg.clone()).unwrap(); + } + } + + future::ok(()) + }); + + let receive_from_others = rx.map(Ok).forward(outgoing); + + pin_mut!(broadcast_incoming, receive_from_others); + future::select(broadcast_incoming, receive_from_others).await; + + info!("{} disconnected", &who); + peer_map.write().unwrap().remove(&who); +} diff --git a/server/src/routes/worker.rs b/server/src/routes/worker.rs index 5f96ca5..933e230 100644 --- a/server/src/routes/worker.rs +++ b/server/src/routes/worker.rs @@ -309,7 +309,7 @@ pub async fn worker_poll( } pub async fn worker_job_update( - State(AppState { pool, bot }): State, + State(AppState { pool, bot, .. }): State, Json(payload): Json, ) -> Result<(), AnyhowError> { if payload.worker_secret != ARGS.worker_secret { From 8b75f5b30647b1ffa67e1eb1cd8dfc19cf8798aa Mon Sep 17 00:00:00 2001 From: eatradish Date: Wed, 19 Jun 2024 11:57:51 +0800 Subject: [PATCH 7/7] feat(worker): use hostname as ws arg --- worker/src/build.rs | 6 ++++-- worker/src/lib.rs | 4 ---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/worker/src/build.rs b/worker/src/build.rs index c4ae52f..4386d69 100644 --- a/worker/src/build.rs +++ b/worker/src/build.rs @@ -363,8 +363,10 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { .timeout(Duration::from_secs(30)) .build() .unwrap(); + + let hostname = gethostname::gethostname().to_string_lossy().to_string(); let req = WorkerPollRequest { - hostname: gethostname::gethostname().to_string_lossy().to_string(), + hostname: hostname.clone(), arch: args.arch.clone(), worker_secret: args.worker_secret.clone(), memory_bytes: get_memory_bytes(), @@ -372,7 +374,7 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { logical_cores: num_cpus::get() as i32, }; - let ws = Url::parse(&args.websocket)?.join(&args.port.to_string())?; + let ws = Url::parse(&args.websocket)?.join(&hostname)?; let (tx, rx) = unbounded(); diff --git a/worker/src/lib.rs b/worker/src/lib.rs index 44ba1a8..00dc5f4 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -53,10 +53,6 @@ pub struct Args { /// Websocket uri #[arg(short = 'w', long, env = "BUILDIT_WS")] pub websocket: String, - - /// Worker machine relay port - #[arg(long, env = "BUILDIT_RELAY_PORT")] - pub port: u16, } pub fn get_memory_bytes() -> i64 {