-
Notifications
You must be signed in to change notification settings - Fork 224
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
256 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,256 @@ | ||
--- | ||
layout: post | ||
title: 0.20 — streaming with backpressure | ||
author: dwrensha | ||
--- | ||
|
||
Version 0.20 of [capnproto-rust](https://github.com/capnproto/capnproto-rust) | ||
is now [available on crates.io](https://crates.io/crates/capnp). | ||
In this release, the library has new built-in support for | ||
[streaming](https://capnproto.org/news/2020-04-23-capnproto-0.8.html#multi-stream-flow-control). | ||
|
||
## the `stream` keyword | ||
|
||
Suppose we have the following interface defined | ||
in a capnp [schema file](https://capnproto.org/language.html): | ||
|
||
``` | ||
interface ByteSink { | ||
# A destination for a sequence of bytes. | ||
write @0 (chunk :Data); | ||
# Writes a chunk of bytes. | ||
end @1 (); | ||
# Indicates that no more chunks will be written. | ||
} | ||
``` | ||
|
||
In Rust code, we can invoke methods on a (possibly-remote) `ByteSink` object | ||
through handles of type `byte_sink::Client`. For example: | ||
|
||
```rust | ||
/// Writes all of `bytes` to `sink` in a call to `write()`. | ||
async fn write_bytes_all_at_once( | ||
bytes: &[u8], | ||
sink: byte_sink::Client, | ||
) -> Result<(), capnp::Error> { | ||
// Construct an RPC request for ByteSink.write(). | ||
let mut request = sink.write_request(); | ||
|
||
// Copy all of the bytes into the request. | ||
request.get().set_chunk(bytes); | ||
|
||
// Send the request and wait for the response. | ||
request.send().promise.await?; | ||
|
||
// Send a ByteSink::end() request and wait for the response. | ||
sink.end_request().send().promise.await?; | ||
|
||
Ok(()) | ||
} | ||
``` | ||
|
||
However, if `bytes` is too large, we might | ||
observe congestion, spikes in memory usage, or | ||
an error like this: | ||
|
||
``` | ||
remote exception: Failed: Message has 8750020 words, which is too large. | ||
``` | ||
|
||
Fortunately, the `ByteSink` interface lets us avoid such problems. We can split | ||
the bytes into chunks over multiple `ByteSink.write()` calls: | ||
|
||
|
||
```rust | ||
const CHUNK_SIZE: usize = 8192; | ||
|
||
/// Writes all of `bytes` to `sink` in chunks of size `CHUNK_SIZE`. | ||
async fn write_bytes_chunked( | ||
mut bytes: &[u8], | ||
sink: byte_sink::Client, | ||
) -> Result<(), capnp::Error> { | ||
// Loop until all bytes have been written. | ||
while bytes.len() > 0 { | ||
// Compute the end index. | ||
let end = usize::min(CHUNK_SIZE, bytes.len()); | ||
|
||
// Construct the write() request. | ||
let mut request = sink.write_request(); | ||
request.get().set_chunk(&bytes[..end]); | ||
|
||
// Send the request and wait for a response. | ||
request.send().promise.await?; | ||
|
||
// Update `bytes` to point to the remaining bytes. | ||
bytes = &bytes[end..]; | ||
} | ||
|
||
// Send `ByteSink.end()`. | ||
sink.end_request().send().promise.await?; | ||
|
||
Ok(()) | ||
} | ||
``` | ||
|
||
Now the bytes should transfer much more smoothly, | ||
but we've introduced a new problem: latency. | ||
After sending each `ByteStream.write()` request, | ||
we're waiting for the server to send back its response | ||
before we start the next request. | ||
That will likely slow things down considerably! | ||
|
||
Here's a naive, bad way to avoid such latency: | ||
|
||
```rust | ||
/// Like write_bytes_chunked(), but sends all write() requests | ||
/// immediately. | ||
async fn write_bytes_chunked_in_parallel( | ||
mut bytes: &[u8], | ||
sink: byte_sink::Client, | ||
) -> Result<(), capnp::Error> { | ||
// Construct an object that will manage multiple futures | ||
// executing at once. | ||
let mut responses = FuturesOrdered::new(); | ||
|
||
// Loop until all writes have been enqueued. | ||
while bytes.len() > 0 { | ||
let end = usize::min(CHUNK_SIZE, bytes.len()); | ||
|
||
// Construct the write() request. | ||
let mut request = sink.write_request(); | ||
request.get().set_chunk(&bytes[..end]); | ||
|
||
// Send the write() request and collect its response | ||
// future (but don't wait on it yet). | ||
responses.push_back(request.send().promise); | ||
|
||
bytes = &bytes[end..]; | ||
} | ||
// Wait for all of the responses. | ||
while let Some(x) = responses.next().await { | ||
x?; | ||
} | ||
|
||
// Send `ByteSink.end()`. | ||
sink.end_request().send().promise.await?; | ||
|
||
Ok(()) | ||
} | ||
``` | ||
|
||
The problem is that, because we immediately send all the requests, | ||
now we are back to causing memory spikes and congestion, | ||
even if each individual message is small. | ||
|
||
We need to add some way to limit the number of in-flight requests. | ||
That's going to make the code significantly more complicated! | ||
Perhaps you'd like to try it as an exercise? | ||
|
||
|
||
The good news is that capnproto-rust can now automatically | ||
perform such bookkeeping, and our code can remain simple. | ||
To enable the new functionality, we define our | ||
method with the `stream` keyword: | ||
|
||
|
||
``` | ||
interface StreamingByteSink { | ||
write @0 (chunk :Data) -> stream; | ||
# Writes a chunk of bytes. | ||
end @1 (); | ||
# Indicates that no more chunks will be written. | ||
} | ||
``` | ||
|
||
Now we can write our code in the following direct style: | ||
|
||
```rust | ||
|
||
/// Like write_bytes_chunked(), but the library | ||
/// automatically handles backpressure. | ||
async fn write_bytes_streaming( | ||
mut bytes: &[u8], | ||
sink: streaming_byte_sink::Client, | ||
) -> Result<(), capnp::Error> { | ||
// Loop until all bytes have been enqueued. | ||
while bytes.len() > 0 { | ||
let end = usize::min(CHUNK_SIZE, bytes.len()); | ||
|
||
// Construct the write() request. | ||
let mut request = sink.write_request(); | ||
request.get().set_chunk(&bytes[..end]); | ||
|
||
// Send the request and wait until the RPC system determines that | ||
// we are clear to send another chunk. | ||
request.send().await?; | ||
|
||
bytes = &bytes[end..]; | ||
} | ||
|
||
// The end() method is not streaming. It does not return until all of | ||
// the streaming calls have completed. If any streaming call triggered | ||
// an error, that error will be returned here. | ||
sink.end_request().send().promise.await?; | ||
|
||
Ok(()) | ||
} | ||
``` | ||
|
||
Behind the scenes, capnproto-rust maintains a limit on how many bytes | ||
are in flight for this object at any given time. | ||
Currently it uses a fixed value | ||
that can be configured via `twoparty::VatNetwork::set_window_size()`. | ||
In the future, the library could potentially add sophisticated dynamic | ||
flow control to optimize throughput, without requiring | ||
any code change from users. | ||
|
||
## implementing streaming methods | ||
|
||
To create an instance of a `ByteSinkStreaming` object in Rust, | ||
we need to implement the `byte_sink_streaming::Server` trait: | ||
|
||
```rust | ||
impl byte_sink_streaming::Server for ByteStreamImpl { | ||
fn write( | ||
&mut self, | ||
params: byte_sink_streaming::WriteParams | ||
) -> Promise<(), Error> { | ||
todo!() | ||
} | ||
|
||
fn end( | ||
&mut self, | ||
params: byte_sink_streaming::EndParams, | ||
results: byte_sink_streaming::EndResults, | ||
) -> Promise<(), Error> { | ||
todo!() | ||
} | ||
|
||
} | ||
``` | ||
|
||
Because it was declared with `-> stream`, the `write()` method | ||
does not have a `Results` parameter, | ||
unlike `end()` and other non-streaming methods. | ||
Streaming methods never have any values to return. | ||
|
||
### difference from capnproto-c++ | ||
|
||
[In the C++ implementation](https://github.com/capnproto/capnproto/pull/825), | ||
streaming method calls are delivered one-by-one | ||
to the implementing object, with no overlap. | ||
This is intended "as a convenience" and to | ||
make it easier to interface with `kj::AsyncOutputStream` objects. | ||
In Rust, async I/O objects are rather different | ||
and enjoy good type system support for preventing misuse, | ||
so I opted not to implement the one-by-one delivery guarantee | ||
in capnproto-rust. | ||
|
||
## example | ||
|
||
For a working example, see | ||
the [capnp-rpc/examples/streaming](https://github.com/capnproto/capnproto-rust/tree/master/capnp-rpc/examples/streaming) | ||
directory. |