Skip to content

Commit

Permalink
add new_deferred_client() and deprecate new_promise_client()
Browse files Browse the repository at this point in the history
  • Loading branch information
dwrensha committed Sep 22, 2024
1 parent 47ee92a commit b959f35
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 21 deletions.
24 changes: 23 additions & 1 deletion capnp-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ where

/// Converts a promise for a client into a client that queues up any calls that arrive
/// before the promise resolves.
// TODO: figure out a better way to allow construction of promise clients.
#[deprecated(since = "0.20.2", note = "use `new_deferred_client()` instead")]
pub fn new_promise_client<T, F>(client_promise: F) -> T
where
T: ::capnp::capability::FromClientHook,
Expand All @@ -482,6 +482,28 @@ where
T::new(Box::new(queued_client))
}

/// Creates a `Client` from a future that resolves to a `Client`.
///
/// Any calls that arrive before the resolution are accumulated in a queue.
pub fn new_deferred_client<T>(
client_future: impl ::futures::Future<Output = Result<T, Error>> + 'static,
) -> T
where
T: ::capnp::capability::FromClientHook,
{
let mut queued_client = crate::queued::Client::new(None);
let weak_client = Rc::downgrade(&queued_client.inner);

queued_client.drive(client_future.then(move |r| {
if let Some(queued_inner) = weak_client.upgrade() {
crate::queued::ClientInner::resolve(&queued_inner, r.map(|c| c.into_client_hook()));
}
Promise::ok(())
}));

T::new(Box::new(queued_client))
}

