Skip to content

Commit

Permalink
optimization: allow Finish message to be elided when not needed
Browse files Browse the repository at this point in the history
  • Loading branch information
dwrensha committed Sep 3, 2024
1 parent 70d22d5 commit 8fbd3d0
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 28 deletions.
68 changes: 41 additions & 27 deletions capnp-rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ where

/// The local QuestionRef, set to None when it is destroyed.
self_ref: Option<Weak<RefCell<QuestionRef<VatId>>>>,

/// If true, don't send a Finish message.
skip_finish: bool,
}

impl<VatId> Question<VatId> {
Expand All @@ -163,6 +166,7 @@ impl<VatId> Question<VatId> {
param_exports: Vec::new(),
is_tail_call: false,
self_ref: None,
skip_finish: false,
}
}
}
Expand Down Expand Up @@ -210,20 +214,22 @@ impl<VatId> Drop for QuestionRef<VatId> {
unreachable!()
};
if let Ok(ref mut c) = *self.connection_state.connection.borrow_mut() {
let mut message = c.new_outgoing_message(5);
{
let root: message::Builder = message.get_body().unwrap().init_as();
let mut builder = root.init_finish();
builder.set_question_id(self.id);

// If we're still awaiting a return, then this request is being
// canceled, and we're going to ignore any capabilities in the return
// message, so set releaseResultCaps true. If we already received the
// return, then we've already built local proxies for the caps and will
// send Release messages when those are destroyed.
builder.set_release_result_caps(q.is_awaiting_return);
if !q.skip_finish {
let mut message = c.new_outgoing_message(5);
{
let root: message::Builder = message.get_body().unwrap().init_as();
let mut builder = root.init_finish();
builder.set_question_id(self.id);

// If we're still awaiting a return, then this request is being
// canceled, and we're going to ignore any capabilities in the return
// message, so set releaseResultCaps true. If we already received the
// return, then we've already built local proxies for the caps and will
// send Release messages when those are destroyed.
builder.set_release_result_caps(q.is_awaiting_return);
}
let _ = message.send();
}
let _ = message.send();
}

if q.is_awaiting_return {
Expand Down Expand Up @@ -774,9 +780,10 @@ impl<VatId> ConnectionState<VatId> {
let answers_slots = &mut connection_state.answers.borrow_mut().slots;
match answers_slots.entry(answer_id) {
hash_map::Entry::Vacant(_) => {
return Err(Error::failed(format!(
"Invalid question ID {answer_id} in Finish message."
)));
// The `Finish` message targets a question ID that isn't present in our answer table.
// Probably, we sent a `Return` with `noFinishNeeded = true`, but the other side didn't
// recognize this hint and sent a `Finish` anyway, or the `Finish` was already in-flight at
// the time we sent the `Return`. We can silently ignore this.
}
hash_map::Entry::Occupied(mut entry) => {
let answer = entry.get_mut();
Expand Down Expand Up @@ -1008,6 +1015,9 @@ impl<VatId> ConnectionState<VatId> {
match questions.slots[question_id as usize] {
Some(ref mut question) => {
question.is_awaiting_return = false;
if ret.get_no_finish_needed() {
question.skip_finish = true;
}
match question.self_ref {
Some(ref question_ref) => match ret.which()? {
return_::Results(results) => {
Expand Down Expand Up @@ -1215,23 +1225,23 @@ impl<VatId> ConnectionState<VatId> {
let promised_answer = promised_answer?;
let question_id = promised_answer.get_question_id();

match self.answers.borrow().slots.get(&question_id) {
None => Err(Error::failed(
"PromisedAnswer.questionId is not a current question.".to_string(),
)),
let pipeline = match self.answers.borrow().slots.get(&question_id) {
None => Box::new(broken::Pipeline::new(Error::failed(
"Pipeline call on a request that returned no capabilities or was already closed.".to_string(),
))) as Box<dyn PipelineHook>,
Some(base) => {
let pipeline = match base.pipeline {
match base.pipeline {
Some(ref pipeline) => pipeline.add_ref(),
None => Box::new(broken::Pipeline::new(Error::failed(
"Pipeline call on a request that returned not capabilities or was \
already closed."
.to_string(),
))) as Box<dyn PipelineHook>,
};
let ops = to_pipeline_ops(promised_answer.get_transform()?)?;
Ok(pipeline.get_pipelined_cap(&ops))
}
}
}
};
let ops = to_pipeline_ops(promised_answer.get_transform()?)?;
Ok(pipeline.get_pipelined_cap(&ops))
}
}
}
Expand Down Expand Up @@ -2356,11 +2366,15 @@ impl ResultsDone {
(false, Ok(())) => {
let exports = {
let root: message::Builder = message.get_body()?.get_as()?;
let message::Return(ret) = root.which()? else {
let message::Return(Ok(mut ret)) = root.which()? else {
unreachable!()
};
if cap_table.is_empty() {
ret.set_no_finish_needed(true);
finish_received.set(true);
}
let crate::rpc_capnp::return_::Results(Ok(payload)) =
ret?.which()?
ret.which()?
else {
unreachable!()
};
Expand Down
2 changes: 1 addition & 1 deletion capnp-rpc/test/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ fn pipelining_return_null() {
let cap = request.send().pipeline.get_cap();
match cap.foo_request().send().promise.await {
Err(ref e) => {
if e.extra.contains("Message contains null capability pointer") {
if e.extra.contains("Pipeline call on a request that returned no capabilities") {
Ok(())
} else {
Err(Error::failed(format!(
Expand Down

0 comments on commit 8fbd3d0

Please sign in to comment.