Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object_store: full HTTP range support #5222

Merged
merged 15 commits into from
Jan 5, 2024
21 changes: 5 additions & 16 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::util::deserialize_rfc1123;
use crate::util::{deserialize_rfc1123, GetRange};
use crate::{
ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, PutOptions, PutResult,
Result, RetryConfig,
Expand Down Expand Up @@ -99,9 +99,6 @@ pub(crate) enum Error {
source: crate::client::header::Error,
},

#[snafu(display("Operation not supported by this store: {reason}"))]
NotSupported { reason: &'static str },

#[snafu(display("ETag required for conditional update"))]
MissingETag,
}
Expand All @@ -112,9 +109,6 @@ impl From<Error> for crate::Error {
Error::GetRequest { source, path }
| Error::DeleteRequest { source, path }
| Error::PutRequest { source, path } => source.error(STORE, path),
Error::NotSupported { .. } => Self::NotSupported {
source: Box::new(err),
},
_ => Self::Generic {
store: STORE,
source: Box::new(err),
Expand Down Expand Up @@ -364,15 +358,10 @@ impl GetClient for AzureClient {
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
// As of 2024-01-02, Azure does not support suffix requests,
// so we should fail fast here rather than sending one
if let Some(r) = options.range.as_ref() {
match r {
crate::util::GetRange::Suffix(_) => {
Err(Error::NotSupported {
reason: "suffix request",
})?;
}
_ => (),
}
if let Some(GetRange::Suffix(_)) = options.range.as_ref() {
return Err(crate::Error::NotSupported {
source: "Azure does not support suffix range requests".into(),
});
}

let credential = self.get_credential().await?;
Expand Down
145 changes: 81 additions & 64 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,49 @@ use std::ops::Range;

use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::util::{as_generic_err, response_range};
use crate::{GetOptions, GetResult};
use crate::{GetResultPayload, Result};
use crate::{GetOptions, GetRange, GetResult, GetResultPayload};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use hyper::header::CONTENT_RANGE;
use hyper::StatusCode;
use reqwest::header::ToStrError;
use reqwest::Response;
use snafu::{ResultExt, Snafu};
use snafu::{ensure, OptionExt, ResultExt, Snafu};

/// A specialized `Error` for get-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub(crate) enum Error {
#[snafu(display("Could not extract metadata from response headers"))]
#[snafu(context(false))]
Header {
store: &'static str,
source: crate::client::header::Error,
},

#[snafu(display("Requested an invalid range"))]
#[snafu(context(false))]
InvalidRangeRequest {
store: &'static str,
source: crate::util::InvalidGetRange,
},

#[snafu(display("Got an invalid range response"))]
InvalidRangeResponse {
store: &'static str,
source: crate::util::InvalidRangeResponse,
},
#[snafu(display("Received non-partial response when range requested"))]
NotPartial,

#[snafu(display("Content-Range header not present"))]
NoContentRange,

#[snafu(display("Failed to parse value for CONTENT_RANGE header: {value}"))]
ParseContentRange { value: String },

#[snafu(display("Content-Range header contained non UTF-8 characters"))]
InvalidContentRange { source: ToStrError },

#[snafu(display("Requested {expected:?}, got {actual:?}"))]
UnexpectedRange {
store: &'static str,
expected: Range<usize>,
actual: Range<usize>,
},
}

impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
let store = match err {
Error::Header { store, .. } => store,
Error::InvalidRangeRequest { store, .. } => store,
Error::InvalidRangeResponse { store, .. } => store,
Error::UnexpectedRange { store, .. } => store,
};
Self::Generic {
store: store,
source: Box::new(err),
}
}
}
pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;

/// A client that can perform a get request
#[async_trait]
Expand All @@ -80,53 +71,79 @@ pub trait GetClient: Send + Sync + 'static {
/// Configure the [`HeaderConfig`] for this client
const HEADER_CONFIG: HeaderConfig;

async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response>;
async fn get_request(&self, path: &Path, options: GetOptions) -> crate::Result<Response>;
}

/// Extension trait for [`GetClient`] that adds common retrieval functionality
#[async_trait]
pub trait GetClientExt {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
async fn get_opts(&self, location: &Path, options: GetOptions) -> crate::Result<GetResult>;
}

#[async_trait]
impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
async fn get_opts(&self, location: &Path, options: GetOptions) -> crate::Result<GetResult> {
let range = options.range.clone();
let response = self.get_request(location, options).await?;
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG)
.context(HeaderSnafu { store: T::STORE })?;

// ensure that we receive the range we asked for
let out_range = if let Some(r) = range {
let actual = r
.as_range(meta.size)
.context(InvalidRangeRequestSnafu { store: T::STORE })?;

let expected =
response_range(&response).context(InvalidRangeResponseSnafu { store: T::STORE })?;

if actual != expected {
Err(Error::UnexpectedRange {
store: T::STORE,
expected,
actual: actual.clone(),
})?;
}
actual
} else {
0..meta.size
};

let stream = response
.bytes_stream()
.map_err(|source| as_generic_err(T::STORE, source))
.boxed();

Ok(GetResult {
range: out_range,
payload: GetResultPayload::Stream(stream),
meta,
get_result::<T>(location, range, response).map_err(|e| crate::Error::Generic {
store: T::STORE,
source: Box::new(e),
})
}
}
fn parse_content_range(s: &str) -> Option<Range<usize>> {
let rem = s.trim().strip_prefix("bytes ")?;
let (range, _) = rem.split_once('/')?;
let (start_s, end_s) = range.split_once('-')?;

let start = start_s.parse().ok()?;
let end: usize = end_s.parse().ok()?;

Some(start..(end + 1))
}

fn get_result<T: GetClient>(
location: &Path,
range: Option<GetRange>,
response: Response,
) -> Result<GetResult> {
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?;

// ensure that we receive the range we asked for
let out_range = if let Some(r) = range {
let expected = r.as_range(meta.size)?;

ensure!(
response.status() == StatusCode::PARTIAL_CONTENT,
NotPartialSnafu
);
let val = response
.headers()
.get(CONTENT_RANGE)
.context(NoContentRangeSnafu)?;

let value = val.to_str().context(InvalidContentRangeSnafu)?;
let actual = parse_content_range(value).context(ParseContentRangeSnafu { value })?;
ensure!(
actual == expected,
UnexpectedRangeSnafu { expected, actual }
);
actual
} else {
0..meta.size
};

let stream = response
.bytes_stream()
.map_err(|source| crate::Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.boxed();

Ok(GetResult {
range: out_range,
payload: GetResultPayload::Stream(stream),
meta,
})
}
4 changes: 2 additions & 2 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ mod parse;
mod util;

pub use parse::{parse_url, parse_url_opts};
use util::GetRange;
pub use util::GetRange;

use crate::path::Path;
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -1889,7 +1889,7 @@ mod tests {

// We can abort an in-progress write
let (upload_id, mut writer) = storage.put_multipart(&location).await.unwrap();
if let Some(chunk) = data.get(0) {
if let Some(chunk) = data.first() {
writer.write_all(chunk).await.unwrap();
let _ = writer.write(chunk).await.unwrap();
}
Expand Down
7 changes: 3 additions & 4 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,9 @@ impl ObjectStore for LocalFileSystem {
let meta = convert_metadata(metadata, location)?;
options.check_preconditions(&meta)?;

let range = if let Some(r) = options.range {
r.as_range(meta.size).context(InvalidRangeSnafu)?
} else {
0..meta.size
let range = match options.range {
Some(r) => r.as_range(meta.size).context(InvalidRangeSnafu)?,
None => 0..meta.size,
};

Ok(GetResult {
Expand Down
Loading
Loading