Skip to content

Commit

Permalink
fix: await oneshot instead of handler join when waiting workers
Browse files Browse the repository at this point in the history
  • Loading branch information
ihciah committed Sep 8, 2023
1 parent 176dcd9 commit 64e78a4
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 7 deletions.
2 changes: 1 addition & 1 deletion monolake-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ openid = []
proxy-protocol = []

[dependencies]
monoio = { workspace = true, features = ['splice'] }
monoio = { workspace = true, features = ["splice", "sync"] }
monoio-http = { workspace = true }
service-async = { workspace = true }

Expand Down
15 changes: 12 additions & 3 deletions monolake-core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ where
/// Start workers according to runtime config.
/// Threads JoinHandle will be returned and each factory Sender will
/// be saved for config updating.
pub fn spawn_workers<A>(&mut self) -> Vec<std::thread::JoinHandle<()>>
pub fn spawn_workers<A>(
&mut self,
) -> Vec<(
std::thread::JoinHandle<()>,
futures_channel::oneshot::Sender<()>,
)>
where
Command<F, LF>: Execute<A, F::Service>,
{
Expand All @@ -45,6 +50,7 @@ where
.map(|worker_id| {
let (tx, rx) = channel(128);
let runtime_config = runtime_config.clone();
let (finish_tx, mut finish_rx) = futures_channel::oneshot::channel::<()>();
let handler = std::thread::Builder::new()
.name(format!("monolake-worker-{worker_id}"))
.spawn(move || {
Expand All @@ -56,11 +62,14 @@ where
}
}
let mut runtime = RuntimeWrapper::from(runtime_config.as_ref());
runtime.block_on(worker_controller.run_controller(rx));
runtime.block_on(async move {
worker_controller.run_controller(rx).await;
finish_rx.close();
});
})
.expect("start worker thread {worker_id} failed");
self.workers.push(tx);
handler
(handler, finish_tx)
})
.collect()
}
Expand Down
6 changes: 3 additions & 3 deletions monolake/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ async fn main() -> Result<()> {

// TODO(ihciah): run update task or api server to do config update, maybe in xDS protocol
// Wait for workers
join_handlers.into_iter().for_each(|h| {
h.join().unwrap();
});
for (_, mut close) in join_handlers.into_iter() {
close.cancellation().await;
}
Ok(())
}

0 comments on commit 64e78a4

Please sign in to comment.