Skip to content

Commit

Permalink
Optimize tokio::child::unix::ChildImp::wait
Browse files Browse the repository at this point in the history
Try avoiding the blocking thread if possible.

Signed-off-by: Jiahao XU <[email protected]>
  • Loading branch information
NobodyXu committed Nov 12, 2023
1 parent 6c628a1 commit 5422b05
Showing 1 changed file with 25 additions and 15 deletions.
40 changes: 25 additions & 15 deletions src/tokio/child/unix.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
convert::TryInto,
io::{Error, Result},
ops::ControlFlow,
os::unix::process::ExitStatusExt,
process::ExitStatus,
};
Expand Down Expand Up @@ -69,7 +70,7 @@ impl ChildImp {
self.inner.id()
}

fn wait_imp(pgid: i32, flag: WaitPidFlag) -> Result<Option<ExitStatus>> {
fn wait_imp(pgid: i32, flag: WaitPidFlag) -> Result<ControlFlow<Option<ExitStatus>>> {
// Wait for processes in a loop until every process in this
// process group has exited (this ensures that we reap any
// zombies that may have been created if the parent exited after
Expand All @@ -85,14 +86,14 @@ impl ChildImp {
0 => {
// Zero should only happen if WNOHANG was passed in,
// and means that no processes have yet to exit.
return Ok(None);
return Ok(ControlFlow::Continue(()));
}
-1 => {
match Errno::last() {
Errno::ECHILD => {
// No more children to reap; this is a
// graceful exit.
return Ok(parent_exit_status);
return Ok(ControlFlow::Break(parent_exit_status));
}
errno => {
return Err(Error::from(errno));
Expand All @@ -115,24 +116,33 @@ impl ChildImp {
}

pub async fn wait(&mut self) -> Result<ExitStatus> {
if let Some(status) = self.try_wait()? {
return Ok(status);
}
const MAX_RETRY_ATTEMPT: usize = 10;

// Always wait for parent to exit first.
//
// It's likely that all its children has already exited and reaped by
// the time the parent exits.
let status = self.inner.wait().await?;

let pgid = self.pgid.as_raw();
match spawn_blocking(move || Self::wait_imp(pgid, WaitPidFlag::empty()))
.await?
.transpose()
{
None => self.inner.wait().await,
Some(status) => status,

// Try reaping all children, if there are some that are still alive after
// several attempts, then spawn a blocking task to reap them.
for retry_attempt in 1..=MAX_RETRY_ATTEMPT {
if Self::wait_imp(pgid, WaitPidFlag::WNOHANG)?.is_break() {
break;
} else if retry_attempt == MAX_RETRY_ATTEMPT {
spawn_blocking(move || Self::wait_imp(pgid, WaitPidFlag::empty())).await??;
}
}

Ok(status)
}

pub fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
match Self::wait_imp(self.pgid.as_raw(), WaitPidFlag::WNOHANG) {
Ok(None) => self.inner.try_wait(),
otherwise => otherwise,
match Self::wait_imp(self.pgid.as_raw(), WaitPidFlag::WNOHANG)? {
ControlFlow::Break(res) => Ok(res),
ControlFlow::Continue(()) => self.inner.try_wait(),
}
}
}
Expand Down

0 comments on commit 5422b05

Please sign in to comment.