Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Results::set_pipeline() #515

Merged
merged 5 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions capnp-rpc/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,19 @@ struct Results {
message: Option<message::Builder<message::HeapAllocator>>,
cap_table: Vec<Option<Box<dyn ClientHook>>>,
results_done_fulfiller: Option<oneshot::Sender<Box<dyn ResultsDoneHook>>>,
pipeline_sender: Option<crate::queued::PipelineInnerSender>,
}

impl Results {
fn new(fulfiller: oneshot::Sender<Box<dyn ResultsDoneHook>>) -> Self {
fn new(
fulfiller: oneshot::Sender<Box<dyn ResultsDoneHook>>,
pipeline_sender: crate::queued::PipelineInnerSender,
) -> Self {
Self {
message: Some(::capnp::message::Builder::new_default()),
cap_table: Vec::new(),
results_done_fulfiller: Some(fulfiller),
pipeline_sender: Some(pipeline_sender),
}
}
}
Expand Down Expand Up @@ -126,6 +131,25 @@ impl ResultsHook for Results {
}
}

fn set_pipeline(&mut self) -> capnp::Result<()> {
use ::capnp::traits::ImbueMut;
let root = self.get()?;
let size = root.target_size()?;
let mut message2 = capnp::message::Builder::new(
capnp::message::HeapAllocator::new().first_segment_words(size.word_count as u32 + 1),
);
let mut root2: capnp::any_pointer::Builder = message2.init_root();
let mut cap_table2 = vec![];
root2.imbue_mut(&mut cap_table2);
root2.set_as(root.into_reader())?;
let hook = Box::new(ResultsDone::new(message2, cap_table2)) as Box<dyn ResultsDoneHook>;
let Some(sender) = self.pipeline_sender.take() else {
return Err(Error::failed("set_pipeline() called twice".into()));
};
sender.complete(Box::new(Pipeline::new(hook)));
Ok(())
}

fn tail_call(self: Box<Self>, _request: Box<dyn RequestHook>) -> Promise<(), Error> {
unimplemented!()
}
Expand All @@ -147,12 +171,12 @@ struct ResultsDoneInner {
cap_table: Vec<Option<Box<dyn ClientHook>>>,
}

