Skip to content

Commit

Permalink
refactor: use forward to send channcel to websocket write
Browse files Browse the repository at this point in the history
  • Loading branch information
eatradish committed Jun 18, 2024
1 parent c2db307 commit 83c0889
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ sysinfo = "0.30.5"
tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread", "process", "sync", "fs"] }
tokio-tungstenite = "0.23.1"
futures-util = "0.3.30"
futures-channel = "0.3.30"

[build-dependencies]
vergen = { version = "8.3.1", features = ["build", "cargo", "git", "gitcl", "rustc", "si"] }
25 changes: 10 additions & 15 deletions worker/src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{get_memory_bytes, Args};
use anyhow::Context;
use chrono::Local;
use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse};
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{SinkExt, StreamExt};
use log::{error, info, warn};
use std::{
Expand All @@ -13,18 +14,16 @@ use tokio::{
fs,
io::{AsyncBufReadExt, BufReader},
process::Command,
sync::mpsc::{unbounded_channel, UnboundedSender},
time::sleep,
};
use tokio_tungstenite::{connect_async, tungstenite::Message};


async fn get_output_logged(
cmd: &str,
args: &[&str],
cwd: &Path,
logs: &mut Vec<u8>,
tx: UnboundedSender<String>,
mut tx: UnboundedSender<Message>,
) -> anyhow::Result<Output> {
let begin = Instant::now();
let msg = format!(
Expand All @@ -50,13 +49,13 @@ async fn get_output_logged(
let mut stdout_reader = BufReader::new(stdout).lines();
let stderr = output.stderr.take().context("Failed to get stderr")?;

let txc = tx.clone();
let mut txc = tx.clone();

let stderr_task = tokio::spawn(async move {
let mut res = vec![];
let mut stderr_reader = BufReader::new(stderr).lines();
while let Ok(Some(v)) = stderr_reader.next_line().await {
let _ = txc.send(v.clone());
let _ = txc.send(Message::Text(v.clone())).await;
res.push(v);
}

Expand All @@ -65,7 +64,7 @@ async fn get_output_logged(

let mut stdout_out = vec![];
while let Ok(Some(v)) = stdout_reader.next_line().await {
tx.send(v.clone())?;
tx.send(Message::Text(v.clone())).await?;
stdout_out.push(v);
}

Expand Down Expand Up @@ -96,7 +95,7 @@ async fn run_logged_with_retry(
args: &[&str],
cwd: &Path,
logs: &mut Vec<u8>,
tx: UnboundedSender<String>,
tx: UnboundedSender<Message>,
) -> anyhow::Result<bool> {
for i in 0..5 {
if i > 0 {
Expand Down Expand Up @@ -129,7 +128,7 @@ async fn build(
job: &WorkerPollResponse,
tree_path: &Path,
args: &Args,
tx: UnboundedSender<String>,
tx: UnboundedSender<Message>,
) -> anyhow::Result<WorkerJobUpdateRequest> {
let begin = Instant::now();
let mut successful_packages = vec![];
Expand Down Expand Up @@ -356,9 +355,9 @@ async fn build(
async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
let ws = &args.websocket;
let (ws_stream, _) = connect_async(ws).await?;
let (mut write, _) = ws_stream.split();
let (write, _) = ws_stream.split();

let (tx, mut rx): (UnboundedSender<String>, _) = unbounded_channel();
let (tx, rx) = unbounded();

let mut tree_path = args.ciel_path.clone();
tree_path.push("TREE");
Expand All @@ -378,11 +377,7 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
logical_cores: num_cpus::get() as i32,
};

tokio::spawn(async move {
if let Some(v) = rx.recv().await {
let _ = write.send(Message::Text(v)).await;
}
});
tokio::spawn(async move { rx.map(Ok).forward(write) });

loop {
if let Some(job) = client
Expand Down

0 comments on commit 83c0889

Please sign in to comment.