Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Socket read/write cleanup #4112

Merged
merged 3 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 25 additions & 30 deletions src/shims/unix/linux_like/eventfd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,10 @@ impl FileDescription for Event {
return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
}

// eventfd read at the size of u64.
// Turn the pointer into a place at the right type.
let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty);

let weak_eventfd = self_ref.downgrade();
eventfd_read(buf_place, dest, weak_eventfd, ecx)
eventfd_read(buf_place, dest, self_ref, ecx)
}

/// A write call adds the 8-byte integer value supplied in
Expand Down Expand Up @@ -97,18 +96,10 @@ impl FileDescription for Event {
return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
}

// Read the user-supplied value from the pointer.
// Turn the pointer into a place at the right type.
let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty);
let num = ecx.read_scalar(&buf_place)?.to_u64()?;

// u64::MAX as input is invalid because the maximum value of counter is u64::MAX - 1.
if num == u64::MAX {
return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
}
// If the addition does not let the counter to exceed the maximum value, update the counter.
// Else, block.
let weak_eventfd = self_ref.downgrade();
eventfd_write(num, buf_place, dest, weak_eventfd, ecx)
eventfd_write(buf_place, dest, self_ref, ecx)
}

fn as_unix(&self) -> &dyn UnixFileDescription {
Expand Down Expand Up @@ -193,20 +184,22 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
/// Block thread if the value addition will exceed u64::MAX -1,
/// else just add the user-supplied value to current counter.
fn eventfd_write<'tcx>(
num: u64,
buf_place: MPlaceTy<'tcx>,
dest: &MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
eventfd_ref: &FileDescriptionRef,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
throw_unsup_format!("eventfd FD got closed while blocking.")
};

// Since we pass the weak file description ref, it is guaranteed to be
// an eventfd file description.
let eventfd = eventfd_ref.downcast::<Event>().unwrap();

// Figure out which value we should add.
let num = ecx.read_scalar(&buf_place)?.to_u64()?;
// u64::MAX as input is invalid because the maximum value of counter is u64::MAX - 1.
if num == u64::MAX {
return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
}

match eventfd.counter.get().checked_add(num) {
Some(new_count @ 0..=MAX_COUNTER) => {
// Future `read` calls will synchronize with this write, so update the FD clock.
Expand All @@ -219,7 +212,7 @@ fn eventfd_write<'tcx>(

// The state changed; we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(&eventfd_ref)?;
ecx.check_and_update_readiness(eventfd_ref)?;

// Unblock *all* threads previously blocked on `read`.
// We need to take out the blocked thread ids and unblock them together,
Expand All @@ -244,6 +237,7 @@ fn eventfd_write<'tcx>(

eventfd.blocked_write_tid.borrow_mut().push(ecx.active_thread());

let weak_eventfd = eventfd_ref.downgrade();
ecx.block_thread(
BlockReason::Eventfd,
None,
Expand All @@ -255,8 +249,10 @@ fn eventfd_write<'tcx>(
weak_eventfd: WeakFileDescriptionRef,
}
@unblock = |this| {
// When we get unblocked, try again.
eventfd_write(num, buf_place, &dest, weak_eventfd, this)
// When we get unblocked, try again. We know the ref is still valid,
// otherwise there couldn't be a `write` that unblocks us.
let eventfd_ref = weak_eventfd.upgrade().unwrap();
eventfd_write(buf_place, &dest, &eventfd_ref, this)
}
),
);
Expand All @@ -270,13 +266,9 @@ fn eventfd_write<'tcx>(
fn eventfd_read<'tcx>(
buf_place: MPlaceTy<'tcx>,
dest: &MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
eventfd_ref: &FileDescriptionRef,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
throw_unsup_format!("eventfd FD got closed while blocking.")
};

// Since we pass the weak file description ref to the callback function, it is guaranteed to be
// an eventfd file description.
let eventfd = eventfd_ref.downcast::<Event>().unwrap();
Expand All @@ -293,6 +285,7 @@ fn eventfd_read<'tcx>(

eventfd.blocked_read_tid.borrow_mut().push(ecx.active_thread());

