Skip to content

An HttpBody implementation with efficient streaming support for the Rust HTTP library hyper

License

Notifications You must be signed in to change notification settings

routerify/stream-body

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

11 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

stream-body

crates.io Documentation MIT

An HttpBody implementation with efficient streaming support for the Rust HTTP library hyper.

Docs

Motivation

The existing Body type in hyper uses Bytes as streaming chunk. Hence, a lot of buffer allocation and de-allocation happen during the real-time large data streaming because of the Bytes type. Therefore, StreamBody comes to tackle this kind of situation. The StreamBody implements HttpBody and uses &[u8] slice as the streaming chunk, so it is possible to use the same buffer without allocating a new one; hence it overcomes any allocation/de-allocation overhead.

Also, the channel() method in hyper Body returns a pair of a Sender and a Body. Here, the Sender accepts Bytes as a data chunk which again creates allocation/de-allocation overhead. To solve this, StreamBody has a method named StreamBody::channel() which returns a pair of an AsyncWrite and the StreamBody itself. As the AsyncWrite accepts &[u8] instead of Bytes, there will be no allocation/de-allocation overhead.

Usage

First add this to your Cargo.toml:

[dependencies]
stream-body = "0.1"

An example on handling a large file:

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use std::{convert::Infallible, net::SocketAddr};
use stream_body::StreamBody;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

async fn handle(_: Request<Body>) -> Result<Response<StreamBody>, Infallible> {
    let (mut writer, body) = StreamBody::channel();

    tokio::spawn(async move {
        let mut f = File::open("large-file").await.unwrap();

        // Reuse this buffer
        let mut buf = [0_u8; 1024 * 16];
        loop {
            let read_count = f.read(&mut buf).await.unwrap();
            if read_count == 0 {
                break;
            }
            writer.write_all(&buf[..read_count]).await.unwrap();
        }
    });

    Ok(Response::builder().body(body).unwrap())
}

#[tokio::main]
async fn main() {
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));

    let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });

    let server = Server::bind(&addr).serve(make_svc);

    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

Contributing

Your PRs and stars are always welcome.