struct SystemTaskReaper;
impl crate::task_set::TaskReaper<Error> for SystemTaskReaper {
fn task_failed(&mut self, error: Error) {
Expand Down
2 changes: 1 addition & 1 deletion capnp-rpc/src/queued.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl Client {

pub fn drive<F>(&mut self, promise: F)
where
F: Future<Output = Result<(), Error>> + 'static + Unpin,
F: Future<Output = Result<(), Error>> + 'static,
{
assert!(self.inner.borrow().promise_to_drive.is_none());
self.inner.borrow_mut().promise_to_drive = Some(Promise::from_future(promise).shared());
Expand Down
10 changes: 4 additions & 6 deletions capnp-rpc/test/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,9 +745,8 @@ impl test_promise_resolve::resolver::Server for ResolverImpl {
return Promise::err(Error::failed("no sender".into()));
};
let (snd, rcv) = oneshot::channel();
let _ = sender.send(capnp_rpc::new_promise_client(
rcv.map_err(|_| Error::failed("oneshot was canceled".to_string()))
.map_ok(|x: test_interface::Client| x.client),
let _ = sender.send(capnp_rpc::new_deferred_client(
rcv.map_err(|_| Error::failed("oneshot was canceled".to_string())),
));
self.sender = Some(snd);
Promise::ok(())
Expand Down Expand Up @@ -777,9 +776,8 @@ impl test_promise_resolve::Server for TestPromiseResolveImpl {
let (snd, rcv) = oneshot::channel();
let resolver = ResolverImpl { sender: Some(snd) };
let mut results_root = results.get();
results_root.set_cap(capnp_rpc::new_promise_client(
rcv.map_err(|_| Error::failed("oneshot was canceled".to_string()))
.map_ok(|x| x.client),
results_root.set_cap(capnp_rpc::new_deferred_client(
rcv.map_err(|_| Error::failed("oneshot was canceled".to_string())),
));
results_root.set_resolver(capnp_rpc::new_client(resolver));
Promise::ok(())
Expand Down
6 changes: 3 additions & 3 deletions capnp-rpc/test/reconnect_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::rc::Rc;
use capnp::capability::{Promise, Response};
use capnp::Error;
use capnp_rpc::{
auto_reconnect, lazy_auto_reconnect, new_client, new_promise_client, pry, rpc_twoparty_capnp,
auto_reconnect, lazy_auto_reconnect, new_client, new_deferred_client, pry, rpc_twoparty_capnp,
twoparty, RpcSystem,
};
use futures::channel::oneshot;
Expand Down Expand Up @@ -314,8 +314,8 @@ fn auto_reconnect_rpc_call() {
do_autoconnect_test(&mut pool, |c| {
b.set_interface(c);
let req = client.test_interface_request();
new_promise_client(req.send().promise.map(|resp| match resp {
Ok(resp) => Ok(resp.get()?.get_cap()?.client),
new_deferred_client(req.send().promise.map(|resp| match resp {
Ok(resp) => Ok(resp.get()?.get_cap()?),
Err(err) => Err(err),
}))
})
Expand Down
18 changes: 8 additions & 10 deletions capnp-rpc/test/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ fn promise_resolve() {

let (paf_fulfiller, paf_promise) = oneshot::channel();
let cap: crate::test_capnp::test_interface::Client =
::capnp_rpc::new_promise_client(paf_promise.map_err(canceled_to_error));
::capnp_rpc::new_deferred_client(paf_promise.map_err(canceled_to_error));
request.get().set_cap(cap.clone());
request2.get().set_cap(cap);

Expand All @@ -620,9 +620,7 @@ fn promise_resolve() {
let _response = client2.get_call_sequence_request().send().promise.await?;

let server = impls::TestInterface::new();
let _ = paf_fulfiller.send(
capnp_rpc::new_client::<crate::test_capnp::test_interface::Client, _>(server).client,
);
let _ = paf_fulfiller.send(capnp_rpc::new_client(server));

let response = promise.await?;
if response.get()?.get_s()? != "bar" {
Expand Down Expand Up @@ -815,7 +813,7 @@ fn dont_hold() {

let (fulfiller, promise) = oneshot::channel();
let cap: crate::test_capnp::test_interface::Client =
::capnp_rpc::new_promise_client(promise.map_err(canceled_to_error));
::capnp_rpc::new_deferred_client(promise.map_err(canceled_to_error));

let mut request = client.dont_hold_request();
request.get().set_cap(cap.clone());
Expand Down Expand Up @@ -913,7 +911,7 @@ fn embargo_error() {

let (fulfiller, promise) = oneshot::channel();
let cap: crate::test_capnp::test_call_order::Client =
::capnp_rpc::new_promise_client(promise.map_err(canceled_to_error));
::capnp_rpc::new_deferred_client(promise.map_err(canceled_to_error));

let client2: crate::test_capnp::test_call_order::Client = client.clone().cast_to();
let early_call = client2.get_call_sequence_request().send();
Expand Down Expand Up @@ -958,7 +956,7 @@ fn echo_destruction() {

let (fulfiller, promise) = oneshot::channel();
let cap: crate::test_capnp::test_call_order::Client =
::capnp_rpc::new_promise_client(promise.map_err(canceled_to_error));
::capnp_rpc::new_deferred_client(promise.map_err(canceled_to_error));

let client2: crate::test_capnp::test_call_order::Client = client.clone().cast_to();
let early_call = client2.get_call_sequence_request().send();
Expand Down Expand Up @@ -1104,15 +1102,15 @@ fn capability_server_set() {
// Also works if the client is a promise.
let (fulfiller, promise) = oneshot::channel();
let client_promise: test_interface::Client =
::capnp_rpc::new_promise_client(promise.map_err(canceled_to_error));
::capnp_rpc::new_deferred_client(promise.map_err(canceled_to_error));

let client_promise2: test_interface::Client = client_promise.clone();

let (error_fulfiller, error_promise) = oneshot::channel();
let error_promise: test_interface::Client =
::capnp_rpc::new_promise_client(error_promise.map_err(canceled_to_error));
::capnp_rpc::new_deferred_client(error_promise.map_err(canceled_to_error));

assert!(fulfiller.send(client1.client).is_ok());
assert!(fulfiller.send(client1).is_ok());
let own_server1_again2 =
futures::executor::block_on(set1.get_local_server(&client_promise)).unwrap();
assert_eq!(
Expand Down

0 comments on commit b959f35

Please sign in to comment.