Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add callback support to FileDescription::read #4110

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions src/shims/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ impl<T: FileDescription + 'static> FileDescriptionExt for T {

pub type DynFileDescriptionRef = FileDescriptionRef<dyn FileDescription>;

/// Represents a dynamic callback for file I/O operations that is invoked upon completion.
/// The callback receives either the number of bytes successfully read (u64) or an IoError.
pub type DynFileDescriptionCallback<'tcx> = DynMachineCallback<'tcx, Result<u64, IoError>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DynFileOpFinishCallback would be a better name IMO.


impl FileDescriptionRef<dyn FileDescription> {
pub fn downcast<T: FileDescription + 'static>(self) -> Option<FileDescriptionRef<T>> {
let inner = self.into_rc_any().downcast::<FdIdWith<T>>().ok()?;
Expand All @@ -134,13 +138,14 @@ pub trait FileDescription: std::fmt::Debug + FileDescriptionExt {

/// Reads as much as possible into the given buffer `ptr`.
/// `len` indicates how many bytes we should try to read.
/// `dest` is where the return value should be stored: number of bytes read, or `-1` in case of error.
/// `finish` Callback to be invoked on operation completion with bytes read or error
#[allow(dead_code)]
fn read<'tcx>(
self: FileDescriptionRef<Self>,
_communicate_allowed: bool,
_ptr: Pointer,
_len: usize,
_dest: &MPlaceTy<'tcx>,
_finish: DynFileDescriptionCallback<'tcx>,
_ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
throw_unsup_format!("cannot read from {}", self.name());
Expand Down Expand Up @@ -207,18 +212,32 @@ impl FileDescription for io::Stdin {
communicate_allowed: bool,
ptr: Pointer,
len: usize,
dest: &MPlaceTy<'tcx>,
finish: DynFileDescriptionCallback<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let mut bytes = vec![0; len];
// First handle isolation mode check
if !communicate_allowed {
// We want isolation mode to be deterministic, so we have to disallow all reads, even stdin.
helpers::isolation_abort_error("`read` from stdin")?;
}
let result = Read::read(&mut &*self, &mut bytes);
match result {
Ok(read_size) => ecx.return_read_success(ptr, &bytes, read_size, dest),
Err(e) => ecx.set_last_error_and_return(e, dest),

let mut bytes = vec![0; len];

match Read::read(&mut &*self, &mut bytes) {
Ok(actual_read_size) => {
// Write the successfully read bytes to the destination pointer
ecx.write_bytes_ptr(ptr, bytes[..actual_read_size].iter().copied())?;

let Ok(read_size) = u64::try_from(actual_read_size) else {
throw_unsup_format!(
"Read operation returned size {} which exceeds maximum allowed value",
actual_read_size
)
};
Comment on lines +228 to +236
Copy link
Member

@RalfJung RalfJung Jan 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep using return_read_success; that function should now take the callback instead of dest.

The PR shouldn't change any of the logic nor add new logic; this kind of reshuffling of code just makes it unnecessarily hard to review. For instance, why did you move the let mut bytes around, or add a new comment in code that you didn't change?


finish.call(ecx, Ok(read_size))
}
Err(e) => finish.call(ecx, Err(e.into())),
}
}

Expand Down
27 changes: 23 additions & 4 deletions src/shims/unix/fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::io::ErrorKind;
use rustc_abi::Size;

use crate::helpers::check_min_arg_count;
use crate::shims::files::FileDescription;
use crate::shims::files::{DynFileDescriptionCallback, FileDescription};
use crate::shims::unix::linux_like::epoll::EpollReadyEvents;
use crate::shims::unix::*;
use crate::*;
Expand Down Expand Up @@ -203,7 +203,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
interp_ok(Scalar::from_i32(this.try_unwrap_io_result(result)?))
}

/// Read data from `fd` into buffer specified by `buf` and `count`.
/// Reads data from a file descriptor using callback-based completion.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change this comment?

///
/// If `offset` is `None`, reads data from current cursor position associated with `fd`
/// and updates cursor position on completion. Otherwise, reads from the specified offset
Expand Down Expand Up @@ -239,13 +239,32 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
return this.set_last_error_and_return(LibcError("EBADF"), dest);
};

trace!("read: FD mapped to {fd:?}");
trace!("Reading from FD {}, size {}, offset {:?}", fd_num, count, offset);
// We want to read at most `count` bytes. We are sure that `count` is not negative
// because it was a target's `usize`. Also we are sure that its smaller than
// `usize::MAX` because it is bounded by the host's `isize`.

// Clone the result destination for use in the completion callback
let result_destination = dest.clone();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please just call this dest as we usually do.


let completion_callback: DynFileDescriptionCallback<'tcx> = callback!(
@capture<'tcx> {
result_destination: MPlaceTy<'tcx>,
}
|this, read_result: Result<u64, IoError>| {
match read_result {
Ok(read_size) => {
this.write_int(read_size, &result_destination)
}
Err(_err_code) => {
this.set_last_error_and_return(LibcError("EIO"), &result_destination)
Comment on lines +259 to +260
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why EIO? And why are you ignoring err_code here?

}
}
}
);

