Skip to content

Commit

Permalink
refactor: use UnboundChannel to replaced Mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
eatradish committed Jun 18, 2024
1 parent 6346880 commit c2db307
Showing 1 changed file with 32 additions and 41 deletions.
73 changes: 32 additions & 41 deletions worker/src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,29 @@ use crate::{get_memory_bytes, Args};
use anyhow::Context;
use chrono::Local;
use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse};
use futures_util::{stream::SplitSink, SinkExt, StreamExt};
use futures_util::{SinkExt, StreamExt};
use log::{error, info, warn};
use std::{
path::Path,
process::{Output, Stdio},
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
fs,
io::{AsyncBufReadExt, BufReader},
net::TcpStream,
process::Command,
sync::Mutex,
sync::mpsc::{unbounded_channel, UnboundedSender},
time::sleep,
};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tokio_tungstenite::{connect_async, tungstenite::Message};

type WsWriter = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;

async fn get_output_logged(
cmd: &str,
args: &[&str],
cwd: &Path,
logs: &mut Vec<u8>,
write: Arc<Mutex<WsWriter>>,
tx: UnboundedSender<String>,
) -> anyhow::Result<Output> {
let begin = Instant::now();
let msg = format!(
Expand All @@ -53,14 +50,13 @@ async fn get_output_logged(
let mut stdout_reader = BufReader::new(stdout).lines();
let stderr = output.stderr.take().context("Failed to get stderr")?;

let wc = write.clone();
let txc = tx.clone();

let stderr_task = tokio::spawn(async move {
let mut res = vec![];
let mut stderr_reader = BufReader::new(stderr).lines();
while let Ok(Some(v)) = stderr_reader.next_line().await {
let mut lock = wc.lock().await;
let _ = lock.send(Message::Text(v.clone())).await;
let _ = txc.send(v.clone());
res.push(v);
}

Expand All @@ -69,8 +65,7 @@ async fn get_output_logged(

let mut stdout_out = vec![];
while let Ok(Some(v)) = stdout_reader.next_line().await {
let mut lock = write.lock().await;
lock.send(Message::Text(v.clone())).await?;
tx.send(v.clone())?;
stdout_out.push(v);
}

Expand Down Expand Up @@ -101,13 +96,13 @@ async fn run_logged_with_retry(
args: &[&str],
cwd: &Path,
logs: &mut Vec<u8>,
write: Arc<Mutex<WsWriter>>,
tx: UnboundedSender<String>,
) -> anyhow::Result<bool> {
for i in 0..5 {
if i > 0 {
info!("Attempt #{i} to run `{cmd} {}`", args.join(" "));
}
match get_output_logged(cmd, args, cwd, logs, write.clone()).await {
match get_output_logged(cmd, args, cwd, logs, tx.clone()).await {
Ok(output) => {
if output.status.success() {
return Ok(true);
Expand All @@ -134,7 +129,7 @@ async fn build(
job: &WorkerPollResponse,
tree_path: &Path,
args: &Args,
write: Arc<Mutex<WsWriter>>,
tx: UnboundedSender<String>,
) -> anyhow::Result<WorkerJobUpdateRequest> {
let begin = Instant::now();
let mut successful_packages = vec![];
Expand All @@ -148,14 +143,7 @@ async fn build(

// clear output directory
if output_path.exists() {
get_output_logged(
"rm",
&["-rf", "debs"],
&output_path,
&mut logs,
write.clone(),
)
.await?;
get_output_logged("rm", &["-rf", "debs"], &output_path, &mut logs, tx.clone()).await?;
}

// switch to git ref
Expand All @@ -168,7 +156,7 @@ async fn build(
],
tree_path,
&mut logs,
write.clone(),
tx.clone(),
)
.await?;

Expand All @@ -182,7 +170,7 @@ async fn build(
&["checkout", "-b", &job.git_branch],
tree_path,
&mut logs,
write.clone(),
tx.clone(),
)
.await?;
// checkout to branch
Expand All @@ -191,7 +179,7 @@ async fn build(
&["checkout", &job.git_branch],
tree_path,
&mut logs,
write.clone(),
tx.clone(),
)
.await?;

Expand All @@ -202,7 +190,7 @@ async fn build(
&["reset", &job.git_sha, "--hard"],
tree_path,
&mut logs,
write.clone(),
tx.clone(),
)
.await?;

Expand All @@ -213,21 +201,16 @@ async fn build(
&["update-os"],
&args.ciel_path,
&mut logs,
write.clone(),
tx.clone(),
)
.await?;

// build packages
let mut ciel_args = vec!["build", "-i", &args.ciel_instance];
ciel_args.extend(job.packages.split(','));
let output = get_output_logged(
"ciel",
&ciel_args,
&args.ciel_path,
&mut logs,
write.clone(),
)
.await?;
let output =
get_output_logged("ciel", &ciel_args, &args.ciel_path, &mut logs, tx.clone())
.await?;

build_success = output.status.success();

Expand Down Expand Up @@ -297,7 +280,7 @@ async fn build(
&args,
&output_path,
&mut logs,
write.clone(),
tx.clone(),
)
.await?;
}
Expand Down Expand Up @@ -330,7 +313,7 @@ async fn build(
],
&tree_path,
&mut scp_log,
write,
tx,
)
.await?
{
Expand Down Expand Up @@ -373,8 +356,10 @@ async fn build(
async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
let ws = &args.websocket;
let (ws_stream, _) = connect_async(ws).await?;
let (write, _) = ws_stream.split();
let write = Arc::new(Mutex::new(write));
let (mut write, _) = ws_stream.split();

let (tx, mut rx): (UnboundedSender<String>, _) = unbounded_channel();

let mut tree_path = args.ciel_path.clone();
tree_path.push("TREE");

Expand All @@ -393,6 +378,12 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
logical_cores: num_cpus::get() as i32,
};

tokio::spawn(async move {
if let Some(v) = rx.recv().await {
let _ = write.send(Message::Text(v)).await;
}
});

loop {
if let Some(job) = client
.post(format!("{}/api/worker/poll", args.server))
Expand All @@ -404,7 +395,7 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
{
info!("Processing job {:?}", job);

match build(&job, &tree_path, args, write.clone()).await {
match build(&job, &tree_path, args, tx.clone()).await {
Ok(result) => {
// post result
info!("Finished to run job {:?} with result {:?}", job, result);
Expand Down

0 comments on commit c2db307

Please sign in to comment.