Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ObjectMeta::size for range requests (#5272) #5276

Merged
merged 4 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 222 additions & 21 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::Range;

use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::{Error, GetOptions, GetResult};
use crate::{GetResultPayload, Result};
use crate::{Error, GetOptions, GetResult, GetResultPayload, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use hyper::header::CONTENT_RANGE;
use hyper::StatusCode;
use reqwest::header::ToStrError;
use reqwest::Response;
use snafu::{ensure, OptionExt, ResultExt, Snafu};

/// A client that can perform a get request
#[async_trait]
Expand All @@ -45,25 +50,221 @@ impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
let response = self.get_request(location, options).await?;
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
}
})?;

let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.boxed();

Ok(GetResult {
range: range.unwrap_or(0..meta.size),
payload: GetResultPayload::Stream(stream),
meta,
get_result::<T>(location, range, response).map_err(|e| crate::Error::Generic {
store: T::STORE,
source: Box::new(e),
})
}
}

struct ContentRange {
/// The range of the object returned
range: Range<usize>,
/// The total size of the object being requested
size: usize,
}

impl ContentRange {
/// Parse a content range of the form `bytes <range-start>-<range-end>/<size>`
///
/// <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range>
fn from_str(s: &str) -> Option<Self> {
let rem = s.trim().strip_prefix("bytes ")?;
let (range, size) = rem.split_once('/')?;
let size = size.parse().ok()?;

let (start_s, end_s) = range.split_once('-')?;

let start = start_s.parse().ok()?;
let end: usize = end_s.parse().ok()?;

Some(Self {
size,
range: start..end + 1,
})
}
}

/// A specialized `Error` for get-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum GetResultError {
#[snafu(context(false))]
Header {
source: crate::client::header::Error,
},

#[snafu(display("Received non-partial response when range requested"))]
NotPartial,

#[snafu(display("Content-Range header not present in partial response"))]
NoContentRange,

#[snafu(display("Failed to parse value for CONTENT_RANGE header: \"{value}\""))]
ParseContentRange { value: String },

#[snafu(display("Content-Range header contained non UTF-8 characters"))]
InvalidContentRange { source: ToStrError },

#[snafu(display("Requested {expected:?}, got {actual:?}"))]
UnexpectedRange {
expected: Range<usize>,
actual: Range<usize>,
},
}

fn get_result<T: GetClient>(
location: &Path,
range: Option<Range<usize>>,
response: Response,
) -> Result<GetResult, GetResultError> {
let mut meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?;

// ensure that we receive the range we asked for
let range = if let Some(expected) = range {
ensure!(
response.status() == StatusCode::PARTIAL_CONTENT,
NotPartialSnafu
);
let val = response
.headers()
.get(CONTENT_RANGE)
.context(NoContentRangeSnafu)?;

let value = val.to_str().context(InvalidContentRangeSnafu)?;
let value = ContentRange::from_str(value).context(ParseContentRangeSnafu { value })?;
let actual = value.range;

ensure!(
actual == expected,
UnexpectedRangeSnafu { expected, actual }
);

// Update size to reflect full size of object (#5272)
meta.size = value.size;
actual
} else {
0..meta.size
};

let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.boxed();

Ok(GetResult {
range,
meta,
payload: GetResultPayload::Stream(stream),
})
}
alamb marked this conversation as resolved.
Show resolved Hide resolved

#[cfg(test)]
mod tests {
use super::*;
use hyper::http;
use hyper::http::header::*;

struct TestClient {}

#[async_trait]
impl GetClient for TestClient {
const STORE: &'static str = "TEST";

const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: false,
last_modified_required: false,
version_header: None,
};

async fn get_request(&self, _: &Path, _: GetOptions) -> Result<Response> {
unimplemented!()
}
}

fn make_response(
object_size: usize,
range: Option<Range<usize>>,
status: StatusCode,
content_range: Option<&str>,
) -> Response {
let mut builder = http::Response::builder();
if let Some(range) = content_range {
builder = builder.header(CONTENT_RANGE, range);
}

let body = match range {
Some(range) => vec![0_u8; range.end - range.start],
None => vec![0_u8; object_size],
};

builder
.status(status)
.header(CONTENT_LENGTH, object_size)
.body(body)
.unwrap()
.into()
}

#[tokio::test]
async fn test_get_result() {
let path = Path::from("test");

let resp = make_response(12, None, StatusCode::OK, None);
let res = get_result::<TestClient>(&path, None, resp).unwrap();
assert_eq!(res.meta.size, 12);
assert_eq!(res.range, 0..12);
let bytes = res.bytes().await.unwrap();
assert_eq!(bytes.len(), 12);

let resp = make_response(
12,
Some(2..3),
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-2/12"),
);
let res = get_result::<TestClient>(&path, Some(2..3), resp).unwrap();
assert_eq!(res.meta.size, 12);
assert_eq!(res.range, 2..3);
let bytes = res.bytes().await.unwrap();
assert_eq!(bytes.len(), 1);

let resp = make_response(12, Some(2..3), StatusCode::OK, None);
let err = get_result::<TestClient>(&path, Some(2..3), resp).unwrap_err();
assert_eq!(
err.to_string(),
"Received non-partial response when range requested"
);

let resp = make_response(
12,
Some(2..3),
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-3/12"),
);
let err = get_result::<TestClient>(&path, Some(2..3), resp).unwrap_err();
assert_eq!(err.to_string(), "Requested 2..3, got 2..4");

let resp = make_response(
12,
Some(2..3),
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-2/*"),
);
let err = get_result::<TestClient>(&path, Some(2..3), resp).unwrap_err();
assert_eq!(
err.to_string(),
"Failed to parse value for CONTENT_RANGE header: \"bytes 2-2/*\""
);

let resp = make_response(12, Some(2..3), StatusCode::PARTIAL_CONTENT, None);
let err = get_result::<TestClient>(&path, Some(2..3), resp).unwrap_err();
assert_eq!(
err.to_string(),
"Content-Range header not present in partial response"
);
}
}
17 changes: 14 additions & 3 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1303,12 +1303,23 @@ mod tests {
let range = 3..7;
let range_result = storage.get_range(&location, range.clone()).await;

let bytes = range_result.unwrap();
assert_eq!(bytes, expected_data.slice(range.clone()));

let opts = GetOptions {
range: Some(2..5),
..Default::default()
};
let result = storage.get_opts(&location, opts).await.unwrap();
// Data is `"arbitrary data"`, length 14 bytes
assert_eq!(result.meta.size, 14); // Should return full object size (#5272)
tustvold marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(result.range, 2..5);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"bit".as_ref());

let out_of_range = 200..300;
let out_of_range_result = storage.get_range(&location, out_of_range).await;

let bytes = range_result.unwrap();
assert_eq!(bytes, expected_data.slice(range));

// Should be a non-fatal error
out_of_range_result.unwrap_err();

Expand Down
Loading