From 63468808fdfc0bffdb18d86916fe71818d9a29df Mon Sep 17 00:00:00 2001 From: eatradish Date: Tue, 18 Jun 2024 17:41:25 +0800 Subject: [PATCH] feat(worker): stream compile log to buildit-monitor --- Cargo.lock | 55 ++++++++++++++++++++ worker/Cargo.toml | 2 + worker/src/build.rs | 119 ++++++++++++++++++++++++++++++++++++++------ worker/src/lib.rs | 4 ++ 4 files changed, 164 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c9e0528..3589e12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -736,6 +736,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "data-encoding" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" + [[package]] name = "deranged" version = "0.3.11" @@ -3387,6 +3393,17 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha1_smol" version = "1.0.0" @@ -3870,6 +3887,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -4056,6 +4085,24 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e2ce1e47ed2994fd43b04c8f618008d4cabdd5ee34027cf14f9d918edd9c8" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -4134,6 +4181,12 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.1" @@ -4622,12 +4675,14 @@ dependencies = [ "dotenv", "env_logger", "fs2", + "futures-util", "gethostname", "log", "num_cpus", "reqwest 0.11.27", "sysinfo", "tokio", + "tokio-tungstenite", "vergen", ] diff --git a/worker/Cargo.toml b/worker/Cargo.toml index 79604a2..fd10662 100644 --- a/worker/Cargo.toml +++ b/worker/Cargo.toml @@ -20,6 +20,8 @@ num_cpus = "1.16.0" reqwest = { version = "0.11.24", features = ["json"] } sysinfo = "0.30.5" tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread", "process", "sync", "fs"] } +tokio-tungstenite = "0.23.1" +futures-util = "0.3.30" [build-dependencies] vergen = { version = "8.3.1", features = ["build", "cargo", "git", "gitcl", "rustc", "si"] } diff --git a/worker/src/build.rs b/worker/src/build.rs index a6196ad..d6cc710 100644 --- a/worker/src/build.rs +++ b/worker/src/build.rs @@ -1,19 +1,33 @@ use crate::{get_memory_bytes, Args}; +use anyhow::Context; use chrono::Local; use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse}; +use futures_util::{stream::SplitSink, SinkExt, StreamExt}; use log::{error, info, warn}; use std::{ path::Path, - process::Output, + process::{Output, Stdio}, + sync::Arc, time::{Duration, Instant}, }; -use tokio::{fs, process::Command, time::sleep}; +use tokio::{ + fs, + io::{AsyncBufReadExt, BufReader}, + net::TcpStream, + process::Command, + sync::Mutex, + time::sleep, +}; +use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; + +type WsWriter = SplitSink>, Message>; async fn get_output_logged( cmd: &str, args: &[&str], cwd: &Path, logs: &mut Vec, + write: Arc>, ) -> anyhow::Result { let begin = Instant::now(); let msg = format!( @@ -26,13 +40,42 @@ async fn get_output_logged( logs.extend(msg.as_bytes()); info!("{}", msg.trim()); - let output = Command::new(cmd) + let mut output = Command::new(cmd) .args(args) .current_dir(cwd) - .output() - .await?; + .stdout(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn()?; let elapsed = begin.elapsed(); + + let stdout = output.stdout.as_mut().context("Failed to get stdout")?; + let mut stdout_reader = BufReader::new(stdout).lines(); + let stderr = output.stderr.take().context("Failed to get stderr")?; + + let wc = write.clone(); + + let stderr_task = tokio::spawn(async move { + let mut res = vec![]; + let mut stderr_reader = BufReader::new(stderr).lines(); + while let Ok(Some(v)) = stderr_reader.next_line().await { + let mut lock = wc.lock().await; + let _ = lock.send(Message::Text(v.clone())).await; + res.push(v); + } + + res + }); + + let mut stdout_out = vec![]; + while let Ok(Some(v)) = stdout_reader.next_line().await { + let mut lock = write.lock().await; + lock.send(Message::Text(v.clone())).await?; + stdout_out.push(v); + } + + let output = output.wait_with_output().await?; + logs.extend( format!( "{}: `{} {}` finished in {:?} with {}\n", @@ -40,14 +83,14 @@ async fn get_output_logged( cmd, args.join(" "), elapsed, - output.status + output.status.success() ) .as_bytes(), ); logs.extend("STDOUT:\n".as_bytes()); - logs.extend(output.stdout.clone()); + logs.extend(stdout_out.join("\n").as_bytes()); logs.extend("STDERR:\n".as_bytes()); - logs.extend(output.stderr.clone()); + logs.extend(stderr_task.await?.join("\n").as_bytes()); Ok(output) } @@ -58,12 +101,13 @@ async fn run_logged_with_retry( args: &[&str], cwd: &Path, logs: &mut Vec, + write: Arc>, ) -> anyhow::Result { for i in 0..5 { if i > 0 { info!("Attempt #{i} to run `{cmd} {}`", args.join(" ")); } - match get_output_logged(cmd, args, cwd, logs).await { + match get_output_logged(cmd, args, cwd, logs, write.clone()).await { Ok(output) => { if output.status.success() { return Ok(true); @@ -90,6 +134,7 @@ async fn build( job: &WorkerPollResponse, tree_path: &Path, args: &Args, + write: Arc>, ) -> anyhow::Result { let begin = Instant::now(); let mut successful_packages = vec![]; @@ -103,7 +148,14 @@ async fn build( // clear output directory if output_path.exists() { - get_output_logged("rm", &["-rf", "debs"], &output_path, &mut logs).await?; + get_output_logged( + "rm", + &["-rf", "debs"], + &output_path, + &mut logs, + write.clone(), + ) + .await?; } // switch to git ref @@ -116,6 +168,7 @@ async fn build( ], tree_path, &mut logs, + write.clone(), ) .await?; @@ -129,10 +182,18 @@ async fn build( &["checkout", "-b", &job.git_branch], tree_path, &mut logs, + write.clone(), ) .await?; // checkout to branch - get_output_logged("git", &["checkout", &job.git_branch], tree_path, &mut logs).await?; + get_output_logged( + "git", + &["checkout", &job.git_branch], + tree_path, + &mut logs, + write.clone(), + ) + .await?; // switch to the commit by sha // to avoid race condition, resolve branch name to sha in server @@ -141,17 +202,32 @@ async fn build( &["reset", &job.git_sha, "--hard"], tree_path, &mut logs, + write.clone(), ) .await?; if output.status.success() { // update container - get_output_logged("ciel", &["update-os"], &args.ciel_path, &mut logs).await?; + get_output_logged( + "ciel", + &["update-os"], + &args.ciel_path, + &mut logs, + write.clone(), + ) + .await?; // build packages let mut ciel_args = vec!["build", "-i", &args.ciel_instance]; ciel_args.extend(job.packages.split(',')); - let output = get_output_logged("ciel", &ciel_args, &args.ciel_path, &mut logs).await?; + let output = get_output_logged( + "ciel", + &ciel_args, + &args.ciel_path, + &mut logs, + write.clone(), + ) + .await?; build_success = output.status.success(); @@ -216,8 +292,14 @@ async fn build( // allow force push if noarch and non stable args.insert(0, "--force-push-noarch-package"); } - pushpkg_success = - run_logged_with_retry("pushpkg", &args, &output_path, &mut logs).await?; + pushpkg_success = run_logged_with_retry( + "pushpkg", + &args, + &output_path, + &mut logs, + write.clone(), + ) + .await?; } } } @@ -248,6 +330,7 @@ async fn build( ], &tree_path, &mut scp_log, + write, ) .await? { @@ -288,6 +371,10 @@ async fn build( } async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { + let ws = &args.websocket; + let (ws_stream, _) = connect_async(ws).await?; + let (write, _) = ws_stream.split(); + let write = Arc::new(Mutex::new(write)); let mut tree_path = args.ciel_path.clone(); tree_path.push("TREE"); @@ -317,7 +404,7 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { { info!("Processing job {:?}", job); - match build(&job, &tree_path, args).await { + match build(&job, &tree_path, args, write.clone()).await { Ok(result) => { // post result info!("Finished to run job {:?} with result {:?}", job, result); diff --git a/worker/src/lib.rs b/worker/src/lib.rs index 6f03771..00dc5f4 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -49,6 +49,10 @@ pub struct Args { /// Performance number of the worker (smaller is better) #[arg(short = 'p', long, env = "BUILDIT_WORKER_PERFORMANCE")] pub worker_performance: Option, + + /// Websocket uri + #[arg(short = 'w', long, env = "BUILDIT_WS")] + pub websocket: String, } pub fn get_memory_bytes() -> i64 {