From 0fdb86aedeea874062f1e2c8a8c5e765c2f5128a Mon Sep 17 00:00:00 2001 From: eatradish Date: Sun, 30 Jun 2024 23:37:51 +0800 Subject: [PATCH] feat(worker): use backoff to control retry --- Cargo.lock | 24 ++++++++++++++++++ worker/Cargo.toml | 1 + worker/src/build.rs | 55 +++++++++++++++++++---------------------- worker/src/heartbeat.rs | 16 ++++++------ worker/src/main.rs | 2 +- 5 files changed, 59 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4bce97e..08f567c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -343,6 +343,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom", + "instant", + "pin-project-lite", + "rand", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -2100,6 +2114,15 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -4703,6 +4726,7 @@ name = "worker" version = "0.1.0" dependencies = [ "anyhow", + "backoff", "chrono", "clap", "common", diff --git a/worker/Cargo.toml b/worker/Cargo.toml index 173fc5e..d9e3af4 100644 --- a/worker/Cargo.toml +++ b/worker/Cargo.toml @@ -24,6 +24,7 @@ tokio-tungstenite = { version = "0.21.0", features = ["rustls", "rustls-tls-nati futures-util = "0.3.30" flume = "0.11.0" tungstenite = { version = "0.21.0", features = ["rustls"] } +backoff = { version = "0.4", features = ["tokio"] } [build-dependencies] vergen = { version = "8.3.1", features = ["build", "cargo", "git", "gitcl", "rustc", "si"] } diff --git a/worker/src/build.rs b/worker/src/build.rs index 8062743..e83e851 100644 --- a/worker/src/build.rs +++ b/worker/src/build.rs @@ -1,12 +1,13 @@ use crate::{get_memory_bytes, Args}; +use backoff::ExponentialBackoff; use chrono::Local; use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse}; use flume::{unbounded, Receiver, Sender}; use futures_util::{future::try_join3, StreamExt}; use log::{error, info, warn}; -use reqwest::Url; +use reqwest::{Client, Url}; use std::{ - path::Path, + path::{Path, PathBuf}, process::{Output, Stdio}, time::{Duration, Instant}, }; @@ -386,7 +387,7 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { info!("Receiving new messages"); - let client = reqwest::Client::builder() + let client = Client::builder() .timeout(Duration::from_secs(30)) .build() .unwrap(); @@ -420,12 +421,12 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { } async fn poll_server( - client: reqwest::Client, + client: Client, args: &Args, req: WorkerPollRequest, - tree_path: std::path::PathBuf, + tree_path: PathBuf, tx: Sender, -) -> Result<(), anyhow::Error> { +) -> anyhow::Result<()> { loop { if let Some(job) = client .post(format!("{}/api/worker/poll", args.server)) @@ -467,32 +468,26 @@ async fn poll_server( } } -pub async fn build_worker(args: Args) -> ! { - loop { - info!("Starting build worker"); - if let Err(err) = build_worker_inner(&args).await { - warn!("Got error running heartbeat worker: {}", err); - } - tokio::time::sleep(Duration::from_secs(5)).await; - } +pub async fn build_worker(args: Args) -> anyhow::Result<()> { + backoff::future::retry(ExponentialBackoff::default(), || async { + info!("Starting build worker ..."); + Ok(build_worker_inner(&args).await?) + }) + .await } pub async fn websocket_connect(rx: Receiver, ws: Url) -> anyhow::Result<()> { - loop { - info!("Starting websocket connect to {:?}", ws); - match connect_async(ws.as_str()).await { - Ok((ws_stream, _)) => { - let (write, _) = ws_stream.split(); - let rx = rx.clone().into_stream(); - if let Err(e) = rx.map(Ok).forward(write).await { - warn!("{e}"); - } - } - Err(err) => { - warn!("Got error connecting to websocket: {}", err); - } - } + backoff::future::retry(ExponentialBackoff::default(), || async { + Ok(websocket_connect_inner(rx.clone(), ws.as_str()).await?) + }) + .await +} - tokio::time::sleep(Duration::from_secs(5)).await; - } +async fn websocket_connect_inner(rx: Receiver, ws: &str) -> anyhow::Result<()> { + info!("Starting websocket connect to {}", ws); + let (ws_stream, _) = connect_async(ws).await?; + let (write, _) = ws_stream.split(); + let rx = rx.clone().into_stream(); + + Ok(rx.map(Ok).forward(write).await?) } diff --git a/worker/src/heartbeat.rs b/worker/src/heartbeat.rs index 4a531ac..0780c62 100644 --- a/worker/src/heartbeat.rs +++ b/worker/src/heartbeat.rs @@ -1,4 +1,5 @@ use crate::{get_memory_bytes, Args}; +use backoff::ExponentialBackoff; use common::WorkerHeartbeatRequest; use log::{info, warn}; use std::{ @@ -52,13 +53,12 @@ pub async fn heartbeat_worker_inner(args: &Args) -> anyhow::Result<()> { } } -pub async fn heartbeat_worker(args: Args) -> ! { +pub async fn heartbeat_worker(args: Args) -> anyhow::Result<()> { tokio::spawn(internet_connectivity_worker()); - loop { - info!("Starting heartbeat worker"); - if let Err(err) = heartbeat_worker_inner(&args).await { - warn!("Got error running heartbeat worker: {}", err); - } - tokio::time::sleep(Duration::from_secs(5)).await; - } + + backoff::future::retry(ExponentialBackoff::default(), || async { + warn!("Retry send heartbeat ..."); + Ok(heartbeat_worker_inner(&args).await?) + }) + .await } diff --git a/worker/src/main.rs b/worker/src/main.rs index b6d87f9..19bd3e7 100644 --- a/worker/src/main.rs +++ b/worker/src/main.rs @@ -15,6 +15,6 @@ async fn main() -> anyhow::Result<()> { s.refresh_memory(); tokio::spawn(heartbeat_worker(args.clone())); - build_worker(args.clone()).await; + build_worker(args.clone()).await?; Ok(()) }