Skip to content

Commit

Permalink
feat(worker): stream compile log to buildit-monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
eatradish committed Jun 18, 2024
1 parent 337d5b7 commit 6346880
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 16 deletions.
55 changes: 55 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ num_cpus = "1.16.0"
reqwest = { version = "0.11.24", features = ["json"] }
sysinfo = "0.30.5"
tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread", "process", "sync", "fs"] }
tokio-tungstenite = "0.23.1"
futures-util = "0.3.30"

[build-dependencies]
vergen = { version = "8.3.1", features = ["build", "cargo", "git", "gitcl", "rustc", "si"] }
119 changes: 103 additions & 16 deletions worker/src/build.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,33 @@
use crate::{get_memory_bytes, Args};
use anyhow::Context;
use chrono::Local;
use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse};
use futures_util::{stream::SplitSink, SinkExt, StreamExt};
use log::{error, info, warn};
use std::{
path::Path,
process::Output,
process::{Output, Stdio},
sync::Arc,
time::{Duration, Instant},
};
use tokio::{fs, process::Command, time::sleep};
use tokio::{
fs,
io::{AsyncBufReadExt, BufReader},
net::TcpStream,
process::Command,
sync::Mutex,
time::sleep,
};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};

type WsWriter = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;

async fn get_output_logged(
cmd: &str,
args: &[&str],
cwd: &Path,
logs: &mut Vec<u8>,
write: Arc<Mutex<WsWriter>>,
) -> anyhow::Result<Output> {
let begin = Instant::now();
let msg = format!(
Expand All @@ -26,28 +40,57 @@ async fn get_output_logged(
logs.extend(msg.as_bytes());
info!("{}", msg.trim());

let output = Command::new(cmd)
let mut output = Command::new(cmd)
.args(args)
.current_dir(cwd)
.output()
.await?;
.stdout(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;

let elapsed = begin.elapsed();

let stdout = output.stdout.as_mut().context("Failed to get stdout")?;
let mut stdout_reader = BufReader::new(stdout).lines();
let stderr = output.stderr.take().context("Failed to get stderr")?;

let wc = write.clone();

let stderr_task = tokio::spawn(async move {
let mut res = vec![];
let mut stderr_reader = BufReader::new(stderr).lines();
while let Ok(Some(v)) = stderr_reader.next_line().await {
let mut lock = wc.lock().await;
let _ = lock.send(Message::Text(v.clone())).await;
res.push(v);
}

res
});

let mut stdout_out = vec![];
while let Ok(Some(v)) = stdout_reader.next_line().await {
let mut lock = write.lock().await;
lock.send(Message::Text(v.clone())).await?;
stdout_out.push(v);
}

let output = output.wait_with_output().await?;

logs.extend(
format!(
"{}: `{} {}` finished in {:?} with {}\n",
Local::now(),
cmd,
args.join(" "),
elapsed,
output.status
output.status.success()
)
.as_bytes(),
);
logs.extend("STDOUT:\n".as_bytes());
logs.extend(output.stdout.clone());
logs.extend(stdout_out.join("\n").as_bytes());
logs.extend("STDERR:\n".as_bytes());
logs.extend(output.stderr.clone());
logs.extend(stderr_task.await?.join("\n").as_bytes());

Ok(output)
}
Expand All @@ -58,12 +101,13 @@ async fn run_logged_with_retry(
args: &[&str],
cwd: &Path,
logs: &mut Vec<u8>,
write: Arc<Mutex<WsWriter>>,
) -> anyhow::Result<bool> {
for i in 0..5 {
if i > 0 {
info!("Attempt #{i} to run `{cmd} {}`", args.join(" "));
}
match get_output_logged(cmd, args, cwd, logs).await {
match get_output_logged(cmd, args, cwd, logs, write.clone()).await {
Ok(output) => {
if output.status.success() {
return Ok(true);
Expand All @@ -90,6 +134,7 @@ async fn build(
job: &WorkerPollResponse,
tree_path: &Path,
args: &Args,
write: Arc<Mutex<WsWriter>>,
) -> anyhow::Result<WorkerJobUpdateRequest> {
let begin = Instant::now();
let mut successful_packages = vec![];
Expand All @@ -103,7 +148,14 @@ async fn build(

// clear output directory
if output_path.exists() {
get_output_logged("rm", &["-rf", "debs"], &output_path, &mut logs).await?;
get_output_logged(
"rm",
&["-rf", "debs"],
&output_path,
&mut logs,
write.clone(),
)
.await?;
}

// switch to git ref
Expand All @@ -116,6 +168,7 @@ async fn build(
],
tree_path,
&mut logs,
write.clone(),
)
.await?;

Expand All @@ -129,10 +182,18 @@ async fn build(
&["checkout", "-b", &job.git_branch],
tree_path,
&mut logs,
write.clone(),
)
.await?;
// checkout to branch
get_output_logged("git", &["checkout", &job.git_branch], tree_path, &mut logs).await?;
get_output_logged(
"git",
&["checkout", &job.git_branch],
tree_path,
&mut logs,
write.clone(),
)
.await?;

// switch to the commit by sha
// to avoid race condition, resolve branch name to sha in server
Expand All @@ -141,17 +202,32 @@ async fn build(
&["reset", &job.git_sha, "--hard"],
tree_path,
&mut logs,
write.clone(),
)
.await?;

if output.status.success() {
// update container
get_output_logged("ciel", &["update-os"], &args.ciel_path, &mut logs).await?;
get_output_logged(
"ciel",
&["update-os"],
&args.ciel_path,
&mut logs,
write.clone(),
)
.await?;

// build packages
let mut ciel_args = vec!["build", "-i", &args.ciel_instance];
ciel_args.extend(job.packages.split(','));
let output = get_output_logged("ciel", &ciel_args, &args.ciel_path, &mut logs).await?;
let output = get_output_logged(
"ciel",
&ciel_args,
&args.ciel_path,
&mut logs,
write.clone(),
)
.await?;

build_success = output.status.success();

Expand Down Expand Up @@ -216,8 +292,14 @@ async fn build(
// allow force push if noarch and non stable
args.insert(0, "--force-push-noarch-package");
}
pushpkg_success =
run_logged_with_retry("pushpkg", &args, &output_path, &mut logs).await?;
pushpkg_success = run_logged_with_retry(
"pushpkg",
&args,
&output_path,
&mut logs,
write.clone(),
)
.await?;
}
}
}
Expand Down Expand Up @@ -248,6 +330,7 @@ async fn build(
],
&tree_path,
&mut scp_log,
write,
)
.await?
{
Expand Down Expand Up @@ -288,6 +371,10 @@ async fn build(
}

async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
let ws = &args.websocket;
let (ws_stream, _) = connect_async(ws).await?;
let (write, _) = ws_stream.split();
let write = Arc::new(Mutex::new(write));
let mut tree_path = args.ciel_path.clone();
tree_path.push("TREE");

Expand Down Expand Up @@ -317,7 +404,7 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
{
info!("Processing job {:?}", job);

match build(&job, &tree_path, args).await {
match build(&job, &tree_path, args, write.clone()).await {
Ok(result) => {
// post result
info!("Finished to run job {:?} with result {:?}", job, result);
Expand Down
4 changes: 4 additions & 0 deletions worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ pub struct Args {
/// Performance number of the worker (smaller is better)
#[arg(short = 'p', long, env = "BUILDIT_WORKER_PERFORMANCE")]
pub worker_performance: Option<i64>,

/// Websocket uri
#[arg(short = 'w', long, env = "BUILDIT_WS")]
pub websocket: String,
}

pub fn get_memory_bytes() -> i64 {
Expand Down

0 comments on commit 6346880

Please sign in to comment.