Skip to content

Commit

Permalink
Implement streaming
Browse files Browse the repository at this point in the history
Updates code generation for methods declared with `-> stream`.
For Server traits, these methods have no `Results` parameter.
For `Client` objects, the methods go through a new
`StreamingRequest` object.

Twoparty RPC connections get a fixed window size that can
be configured via `twoparty::VatNetwork::set_window_size()`.

The implementation closely follows capnproto-c++. One important
difference: in capnproto-c++ streaming methods
are delivered one-by-one, without overlap. In Rust, overlapping
method calls are less error-prone, so we don't bother with
that extra built-in logic.

Adds a new example `streaming` that performs streaming method calls
on a capability returned early via set_pipeline().
  • Loading branch information
dwrensha committed Sep 16, 2024
1 parent 418876d commit be75e95
Show file tree
Hide file tree
Showing 23 changed files with 948 additions and 64 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ members = [
"capnp-rpc/examples/hello-world",
"capnp-rpc/examples/calculator",
"capnp-rpc/examples/pubsub",
"capnp-rpc/examples/streaming",
"capnp-rpc/test",
"example/addressbook",
"example/addressbook_send",
Expand Down
24 changes: 24 additions & 0 deletions capnp-rpc/examples/streaming/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "streaming"
version = "0.1.0"
edition = "2021"

build = "build.rs"

[[bin]]
name = "streaming"
path = "main.rs"

[build-dependencies]
capnpc = { path = "../../../capnpc" }

[dependencies]
capnp = { path = "../../../capnp" }
futures = "0.3.0"
rand = "0.8.5"
sha2 = { version = "0.10.8" }
tokio = { version = "1.0.0", features = ["net", "rt", "macros"]}
tokio-util = { version = "0.7.4", features = ["compat"] }

[dependencies.capnp-rpc]
path = "../.."
6 changes: 6 additions & 0 deletions capnp-rpc/examples/streaming/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
capnpc::CompilerCommand::new()
.file("streaming.capnp")
.run()?;
Ok(())
}
70 changes: 70 additions & 0 deletions capnp-rpc/examples/streaming/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use crate::streaming_capnp::receiver;
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};

use futures::AsyncReadExt;
use rand::Rng;
use sha2::{Digest, Sha256};

pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
use std::net::ToSocketAddrs;
let args: Vec<String> = ::std::env::args().collect();
if args.len() != 5 {
println!(
"usage: {} client HOST:PORT STREAM_SIZE WINDOW_SIZE",
args[0]
);
return Ok(());
}

let stream_size: usize = str::parse(&args[3]).unwrap();
let window_size: usize = str::parse(&args[4]).unwrap();

let addr = args[2]
.to_socket_addrs()?
.next()
.expect("could not parse address");

tokio::task::LocalSet::new()
.run_until(async move {
let stream = tokio::net::TcpStream::connect(&addr).await?;
stream.set_nodelay(true)?;
let (reader, writer) =
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
let mut rpc_network = Box::new(twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
rpc_network.set_window_size(window_size);
let mut rpc_system = RpcSystem::new(rpc_network, None);
let receiver: receiver::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
tokio::task::spawn_local(rpc_system);

let capnp::capability::RemotePromise { promise, pipeline } =
receiver.write_stream_request().send();

let mut rng = rand::thread_rng();
let mut hasher = Sha256::new();
let bytestream = pipeline.get_stream();
let mut bytes_written: u32 = 0;
const CHUNK_SIZE: u32 = 4096;
while bytes_written < stream_size as u32 {
let mut request = bytestream.write_request();
let body = request.get();
let buf = body.init_bytes(CHUNK_SIZE);
rng.fill(buf);
hasher.update(buf);
request.send().await?;
bytes_written += CHUNK_SIZE;
}
bytestream.end_request().send().promise.await?;
let response = promise.await?;

let sha256 = response.get()?.get_sha256()?;
let local_sha256 = hasher.finalize();
assert_eq!(sha256, &local_sha256[..]);
Ok(())
})
.await
}
21 changes: 21 additions & 0 deletions capnp-rpc/examples/streaming/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
pub mod streaming_capnp {
include!(concat!(env!("OUT_DIR"), "/streaming_capnp.rs"));
}

