diff --git a/Cargo.lock b/Cargo.lock index 28fd4fa..f238c51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1030,6 +1030,18 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -2316,6 +2328,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -3534,6 +3555,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "strsim" @@ -4674,8 +4698,8 @@ dependencies = [ "common", "dotenv", "env_logger", + "flume", "fs2", - "futures-channel", "futures-util", "gethostname", "log", diff --git a/worker/Cargo.toml b/worker/Cargo.toml index e1fa733..f92e331 100644 --- a/worker/Cargo.toml +++ b/worker/Cargo.toml @@ -22,7 +22,7 @@ sysinfo = "0.30.5" tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread", "process", "sync", "fs"] } tokio-tungstenite = "0.23.1" futures-util = "0.3.30" -futures-channel = "0.3.30" +flume = "0.11.0" [build-dependencies] vergen = { version = "8.3.1", features = ["build", "cargo", "git", "gitcl", "rustc", "si"] } diff --git a/worker/src/build.rs b/worker/src/build.rs index 8b32465..0ebd824 100644 --- a/worker/src/build.rs +++ b/worker/src/build.rs @@ -2,8 +2,8 @@ use crate::{get_memory_bytes, Args}; use anyhow::Context; use chrono::Local; use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse}; -use futures_channel::mpsc::{unbounded, UnboundedSender}; -use futures_util::{SinkExt, StreamExt}; +use flume::{unbounded, Receiver, Sender}; +use futures_util::StreamExt; use log::{error, info, warn}; use std::{ path::Path, @@ -23,7 +23,7 @@ async fn get_output_logged( args: &[&str], cwd: &Path, logs: &mut Vec, - mut tx: UnboundedSender, + tx: Sender, ) -> anyhow::Result { let begin = Instant::now(); let msg = format!( @@ -49,13 +49,13 @@ async fn get_output_logged( let mut stdout_reader = BufReader::new(stdout).lines(); let stderr = output.stderr.take().context("Failed to get stderr")?; - let mut txc = tx.clone(); + let txc = tx.clone(); let stderr_task = tokio::spawn(async move { let mut res = vec![]; let mut stderr_reader = BufReader::new(stderr).lines(); while let Ok(Some(v)) = stderr_reader.next_line().await { - let _ = txc.send(Message::Text(v.clone())).await; + let _ = txc.clone().into_send_async(Message::Text(v.clone())).await; res.push(v); } @@ -64,7 +64,7 @@ async fn get_output_logged( let mut stdout_out = vec![]; while let Ok(Some(v)) = stdout_reader.next_line().await { - tx.send(Message::Text(v.clone())).await?; + tx.clone().into_send_async(Message::Text(v.clone())).await?; stdout_out.push(v); } @@ -95,7 +95,7 @@ async fn run_logged_with_retry( args: &[&str], cwd: &Path, logs: &mut Vec, - tx: UnboundedSender, + tx: Sender, ) -> anyhow::Result { for i in 0..5 { if i > 0 { @@ -128,7 +128,7 @@ async fn build( job: &WorkerPollResponse, tree_path: &Path, args: &Args, - tx: UnboundedSender, + tx: Sender, ) -> anyhow::Result { let begin = Instant::now(); let mut successful_packages = vec![]; @@ -352,13 +352,7 @@ async fn build( Ok(result) } -async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { - let ws = &args.websocket; - let (ws_stream, _) = connect_async(ws).await?; - let (write, _) = ws_stream.split(); - - let (tx, rx) = unbounded(); - +async fn build_worker_inner(args: Args) -> anyhow::Result<()> { let mut tree_path = args.ciel_path.clone(); tree_path.push("TREE"); @@ -377,8 +371,15 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { logical_cores: num_cpus::get() as i32, }; - tokio::spawn(async move { rx.map(Ok).forward(write) }); + let argc = args.clone(); + + let (tx, rx) = unbounded(); + + tokio::spawn(async move { + websocket_connect(rx, argc).await; + }); + let args = &args; loop { if let Some(job) = client .post(format!("{}/api/worker/poll", args.server)) @@ -423,9 +424,30 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { pub async fn build_worker(args: Args) -> ! { loop { info!("Starting build worker"); - if let Err(err) = build_worker_inner(&args).await { + if let Err(err) = build_worker_inner(args.clone()).await { warn!("Got error running heartbeat worker: {}", err); } tokio::time::sleep(Duration::from_secs(5)).await; } } + +pub async fn websocket_connect(rx: Receiver, args: Args) -> ! { + let ws = &args.websocket; + loop { + info!("Starting websocket connect"); + match connect_async(ws).await { + Ok((ws_stream, _)) => { + let (write, _) = ws_stream.split(); + let rx_iter = rx.clone().into_stream(); + if let Err(e) = rx_iter.map(Ok).forward(write).await { + warn!("{e}"); + } + } + Err(err) => { + warn!("Got error connecting to websocket: {}", err); + } + } + + tokio::time::sleep(Duration::from_secs(5)).await; + } +} diff --git a/worker/src/main.rs b/worker/src/main.rs index 03dc7b9..b6d87f9 100644 --- a/worker/src/main.rs +++ b/worker/src/main.rs @@ -15,7 +15,6 @@ async fn main() -> anyhow::Result<()> { s.refresh_memory(); tokio::spawn(heartbeat_worker(args.clone())); - build_worker(args.clone()).await; Ok(()) }