Skip to content

Commit

Permalink
Fix ObjectMeta::size for range requests (#5272) (#5276)
Browse files Browse the repository at this point in the history
* Fix ObjectMeta::size for range requests (#5272)

* Docs

* Update object_store/src/lib.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Add tests

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
tustvold and alamb authored Jan 3, 2024
1 parent 2460c88 commit 5a67f1f
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 24 deletions.
243 changes: 222 additions & 21 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::Range;

use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::{Error, GetOptions, GetResult};
use crate::{GetResultPayload, Result};
use crate::{Error, GetOptions, GetResult, GetResultPayload, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use hyper::header::CONTENT_RANGE;
use hyper::StatusCode;
use reqwest::header::ToStrError;
use reqwest::Response;
use snafu::{ensure, OptionExt, ResultExt, Snafu};

/// A client that can perform a get request
#[async_trait]
Expand All @@ -45,25 +50,221 @@ impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
let response = self.get_request(location, options).await?;
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
}
})?;

let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.boxed();

Ok(GetResult {
range: range.unwrap_or(0..meta.size),
payload: GetResultPayload::Stream(stream),
meta,
get_result::<T>(location, range, response).map_err(|e| crate::Error::Generic {
store: T::STORE,
source: Box::new(e),
})
}
}

struct ContentRange {
/// The range of the object returned
range: Range<usize>,
/// The total size of the object being requested
size: usize,
}

impl ContentRange {
/// Parse a content range of the form `bytes <range-start>-<range-end>/<size>`
///
/// <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range>
fn from_str(s: &str) -> Option<Self> {
let rem = s.trim().strip_prefix("bytes ")?;
let (range, size) = rem.split_once('/')?;
let size = size.parse().ok()?;

let (start_s, end_s) = range.split_once('-')?;

let start = start_s.parse().ok()?;
let end: usize = end_s.parse().ok()?;

Some(Self {
size,
range: start..end + 1,
})
}
}

/// A specialized `Error` for get-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum GetResultError {
#[snafu(context(false))]
Header {
source: crate::client::header::Error,
},

#[snafu(display("Received non-partial response when range requested"))]
NotPartial,

#[snafu(display("Content-Range header not present in partial response"))]
NoContentRange,

#[snafu(display("Failed to parse value for CONTENT_RANGE header: \"{value}\""))]
ParseContentRange { value: String },

#[snafu(display("Content-Range header contained non UTF-8 characters"))]
InvalidContentRange { source: ToStrError },

#[snafu(display("Requested {expected:?}, got {actual:?}"))]
UnexpectedRange {
expected: Range<usize>,
actual: Range<usize>,
},
}

fn get_result<T: GetClient>(
location: &Path,
range: Option<Range<usize>>,
response: Response,
) -> Result<GetResult, GetResultError> {
let mut meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?;

// ensure that we receive the range we asked for
let range = if let Some(expected) = range {
ensure!(
response.status() == StatusCode::PARTIAL_CONTENT,
NotPartialSnafu
);
let val = response
.headers()
.get(CONTENT_RANGE)
.context(NoContentRangeSnafu)?;

let value = val.to_str().context(InvalidContentRangeSnafu)?;
let value = ContentRange::from_str(value).context(ParseContentRangeSnafu { value })?;
let actual = value.range;

ensure!(
actual == expected,
UnexpectedRangeSnafu { expected, actual }
);

// Update size to reflect full size of object (#5272)
meta.size = value.size;
actual
} else {
0..meta.size
};

let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.boxed();

Ok(GetResult {
range,
meta,
payload: GetResultPayload::Stream(stream),
})
}

#[cfg(test)]
mod tests {
use super::*;
use hyper::http;
use hyper::http::header::*;

struct TestClient {}

#[async_trait]
impl GetClient for TestClient {
const STORE: &'static str = "TEST";

const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: false,
last_modified_required: false,
version_header: None,
};

async fn get_request(&self, _: &Path, _: GetOptions) -> Result<Response> {
unimplemented!()
}
}

fn make_response(
object_size: usize,
range: Option<Range<usize>>,
status: StatusCode,
content_range: Option<&str>,
) -> Response {
let mut builder = http::Response::builder();
if let Some(range) = content_range {
builder = builder.header(CONTENT_RANGE, range);
}

let body = match range {
Some(range) => vec![0_u8; range.end - range.start],
None => vec![0_u8; object_size],
};

builder
.status(status)
.header(CONTENT_LENGTH, object_size)
.body(body)
.unwrap()
.into()
}

#[tokio::test]
async fn test_get_result() {
let path = Path::from("test");

let resp = make_response(12, None, StatusCode::OK, None);
let res = get_result::<TestClient>(&path, None, resp).unwrap();
assert_eq!(res.meta.size, 12);
assert_eq!(res.range, 0..12);
let bytes = res.bytes().await.unwrap();
assert_eq!(bytes.len(), 12);

let resp = make_response(
12,
Some(2..3),
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-2/12"),
);
let res = get_result::<TestClient>(&path, Some(2..3), resp).unwrap();
assert_eq!(res.meta.size, 12);
assert_eq!(res.range, 2..3);
let bytes = res.bytes().await.unwrap();
assert_eq!(bytes.len(), 1);

let resp = make_response(12, Some(2..3), StatusCode::OK, None);
let err = get_result::<TestClient>(&path, Some(2..3), resp).unwrap_err();
assert_eq!(
err.to_string(),
"Received non-partial response when range requested"
);

let resp = make_response(
12,
Some(2..3),
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-3/12"),
);
let err = get_result::<TestClient>(&path, Some(2..3), resp).unwrap_err();
assert_eq!(err.to_string(), "Requested 2..3, got 2..4");

let resp = make_response(
12,
Some(2..3),
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-2/*"),
);
let err = get_result::<TestClient>(&path, Some(2..3), resp).unwrap_err();
assert_eq!(
err.to_string(),
"Failed to parse value for CONTENT_RANGE header: \"bytes 2-2/*\""
);

let resp = make_response(12, Some(2..3), StatusCode::PARTIAL_CONTENT, None);
let err = get_result::<TestClient>(&path, Some(2..3), resp).unwrap_err();
assert_eq!(
err.to_string(),
"Content-Range header not present in partial response"
);
}
}
17 changes: 14 additions & 3 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1303,12 +1303,23 @@ mod tests {
let range = 3..7;
let range_result = storage.get_range(&location, range.clone()).await;

let bytes = range_result.unwrap();
assert_eq!(bytes, expected_data.slice(range.clone()));

let opts = GetOptions {
range: Some(2..5),
..Default::default()
};
let result = storage.get_opts(&location, opts).await.unwrap();
// Data is `"arbitrary data"`, length 14 bytes
assert_eq!(result.meta.size, 14); // Should return full object size (#5272)
assert_eq!(result.range, 2..5);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"bit".as_ref());

let out_of_range = 200..300;
let out_of_range_result = storage.get_range(&location, out_of_range).await;

let bytes = range_result.unwrap();
assert_eq!(bytes, expected_data.slice(range));

// Should be a non-fatal error
out_of_range_result.unwrap_err();

Expand Down

0 comments on commit 5a67f1f

Please sign in to comment.