struct ResultsDone {
pub(crate) struct ResultsDone {
inner: Rc<ResultsDoneInner>,
}

impl ResultsDone {
fn new(
pub(crate) fn new(
message: message::Builder<message::HeapAllocator>,
cap_table: Vec<Option<Box<dyn ClientHook>>>,
) -> Self {
Expand Down Expand Up @@ -181,6 +205,8 @@ pub struct Request {
interface_id: u64,
method_id: u16,
client: Box<dyn ClientHook>,
pipeline: crate::queued::Pipeline,
pipeline_sender: crate::queued::PipelineInnerSender,
}

impl Request {
Expand All @@ -190,12 +216,15 @@ impl Request {
_size_hint: Option<::capnp::MessageSize>,
client: Box<dyn ClientHook>,
) -> Self {
let (pipeline_sender, pipeline) = crate::queued::Pipeline::new();
Self {
message: message::Builder::new_default(),
cap_table: Vec::new(),
interface_id,
method_id,
client,
pipeline,
pipeline_sender,
}
}
}
Expand All @@ -217,17 +246,17 @@ impl RequestHook for Request {
interface_id,
method_id,
client,
mut pipeline,
pipeline_sender,
} = tmp;
let params = Params::new(message, cap_table);

let (results_done_fulfiller, results_done_promise) =
oneshot::channel::<Box<dyn ResultsDoneHook>>();
let results_done_promise = results_done_promise.map_err(crate::canceled_to_error);
let results = Results::new(results_done_fulfiller);
let results = Results::new(results_done_fulfiller, pipeline_sender.weak_clone());
let promise = client.call(interface_id, method_id, Box::new(params), Box::new(results));

let (pipeline_sender, mut pipeline) = crate::queued::Pipeline::new();

let p = futures::future::try_join(promise, results_done_promise).and_then(
move |((), results_done_hook)| {
pipeline_sender
Expand Down
50 changes: 40 additions & 10 deletions capnp-rpc/src/queued.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ pub struct PipelineInner {

impl PipelineInner {
fn resolve(this: &Rc<RefCell<Self>>, result: Result<Box<dyn PipelineHook>, Error>) {
assert!(this.borrow().redirect.is_none());
if this.borrow().redirect.is_some() {
// Already resolved, probably by set_pipeline().
return;
}

let pipeline = match result {
Ok(pipeline_hook) => pipeline_hook,
Err(e) => Box::new(broken::Pipeline::new(e)),
Expand All @@ -66,18 +70,30 @@ impl PipelineInner {

pub struct PipelineInnerSender {
inner: Option<Weak<RefCell<PipelineInner>>>,
resolve_on_drop: bool,
}

impl PipelineInnerSender {
pub(crate) fn weak_clone(&self) -> Self {
Self {
inner: self.inner.clone(),
resolve_on_drop: false,
}
}
}

impl Drop for PipelineInnerSender {
fn drop(&mut self) {
if let Some(weak_queued) = self.inner.take() {
if let Some(pipeline_inner) = weak_queued.upgrade() {
PipelineInner::resolve(
&pipeline_inner,
Ok(Box::new(crate::broken::Pipeline::new(Error::failed(
"PipelineInnerSender was canceled".into(),
)))),
);
if self.resolve_on_drop {
if let Some(weak_queued) = self.inner.take() {
if let Some(pipeline_inner) = weak_queued.upgrade() {
PipelineInner::resolve(
&pipeline_inner,
Ok(Box::new(crate::broken::Pipeline::new(Error::failed(
"PipelineInnerSender was canceled".into(),
)))),
);
}
}
}
}
Expand Down Expand Up @@ -108,6 +124,7 @@ impl Pipeline {
(
PipelineInnerSender {
inner: Some(Rc::downgrade(&inner)),
resolve_on_drop: true,
},
Self { inner },
)
Expand Down Expand Up @@ -271,9 +288,22 @@ impl ClientHook for Client {
.attach(inner_clone)
.and_then(|x| x);

// We need to drive `promise_to_drive` until we have a result.
match self.inner.borrow().promise_to_drive {
Some(ref p) => {
Promise::from_future(futures::future::try_join(p.clone(), promise).map_ok(|v| v.1))
let p1 = p.clone();
Promise::from_future(async move {
match futures::future::select(p1, promise).await {
futures::future::Either::Left((Ok(()), promise)) => promise.await,
futures::future::Either::Left((Err(e), _)) => Err(e),
futures::future::Either::Right((r, _)) => {
// Don't bother waiting for `promise_to_drive` to resolve.
// If we're here because set_pipeline() was called, then
// `promise_to_drive` might in fact never resolve.
r
}
}
})
}
None => Promise::from_future(promise),
}
Expand Down
30 changes: 29 additions & 1 deletion capnp-rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,12 +938,15 @@ impl<VatId> ConnectionState<VatId> {

let (results_inner_fulfiller, results_inner_promise) = oneshot::channel();
let results_inner_promise = results_inner_promise.map_err(crate::canceled_to_error);

let (pipeline_sender, mut pipeline) = queued::Pipeline::new();
let results = Results::new(
&connection_state,
question_id,
redirect_results,
results_inner_fulfiller,
answer.received_finish.clone(),
Some(pipeline_sender.weak_clone()),
);

let (redirected_results_done_promise, redirected_results_done_fulfiller) =
Expand All @@ -965,7 +968,6 @@ impl<VatId> ConnectionState<VatId> {

let call_promise =
capability.call(interface_id, method_id, Box::new(params), Box::new(results));
let (pipeline_sender, mut pipeline) = queued::Pipeline::new();

let promise = call_promise
.then(move |call_result| {
Expand Down Expand Up @@ -2141,6 +2143,7 @@ where
redirect_results: bool,
answer_id: AnswerId,
finish_received: Rc<Cell<bool>>,
pipeline_sender: Option<queued::PipelineInnerSender>,
}

impl<VatId> ResultsInner<VatId>
Expand Down Expand Up @@ -2195,6 +2198,7 @@ where
redirect_results: bool,
fulfiller: oneshot::Sender<ResultsInner<VatId>>,
finish_received: Rc<Cell<bool>>,
pipeline_sender: Option<queued::PipelineInnerSender>,
) -> Self {
Self {
inner: Some(ResultsInner {
Expand All @@ -2203,6 +2207,7 @@ where
redirect_results,
answer_id,
finish_received,
pipeline_sender,
}),
results_done_fulfiller: Some(fulfiller),
}
Expand Down Expand Up @@ -2250,6 +2255,29 @@ impl<VatId> ResultsHook for Results<VatId> {
}
}

fn set_pipeline(&mut self) -> ::capnp::Result<()> {
use ::capnp::traits::ImbueMut;
let root = self.get()?;
let size = root.target_size()?;
let mut message2 = capnp::message::Builder::new(
capnp::message::HeapAllocator::new().first_segment_words(size.word_count as u32 + 1),
);
let mut root2: capnp::any_pointer::Builder = message2.init_root();
let mut cap_table2 = vec![];
root2.imbue_mut(&mut cap_table2);
root2.set_as(root.into_reader())?;
let hook =
Box::new(local::ResultsDone::new(message2, cap_table2)) as Box<dyn ResultsDoneHook>;
let Some(ref mut inner) = self.inner else {
unreachable!();
};
let Some(sender) = inner.pipeline_sender.take() else {
return Err(Error::failed("set_pipeline() called twice".into()));
};
sender.complete(Box::new(local::Pipeline::new(hook)));
Ok(())
}

fn tail_call(self: Box<Self>, _request: Box<dyn RequestHook>) -> Promise<(), Error> {
unimplemented!()
}
Expand Down
13 changes: 13 additions & 0 deletions capnp-rpc/test/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,19 @@ impl test_pipeline::Server for TestPipeline {
) -> Promise<(), Error> {
Promise::ok(())
}

fn get_cap_pipeline_only(
&mut self,
_params: test_pipeline::GetCapPipelineOnlyParams,
mut results: test_pipeline::GetCapPipelineOnlyResults,
) -> Promise<(), Error> {
results
.get()
.init_out_box()
.set_cap(capnp_rpc::new_client::<test_extends::Client, _>(TestExtends).cast_to());
pry!(results.set_pipeline());
Promise::from_future(::futures::future::pending())
}
}

#[derive(Default)]
Expand Down
3 changes: 3 additions & 0 deletions capnp-rpc/test/test.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ interface TestPipeline {
getNullCap @1 () -> (cap :TestInterface);
testPointers @2 (cap :TestInterface, obj :AnyPointer, list :List(TestInterface)) -> ();

getCapPipelineOnly @3 () -> (outBox :Box);
# Never returns, but uses setPipeline() to make the pipeline work.

struct Box {
cap @0 :TestInterface;

Expand Down
Loading
Loading