Skip to content

Commit

Permalink
Merge pull request #801 from microsoft/enhancement-catmem-pipe-state
Browse files Browse the repository at this point in the history
[catmem] States For Pipes
  • Loading branch information
ppenna authored Jun 30, 2023
2 parents e627515 + 242974c commit 3def8be
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 10 deletions.
67 changes: 57 additions & 10 deletions src/rust/catmem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@ use self::{
queue::CatmemQueue,
};
use crate::{
catmem::futures::{
close::{
close_coroutine,
push_eof,
catmem::{
futures::{
close::{
close_coroutine,
push_eof,
},
pop::pop_coroutine,
push::push_coroutine,
},
pop::pop_coroutine,
push::push_coroutine,
pipe::PipeState,
},
collections::shared_ring::SharedRingBuffer,
runtime::{
Expand Down Expand Up @@ -135,7 +138,16 @@ impl CatmemLibOS {
let mut qtable = self.qtable.borrow_mut();
match qtable.get_mut(&qd) {
Some(queue) => {
// Set pipe as closing.
let pipe: &mut Pipe = queue.get_mut_pipe();
pipe.set_state(PipeState::Closing);

queue.cancel_pending_ops(Fail::new(libc::ECANCELED, "this queue was closed"));

// Set pipe as closed.
let pipe: &mut Pipe = queue.get_mut_pipe();
pipe.set_state(PipeState::Closed);

qtable.free(&qd);
},
None => {
Expand All @@ -155,8 +167,15 @@ impl CatmemLibOS {
// Check if queue descriptor is valid.
match qtable.get_mut(&qd) {
Some(queue) => {
let pipe: &mut Pipe = queue.get_mut_pipe();

// Set pipe as closing.
pipe.set_state(PipeState::Closing);

let ring: Rc<SharedRingBuffer<u16>> = pipe.buffer();

// Attempt to push EoF.
let result: Result<(), Fail> = { push_eof(queue.get_pipe().buffer()) };
let result: Result<(), Fail> = { push_eof(ring) };
queue.cancel_pending_ops(Fail::new(libc::ECANCELED, "this queue was closed"));

// Release the queue descriptor, even if pushing EoF failed. This will prevent any further operations on the
Expand All @@ -180,7 +199,12 @@ impl CatmemLibOS {
// Check if queue descriptor is valid.
match qtable.get_mut(&qd) {
Some(queue) => {
let ring: Rc<SharedRingBuffer<u16>> = queue.get_pipe().buffer();
let pipe: &mut Pipe = queue.get_mut_pipe();

// Set pipe as closing.
pipe.set_state(PipeState::Closing);

let ring: Rc<SharedRingBuffer<u16>> = pipe.buffer();
let qtable_ptr: Rc<RefCell<IoQueueTable<CatmemQueue>>> = self.qtable.clone();
let yielder: Yielder = Yielder::new();
let coroutine: Pin<Box<Operation>> = Box::pin(async move {
Expand Down Expand Up @@ -254,7 +278,18 @@ impl CatmemLibOS {
// Issue push operation.
match self.qtable.borrow_mut().get_mut(&qd) {
Some(queue) => {
let pipe: &Pipe = queue.get_pipe();
let pipe: &mut Pipe = queue.get_mut_pipe();

// Check if the pipe is closing or closed.
if pipe.state() == PipeState::Closing {
let cause: String = format!("pipe is closing (qd={:?})", qd);
error!("push(): {}", cause);
return Err(Fail::new(libc::EBADF, &cause));
} else if pipe.state() == PipeState::Closed {
let cause: String = format!("pipe is closed (qd={:?})", qd);
error!("push(): {}", cause);
return Err(Fail::new(libc::EBADF, &cause));
}

// TODO: review the following code once that condition is enforced by the pipe abstraction.
// We do not check for EoF because pipes are unidirectional,
Expand Down Expand Up @@ -315,7 +350,19 @@ impl CatmemLibOS {
// Issue pop operation.
match self.qtable.borrow_mut().get_mut(&qd) {
Some(queue) => {
let pipe: &Pipe = queue.get_pipe();
let pipe: &mut Pipe = queue.get_mut_pipe();

// Check if the pipe is closing or closed.
if pipe.state() == PipeState::Closing {
let cause: String = format!("pipe is closing (qd={:?})", qd);
error!("push(): {}", cause);
return Err(Fail::new(libc::EBADF, &cause));
} else if pipe.state() == PipeState::Closed {
let cause: String = format!("pipe is closed (qd={:?})", qd);
error!("push(): {}", cause);
return Err(Fail::new(libc::EBADF, &cause));
}

let ring: Rc<SharedRingBuffer<u16>> = pipe.buffer();
let yielder: Yielder = Yielder::new();
let yielder_handle: YielderHandle = yielder.get_handle();
Expand Down
24 changes: 24 additions & 0 deletions src/rust/catmem/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,21 @@ use ::std::rc::Rc;
// Structures
//======================================================================================================================

/// Encodes the states of a pipe.
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum PipeState {
/// A pipe that is opened.
Opened,
/// A pipe that is closing.
Closing,
/// A pipe that is closed.
Closed,
}

/// A pipe.
pub struct Pipe {
/// State of the pipe.
state: PipeState,
/// Indicates end of file.
eof: bool,
/// Underlying buffer.
Expand All @@ -28,6 +41,7 @@ impl Pipe {
/// Creates a new pipe.
pub fn new(buffer: SharedRingBuffer<u16>) -> Self {
Self {
state: PipeState::Opened,
eof: false,
buffer: Rc::new(buffer),
}
Expand All @@ -43,6 +57,16 @@ impl Pipe {
self.eof
}

/// Gets the state of the pipe.
pub fn state(&self) -> PipeState {
self.state
}

/// Sets the state of the pipe.
pub fn set_state(&mut self, state: PipeState) {
self.state = state;
}

/// Gets a reference to the underlying buffer of the target pipe.
pub fn buffer(&self) -> Rc<SharedRingBuffer<u16>> {
self.buffer.clone()
Expand Down

0 comments on commit 3def8be

Please sign in to comment.