Skip to content

Commit

Permalink
add rpc_and_local_top_level to reduce code duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
dwrensha committed Sep 9, 2024
1 parent 216bff4 commit 6647fb3
Showing 1 changed file with 16 additions and 101 deletions.
117 changes: 16 additions & 101 deletions capnp-rpc/test/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ fn disconnector_disconnects_2() {
});
}

/// Sets up a test_capnp::bootstrap capability on the remote side of a
/// two-party RPC connection and provides a local reference to it, along
/// with a handle that supports spawning of tasks.
fn rpc_top_level<F, G>(main: F)
where
F: FnOnce(futures::executor::LocalSpawner, test_capnp::bootstrap::Client) -> G,
Expand Down Expand Up @@ -294,6 +297,17 @@ where
pool.run_until(main(spawner, client)).unwrap();
}

/// Runs both rpc_top_level() and local_top_level() on the function `main`.
fn rpc_and_local_top_level<F, G>(main: F)
where
F: FnOnce(futures::executor::LocalSpawner, test_capnp::bootstrap::Client) -> G,
F: Send + 'static + Clone,
G: Future<Output = Result<(), Error>> + 'static,
{
rpc_top_level(main.clone());
local_top_level(main);
}

#[test]
fn do_nothing() {
rpc_top_level(|_spawner, _client| async { Ok(()) });
Expand Down Expand Up @@ -338,7 +352,7 @@ fn basic_rpc_calls() {

#[test]
fn basic_pipelining() {
rpc_top_level(|_spawner, client| async move {
rpc_and_local_top_level(|_spawner, client| async move {
let response = client.test_pipeline_request().send().promise.await?;
let client = response.get()?.get_cap()?;

Expand Down Expand Up @@ -387,59 +401,6 @@ fn basic_pipelining() {
});
}

#[test]
fn basic_pipelining_local() {
// Like the basic_pipelining test, but no RPC connection -- everything is local.
local_top_level(|_spawner, client| async move {
let response = client.test_pipeline_request().send().promise.await?;
let client = response.get()?.get_cap()?;

let mut request = client.get_cap_request();

request.get().set_n(234);
let server = impls::TestInterface::new();
let chained_call_count = server.get_call_count();
request.get().set_in_cap(capnp_rpc::new_client(server));

let promise = request.send();

// This is just here to check that code generation for pipelines
// inside of groups works correctly.
let _ = promise.pipeline.get_out_box().get_foo().get_cap_in_group();

let mut pipeline_request = promise.pipeline.get_out_box().get_cap().foo_request();
pipeline_request.get().set_i(321);
let pipeline_promise = pipeline_request.send();

let pipeline_request2 = {
let extends_client = crate::test_capnp::test_extends::Client {
client: promise.pipeline.get_out_box().get_cap().client,
};
extends_client.grault_request()
};
let pipeline_promise2 = pipeline_request2.send();

drop(promise); // Just to be annoying, drop the original promise.

if chained_call_count.get() != 0 {
return Err(Error::failed(
"expected chained_call_count to equal 0".to_string(),
));
}

let response = pipeline_promise.promise.await?;

if response.get()?.get_x()? != "bar" {
return Err(Error::failed("expected x to equal 'bar'".to_string()));
}

let response2 = pipeline_promise2.promise.await?;
crate::test_util::CheckTestMessage::check_test_message(response2.get()?);
assert_eq!(chained_call_count.get(), 1);
Ok::<(), capnp::Error>(())
});
}

#[test]
fn pipelining_return_null() {
rpc_top_level(|_spawner, client| async move {
Expand Down Expand Up @@ -509,53 +470,7 @@ impl Future for WaitNTicks {
fn set_pipeline() {
use std::cell::Cell;
use std::rc::Rc;
rpc_top_level(|mut spawner, client| async move {
let response = client.test_pipeline_request().send().promise.await?;
let client = response.get()?.get_cap()?;
let capnp::capability::RemotePromise { promise, pipeline } =
client.get_cap_pipeline_only_request().send();
let promise_completed = Rc::new(Cell::new(false));
let promise_completed2 = promise_completed.clone();
spawn(
&mut spawner,
promise.map(move |_| {
promise_completed2.set(true);
Ok(())
}),
);

let mut pipeline_request = pipeline.get_out_box().get_cap().foo_request();
pipeline_request.get().set_i(321);
let pipeline_promise = pipeline_request.send().promise;

let pipeline_request2 = pipeline
.get_out_box()
.get_cap()
.cast_to::<test_capnp::test_extends::Client>()
.grault_request();
let pipeline_promise2 = pipeline_request2.send().promise;

let response = pipeline_promise.await?;
assert_eq!(response.get()?.get_x()?, "bar");

let response2 = pipeline_promise2.await?;
crate::test_util::CheckTestMessage::check_test_message(response2.get()?);

// Give the original promise an opportunity to complete.
WaitNTicks::new(5).await;

// The original promise never completed.
assert!(!promise_completed.get());
Ok(())
});
}

#[test]
fn set_pipeline_local() {
// like set_pipeline(), but with everything in the local vat.
use std::cell::Cell;
use std::rc::Rc;
local_top_level(|mut spawner, client| async move {
rpc_and_local_top_level(|mut spawner, client| async move {
let response = client.test_pipeline_request().send().promise.await?;
let client = response.get()?.get_cap()?;
let capnp::capability::RemotePromise { promise, pipeline } =
Expand Down

0 comments on commit 6647fb3

Please sign in to comment.