Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jan 3, 2024
1 parent 9845874 commit e6b3dba
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 160 deletions.
21 changes: 5 additions & 16 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::util::deserialize_rfc1123;
use crate::util::{deserialize_rfc1123, GetRange};
use crate::{
ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, PutOptions, PutResult,
Result, RetryConfig,
Expand Down Expand Up @@ -99,9 +99,6 @@ pub(crate) enum Error {
source: crate::client::header::Error,
},

#[snafu(display("Operation not supported by this store: {reason}"))]
NotSupported { reason: &'static str },

#[snafu(display("ETag required for conditional update"))]
MissingETag,
}
Expand All @@ -112,9 +109,6 @@ impl From<Error> for crate::Error {
Error::GetRequest { source, path }
| Error::DeleteRequest { source, path }
| Error::PutRequest { source, path } => source.error(STORE, path),
Error::NotSupported { .. } => Self::NotSupported {
source: Box::new(err),
},
_ => Self::Generic {
store: STORE,
source: Box::new(err),
Expand Down Expand Up @@ -364,15 +358,10 @@ impl GetClient for AzureClient {
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
// As of 2024-01-02, Azure does not support suffix requests,
// so we should fail fast here rather than sending one
if let Some(r) = options.range.as_ref() {
match r {
crate::util::GetRange::Suffix(_) => {
Err(Error::NotSupported {
reason: "suffix request",
})?;
}
_ => (),
}
if let Some(GetRange::Suffix(_)) = options.range.as_ref() {
return Err(crate::Error::NotSupported {
source: "Azure does not support suffix range requests".into(),
});
}

let credential = self.get_credential().await?;
Expand Down
145 changes: 81 additions & 64 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,49 @@ use std::ops::Range;

use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::util::{as_generic_err, response_range};
use crate::{GetOptions, GetResult};
use crate::{GetResultPayload, Result};
use crate::{GetOptions, GetRange, GetResult, GetResultPayload};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use hyper::header::CONTENT_RANGE;
use hyper::StatusCode;
use reqwest::header::ToStrError;
use reqwest::Response;
use snafu::{ResultExt, Snafu};
use snafu::{ensure, OptionExt, ResultExt, Snafu};

/// A specialized `Error` for get-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub(crate) enum Error {
#[snafu(display("Could not extract metadata from response headers"))]
#[snafu(context(false))]
Header {
store: &'static str,
source: crate::client::header::Error,
},

#[snafu(display("Requested an invalid range"))]
#[snafu(context(false))]
InvalidRangeRequest {
store: &'static str,
source: crate::util::InvalidGetRange,
},

#[snafu(display("Got an invalid range response"))]
InvalidRangeResponse {
store: &'static str,
source: crate::util::InvalidRangeResponse,
},
#[snafu(display("Received non-partial response when range requested"))]
NotPartial,

#[snafu(display("Content-Range header not present"))]
NoContentRange,

#[snafu(display("Failed to parse value for CONTENT_RANGE header: {value}"))]
ParseContentRange { value: String },

#[snafu(display("Content-Range header contained non UTF-8 characters"))]
InvalidContentRange { source: ToStrError },

#[snafu(display("Requested {expected:?}, got {actual:?}"))]
UnexpectedRange {
store: &'static str,
expected: Range<usize>,
actual: Range<usize>,
},
}

impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
let store = match err {
Error::Header { store, .. } => store,
Error::InvalidRangeRequest { store, .. } => store,
Error::InvalidRangeResponse { store, .. } => store,
Error::UnexpectedRange { store, .. } => store,
};
Self::Generic {
store: store,
source: Box::new(err),
}
}
}
pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;

/// A client that can perform a get request
#[async_trait]
Expand All @@ -80,53 +71,79 @@ pub trait GetClient: Send + Sync + 'static {
/// Configure the [`HeaderConfig`] for this client
const HEADER_CONFIG: HeaderConfig;

async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response>;
async fn get_request(&self, path: &Path, options: GetOptions) -> crate::Result<Response>;
}

/// Extension trait for [`GetClient`] that adds common retrieval functionality
#[async_trait]
pub trait GetClientExt {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
async fn get_opts(&self, location: &Path, options: GetOptions) -> crate::Result<GetResult>;
}

#[async_trait]
impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
async fn get_opts(&self, location: &Path, options: GetOptions) -> crate::Result<GetResult> {
let range = options.range.clone();
let response = self.get_request(location, options).await?;
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG)
.context(HeaderSnafu { store: T::STORE })?;

// ensure that we receive the range we asked for
let out_range = if let Some(r) = range {
let actual = r
.as_range(meta.size)
.context(InvalidRangeRequestSnafu { store: T::STORE })?;

let expected =
response_range(&response).context(InvalidRangeResponseSnafu { store: T::STORE })?;

if actual != expected {
Err(Error::UnexpectedRange {
store: T::STORE,
expected,
actual: actual.clone(),
})?;
}
actual
} else {
0..meta.size
};

let stream = response
.bytes_stream()
.map_err(|source| as_generic_err(T::STORE, source))
.boxed();

Ok(GetResult {
range: out_range,
payload: GetResultPayload::Stream(stream),
meta,
get_result::<T>(location, range, response).map_err(|e| crate::Error::Generic {
store: T::STORE,
source: Box::new(e),
})
}
}
fn parse_content_range(s: &str) -> Option<Range<usize>> {
let rem = s.trim().strip_prefix("bytes ")?;
let (range, _) = rem.split_once('/')?;
let (start_s, end_s) = range.split_once('-')?;

let start = start_s.parse().ok()?;
let end: usize = end_s.parse().ok()?;

Some(start..(end + 1))
}

fn get_result<T: GetClient>(
location: &Path,
range: Option<GetRange>,
response: Response,
) -> Result<GetResult> {
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?;

// ensure that we receive the range we asked for
let out_range = if let Some(r) = range {
let expected = r.as_range(meta.size)?;

ensure!(
response.status() == StatusCode::PARTIAL_CONTENT,
NotPartialSnafu
);
let val = response
.headers()
.get(CONTENT_RANGE)
.context(NoContentRangeSnafu)?;

let value = val.to_str().context(InvalidContentRangeSnafu)?;
let actual = parse_content_range(value).context(ParseContentRangeSnafu { value })?;
ensure!(
actual == expected,
UnexpectedRangeSnafu { expected, actual }
);
actual
} else {
0..meta.size
};

let stream = response
.bytes_stream()
.map_err(|source| crate::Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.boxed();

Ok(GetResult {
range: out_range,
payload: GetResultPayload::Stream(stream),
meta,
})
}
4 changes: 2 additions & 2 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ mod parse;
mod util;

pub use parse::{parse_url, parse_url_opts};
use util::GetRange;
pub use util::GetRange;

use crate::path::Path;
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -1889,7 +1889,7 @@ mod tests {

// We can abort an in-progress write
let (upload_id, mut writer) = storage.put_multipart(&location).await.unwrap();
if let Some(chunk) = data.get(0) {
if let Some(chunk) = data.first() {
writer.write_all(chunk).await.unwrap();
let _ = writer.write(chunk).await.unwrap();
}
Expand Down
7 changes: 3 additions & 4 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,9 @@ impl ObjectStore for LocalFileSystem {
let meta = convert_metadata(metadata, location)?;
options.check_preconditions(&meta)?;

let range = if let Some(r) = options.range {
r.as_range(meta.size).context(InvalidRangeSnafu)?
} else {
0..meta.size
let range = match options.range {
Some(r) => r.as_range(meta.size).context(InvalidRangeSnafu)?,
None => 0..meta.size,
};

Ok(GetResult {
Expand Down
Loading

0 comments on commit e6b3dba

Please sign in to comment.