Skip to content

Commit

Permalink
refactor: retry websocket connect
Browse files Browse the repository at this point in the history
  • Loading branch information
eatradish committed Jun 18, 2024
1 parent 83c0889 commit 51af280
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 20 deletions.
26 changes: 25 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ sysinfo = "0.30.5"
tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread", "process", "sync", "fs"] }
tokio-tungstenite = "0.23.1"
futures-util = "0.3.30"
futures-channel = "0.3.30"
flume = "0.11.0"

[build-dependencies]
vergen = { version = "8.3.1", features = ["build", "cargo", "git", "gitcl", "rustc", "si"] }
56 changes: 39 additions & 17 deletions worker/src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::{get_memory_bytes, Args};
use anyhow::Context;
use chrono::Local;
use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse};
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{SinkExt, StreamExt};
use flume::{unbounded, Receiver, Sender};
use futures_util::StreamExt;
use log::{error, info, warn};
use std::{
path::Path,
Expand All @@ -23,7 +23,7 @@ async fn get_output_logged(
args: &[&str],
cwd: &Path,
logs: &mut Vec<u8>,
mut tx: UnboundedSender<Message>,
tx: Sender<Message>,
) -> anyhow::Result<Output> {
let begin = Instant::now();
let msg = format!(
Expand All @@ -49,13 +49,13 @@ async fn get_output_logged(
let mut stdout_reader = BufReader::new(stdout).lines();
let stderr = output.stderr.take().context("Failed to get stderr")?;

let mut txc = tx.clone();
let txc = tx.clone();

let stderr_task = tokio::spawn(async move {
let mut res = vec![];
let mut stderr_reader = BufReader::new(stderr).lines();
while let Ok(Some(v)) = stderr_reader.next_line().await {
let _ = txc.send(Message::Text(v.clone())).await;
let _ = txc.clone().into_send_async(Message::Text(v.clone())).await;
res.push(v);
}

Expand All @@ -64,7 +64,7 @@ async fn get_output_logged(

let mut stdout_out = vec![];
while let Ok(Some(v)) = stdout_reader.next_line().await {
tx.send(Message::Text(v.clone())).await?;
tx.clone().into_send_async(Message::Text(v.clone())).await?;
stdout_out.push(v);
}

Expand Down Expand Up @@ -95,7 +95,7 @@ async fn run_logged_with_retry(
args: &[&str],
cwd: &Path,
logs: &mut Vec<u8>,
tx: UnboundedSender<Message>,
tx: Sender<Message>,
) -> anyhow::Result<bool> {
for i in 0..5 {
if i > 0 {
Expand Down Expand Up @@ -128,7 +128,7 @@ async fn build(
job: &WorkerPollResponse,
tree_path: &Path,
args: &Args,
tx: UnboundedSender<Message>,
tx: Sender<Message>,
) -> anyhow::Result<WorkerJobUpdateRequest> {
let begin = Instant::now();
let mut successful_packages = vec![];
Expand Down Expand Up @@ -352,13 +352,7 @@ async fn build(
Ok(result)
}

async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
let ws = &args.websocket;
let (ws_stream, _) = connect_async(ws).await?;
let (write, _) = ws_stream.split();

let (tx, rx) = unbounded();

async fn build_worker_inner(args: Args) -> anyhow::Result<()> {
let mut tree_path = args.ciel_path.clone();
tree_path.push("TREE");

Expand All @@ -377,8 +371,15 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
logical_cores: num_cpus::get() as i32,
};

tokio::spawn(async move { rx.map(Ok).forward(write) });
let argc = args.clone();

let (tx, rx) = unbounded();

tokio::spawn(async move {
websocket_connect(rx, argc).await;
});

let args = &args;
loop {
if let Some(job) = client
.post(format!("{}/api/worker/poll", args.server))
Expand Down Expand Up @@ -423,9 +424,30 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
pub async fn build_worker(args: Args) -> ! {
loop {
info!("Starting build worker");
if let Err(err) = build_worker_inner(&args).await {
if let Err(err) = build_worker_inner(args.clone()).await {
warn!("Got error running heartbeat worker: {}", err);
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
}

pub async fn websocket_connect(rx: Receiver<Message>, args: Args) -> ! {
let ws = &args.websocket;
loop {
info!("Starting websocket connect");
match connect_async(ws).await {
Ok((ws_stream, _)) => {
let (write, _) = ws_stream.split();
let rx_iter = rx.clone().into_stream();
if let Err(e) = rx_iter.map(Ok).forward(write).await {
warn!("{e}");
}
}
Err(err) => {
warn!("Got error connecting to websocket: {}", err);
}
}

tokio::time::sleep(Duration::from_secs(5)).await;
}
}
1 change: 0 additions & 1 deletion worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ async fn main() -> anyhow::Result<()> {
s.refresh_memory();

tokio::spawn(heartbeat_worker(args.clone()));

build_worker(args.clone()).await;
Ok(())
}

0 comments on commit 51af280

Please sign in to comment.