Skip to content

Commit

Permalink
refactor: optimize file reads in fs module
Browse files Browse the repository at this point in the history
By implementing a file stream and prefering std::fs::File instead of
its tokio async variant, the fs module performance improves significantly
when reading files resulting in ~79% more req/sec using ~57 less
memory in Linux.
A similar result should also be expected in Unix targets.
  • Loading branch information
joseluisq committed Oct 19, 2023
1 parent efe8548 commit 9b17c4e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 93 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ compression-gzip = ["async-compression/deflate", "async-compression/gzip"]
[profile.release]
codegen-units = 1
incremental = false
lto = "fat"
strip = true

[profile.bench]
codegen-units = 1
Expand Down Expand Up @@ -98,7 +100,9 @@ required-features = ["websocket"]
[[example]]
name = "query_string"


[[example]]
name = "multipart"
required-features = ["multipart"]

[[example]]
name = "file"
138 changes: 46 additions & 92 deletions src/filters/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@
use std::cmp;
use std::convert::Infallible;
use std::fs::Metadata;
use std::fs::{File as StdFile, Metadata};
use std::future::Future;
use std::io;
use std::io::{self, BufReader, Error, Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::task::{Context, Poll};

use bytes::{Bytes, BytesMut};
use futures_util::future::Either;
use futures_util::{future, ready, stream, FutureExt, Stream, StreamExt, TryFutureExt};
use futures_util::{future, Stream, TryFutureExt};
use headers::{
AcceptRanges, ContentLength, ContentRange, ContentType, HeaderMapExt, IfModifiedSince, IfRange,
IfUnmodifiedSince, LastModified, Range,
Expand All @@ -21,9 +21,6 @@ use http::StatusCode;
use hyper::Body;
use mime_guess;
use percent_encoding::percent_decode_str;
use tokio::fs::File as TkFile;
use tokio::io::AsyncSeekExt;
use tokio_util::io::poll_read_buf;

use crate::filter::{Filter, FilterClone, One};
use crate::reject::{self, Rejection};
Expand Down Expand Up @@ -93,8 +90,7 @@ fn path_from_tail(
) -> impl FilterClone<Extract = One<ArcPath>, Error = Rejection> {
crate::path::tail().and_then(move |tail: crate::path::Tail| {
future::ready(sanitize_path(base.as_ref(), tail.as_str())).and_then(|mut buf| async {
let is_dir = tokio::fs::metadata(buf.clone())
.await
let is_dir = std::fs::metadata(buf.clone())
.map(|m| m.is_dir())
.unwrap_or(false);

Expand Down Expand Up @@ -265,7 +261,7 @@ fn file_reply(
path: ArcPath,
conditionals: Conditionals,
) -> impl Future<Output = Result<File, Rejection>> + Send {
TkFile::open(path.clone()).then(move |res| match res {
match StdFile::open(path.clone()) {
Ok(f) => Either::Left(file_conditional(f, path, conditionals)),
Err(err) => {
let rej = match err.kind() {
Expand All @@ -288,11 +284,11 @@ fn file_reply(
};
Either::Right(future::err(rej))
}
})
}
}

async fn file_metadata(f: TkFile) -> Result<(TkFile, Metadata), Rejection> {
match f.metadata().await {
async fn file_metadata(f: StdFile) -> Result<(StdFile, Metadata), Rejection> {
match f.metadata() {
Ok(meta) => Ok((f, meta)),
Err(err) => {
tracing::debug!("file metadata error: {}", err);
Expand All @@ -302,22 +298,27 @@ async fn file_metadata(f: TkFile) -> Result<(TkFile, Metadata), Rejection> {
}

fn file_conditional(
f: TkFile,
f: StdFile,
path: ArcPath,
conditionals: Conditionals,
) -> impl Future<Output = Result<File, Rejection>> + Send {
file_metadata(f).map_ok(move |(file, meta)| {
file_metadata(f).map_ok(move |(mut file, meta)| {
let mut len = meta.len();
let modified = meta.modified().ok().map(LastModified::from);

let resp = match conditionals.check(modified) {
Cond::NoBody(resp) => resp,
Cond::WithBody(range) => {
let buf_size = optimal_buf_size(&meta);
bytes_range(range, len)
.map(|(start, end)| {
file.seek(SeekFrom::Start(start))
.expect("error while seeking the file to the specified offset");

let sub_len = end - start;
let buf_size = optimal_buf_size(&meta);
let stream = file_stream(file, buf_size, (start, end));
let reader = BufReader::new(file).take(sub_len);
let stream = FileStream { reader, buf_size };

let body = Body::wrap_stream(stream);

let mut resp = Response::new(body);
Expand Down Expand Up @@ -403,68 +404,10 @@ fn bytes_range(range: Option<Range>, max_len: u64) -> Result<(u64, u64), BadRang
ret
}

fn file_stream(
mut file: TkFile,
buf_size: usize,
(start, end): (u64, u64),
) -> impl Stream<Item = Result<Bytes, io::Error>> + Send {
use std::io::SeekFrom;

let seek = async move {
if start != 0 {
file.seek(SeekFrom::Start(start)).await?;
}
Ok(file)
};

seek.into_stream()
.map(move |result| {
let mut buf = BytesMut::new();
let mut len = end - start;
let mut f = match result {
Ok(f) => f,
Err(f) => return Either::Left(stream::once(future::err(f))),
};

Either::Right(stream::poll_fn(move |cx| {
if len == 0 {
return Poll::Ready(None);
}
reserve_at_least(&mut buf, buf_size);

let n = match ready!(poll_read_buf(Pin::new(&mut f), cx, &mut buf)) {
Ok(n) => n as u64,
Err(err) => {
tracing::debug!("file read error: {}", err);
return Poll::Ready(Some(Err(err)));
}
};

if n == 0 {
tracing::debug!("file read found EOF before expected length");
return Poll::Ready(None);
}

let mut chunk = buf.split().freeze();
if n > len {
chunk = chunk.split_to(len as usize);
len = 0;
} else {
len -= n;
}

Poll::Ready(Some(Ok(chunk)))
}))
})
.flatten()
}

fn reserve_at_least(buf: &mut BytesMut, cap: usize) {
if buf.capacity() - buf.len() < cap {
buf.reserve(cap);
}
}
#[cfg(unix)]
const DEFAULT_READ_BUF_SIZE: usize = 4_096;

#[cfg(not(unix))]
const DEFAULT_READ_BUF_SIZE: usize = 8_192;

fn optimal_buf_size(metadata: &Metadata) -> usize {
Expand All @@ -490,6 +433,31 @@ fn get_block_size(_metadata: &Metadata) -> usize {
DEFAULT_READ_BUF_SIZE
}

#[derive(Debug)]
struct FileStream<T> {
reader: T,
buf_size: usize,
}

impl<T: Read + Unpin> Stream for FileStream<T> {
type Item = Result<Bytes, Error>;

fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut buf = BytesMut::zeroed(self.buf_size);
match Pin::into_inner(self).reader.read(&mut buf[..]) {
Ok(n) => {
if n == 0 {
Poll::Ready(None)
} else {
buf.truncate(n);
Poll::Ready(Some(Ok(buf.freeze())))
}
}
Err(err) => Poll::Ready(Some(Err(err))),
}
}
}

// ===== Rejections =====

unit_error! {
Expand All @@ -503,7 +471,6 @@ unit_error! {
#[cfg(test)]
mod tests {
use super::sanitize_path;
use bytes::BytesMut;

#[test]
fn test_sanitize_path() {
Expand All @@ -523,17 +490,4 @@ mod tests {

sanitize_path(base, "/C:\\/foo.html").expect_err("C:\\");
}

#[test]
fn test_reserve_at_least() {
let mut buf = BytesMut::new();
let cap = 8_192;

assert_eq!(buf.len(), 0);
assert_eq!(buf.capacity(), 0);

super::reserve_at_least(&mut buf, cap);
assert_eq!(buf.len(), 0);
assert_eq!(buf.capacity(), cap);
}
}

0 comments on commit 9b17c4e

Please sign in to comment.