-
Notifications
You must be signed in to change notification settings - Fork 225
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Updates code generation for methods declared with `-> stream`. For Server traits, these methods have no `Results` parameter. For `Client` objects, the methods go through a new `StreamingRequest` object. Twoparty RPC connections get a fixed window size that can be configured via `twoparty::VatNetwork::set_window_size()`. The implementation closely follows capnproto-c++. One important differnce: in capnproto-c++ streaming methods are delivered one-by-one, without overlap. In Rust, overlapping method calls are less error-prone, so we don't bother with that extra built-in logic. Adds a new example `streaming` that performs streaming method calls on a capability returned early via set_pipeline().
- Loading branch information
Showing
23 changed files
with
949 additions
and
64 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
[package] | ||
name = "streaming" | ||
version = "0.1.0" | ||
edition = "2021" | ||
|
||
build = "build.rs" | ||
|
||
[[bin]] | ||
name = "streaming" | ||
path = "main.rs" | ||
|
||
[build-dependencies] | ||
capnpc = { path = "../../../capnpc" } | ||
|
||
[dependencies] | ||
capnp = { path = "../../../capnp" } | ||
futures = "0.3.0" | ||
rand = "0.8.5" | ||
sha2 = { version = "0.10.8" } | ||
tokio = { version = "1.0.0", features = ["net", "rt", "macros"]} | ||
tokio-util = { version = "0.7.4", features = ["compat"] } | ||
|
||
[dependencies.capnp-rpc] | ||
path = "../.." |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
capnpc::CompilerCommand::new() | ||
.file("streaming.capnp") | ||
.run()?; | ||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
use crate::streaming_capnp::receiver; | ||
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; | ||
|
||
use futures::AsyncReadExt; | ||
use rand::Rng; | ||
use sha2::{Digest, Sha256}; | ||
|
||
pub async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
use std::net::ToSocketAddrs; | ||
let args: Vec<String> = ::std::env::args().collect(); | ||
if args.len() != 5 { | ||
println!( | ||
"usage: {} client HOST:PORT STREAM_SIZE WINDOW_SIZE", | ||
args[0] | ||
); | ||
return Ok(()); | ||
} | ||
|
||
let stream_size: usize = str::parse(&args[3]).unwrap(); | ||
let window_size: usize = str::parse(&args[4]).unwrap(); | ||
|
||
let addr = args[2] | ||
.to_socket_addrs()? | ||
.next() | ||
.expect("could not parse address"); | ||
|
||
tokio::task::LocalSet::new() | ||
.run_until(async move { | ||
let stream = tokio::net::TcpStream::connect(&addr).await?; | ||
stream.set_nodelay(true)?; | ||
let (reader, writer) = | ||
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split(); | ||
let mut rpc_network = Box::new(twoparty::VatNetwork::new( | ||
futures::io::BufReader::new(reader), | ||
futures::io::BufWriter::new(writer), | ||
rpc_twoparty_capnp::Side::Client, | ||
Default::default(), | ||
)); | ||
rpc_network.set_window_size(window_size); | ||
let mut rpc_system = RpcSystem::new(rpc_network, None); | ||
let receiver: receiver::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); | ||
tokio::task::spawn_local(rpc_system); | ||
|
||
let capnp::capability::RemotePromise { promise, pipeline } = | ||
receiver.write_stream_request().send(); | ||
|
||
let mut rng = rand::thread_rng(); | ||
let mut hasher = Sha256::new(); | ||
let bytestream = pipeline.get_stream(); | ||
let mut bytes_written: u32 = 0; | ||
const CHUNK_SIZE: u32 = 4096; | ||
while bytes_written < stream_size as u32 { | ||
let mut request = bytestream.write_request(); | ||
let body = request.get(); | ||
let buf = body.init_bytes(CHUNK_SIZE); | ||
rng.fill(buf); | ||
hasher.update(buf); | ||
request.send().await?; | ||
bytes_written += CHUNK_SIZE; | ||
} | ||
bytestream.end_request().send().promise.await?; | ||
let response = promise.await?; | ||
|
||
let sha256 = response.get()?.get_sha256()?; | ||
let local_sha256 = hasher.finalize(); | ||
assert_eq!(sha256, &local_sha256[..]); | ||
Ok(()) | ||
}) | ||
.await | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
pub mod streaming_capnp { | ||
include!(concat!(env!("OUT_DIR"), "/streaming_capnp.rs")); | ||
} | ||
|
||
pub mod client; | ||
pub mod server; | ||
|
||
#[tokio::main(flavor = "current_thread")] | ||
async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
let args: Vec<String> = ::std::env::args().collect(); | ||
if args.len() >= 2 { | ||
match &args[1][..] { | ||
"client" => return client::main().await, | ||
"server" => return server::main().await, | ||
_ => (), | ||
} | ||
} | ||
|
||
println!("usage: {} [client | server] ADDRESS", args[0]); | ||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
use std::net::ToSocketAddrs; | ||
|
||
use crate::streaming_capnp::{byte_stream, receiver}; | ||
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem}; | ||
|
||
use capnp::capability::Promise; | ||
use capnp::Error; | ||
|
||
use futures::channel::oneshot; | ||
use futures::AsyncReadExt; | ||
use sha2::{Digest, Sha256}; | ||
|
||
struct ByteStreamImpl { | ||
hasher: Sha256, | ||
hash_sender: Option<oneshot::Sender<Vec<u8>>>, | ||
} | ||
|
||
impl ByteStreamImpl { | ||
fn new(hash_sender: oneshot::Sender<Vec<u8>>) -> Self { | ||
Self { | ||
hasher: Sha256::new(), | ||
hash_sender: Some(hash_sender), | ||
} | ||
} | ||
} | ||
|
||
impl byte_stream::Server for ByteStreamImpl { | ||
fn write(&mut self, params: byte_stream::WriteParams) -> Promise<(), Error> { | ||
let bytes = pry!(pry!(params.get()).get_bytes()); | ||
self.hasher.update(bytes); | ||
Promise::ok(()) | ||
} | ||
|
||
fn end( | ||
&mut self, | ||
_params: byte_stream::EndParams, | ||
_results: byte_stream::EndResults, | ||
) -> Promise<(), Error> { | ||
let hasher = std::mem::take(&mut self.hasher); | ||
if let Some(sender) = self.hash_sender.take() { | ||
let _ = sender.send(hasher.finalize()[..].to_vec()); | ||
} | ||
Promise::ok(()) | ||
} | ||
} | ||
|
||
struct ReceiverImpl {} | ||
|
||
impl ReceiverImpl { | ||
fn new() -> Self { | ||
Self {} | ||
} | ||
} | ||
|
||
impl receiver::Server for ReceiverImpl { | ||
fn write_stream( | ||
&mut self, | ||
_params: receiver::WriteStreamParams, | ||
mut results: receiver::WriteStreamResults, | ||
) -> Promise<(), Error> { | ||
let (snd, rcv) = oneshot::channel(); | ||
let client: byte_stream::Client = capnp_rpc::new_client(ByteStreamImpl::new(snd)); | ||
results.get().set_stream(client); | ||
pry!(results.set_pipeline()); | ||
Promise::from_future(async move { | ||
match rcv.await { | ||
Ok(v) => { | ||
results.get().set_sha256(&v[..]); | ||
Ok(()) | ||
} | ||
Err(_) => Err(Error::failed("failed to get hash".into())), | ||
} | ||
}) | ||
} | ||
} | ||
|
||
pub async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
let args: Vec<String> = ::std::env::args().collect(); | ||
if args.len() != 3 { | ||
println!("usage: {} server ADDRESS[:PORT]", args[0]); | ||
return Ok(()); | ||
} | ||
|
||
let addr = args[2] | ||
.to_socket_addrs()? | ||
.next() | ||
.expect("could not parse address"); | ||
|
||
tokio::task::LocalSet::new() | ||
.run_until(async move { | ||
let listener = tokio::net::TcpListener::bind(&addr).await?; | ||
let client: receiver::Client = capnp_rpc::new_client(ReceiverImpl::new()); | ||
|
||
loop { | ||
let (stream, _) = listener.accept().await?; | ||
stream.set_nodelay(true)?; | ||
let (reader, writer) = | ||
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split(); | ||
let network = twoparty::VatNetwork::new( | ||
futures::io::BufReader::new(reader), | ||
futures::io::BufWriter::new(writer), | ||
rpc_twoparty_capnp::Side::Server, | ||
Default::default(), | ||
); | ||
|
||
let rpc_system = RpcSystem::new(Box::new(network), Some(client.clone().client)); | ||
|
||
tokio::task::spawn_local(rpc_system); | ||
} | ||
}) | ||
.await | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
@0x9fedc87e438cde81; | ||
|
||
interface ByteStream { | ||
write @0 (bytes :Data) -> stream; | ||
# Writes a chunk. | ||
|
||
end @1 (); | ||
# Ends the stream. | ||
} | ||
|
||
interface Receiver { | ||
writeStream @0 () -> (stream :ByteStream, sha256 :Data); | ||
# Uses set_pipeline() to set up `stream` immediately. | ||
# Actually returns when `end()` is called on that stream. | ||
# `sha256` is the SHA256 checksum of the received data. | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.