diff --git a/Cargo.lock b/Cargo.lock index b9b3782c22..c8862bb41a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4010,6 +4010,7 @@ name = "moon_process" version = "0.0.1" dependencies = [ "cached", + "libc", "miette 7.4.0", "moon_args", "moon_common", diff --git a/crates/process/Cargo.toml b/crates/process/Cargo.toml index f3b7cc3f28..90f8b7773d 100644 --- a/crates/process/Cargo.toml +++ b/crates/process/Cargo.toml @@ -22,5 +22,8 @@ thiserror = { workspace = true } tracing = { workspace = true } tokio = { workspace = true, features = ["io-util"] } +[target.'cfg(unix)'.dependencies] +libc = "0.2.169" + [lints] workspace = true diff --git a/crates/process/src/exec_command.rs b/crates/process/src/exec_command.rs index 76d3bdb3c1..dd16a1a15a 100644 --- a/crates/process/src/exec_command.rs +++ b/crates/process/src/exec_command.rs @@ -1,5 +1,6 @@ use crate::command::Command; use crate::command_line::CommandLine; +use crate::output_stream::capture_stream; use crate::output_to_error; use crate::process_error::ProcessError; use moon_common::color; @@ -96,7 +97,7 @@ impl Command { Ok(output) } - pub async fn exec_stream_and_capture_output(&mut self) -> miette::Result { + pub async fn exec_stream_and_capture_output_old(&mut self) -> miette::Result { let (mut command, line) = self.create_async_command(); let mut child = command @@ -209,6 +210,105 @@ impl Command { Ok(output) } + pub async fn exec_stream_and_capture_output(&mut self) -> miette::Result { + let (mut command, line) = self.create_async_command(); + + let mut child = command + .stdin(if self.should_pass_stdin() { + Stdio::piped() + } else { + Stdio::inherit() + }) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + .map_err(|error| ProcessError::StreamCapture { + bin: self.get_bin_name(), + error: Box::new(error), + })?; + + self.current_id = child.id(); + + if self.should_pass_stdin() { + self.write_input_to_child(&mut child, &line).await?; + } + + // Stream and attempt to capture the output + let stderr = child.stderr.take().unwrap(); + let mut stderr_buffer = Vec::new(); + let mut stderr_pos = 0; + + let stdout = child.stdout.take().unwrap(); + let mut stdout_buffer = Vec::new(); + let mut stdout_pos = 0; + + let prefix = self.get_prefix(); + let console = self + .console + .as_ref() + .expect("A console is required when streaming output!"); + + capture_stream(stdout, stderr, &mut |is_out, data, eof| { + let (pos, buf) = if is_out { + (&mut stdout_pos, &mut stdout_buffer) + } else { + (&mut stderr_pos, &mut stderr_buffer) + }; + + let idx = if eof { + data.len() + } else { + match data[*pos..].iter().rposition(|b| *b == b'\n') { + Some(i) => *pos + i + 1, + None => { + *pos = data.len(); + return; + } + } + }; + + let new_lines = &data[..idx]; + + for line in String::from_utf8_lossy(new_lines).lines() { + let stream = if is_out { &console.out } else { &console.err }; + + let _ = if let Some(p) = &prefix { + stream.write_line_with_prefix(line.trim(), p) + } else { + stream.write_line(line.trim()) + }; + } + + buf.extend(new_lines); + data.drain(..idx); + *pos = 0; + }) + .await + .map_err(|error| ProcessError::StreamCapture { + bin: self.get_bin_name(), + error: Box::new(error), + })?; + + // Attempt to create the child output + let status = child + .wait() + .await + .map_err(|error| ProcessError::StreamCapture { + bin: self.get_bin_name(), + error: Box::new(error), + })?; + + let output = Output { + status, + stdout: stdout_buffer, + stderr: stderr_buffer, + }; + + self.handle_nonzero_status(&output, true)?; + + Ok(output) + } + fn create_async_command(&self) -> (AsyncCommand, CommandLine) { let command_line = self.create_command_line(); diff --git a/crates/process/src/lib.rs b/crates/process/src/lib.rs index 8521c6a1e3..77f3c7e1b7 100644 --- a/crates/process/src/lib.rs +++ b/crates/process/src/lib.rs @@ -2,6 +2,7 @@ mod command; mod command_line; mod exec_command; mod output; +mod output_stream; mod process_error; mod shell; diff --git a/crates/process/src/output_stream.rs b/crates/process/src/output_stream.rs new file mode 100644 index 0000000000..cfae225769 --- /dev/null +++ b/crates/process/src/output_stream.rs @@ -0,0 +1,203 @@ +// This code is copied from Cargo, but rewritten to support tokio/async. +// Original copyright belongs to them! +// https://github.com/rust-lang/cargo/blob/master/crates/cargo-util/src/read2.rs + +pub use self::imp::capture_stream; + +#[cfg(unix)] +mod imp { + use libc::{c_int, fcntl, F_GETFL, F_SETFL, O_NONBLOCK}; + use std::io; + use std::mem; + use std::os::unix::prelude::*; + use tokio::io::{AsyncBufReadExt, BufReader}; + use tokio::process::{ChildStderr, ChildStdout}; + + fn set_nonblock(fd: c_int) -> io::Result<()> { + let flags = unsafe { fcntl(fd, F_GETFL) }; + + if flags == -1 || unsafe { fcntl(fd, F_SETFL, flags | O_NONBLOCK) } == -1 { + return Err(io::Error::last_os_error()); + } + + Ok(()) + } + + pub async fn capture_stream( + out_pipe: ChildStdout, + err_pipe: ChildStderr, + data: &mut (dyn FnMut(bool, &mut Vec, bool) + Send), + ) -> io::Result<()> { + let out_fd = out_pipe.as_raw_fd(); + let err_fd = err_pipe.as_raw_fd(); + + set_nonblock(out_fd)?; + set_nonblock(err_fd)?; + + let mut out_buf = BufReader::new(out_pipe); + let mut err_buf = BufReader::new(err_pipe); + let mut out_done = false; + let mut err_done = false; + let mut out = String::new(); + let mut err = String::new(); + + let mut fds: [libc::pollfd; 2] = unsafe { mem::zeroed() }; + fds[0].fd = out_fd; + fds[0].events = libc::POLLIN; + fds[1].fd = err_fd; + fds[1].events = libc::POLLIN; + let mut nfds = 2; + let mut errfd = 1; + + while nfds > 0 { + // Wait for either pipe to become readable using `poll` + let r = unsafe { libc::poll(fds.as_mut_ptr(), nfds, -1) }; + + if r == -1 { + let err = io::Error::last_os_error(); + + if err.kind() == io::ErrorKind::Interrupted { + continue; + } + + return Err(err); + } + + // Read as much as we can from each pipe, ignoring EWOULDBLOCK or + // EAGAIN. If we hit EOF, then this will happen because the underlying + // reader will return Ok(0), in which case we'll see `Ok` ourselves. In + // this case we flip the other fd back into blocking mode and read + // whatever's leftover on that file descriptor. + let handle = |res: io::Result| match res { + Ok(size) => Ok(size == 0), + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + Ok(false) + } else { + Err(e) + } + } + }; + + if !err_done && fds[errfd].revents != 0 && handle(err_buf.read_line(&mut err).await)? { + err_done = true; + nfds -= 1; + } + + data(false, &mut mem::take(&mut err).into_bytes(), err_done); + + if !out_done && fds[0].revents != 0 && handle(out_buf.read_line(&mut out).await)? { + out_done = true; + fds[0].fd = err_fd; + errfd = 0; + nfds -= 1; + } + + data(true, &mut mem::take(&mut out).into_bytes(), out_done); + } + Ok(()) + } +} + +#[cfg(windows)] +mod imp { + use std::io; + use std::os::windows::prelude::*; + use std::process::{ChildStderr, ChildStdout}; + use std::slice; + + use miow::iocp::{CompletionPort, CompletionStatus}; + use miow::pipe::NamedPipe; + use miow::Overlapped; + use windows_sys::Win32::Foundation::ERROR_BROKEN_PIPE; + + struct Pipe<'a> { + dst: &'a mut Vec, + overlapped: Overlapped, + pipe: NamedPipe, + done: bool, + } + + pub fn read2( + out_pipe: ChildStdout, + err_pipe: ChildStderr, + data: &mut dyn FnMut(bool, &mut Vec, bool), + ) -> io::Result<()> { + let mut out = Vec::new(); + let mut err = Vec::new(); + + let port = CompletionPort::new(1)?; + port.add_handle(0, &out_pipe)?; + port.add_handle(1, &err_pipe)?; + + unsafe { + let mut out_pipe = Pipe::new(out_pipe, &mut out); + let mut err_pipe = Pipe::new(err_pipe, &mut err); + + out_pipe.read()?; + err_pipe.read()?; + + let mut status = [CompletionStatus::zero(), CompletionStatus::zero()]; + + while !out_pipe.done || !err_pipe.done { + for status in port.get_many(&mut status, None)? { + if status.token() == 0 { + out_pipe.complete(status); + data(true, out_pipe.dst, out_pipe.done); + out_pipe.read()?; + } else { + err_pipe.complete(status); + data(false, err_pipe.dst, err_pipe.done); + err_pipe.read()?; + } + } + } + + Ok(()) + } + } + + impl<'a> Pipe<'a> { + unsafe fn new(p: P, dst: &'a mut Vec) -> Pipe<'a> { + Pipe { + dst, + pipe: NamedPipe::from_raw_handle(p.into_raw_handle()), + overlapped: Overlapped::zero(), + done: false, + } + } + + unsafe fn read(&mut self) -> io::Result<()> { + let dst = slice_to_end(self.dst); + match self.pipe.read_overlapped(dst, self.overlapped.raw()) { + Ok(_) => Ok(()), + Err(e) => { + if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) { + self.done = true; + Ok(()) + } else { + Err(e) + } + } + } + } + + unsafe fn complete(&mut self, status: &CompletionStatus) { + let prev = self.dst.len(); + self.dst.set_len(prev + status.bytes_transferred() as usize); + if status.bytes_transferred() == 0 { + self.done = true; + } + } + } + + unsafe fn slice_to_end(v: &mut Vec) -> &mut [u8] { + if v.capacity() == 0 { + v.reserve(16); + } + if v.capacity() == v.len() { + v.reserve(1); + } + slice::from_raw_parts_mut(v.as_mut_ptr().add(v.len()), v.capacity() - v.len()) + } +}