diff --git a/capnp-rpc/src/rpc.rs b/capnp-rpc/src/rpc.rs index 85637309d..86d39af67 100644 --- a/capnp-rpc/src/rpc.rs +++ b/capnp-rpc/src/rpc.rs @@ -154,6 +154,9 @@ where /// The local QuestionRef, set to None when it is destroyed. self_ref: Option>>>, + + /// If true, don't send a Finish message. + skip_finish: bool, } impl Question { @@ -163,6 +166,7 @@ impl Question { param_exports: Vec::new(), is_tail_call: false, self_ref: None, + skip_finish: false, } } } @@ -210,20 +214,22 @@ impl Drop for QuestionRef { unreachable!() }; if let Ok(ref mut c) = *self.connection_state.connection.borrow_mut() { - let mut message = c.new_outgoing_message(5); - { - let root: message::Builder = message.get_body().unwrap().init_as(); - let mut builder = root.init_finish(); - builder.set_question_id(self.id); - - // If we're still awaiting a return, then this request is being - // canceled, and we're going to ignore any capabilities in the return - // message, so set releaseResultCaps true. If we already received the - // return, then we've already built local proxies for the caps and will - // send Release messages when those are destroyed. - builder.set_release_result_caps(q.is_awaiting_return); + if !q.skip_finish { + let mut message = c.new_outgoing_message(5); + { + let root: message::Builder = message.get_body().unwrap().init_as(); + let mut builder = root.init_finish(); + builder.set_question_id(self.id); + + // If we're still awaiting a return, then this request is being + // canceled, and we're going to ignore any capabilities in the return + // message, so set releaseResultCaps true. If we already received the + // return, then we've already built local proxies for the caps and will + // send Release messages when those are destroyed. + builder.set_release_result_caps(q.is_awaiting_return); + } + let _ = message.send(); } - let _ = message.send(); } if q.is_awaiting_return { @@ -774,9 +780,10 @@ impl ConnectionState { let answers_slots = &mut connection_state.answers.borrow_mut().slots; match answers_slots.entry(answer_id) { hash_map::Entry::Vacant(_) => { - return Err(Error::failed(format!( - "Invalid question ID {answer_id} in Finish message." - ))); + // The `Finish` message targets a question ID that isn't present in our answer table. + // Probably, we sent a `Return` with `noFinishNeeded = true`, but the other side didn't + // recognize this hint and sent a `Finish` anyway, or the `Finish` was already in-flight at + // the time we sent the `Return`. We can silently ignore this. } hash_map::Entry::Occupied(mut entry) => { let answer = entry.get_mut(); @@ -1008,6 +1015,9 @@ impl ConnectionState { match questions.slots[question_id as usize] { Some(ref mut question) => { question.is_awaiting_return = false; + if ret.get_no_finish_needed() { + question.skip_finish = true; + } match question.self_ref { Some(ref question_ref) => match ret.which()? { return_::Results(results) => { @@ -1215,23 +1225,23 @@ impl ConnectionState { let promised_answer = promised_answer?; let question_id = promised_answer.get_question_id(); - match self.answers.borrow().slots.get(&question_id) { - None => Err(Error::failed( - "PromisedAnswer.questionId is not a current question.".to_string(), - )), + let pipeline = match self.answers.borrow().slots.get(&question_id) { + None => Box::new(broken::Pipeline::new(Error::failed( + "Pipeline call on a request that returned no capabilities or was already closed.".to_string(), + ))) as Box, Some(base) => { - let pipeline = match base.pipeline { + match base.pipeline { Some(ref pipeline) => pipeline.add_ref(), None => Box::new(broken::Pipeline::new(Error::failed( "Pipeline call on a request that returned not capabilities or was \ already closed." .to_string(), ))) as Box, - }; - let ops = to_pipeline_ops(promised_answer.get_transform()?)?; - Ok(pipeline.get_pipelined_cap(&ops)) + } } - } + }; + let ops = to_pipeline_ops(promised_answer.get_transform()?)?; + Ok(pipeline.get_pipelined_cap(&ops)) } } } @@ -2356,11 +2366,15 @@ impl ResultsDone { (false, Ok(())) => { let exports = { let root: message::Builder = message.get_body()?.get_as()?; - let message::Return(ret) = root.which()? else { + let message::Return(Ok(mut ret)) = root.which()? else { unreachable!() }; + if cap_table.is_empty() { + ret.set_no_finish_needed(true); + finish_received.set(true); + } let crate::rpc_capnp::return_::Results(Ok(payload)) = - ret?.which()? + ret.which()? else { unreachable!() }; diff --git a/capnp-rpc/test/test.rs b/capnp-rpc/test/test.rs index f6990ccab..e4e4376b9 100644 --- a/capnp-rpc/test/test.rs +++ b/capnp-rpc/test/test.rs @@ -384,7 +384,7 @@ fn pipelining_return_null() { let cap = request.send().pipeline.get_cap(); match cap.foo_request().send().promise.await { Err(ref e) => { - if e.extra.contains("Message contains null capability pointer") { + if e.extra.contains("Pipeline call on a request that returned no capabilities") { Ok(()) } else { Err(Error::failed(format!(