pub mod client;
pub mod server;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = ::std::env::args().collect();
if args.len() >= 2 {
match &args[1][..] {
"client" => return client::main().await,
"server" => return server::main().await,
_ => (),
}
}

println!("usage: {} [client | server] ADDRESS", args[0]);
Ok(())
}
112 changes: 112 additions & 0 deletions capnp-rpc/examples/streaming/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use std::net::ToSocketAddrs;

use crate::streaming_capnp::{byte_stream, receiver};
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};

use capnp::capability::Promise;
use capnp::Error;

use futures::channel::oneshot;
use futures::AsyncReadExt;
use sha2::{Digest, Sha256};

struct ByteStreamImpl {
hasher: Sha256,
hash_sender: Option<oneshot::Sender<Vec<u8>>>,
}

impl ByteStreamImpl {
fn new(hash_sender: oneshot::Sender<Vec<u8>>) -> Self {
Self {
hasher: Sha256::new(),
hash_sender: Some(hash_sender),
}
}
}

impl byte_stream::Server for ByteStreamImpl {
fn write(&mut self, params: byte_stream::WriteParams) -> Promise<(), Error> {
let bytes = pry!(pry!(params.get()).get_bytes());
self.hasher.update(bytes);
Promise::ok(())
}

fn end(
&mut self,
_params: byte_stream::EndParams,
_results: byte_stream::EndResults,
) -> Promise<(), Error> {
let hasher = std::mem::take(&mut self.hasher);
if let Some(sender) = self.hash_sender.take() {
let _ = sender.send(hasher.finalize()[..].to_vec());
}
Promise::ok(())
}
}

struct ReceiverImpl {}

impl ReceiverImpl {
fn new() -> Self {
Self {}
}
}

impl receiver::Server for ReceiverImpl {
fn write_stream(
&mut self,
_params: receiver::WriteStreamParams,
mut results: receiver::WriteStreamResults,
) -> Promise<(), Error> {
let (snd, rcv) = oneshot::channel();
let client: byte_stream::Client = capnp_rpc::new_client(ByteStreamImpl::new(snd));
results.get().set_stream(client);
pry!(results.set_pipeline());
Promise::from_future(async move {
match rcv.await {
Ok(v) => {
results.get().set_sha256(&v[..]);
Ok(())
}
Err(_) => Err(Error::failed("failed to get hash".into())),
}
})
}
}

pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = ::std::env::args().collect();
if args.len() != 3 {
println!("usage: {} server ADDRESS[:PORT]", args[0]);
return Ok(());
}

let addr = args[2]
.to_socket_addrs()?
.next()
.expect("could not parse address");

tokio::task::LocalSet::new()
.run_until(async move {
let listener = tokio::net::TcpListener::bind(&addr).await?;
let client: receiver::Client = capnp_rpc::new_client(ReceiverImpl::new());

loop {
let (stream, _) = listener.accept().await?;
stream.set_nodelay(true)?;
let (reader, writer) =
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
let network = twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Server,
Default::default(),
);

let rpc_system = RpcSystem::new(Box::new(network), Some(client.clone().client));

tokio::task::spawn_local(rpc_system);
}
})
.await
}
16 changes: 16 additions & 0 deletions capnp-rpc/examples/streaming/streaming.capnp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
@0x9fedc87e438cde81;

interface ByteStream {
write @0 (bytes :Data) -> stream;
# Writes a chunk.

end @1 ();
# Ends the stream.
}

interface Receiver {
writeStream @0 () -> (stream :ByteStream, sha256 :Data);
# Uses set_pipeline() to set up `stream` immediately.
# Actually returns when `end()` is called on that stream.
# `sha256` is the SHA256 checksum of the received data.
}
3 changes: 3 additions & 0 deletions capnp-rpc/src/broken.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ impl RequestHook for Request {
pipeline: any_pointer::Pipeline::new(Box::new(pipeline)),
}
}
fn send_streaming(self: Box<Self>) -> Promise<(), Error> {
Promise::err(self.error)
}
fn tail_send(self: Box<Self>) -> Option<(u32, Promise<(), Error>, Box<dyn PipelineHook>)> {
None
}
Expand Down
Loading

0 comments on commit be75e95

Please sign in to comment.