Skip to content

Commit

Permalink
object_store: full HTTP range support
Browse files Browse the repository at this point in the history
- Support suffix and offset ranges in GetOptions and get_opts
- Ensure that, if a range is requested, the response contains exactly
  that range
  • Loading branch information
clbarnes committed Dec 18, 2023
1 parent df69ef5 commit 2c46bab
Show file tree
Hide file tree
Showing 7 changed files with 646 additions and 20 deletions.
1 change: 1 addition & 0 deletions object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ reqwest = { version = "0.11", default-features = false, features = ["rustls-tls-
ring = { version = "0.17", default-features = false, features = ["std"], optional = true }
rustls-pemfile = { version = "1.0", default-features = false, optional = true }
tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util"] }
http-content-range = "0.1.2"

[target.'cfg(target_family="unix")'.dev-dependencies]
nix = { version = "0.27.1", features = ["fs"] }
Expand Down
44 changes: 33 additions & 11 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::Range;

use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::{Error, GetOptions, GetResult};
use crate::util::{as_generic_err, response_range};
use crate::{GetOptions, GetResult};
use crate::{GetResultPayload, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use reqwest::Response;
use snafu::Snafu;

/// A client that can perform a get request
#[async_trait]
Expand All @@ -34,6 +38,13 @@ pub trait GetClient: Send + Sync + 'static {
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response>;
}

#[derive(Debug, Snafu)]
#[snafu(display("Requested range {expected:?}, got {actual:?}"))]
pub struct UnexpectedRange {
expected: Range<usize>,
actual: Range<usize>,
}

/// Extension trait for [`GetClient`] that adds common retrieval functionality
#[async_trait]
pub trait GetClientExt {
Expand All @@ -45,23 +56,34 @@ impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
let response = self.get_request(location, options).await?;
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG)
.map_err(|e| as_generic_err(T::STORE, e))?;

// ensure that we receive the range we asked for
let out_range = if let Some(r) = range {
let actual = r
.as_range(meta.size)
.map_err(|source| as_generic_err(T::STORE, source))?;
let expected =
response_range(&response).map_err(|source| as_generic_err(T::STORE, source))?;
if actual != expected {
return Err(as_generic_err(
T::STORE,
UnexpectedRange { expected, actual },
));
}
})?;
actual
} else {
0..meta.size
};

let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.map_err(|source| as_generic_err(T::STORE, source))
.boxed();

Ok(GetResult {
range: range.unwrap_or(0..meta.size),
range: out_range,
payload: GetResultPayload::Stream(stream),
meta,
})
Expand Down
3 changes: 1 addition & 2 deletions object_store/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,7 @@ impl GetOptionsExt for RequestBuilder {
use hyper::header::*;

if let Some(range) = options.range {
let range = format!("bytes={}-{}", range.start, range.end.saturating_sub(1));
self = self.header(RANGE, range);
self = self.header(RANGE, format!("bytes={range}"));
}

if let Some(tag) = options.if_match {
Expand Down
5 changes: 3 additions & 2 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ mod parse;
mod util;

pub use parse::{parse_url, parse_url_opts};
use util::GetRange;

use crate::path::Path;
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -581,7 +582,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// in the given byte range
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let options = GetOptions {
range: Some(range.clone()),
range: Some(range.into()),
..Default::default()
};
self.get_opts(location, options).await?.bytes().await
Expand Down Expand Up @@ -911,7 +912,7 @@ pub struct GetOptions {
/// otherwise returning [`Error::NotModified`]
///
/// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
pub range: Option<Range<usize>>,
pub range: Option<GetRange>,
/// Request a particular object version
pub version: Option<String>,
/// Request transfer of no content
Expand Down
12 changes: 11 additions & 1 deletion object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use crate::{
maybe_spawn_blocking,
path::{absolute_path_to_url, Path},
util::as_generic_err,
GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutMode, PutOptions, PutResult, Result,
};
Expand All @@ -43,6 +44,8 @@ use tokio::io::AsyncWrite;
use url::Url;
use walkdir::{DirEntry, WalkDir};

const STORE: &'static str = "LocalFileSystem";

/// A specialized `Error` for filesystem object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
Expand Down Expand Up @@ -416,9 +419,16 @@ impl ObjectStore for LocalFileSystem {
let meta = convert_metadata(metadata, location)?;
options.check_preconditions(&meta)?;

let range = if let Some(r) = options.range {
r.as_range(meta.size)
.map_err(|e| as_generic_err(STORE, e))?
} else {
0..meta.size
};

Ok(GetResult {
payload: GetResultPayload::File(file, path),
range: options.range.unwrap_or(0..meta.size),
range,
meta,
})
})
Expand Down
24 changes: 21 additions & 3 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ enum Error {
#[snafu(display("Invalid range: {}..{}", range.start, range.end))]
BadRange { range: Range<usize> },

#[snafu(display("Invalid suffix: {} bytes", nbytes))]
BadSuffix { nbytes: usize },

#[snafu(display("Object already exists at that location: {path}"))]
AlreadyExists { path: String },

Expand Down Expand Up @@ -206,6 +209,8 @@ impl ObjectStore for InMemory {
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
use crate::util::GetRange::*;

let entry = self.entry(location).await?;
let e_tag = entry.e_tag.to_string();

Expand All @@ -221,9 +226,22 @@ impl ObjectStore for InMemory {
let (range, data) = match options.range {
Some(range) => {
let len = entry.data.len();
ensure!(range.end <= len, OutOfRangeSnafu { range, len });
ensure!(range.start <= range.end, BadRangeSnafu { range });
(range.clone(), entry.data.slice(range))
match range {
Bounded(r) => {
ensure!(r.end <= len, OutOfRangeSnafu { range: r, len });
ensure!(r.start <= r.end, BadRangeSnafu { range: r });
(r.clone(), entry.data.slice(r))
}
Offset(o) => {
ensure!(o < len, OutOfRangeSnafu { range: o..len, len });
(o..len, entry.data.slice(o..len))
}
Suffix(n) => {
ensure!(n < len, BadSuffixSnafu { nbytes: n });
let start = len - n;
(start..len, entry.data.slice(start..len))
}
}
}
None => (0..entry.data.len(), entry.data),
};
Expand Down
Loading

0 comments on commit 2c46bab

Please sign in to comment.