diff --git a/blog/_drafts/0.20-release.md b/blog/_drafts/0.20-release.md new file mode 100644 index 000000000..8b71923f6 --- /dev/null +++ b/blog/_drafts/0.20-release.md @@ -0,0 +1,256 @@ +--- +layout: post +title: 0.20 — streaming with backpressure +author: dwrensha +--- + +Version 0.20 of [capnproto-rust](https://github.com/capnproto/capnproto-rust) +is now [available on crates.io](https://crates.io/crates/capnp). +In this release, the library has new built-in support for +[streaming](https://capnproto.org/news/2020-04-23-capnproto-0.8.html#multi-stream-flow-control). + +## the `stream` keyword + +Suppose we have the following interface defined +in a capnp [schema file](https://capnproto.org/language.html): + +``` +interface ByteSink { + # A destination for a sequence of bytes. + + write @0 (chunk :Data); + # Writes a chunk of bytes. + + end @1 (); + # Indicates that no more chunks will be written. +} +``` + +In Rust code, we can invoke methods on a (possibly-remote) `ByteSink` object +through handles of type `byte_sink::Client`. For example: + +```rust +/// Writes all of `bytes` to `sink` in a call to `write()`. +async fn write_bytes_all_at_once( + bytes: &[u8], + sink: byte_sink::Client, +) -> Result<(), capnp::Error> { + // Construct an RPC request for ByteSink.write(). + let mut request = sink.write_request(); + + // Copy all of the bytes into the request. + request.get().set_chunk(bytes); + + // Send the request and wait for the response. + request.send().promise.await?; + + // Send a ByteSink::end() request and wait for the response. + sink.end_request().send().promise.await?; + + Ok(()) +} +``` + +However, if `bytes` is too large, we might +observe congestion, spikes in memory usage, or +an error like this: + +``` +remote exception: Failed: Message has 8750020 words, which is too large. +``` + +Fortunately, the `ByteSink` interface lets us avoid such problems. We can split +the bytes into chunks over multiple `ByteSink.write()` calls: + + +```rust +const CHUNK_SIZE: usize = 8192; + +/// Writes all of `bytes` to `sink` in chunks of size `CHUNK_SIZE`. +async fn write_bytes_chunked( + mut bytes: &[u8], + sink: byte_sink::Client, +) -> Result<(), capnp::Error> { + // Loop until all bytes have been written. + while bytes.len() > 0 { + // Compute the end index. + let end = usize::min(CHUNK_SIZE, bytes.len()); + + // Construct the write() request. + let mut request = sink.write_request(); + request.get().set_chunk(&bytes[..end]); + + // Send the request and wait for a response. + request.send().promise.await?; + + // Update `bytes` to point to the remaining bytes. + bytes = &bytes[end..]; + } + + // Send `ByteSink.end()`. + sink.end_request().send().promise.await?; + + Ok(()) +} +``` + +Now the bytes should transfer much more smoothly, +but we've introduced a new problem: latency. +After sending each `ByteStream.write()` request, +we're waiting for the server to send back its response +before we start the next request. +That will likely slow things down considerably! + +Here's a naive, bad way to avoid such latency: + +```rust +/// Like write_bytes_chunked(), but sends all write() requests +/// immediately. +async fn write_bytes_chunked_in_parallel( + mut bytes: &[u8], + sink: byte_sink::Client, +) -> Result<(), capnp::Error> { + // Construct an object that will manage multiple futures + // executing at once. + let mut responses = FuturesOrdered::new(); + + // Loop until all writes have been enqueued. + while bytes.len() > 0 { + let end = usize::min(CHUNK_SIZE, bytes.len()); + + // Construct the write() request. + let mut request = sink.write_request(); + request.get().set_chunk(&bytes[..end]); + + // Send the write() request and collect its response + // future (but don't wait on it yet). + responses.push_back(request.send().promise); + + bytes = &bytes[end..]; + } + // Wait for all of the responses. + while let Some(x) = responses.next().await { + x?; + } + + // Send `ByteSink.end()`. + sink.end_request().send().promise.await?; + + Ok(()) +} +``` + +The problem is that, because we immediately send all the requests, +now we are back to causing memory spikes and congestion, +even if each individual message is small. + +We need to add some way to limit the number of in-flight requests. +That's going to make the code significantly more complicated! +Perhaps you'd like to try it as an exercise? + + +The good news is that capnproto-rust can now automatically +perform such bookkeeping, and our code can remain simple. +To enable the new functionality, we define our +method with the `stream` keyword: + + +``` +interface StreamingByteSink { + write @0 (chunk :Data) -> stream; + # Writes a chunk of bytes. + + end @1 (); + # Indicates that no more chunks will be written. +} +``` + +Now we can write our code in the following direct style: + +```rust + +/// Like write_bytes_chunked(), but the library +/// automatically handles backpressure. +async fn write_bytes_streaming( + mut bytes: &[u8], + sink: streaming_byte_sink::Client, +) -> Result<(), capnp::Error> { + // Loop until all bytes have been enqueued. + while bytes.len() > 0 { + let end = usize::min(CHUNK_SIZE, bytes.len()); + + // Construct the write() request. + let mut request = sink.write_request(); + request.get().set_chunk(&bytes[..end]); + + // Send the request and wait until the RPC system determines that + // we are clear to send another chunk. + request.send().await?; + + bytes = &bytes[end..]; + } + + // The end() method is not streaming. It does not return until all of + // the streaming calls have completed. If any streaming call triggered + // an error, that error will be returned here. + sink.end_request().send().promise.await?; + + Ok(()) +} +``` + +Behind the scenes, capnproto-rust maintains a limit on how many bytes +are in flight for this object at any given time. +Currently it uses a fixed value +that can be configured via `twoparty::VatNetwork::set_window_size()`. +In the future, the library could potentially add sophisticated dynamic +flow control to optimize throughput, without requiring +any code change from users. + +## implementing streaming methods + +To create an instance of a `ByteSinkStreaming` object in Rust, +we need to implement the `byte_sink_streaming::Server` trait: + +```rust +impl byte_sink_streaming::Server for ByteStreamImpl { + fn write( + &mut self, + params: byte_sink_streaming::WriteParams + ) -> Promise<(), Error> { + todo!() + } + + fn end( + &mut self, + params: byte_sink_streaming::EndParams, + results: byte_sink_streaming::EndResults, + ) -> Promise<(), Error> { + todo!() + } + +} +``` + +Because it was declared with `-> stream`, the `write()` method +does not have a `Results` parameter, +unlike `end()` and other non-streaming methods. +Streaming methods never have any values to return. + +### difference from capnproto-c++ + +[In the C++ implementation](https://github.com/capnproto/capnproto/pull/825), +streaming method calls are delivered one-by-one +to the implementing object, with no overlap. +This is intended "as a convenience" and to +make it easier to interface with `kj::AsyncOutputStream` objects. +In Rust, async I/O objects are rather different +and enjoy good type system support for preventing misuse, +so I opted not to implement the one-by-one delivery guarantee +in capnproto-rust. + +## example + +For a working example, see +the [capnp-rpc/examples/streaming](https://github.com/capnproto/capnproto-rust/tree/master/capnp-rpc/examples/streaming) +directory.