match offset {
None => fd.read(communicate, buf, count, dest, this)?,
None => fd.read(communicate, buf, count, completion_callback, this)?,
Some(offset) => {
let Ok(offset) = u64::try_from(offset) else {
return this.set_last_error_and_return(LibcError("EINVAL"), dest);
Expand Down
146 changes: 135 additions & 11 deletions src/shims/unix/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use rustc_data_structures::fx::FxHashMap;

use self::shims::time::system_time_to_duration;
use crate::helpers::check_min_arg_count;
use crate::shims::files::{EvalContextExt as _, FileDescription, FileDescriptionRef};
use crate::shims::files::{
DynFileDescriptionCallback, EvalContextExt as _, FileDescription, FileDescriptionRef,
WeakFileDescriptionRef,
};
use crate::shims::os_str::bytes_to_os_str;
use crate::shims::unix::fd::{FlockOp, UnixFileDescription};
use crate::*;
Expand All @@ -23,6 +26,92 @@ use crate::*;
struct FileHandle {
file: File,
writable: bool,
/// Mutex for synchronizing file access across threads.
file_lock: MutexRef,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh, what is this? No mutex should be added, please undo all that.

}

impl VisitProvenance for FileHandle {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
// No provenance tracking needed for FileHandle as it contains no references.
// This implementation satisfies the trait requirement but performs no operations.
}
}

impl FileHandle {
/// Creates a new FileHandle with specified permissions and synchronization primitive.
fn new(file: File, writable: bool, file_lock: MutexRef) -> Self {
Self { file, writable, file_lock }
}

/// Attempts to create a clone of the file handle while preserving all attributes.
///
/// # Errors
/// Returns an `InterpResult` error if file handle cloning fails.
fn try_clone<'tcx>(&self) -> InterpResult<'tcx, FileHandle> {
let cloned_file = self
.file
.try_clone()
.map_err(|e| err_unsup_format!("Failed to clone file handle: {}", e))?;

interp_ok(FileHandle {
file: cloned_file,
writable: self.writable,
file_lock: self.file_lock.clone(),
})
}

/// Performs a synchronized file read with callback completion.
fn perform_read<'tcx>(
this: &mut MiriInterpCx<'tcx>,
finish: DynFileDescriptionCallback<'tcx>,
mut file_handle: FileHandle,
weak_fd: WeakFileDescriptionRef<FileHandle>,
buffer_ptr: Pointer,
length: usize,
) -> InterpResult<'tcx> {
this.mutex_lock(&file_handle.file_lock);

let result = {
// Verify file descriptor is still valid.
if weak_fd.upgrade().is_none() {
throw_unsup_format!("file got closed while blocking")
}

let mut bytes = vec![0; length];
let read_result = file_handle.file.read(&mut bytes);

// Handle the read result.
match read_result {
Ok(read_size) => {
// Write the bytes to memory.
if let Err(err_code) = this
.write_bytes_ptr(buffer_ptr, bytes[..read_size].iter().copied())
.report_err()
{
throw_unsup_format!(
"Memory write failed during file read operation: {:#?}",
err_code
)
}

let Ok(read_size) = u64::try_from(read_size) else {
throw_unsup_format!(
"Read operation returned size {} which exceeds maximum allowed value",
read_size
)
};

finish.call(this, Ok(read_size))
}
Err(err_code) => finish.call(this, Err(err_code.into())),
}
};

// Always unlock the mutex, even if the read operation failed.
this.mutex_unlock(&file_handle.file_lock)?;

result
}
}

impl FileDescription for FileHandle {
Expand All @@ -35,15 +124,42 @@ impl FileDescription for FileHandle {
communicate_allowed: bool,
ptr: Pointer,
len: usize,
dest: &MPlaceTy<'tcx>,
finish: DynFileDescriptionCallback<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let this = ecx;
assert!(communicate_allowed, "isolation should have prevented even opening a file");
let mut bytes = vec![0; len];
let result = (&mut &self.file).read(&mut bytes);
match result {
Ok(read_size) => ecx.return_read_success(ptr, &bytes, read_size, dest),
Err(e) => ecx.set_last_error_and_return(e, dest),

// Clone the underlying File.
let clone_file_handle = match self.try_clone().report_err() {
Ok(handle) => handle,
Err(ec) => throw_unsup_format!("unable to clone file discp {:#?}", ec),
};

let weak_fd = FileDescriptionRef::downgrade(&self);

if this.mutex_is_locked(&self.file_lock) {
this.block_thread(
BlockReason::Mutex,
None,
callback!(
@capture<'tcx> {
finish: DynFileDescriptionCallback<'tcx>,
clone_file_handle: FileHandle,
weak_fd: WeakFileDescriptionRef<FileHandle>,
ptr: Pointer,
len: usize,
}
|this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::Ready);
FileHandle::perform_read(this, finish, clone_file_handle, weak_fd, ptr, len)
}
),
);

unreachable!()
} else {
FileHandle::perform_read(this, finish, clone_file_handle, weak_fd, ptr, len)
}
}

Expand Down Expand Up @@ -584,9 +700,13 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
return this.set_last_error_and_return_i32(ErrorKind::PermissionDenied);
}

let fd = options
.open(path)
.map(|file| this.machine.fds.insert_new(FileHandle { file, writable }));
let fd = options.open(path).map(|file| {
this.machine.fds.insert_new(FileHandle::new(
file,
writable,
this.machine.sync.mutex_create(),
))
});

interp_ok(Scalar::from_i32(this.try_unwrap_io_result(fd)?))
}
Expand Down Expand Up @@ -1645,7 +1765,11 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {

match file {
Ok(f) => {
let fd = this.machine.fds.insert_new(FileHandle { file: f, writable: true });
let fd = this.machine.fds.insert_new(FileHandle::new(
f,
true,
this.machine.sync.mutex_create(),
));
return interp_ok(Scalar::from_i32(fd));
}
Err(e) =>
Expand Down
Loading
Loading