diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index f9b4950e58db..a30dfe8935be 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -25,7 +25,7 @@ use crate::client::retry::RetryExt; use crate::client::GetOptionsExt; use crate::multipart::PartId; use crate::path::DELIMITER; -use crate::util::deserialize_rfc1123; +use crate::util::{deserialize_rfc1123, GetRange}; use crate::{ ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, PutOptions, PutResult, Result, RetryConfig, @@ -99,9 +99,6 @@ pub(crate) enum Error { source: crate::client::header::Error, }, - #[snafu(display("Operation not supported by this store: {reason}"))] - NotSupported { reason: &'static str }, - #[snafu(display("ETag required for conditional update"))] MissingETag, } @@ -112,9 +109,6 @@ impl From for crate::Error { Error::GetRequest { source, path } | Error::DeleteRequest { source, path } | Error::PutRequest { source, path } => source.error(STORE, path), - Error::NotSupported { .. } => Self::NotSupported { - source: Box::new(err), - }, _ => Self::Generic { store: STORE, source: Box::new(err), @@ -364,15 +358,10 @@ impl GetClient for AzureClient { async fn get_request(&self, path: &Path, options: GetOptions) -> Result { // As of 2024-01-02, Azure does not support suffix requests, // so we should fail fast here rather than sending one - if let Some(r) = options.range.as_ref() { - match r { - crate::util::GetRange::Suffix(_) => { - Err(Error::NotSupported { - reason: "suffix request", - })?; - } - _ => (), - } + if let Some(GetRange::Suffix(_)) = options.range.as_ref() { + return Err(crate::Error::NotSupported { + source: "Azure does not support suffix range requests".into(), + }); } let credential = self.get_credential().await?; diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs index fb27d4eccbaa..effca0dffb4c 100644 --- a/object_store/src/client/get.rs +++ b/object_store/src/client/get.rs @@ -19,58 +19,49 @@ use std::ops::Range; use crate::client::header::{header_meta, HeaderConfig}; use crate::path::Path; -use crate::util::{as_generic_err, response_range}; -use crate::{GetOptions, GetResult}; -use crate::{GetResultPayload, Result}; +use crate::{GetOptions, GetRange, GetResult, GetResultPayload}; use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; +use hyper::header::CONTENT_RANGE; +use hyper::StatusCode; +use reqwest::header::ToStrError; use reqwest::Response; -use snafu::{ResultExt, Snafu}; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; /// A specialized `Error` for get-related errors #[derive(Debug, Snafu)] #[allow(missing_docs)] pub(crate) enum Error { - #[snafu(display("Could not extract metadata from response headers"))] + #[snafu(context(false))] Header { - store: &'static str, source: crate::client::header::Error, }, - #[snafu(display("Requested an invalid range"))] + #[snafu(context(false))] InvalidRangeRequest { - store: &'static str, source: crate::util::InvalidGetRange, }, - #[snafu(display("Got an invalid range response"))] - InvalidRangeResponse { - store: &'static str, - source: crate::util::InvalidRangeResponse, - }, + #[snafu(display("Received non-partial response when range requested"))] + NotPartial, + + #[snafu(display("Content-Range header not present"))] + NoContentRange, + + #[snafu(display("Failed to parse value for CONTENT_RANGE header: {value}"))] + ParseContentRange { value: String }, + + #[snafu(display("Content-Range header contained non UTF-8 characters"))] + InvalidContentRange { source: ToStrError }, #[snafu(display("Requested {expected:?}, got {actual:?}"))] UnexpectedRange { - store: &'static str, expected: Range, actual: Range, }, } -impl From for crate::Error { - fn from(err: Error) -> Self { - let store = match err { - Error::Header { store, .. } => store, - Error::InvalidRangeRequest { store, .. } => store, - Error::InvalidRangeResponse { store, .. } => store, - Error::UnexpectedRange { store, .. } => store, - }; - Self::Generic { - store: store, - source: Box::new(err), - } - } -} +pub(crate) type Result = std::result::Result; /// A client that can perform a get request #[async_trait] @@ -80,53 +71,79 @@ pub trait GetClient: Send + Sync + 'static { /// Configure the [`HeaderConfig`] for this client const HEADER_CONFIG: HeaderConfig; - async fn get_request(&self, path: &Path, options: GetOptions) -> Result; + async fn get_request(&self, path: &Path, options: GetOptions) -> crate::Result; } /// Extension trait for [`GetClient`] that adds common retrieval functionality #[async_trait] pub trait GetClientExt { - async fn get_opts(&self, location: &Path, options: GetOptions) -> Result; + async fn get_opts(&self, location: &Path, options: GetOptions) -> crate::Result; } #[async_trait] impl GetClientExt for T { - async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + async fn get_opts(&self, location: &Path, options: GetOptions) -> crate::Result { let range = options.range.clone(); let response = self.get_request(location, options).await?; - let meta = header_meta(location, response.headers(), T::HEADER_CONFIG) - .context(HeaderSnafu { store: T::STORE })?; - - // ensure that we receive the range we asked for - let out_range = if let Some(r) = range { - let actual = r - .as_range(meta.size) - .context(InvalidRangeRequestSnafu { store: T::STORE })?; - - let expected = - response_range(&response).context(InvalidRangeResponseSnafu { store: T::STORE })?; - - if actual != expected { - Err(Error::UnexpectedRange { - store: T::STORE, - expected, - actual: actual.clone(), - })?; - } - actual - } else { - 0..meta.size - }; - - let stream = response - .bytes_stream() - .map_err(|source| as_generic_err(T::STORE, source)) - .boxed(); - - Ok(GetResult { - range: out_range, - payload: GetResultPayload::Stream(stream), - meta, + get_result::(location, range, response).map_err(|e| crate::Error::Generic { + store: T::STORE, + source: Box::new(e), }) } } +fn parse_content_range(s: &str) -> Option> { + let rem = s.trim().strip_prefix("bytes ")?; + let (range, _) = rem.split_once('/')?; + let (start_s, end_s) = range.split_once('-')?; + + let start = start_s.parse().ok()?; + let end: usize = end_s.parse().ok()?; + + Some(start..(end + 1)) +} + +fn get_result( + location: &Path, + range: Option, + response: Response, +) -> Result { + let meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?; + + // ensure that we receive the range we asked for + let out_range = if let Some(r) = range { + let expected = r.as_range(meta.size)?; + + ensure!( + response.status() == StatusCode::PARTIAL_CONTENT, + NotPartialSnafu + ); + let val = response + .headers() + .get(CONTENT_RANGE) + .context(NoContentRangeSnafu)?; + + let value = val.to_str().context(InvalidContentRangeSnafu)?; + let actual = parse_content_range(value).context(ParseContentRangeSnafu { value })?; + ensure!( + actual == expected, + UnexpectedRangeSnafu { expected, actual } + ); + actual + } else { + 0..meta.size + }; + + let stream = response + .bytes_stream() + .map_err(|source| crate::Error::Generic { + store: T::STORE, + source: Box::new(source), + }) + .boxed(); + + Ok(GetResult { + range: out_range, + payload: GetResultPayload::Stream(stream), + meta, + }) +} diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index cc96808bd894..9dfbb3c6aadb 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -497,7 +497,7 @@ mod parse; mod util; pub use parse::{parse_url, parse_url_opts}; -use util::GetRange; +pub use util::GetRange; use crate::path::Path; #[cfg(not(target_arch = "wasm32"))] @@ -1889,7 +1889,7 @@ mod tests { // We can abort an in-progress write let (upload_id, mut writer) = storage.put_multipart(&location).await.unwrap(); - if let Some(chunk) = data.get(0) { + if let Some(chunk) = data.first() { writer.write_all(chunk).await.unwrap(); let _ = writer.write(chunk).await.unwrap(); } diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 4dd4647b687e..b62122c772ff 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -422,10 +422,9 @@ impl ObjectStore for LocalFileSystem { let meta = convert_metadata(metadata, location)?; options.check_preconditions(&meta)?; - let range = if let Some(r) = options.range { - r.as_range(meta.size).context(InvalidRangeSnafu)? - } else { - 0..meta.size + let range = match options.range { + Some(r) => r.as_range(meta.size).context(InvalidRangeSnafu)?, + None => 0..meta.size, }; Ok(GetResult { diff --git a/object_store/src/util.rs b/object_store/src/util.rs index a12630ec7ef7..fa3edf69d270 100644 --- a/object_store/src/util.rs +++ b/object_store/src/util.rs @@ -19,16 +19,11 @@ use std::{ fmt::Display, ops::{Range, RangeBounds}, - str::from_utf8, }; -use crate::Error; - use super::Result; use bytes::Bytes; use futures::{stream::StreamExt, Stream, TryStreamExt}; -use hyper::{header::CONTENT_RANGE, http::HeaderValue, StatusCode}; -use reqwest::Response; use snafu::Snafu; #[cfg(any(feature = "azure", feature = "http"))] @@ -109,12 +104,12 @@ pub const OBJECT_STORE_COALESCE_PARALLEL: usize = 10; /// * Make multiple `fetch` requests in parallel (up to maximum of 10) /// pub async fn coalesce_ranges( - ranges: &[std::ops::Range], + ranges: &[Range], fetch: F, coalesce: usize, ) -> Result, E> where - F: Send + FnMut(std::ops::Range) -> Fut, + F: Send + FnMut(Range) -> Fut, E: Send, Fut: std::future::Future> + Send, { @@ -141,7 +136,7 @@ where } /// Returns a sorted list of ranges that cover `ranges` -fn merge_ranges(ranges: &[std::ops::Range], coalesce: usize) -> Vec> { +fn merge_ranges(ranges: &[Range], coalesce: usize) -> Vec> { if ranges.is_empty() { return vec![]; } @@ -183,11 +178,11 @@ fn merge_ranges(ranges: &[std::ops::Range], coalesce: usize) -> Vec] if valid. + /// Convert to a [`Range`] if valid. pub(crate) fn as_range(&self, len: usize) -> Result, InvalidGetRange> { match self { Self::Bounded(r) => { @@ -263,9 +258,9 @@ impl GetRange { impl Display for GetRange { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - GetRange::Bounded(r) => f.write_fmt(format_args!("{}-{}", r.start, r.end - 1)), - GetRange::Offset(o) => f.write_fmt(format_args!("{o}-")), - GetRange::Suffix(n) => f.write_fmt(format_args!("-{n}")), + Self::Bounded(r) => f.write_fmt(format_args!("{}-{}", r.start, r.end - 1)), + Self::Offset(o) => f.write_fmt(format_args!("{o}-")), + Self::Suffix(n) => f.write_fmt(format_args!("-{n}")), } } } @@ -279,66 +274,13 @@ impl> From for GetRange { Unbounded => 0, }; match value.end_bound() { - Included(i) => GetRange::Bounded(first..(i + 1)), - Excluded(i) => GetRange::Bounded(first..*i), - Unbounded => GetRange::Offset(first), + Included(i) => Self::Bounded(first..(i + 1)), + Excluded(i) => Self::Bounded(first..*i), + Unbounded => Self::Offset(first), } } } -#[derive(Debug, Snafu)] -pub enum InvalidRangeResponse { - #[snafu(display("Response was not PARTIAL_CONTENT; length {length:?}"))] - NotPartial { length: Option }, - #[snafu(display("Content-Range header not present"))] - NoContentRange, - #[snafu(display("Content-Range header could not be parsed: {value:?}"))] - InvalidContentRange { value: Vec }, -} - -fn parse_content_range(val: &HeaderValue) -> Result, InvalidRangeResponse> { - let bts = val.as_bytes(); - - let err = || InvalidRangeResponse::InvalidContentRange { - value: bts.to_vec(), - }; - let s = from_utf8(bts).map_err(|_| err())?; - let rem = s.trim().strip_prefix("bytes ").ok_or_else(err)?; - let (range, _) = rem.split_once("/").ok_or_else(err)?; - let (start_s, end_s) = range.split_once("-").ok_or_else(err)?; - - let start = start_s.parse().map_err(|_| err())?; - let end: usize = end_s.parse().map_err(|_| err())?; - - Ok(start..(end + 1)) -} - -/// Ensure that the given [Response] contains Partial Content with a single byte range, -/// and get that range range. -pub(crate) fn response_range(r: &Response) -> Result, InvalidRangeResponse> { - use InvalidRangeResponse::*; - - if r.status() != StatusCode::PARTIAL_CONTENT { - return Err(NotPartial { - length: r.content_length().map(|s| s as usize), - }); - } - - let val = r.headers().get(CONTENT_RANGE).ok_or(NoContentRange)?; - - parse_content_range(val) -} - -pub(crate) fn as_generic_err( - store: &'static str, - source: E, -) -> Error { - Error::Generic { - store, - source: Box::new(source), - } -} - #[cfg(test)] mod tests { use crate::Error;