let weak_eventfd = eventfd_ref.downgrade();
ecx.block_thread(
BlockReason::Eventfd,
None,
Expand All @@ -303,8 +296,10 @@ fn eventfd_read<'tcx>(
weak_eventfd: WeakFileDescriptionRef,
}
@unblock = |this| {
// When we get unblocked, try again.
eventfd_read(buf_place, &dest, weak_eventfd, this)
// When we get unblocked, try again. We know the ref is still valid,
// otherwise there couldn't be a `write` that unblocks us.
let eventfd_ref = weak_eventfd.upgrade().unwrap();
eventfd_read(buf_place, &dest, &eventfd_ref, this)
}
),
);
Expand All @@ -317,7 +312,7 @@ fn eventfd_read<'tcx>(

// The state changed; we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(&eventfd_ref)?;
ecx.check_and_update_readiness(eventfd_ref)?;

// Unblock *all* threads previously blocked on `write`.
// We need to take out the blocked thread ids and unblock them together,
Expand Down
158 changes: 72 additions & 86 deletions src/shims/unix/unnamed_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,26 +96,7 @@ impl FileDescription for AnonSocket {
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
// Always succeed on read size 0.
if len == 0 {
return ecx.return_read_success(ptr, &[], 0, dest);
}

let Some(readbuf) = &self.readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("reading from the write end of a pipe");
};

if readbuf.borrow().buf.is_empty() && self.is_nonblock {
// Non-blocking socketpair with writer and empty buffer.
// https://linux.die.net/man/2/read
// EAGAIN or EWOULDBLOCK can be returned for socket,
// POSIX.1-2001 allows either error to be returned for this case.
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}
anonsocket_read(self_ref.downgrade(), len, ptr, dest.clone(), ecx)
anonsocket_read(self_ref, len, ptr, dest, ecx)
}

fn write<'tcx>(
Expand All @@ -127,31 +108,7 @@ impl FileDescription for AnonSocket {
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
// Always succeed on write size 0.
// ("If count is zero and fd refers to a file other than a regular file, the results are not specified.")
if len == 0 {
return ecx.return_write_success(0, dest);
}

// We are writing to our peer's readbuf.
let Some(peer_fd) = self.peer_fd().upgrade() else {
// If the upgrade from Weak to Rc fails, it indicates that all read ends have been
// closed.
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, dest);
};

let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("writing to the reading end of a pipe");
};
let available_space =
MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
if available_space == 0 && self.is_nonblock {
// Non-blocking socketpair with a full buffer.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}
anonsocket_write(self_ref.downgrade(), ptr, len, dest.clone(), ecx)
anonsocket_write(self_ref, ptr, len, dest, ecx)
}

fn as_unix(&self) -> &dyn UnixFileDescription {
Expand All @@ -161,50 +118,65 @@ impl FileDescription for AnonSocket {

/// Write to AnonSocket based on the space available and return the written byte size.
fn anonsocket_write<'tcx>(
weak_self_ref: WeakFileDescriptionRef,
self_ref: &FileDescriptionRef,
ptr: Pointer,
len: usize,
dest: MPlaceTy<'tcx>,
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(self_ref) = weak_self_ref.upgrade() else {
// FIXME: We should raise a deadlock error if the self_ref upgrade failed.
throw_unsup_format!("This will be a deadlock error in future")
};
let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap();

// Always succeed on write size 0.
// ("If count is zero and fd refers to a file other than a regular file, the results are not specified.")
if len == 0 {
return ecx.return_write_success(0, dest);
}

// We are writing to our peer's readbuf.
let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() else {
// If the upgrade from Weak to Rc fails, it indicates that all read ends have been
// closed.
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, &dest);
// closed. It is an error to write even if there would be space.
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, dest);
};

let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
throw_unsup_format!("writing to the reading end of a pipe")
// Writing to the read end of a pipe.
return ecx.set_last_error_and_return(IoError::LibcError("EBADF"), dest);
};

// Let's see if we can write.
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());

