Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into haoyu/sindri_tokio
Browse files Browse the repository at this point in the history
  • Loading branch information
lispc committed Oct 24, 2024
2 parents 6f21a8c + eae0217 commit d717a1f
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 2 deletions.
68 changes: 68 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ clap = { version = "4.5", features = ["derive"] }
ctor = "0.2.8"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
axum = "0.6.0"
6 changes: 6 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ pub struct Config {
pub coordinator: CoordinatorConfig,
pub l2geth: Option<L2GethConfig>,
pub prover: ProverConfig,
#[serde(default = "default_health_listener_addr")]
pub health_listener_addr: String,
}

fn default_health_listener_addr() -> String {
"0.0.0.0:80".to_string()
}

#[derive(Debug, Serialize, Deserialize, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion src/coordinator_handler/coordinator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl CoordinatorClient {
///
/// If the token is expired, `force_relogin` is set to `true`, or a login was never performed
/// before, it will authenticate and fetch a new token.
async fn get_token(&self, force_relogin: bool) -> anyhow::Result<String> {
pub async fn get_token(&self, force_relogin: bool) -> anyhow::Result<String> {
let token_guard = self.token.lock().await;

match *token_guard {
Expand Down
1 change: 1 addition & 0 deletions src/prover/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ impl ProverBuilder {
l2geth_client,
proving_service: self.proving_service.unwrap(),
n_workers: self.cfg.prover.n_workers,
health_listener_addr: self.cfg.health_listener_addr,
})
}
}
23 changes: 22 additions & 1 deletion src/prover/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ use crate::{
},
tracing_handler::L2gethClient,
};
use axum::{routing::get, Router};
use proving_service::{ProveRequest, QueryTaskRequest, TaskStatus};
use std::net::SocketAddr;
use std::str::FromStr;
use tokio::task::JoinSet;
use tokio::time::{sleep, Duration};
use tracing::{error, info, instrument};

pub use {builder::ProverBuilder, proving_service::ProvingService, types::*};

const WORKER_SLEEP_SEC: u64 = 20;
Expand All @@ -23,6 +27,7 @@ pub struct Prover {
l2geth_client: Option<L2gethClient>,
proving_service: Box<dyn ProvingService + Send + Sync>,
n_workers: usize,
health_listener_addr: String,
}

impl Prover {
Expand All @@ -32,6 +37,19 @@ impl Prover {
assert!(self.l2geth_client.is_some());
}

// Use the first coordinator client to test the connection
match self.coordinator_clients[0].get_token(true).await {
Ok(_) => {}
Err(e) => {
panic!("Failed to login: {:?}", e);
}
};

let app = Router::new().route("/", get(|| async { "OK" }));
let addr = SocketAddr::from_str(&self.health_listener_addr).expect("Failed to parse socket address");
let server = axum::Server::bind(&addr).serve(app.into_make_service());
let health_check_server_task = tokio::spawn(server);

let mut provers = JoinSet::new();
let self_arc = std::sync::Arc::new(self);
for i in 0..self_arc.n_workers {
Expand All @@ -41,7 +59,10 @@ impl Prover {
});
}

while provers.join_next().await.is_some() {}
tokio::select! {
_ = health_check_server_task => {},
_ = async { while provers.join_next().await.is_some() {} } => {},
}
}

#[instrument(skip(self))]
Expand Down

0 comments on commit d717a1f

Please sign in to comment.