Skip to content

Commit

Permalink
Add new capture stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
milesj committed Jan 12, 2025
1 parent 3d95a02 commit b8c65b0
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/process/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,8 @@ thiserror = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true, features = ["io-util"] }

[target.'cfg(unix)'.dependencies]
libc = "0.2.169"

[lints]
workspace = true
102 changes: 101 additions & 1 deletion crates/process/src/exec_command.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::command::Command;
use crate::command_line::CommandLine;
use crate::output_stream::capture_stream;
use crate::output_to_error;
use crate::process_error::ProcessError;
use moon_common::color;
Expand Down Expand Up @@ -96,7 +97,7 @@ impl Command {
Ok(output)
}

pub async fn exec_stream_and_capture_output(&mut self) -> miette::Result<Output> {
pub async fn exec_stream_and_capture_output_old(&mut self) -> miette::Result<Output> {
let (mut command, line) = self.create_async_command();

let mut child = command
Expand Down Expand Up @@ -209,6 +210,105 @@ impl Command {
Ok(output)
}

pub async fn exec_stream_and_capture_output(&mut self) -> miette::Result<Output> {
let (mut command, line) = self.create_async_command();

let mut child = command
.stdin(if self.should_pass_stdin() {
Stdio::piped()
} else {
Stdio::inherit()
})
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.map_err(|error| ProcessError::StreamCapture {
bin: self.get_bin_name(),
error: Box::new(error),
})?;

self.current_id = child.id();

if self.should_pass_stdin() {
self.write_input_to_child(&mut child, &line).await?;
}

// Stream and attempt to capture the output
let stderr = child.stderr.take().unwrap();
let mut stderr_buffer = Vec::new();
let mut stderr_pos = 0;

let stdout = child.stdout.take().unwrap();
let mut stdout_buffer = Vec::new();
let mut stdout_pos = 0;

let prefix = self.get_prefix();
let console = self
.console
.as_ref()
.expect("A console is required when streaming output!");

capture_stream(stdout, stderr, &mut |is_out, data, eof| {
let (pos, buf) = if is_out {
(&mut stdout_pos, &mut stdout_buffer)
} else {
(&mut stderr_pos, &mut stderr_buffer)
};

let idx = if eof {
data.len()
} else {
match data[*pos..].iter().rposition(|b| *b == b'\n') {
Some(i) => *pos + i + 1,
None => {
*pos = data.len();
return;
}
}
};

let new_lines = &data[..idx];

for line in String::from_utf8_lossy(new_lines).lines() {
let stream = if is_out { &console.out } else { &console.err };

let _ = if let Some(p) = &prefix {
stream.write_line_with_prefix(line.trim(), p)
} else {
stream.write_line(line.trim())
};
}

buf.extend(new_lines);
data.drain(..idx);
*pos = 0;
})
.await
.map_err(|error| ProcessError::StreamCapture {
bin: self.get_bin_name(),
error: Box::new(error),
})?;

// Attempt to create the child output
let status = child
.wait()
.await
.map_err(|error| ProcessError::StreamCapture {
bin: self.get_bin_name(),
error: Box::new(error),
})?;

let output = Output {
status,
stdout: stdout_buffer,
stderr: stderr_buffer,
};

self.handle_nonzero_status(&output, true)?;

Ok(output)
}

fn create_async_command(&self) -> (AsyncCommand, CommandLine) {
let command_line = self.create_command_line();

Expand Down
1 change: 1 addition & 0 deletions crates/process/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod command;
mod command_line;
mod exec_command;
mod output;
mod output_stream;
mod process_error;
mod shell;

Expand Down
203 changes: 203 additions & 0 deletions crates/process/src/output_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// This code is copied from Cargo, but rewritten to support tokio/async.
// Original copyright belongs to them!
// https://github.com/rust-lang/cargo/blob/master/crates/cargo-util/src/read2.rs

pub use self::imp::capture_stream;

#[cfg(unix)]
mod imp {
use libc::{c_int, fcntl, F_GETFL, F_SETFL, O_NONBLOCK};
use std::io;
use std::mem;
use std::os::unix::prelude::*;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{ChildStderr, ChildStdout};

fn set_nonblock(fd: c_int) -> io::Result<()> {
let flags = unsafe { fcntl(fd, F_GETFL) };

if flags == -1 || unsafe { fcntl(fd, F_SETFL, flags | O_NONBLOCK) } == -1 {
return Err(io::Error::last_os_error());
}

Ok(())
}

pub async fn capture_stream(
out_pipe: ChildStdout,
err_pipe: ChildStderr,
data: &mut (dyn FnMut(bool, &mut Vec<u8>, bool) + Send),
) -> io::Result<()> {
let out_fd = out_pipe.as_raw_fd();
let err_fd = err_pipe.as_raw_fd();

set_nonblock(out_fd)?;
set_nonblock(err_fd)?;

let mut out_buf = BufReader::new(out_pipe);
let mut err_buf = BufReader::new(err_pipe);
let mut out_done = false;
let mut err_done = false;
let mut out = String::new();
let mut err = String::new();

let mut fds: [libc::pollfd; 2] = unsafe { mem::zeroed() };
fds[0].fd = out_fd;
fds[0].events = libc::POLLIN;
fds[1].fd = err_fd;
fds[1].events = libc::POLLIN;
let mut nfds = 2;
let mut errfd = 1;

while nfds > 0 {
// Wait for either pipe to become readable using `poll`
let r = unsafe { libc::poll(fds.as_mut_ptr(), nfds, -1) };

if r == -1 {
let err = io::Error::last_os_error();

if err.kind() == io::ErrorKind::Interrupted {
continue;
}

return Err(err);
}

// Read as much as we can from each pipe, ignoring EWOULDBLOCK or
// EAGAIN. If we hit EOF, then this will happen because the underlying
// reader will return Ok(0), in which case we'll see `Ok` ourselves. In
// this case we flip the other fd back into blocking mode and read
// whatever's leftover on that file descriptor.
let handle = |res: io::Result<usize>| match res {
Ok(size) => Ok(size == 0),
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
Ok(false)
} else {
Err(e)
}
}
};

if !err_done && fds[errfd].revents != 0 && handle(err_buf.read_line(&mut err).await)? {
err_done = true;
nfds -= 1;
}

data(false, &mut mem::take(&mut err).into_bytes(), err_done);

if !out_done && fds[0].revents != 0 && handle(out_buf.read_line(&mut out).await)? {
out_done = true;
fds[0].fd = err_fd;
errfd = 0;
nfds -= 1;
}

data(true, &mut mem::take(&mut out).into_bytes(), out_done);
}
Ok(())
}
}

#[cfg(windows)]
mod imp {
use std::io;
use std::os::windows::prelude::*;
use std::process::{ChildStderr, ChildStdout};
use std::slice;

use miow::iocp::{CompletionPort, CompletionStatus};
use miow::pipe::NamedPipe;
use miow::Overlapped;
use windows_sys::Win32::Foundation::ERROR_BROKEN_PIPE;

struct Pipe<'a> {
dst: &'a mut Vec<u8>,
overlapped: Overlapped,
pipe: NamedPipe,
done: bool,
}

pub fn read2(
out_pipe: ChildStdout,
err_pipe: ChildStderr,
data: &mut dyn FnMut(bool, &mut Vec<u8>, bool),
) -> io::Result<()> {
let mut out = Vec::new();
let mut err = Vec::new();

let port = CompletionPort::new(1)?;
port.add_handle(0, &out_pipe)?;
port.add_handle(1, &err_pipe)?;

unsafe {
let mut out_pipe = Pipe::new(out_pipe, &mut out);
let mut err_pipe = Pipe::new(err_pipe, &mut err);

out_pipe.read()?;
err_pipe.read()?;

let mut status = [CompletionStatus::zero(), CompletionStatus::zero()];

while !out_pipe.done || !err_pipe.done {
for status in port.get_many(&mut status, None)? {
if status.token() == 0 {
out_pipe.complete(status);
data(true, out_pipe.dst, out_pipe.done);
out_pipe.read()?;
} else {
err_pipe.complete(status);
data(false, err_pipe.dst, err_pipe.done);
err_pipe.read()?;
}
}
}

Ok(())
}
}

impl<'a> Pipe<'a> {
unsafe fn new<P: IntoRawHandle>(p: P, dst: &'a mut Vec<u8>) -> Pipe<'a> {
Pipe {
dst,
pipe: NamedPipe::from_raw_handle(p.into_raw_handle()),
overlapped: Overlapped::zero(),
done: false,
}
}

unsafe fn read(&mut self) -> io::Result<()> {
let dst = slice_to_end(self.dst);
match self.pipe.read_overlapped(dst, self.overlapped.raw()) {
Ok(_) => Ok(()),
Err(e) => {
if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) {
self.done = true;
Ok(())
} else {
Err(e)
}
}
}
}

unsafe fn complete(&mut self, status: &CompletionStatus) {
let prev = self.dst.len();
self.dst.set_len(prev + status.bytes_transferred() as usize);
if status.bytes_transferred() == 0 {
self.done = true;
}
}
}

unsafe fn slice_to_end(v: &mut Vec<u8>) -> &mut [u8] {
if v.capacity() == 0 {
v.reserve(16);
}
if v.capacity() == v.len() {
v.reserve(1);
}
slice::from_raw_parts_mut(v.as_mut_ptr().add(v.len()), v.capacity() - v.len())
}
}

0 comments on commit b8c65b0

Please sign in to comment.