if available_space == 0 {
// Blocking socketpair with a full buffer.
let dest = dest.clone();
self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread());
ecx.block_thread(
BlockReason::UnnamedSocket,
None,
callback!(
@capture<'tcx> {
weak_self_ref: WeakFileDescriptionRef,
ptr: Pointer,
len: usize,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
anonsocket_write(weak_self_ref, ptr, len, dest, this)
}
),
);
if self_anonsocket.is_nonblock {
// Non-blocking socketpair with a full buffer.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
} else {
// Blocking socketpair with a full buffer.
// Block the current thread; only keep a weak ref for this.
let weak_self_ref = self_ref.downgrade();
let dest = dest.clone();
self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread());
ecx.block_thread(
BlockReason::UnnamedSocket,
None,
callback!(
@capture<'tcx> {
weak_self_ref: WeakFileDescriptionRef,
ptr: Pointer,
len: usize,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
// If we got unblocked, then our peer successfully upgraded its weak
// ref to us. That means we can also upgrade our weak ref.
let self_ref = weak_self_ref.upgrade().unwrap();
anonsocket_write(&self_ref, ptr, len, &dest, this)
}
),
);
}
} else {
// There is space to write!
let mut writebuf = writebuf.borrow_mut();
// Remember this clock so `read` can synchronize with us.
ecx.release_clock(|clock| {
Expand All @@ -229,25 +201,26 @@ fn anonsocket_write<'tcx>(
ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
}

return ecx.return_write_success(actual_write_size, &dest);
return ecx.return_write_success(actual_write_size, dest);
}
interp_ok(())
}

/// Read from AnonSocket and return the number of bytes read.
fn anonsocket_read<'tcx>(
weak_self_ref: WeakFileDescriptionRef,
self_ref: &FileDescriptionRef,
len: usize,
ptr: Pointer,
dest: MPlaceTy<'tcx>,
dest: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(self_ref) = weak_self_ref.upgrade() else {
// FIXME: We should raise a deadlock error if the self_ref upgrade failed.
throw_unsup_format!("This will be a deadlock error in future")
};
let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap();

// Always succeed on read size 0.
if len == 0 {
return ecx.return_read_success(ptr, &[], 0, dest);
}

let Some(readbuf) = &self_anonsocket.readbuf else {
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
// corresponding ErrorKind variant.
Expand All @@ -258,10 +231,19 @@ fn anonsocket_read<'tcx>(
if self_anonsocket.peer_fd().upgrade().is_none() {
// Socketpair with no peer and empty buffer.
// 0 bytes successfully read indicates end-of-file.
return ecx.return_read_success(ptr, &[], 0, &dest);
return ecx.return_read_success(ptr, &[], 0, dest);
} else if self_anonsocket.is_nonblock {
// Non-blocking socketpair with writer and empty buffer.
// https://linux.die.net/man/2/read
// EAGAIN or EWOULDBLOCK can be returned for socket,
// POSIX.1-2001 allows either error to be returned for this case.
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
} else {
// Blocking socketpair with writer and empty buffer.
let weak_self_ref = weak_self_ref.clone();
// Block the current thread; only keep a weak ref for this.
let weak_self_ref = self_ref.downgrade();
let dest = dest.clone();
self_anonsocket.blocked_read_tid.borrow_mut().push(ecx.active_thread());
ecx.block_thread(
BlockReason::UnnamedSocket,
Expand All @@ -274,12 +256,16 @@ fn anonsocket_read<'tcx>(
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
anonsocket_read(weak_self_ref, len, ptr, dest, this)
// If we got unblocked, then our peer successfully upgraded its weak
// ref to us. That means we can also upgrade our weak ref.
let self_ref = weak_self_ref.upgrade().unwrap();
anonsocket_read(&self_ref, len, ptr, &dest, this)
}
),
);
}
} else {
// There's data to be read!
let mut bytes = vec![0; len];
let mut readbuf = readbuf.borrow_mut();
// Synchronize with all previous writes to this buffer.
Expand Down Expand Up @@ -313,7 +299,7 @@ fn anonsocket_read<'tcx>(
}
};

return ecx.return_read_success(ptr, &bytes, actual_read_size, &dest);
return ecx.return_read_success(ptr, &bytes, actual_read_size, dest);
}
interp_ok(())
}
Expand Down
Loading
Loading