diff --git a/worker/src/build.rs b/worker/src/build.rs index d6cc710..e5eee62 100644 --- a/worker/src/build.rs +++ b/worker/src/build.rs @@ -2,32 +2,29 @@ use crate::{get_memory_bytes, Args}; use anyhow::Context; use chrono::Local; use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse}; -use futures_util::{stream::SplitSink, SinkExt, StreamExt}; +use futures_util::{SinkExt, StreamExt}; use log::{error, info, warn}; use std::{ path::Path, process::{Output, Stdio}, - sync::Arc, time::{Duration, Instant}, }; use tokio::{ fs, io::{AsyncBufReadExt, BufReader}, - net::TcpStream, process::Command, - sync::Mutex, + sync::mpsc::{unbounded_channel, UnboundedSender}, time::sleep, }; -use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; +use tokio_tungstenite::{connect_async, tungstenite::Message}; -type WsWriter = SplitSink>, Message>; async fn get_output_logged( cmd: &str, args: &[&str], cwd: &Path, logs: &mut Vec, - write: Arc>, + tx: UnboundedSender, ) -> anyhow::Result { let begin = Instant::now(); let msg = format!( @@ -53,14 +50,13 @@ async fn get_output_logged( let mut stdout_reader = BufReader::new(stdout).lines(); let stderr = output.stderr.take().context("Failed to get stderr")?; - let wc = write.clone(); + let txc = tx.clone(); let stderr_task = tokio::spawn(async move { let mut res = vec![]; let mut stderr_reader = BufReader::new(stderr).lines(); while let Ok(Some(v)) = stderr_reader.next_line().await { - let mut lock = wc.lock().await; - let _ = lock.send(Message::Text(v.clone())).await; + let _ = txc.send(v.clone()); res.push(v); } @@ -69,8 +65,7 @@ async fn get_output_logged( let mut stdout_out = vec![]; while let Ok(Some(v)) = stdout_reader.next_line().await { - let mut lock = write.lock().await; - lock.send(Message::Text(v.clone())).await?; + tx.send(v.clone())?; stdout_out.push(v); } @@ -101,13 +96,13 @@ async fn run_logged_with_retry( args: &[&str], cwd: &Path, logs: &mut Vec, - write: Arc>, + tx: UnboundedSender, ) -> anyhow::Result { for i in 0..5 { if i > 0 { info!("Attempt #{i} to run `{cmd} {}`", args.join(" ")); } - match get_output_logged(cmd, args, cwd, logs, write.clone()).await { + match get_output_logged(cmd, args, cwd, logs, tx.clone()).await { Ok(output) => { if output.status.success() { return Ok(true); @@ -134,7 +129,7 @@ async fn build( job: &WorkerPollResponse, tree_path: &Path, args: &Args, - write: Arc>, + tx: UnboundedSender, ) -> anyhow::Result { let begin = Instant::now(); let mut successful_packages = vec![]; @@ -148,14 +143,7 @@ async fn build( // clear output directory if output_path.exists() { - get_output_logged( - "rm", - &["-rf", "debs"], - &output_path, - &mut logs, - write.clone(), - ) - .await?; + get_output_logged("rm", &["-rf", "debs"], &output_path, &mut logs, tx.clone()).await?; } // switch to git ref @@ -168,7 +156,7 @@ async fn build( ], tree_path, &mut logs, - write.clone(), + tx.clone(), ) .await?; @@ -182,7 +170,7 @@ async fn build( &["checkout", "-b", &job.git_branch], tree_path, &mut logs, - write.clone(), + tx.clone(), ) .await?; // checkout to branch @@ -191,7 +179,7 @@ async fn build( &["checkout", &job.git_branch], tree_path, &mut logs, - write.clone(), + tx.clone(), ) .await?; @@ -202,7 +190,7 @@ async fn build( &["reset", &job.git_sha, "--hard"], tree_path, &mut logs, - write.clone(), + tx.clone(), ) .await?; @@ -213,21 +201,16 @@ async fn build( &["update-os"], &args.ciel_path, &mut logs, - write.clone(), + tx.clone(), ) .await?; // build packages let mut ciel_args = vec!["build", "-i", &args.ciel_instance]; ciel_args.extend(job.packages.split(',')); - let output = get_output_logged( - "ciel", - &ciel_args, - &args.ciel_path, - &mut logs, - write.clone(), - ) - .await?; + let output = + get_output_logged("ciel", &ciel_args, &args.ciel_path, &mut logs, tx.clone()) + .await?; build_success = output.status.success(); @@ -297,7 +280,7 @@ async fn build( &args, &output_path, &mut logs, - write.clone(), + tx.clone(), ) .await?; } @@ -330,7 +313,7 @@ async fn build( ], &tree_path, &mut scp_log, - write, + tx, ) .await? { @@ -373,8 +356,10 @@ async fn build( async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { let ws = &args.websocket; let (ws_stream, _) = connect_async(ws).await?; - let (write, _) = ws_stream.split(); - let write = Arc::new(Mutex::new(write)); + let (mut write, _) = ws_stream.split(); + + let (tx, mut rx): (UnboundedSender, _) = unbounded_channel(); + let mut tree_path = args.ciel_path.clone(); tree_path.push("TREE"); @@ -393,6 +378,12 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { logical_cores: num_cpus::get() as i32, }; + tokio::spawn(async move { + if let Some(v) = rx.recv().await { + let _ = write.send(Message::Text(v)).await; + } + }); + loop { if let Some(job) = client .post(format!("{}/api/worker/poll", args.server)) @@ -404,7 +395,7 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { { info!("Processing job {:?}", job); - match build(&job, &tree_path, args, write.clone()).await { + match build(&job, &tree_path, args, tx.clone()).await { Ok(result) => { // post result info!("Finished to run job {:?} with result {:?}", job, result);