Skip to content

Commit

Permalink
object_store: review comments
Browse files Browse the repository at this point in the history
- Use idiomatic snafu error handling
- fast-fail on azure suffix requests
- remove unused GetRange utilities
  • Loading branch information
clbarnes committed Jan 2, 2024
1 parent 2c46bab commit 9845874
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 437 deletions.
1 change: 0 additions & 1 deletion object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ reqwest = { version = "0.11", default-features = false, features = ["rustls-tls-
ring = { version = "0.17", default-features = false, features = ["std"], optional = true }
rustls-pemfile = { version = "1.0", default-features = false, optional = true }
tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util"] }
http-content-range = "0.1.2"

[target.'cfg(target_family="unix")'.dev-dependencies]
nix = { version = "0.27.1", features = ["fs"] }
Expand Down
19 changes: 19 additions & 0 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ pub(crate) enum Error {
source: crate::client::header::Error,
},

#[snafu(display("Operation not supported by this store: {reason}"))]
NotSupported { reason: &'static str },

#[snafu(display("ETag required for conditional update"))]
MissingETag,
}
Expand All @@ -109,6 +112,9 @@ impl From<Error> for crate::Error {
Error::GetRequest { source, path }
| Error::DeleteRequest { source, path }
| Error::PutRequest { source, path } => source.error(STORE, path),
Error::NotSupported { .. } => Self::NotSupported {
source: Box::new(err),
},
_ => Self::Generic {
store: STORE,
source: Box::new(err),
Expand Down Expand Up @@ -356,6 +362,19 @@ impl GetClient for AzureClient {
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
// As of 2024-01-02, Azure does not support suffix requests,
// so we should fail fast here rather than sending one
if let Some(r) = options.range.as_ref() {
match r {
crate::util::GetRange::Suffix(_) => {
Err(Error::NotSupported {
reason: "suffix request",
})?;
}
_ => (),
}
}

let credential = self.get_credential().await?;
let url = self.config.path_url(path);
let method = match options.head {
Expand Down
71 changes: 56 additions & 15 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,52 @@ use crate::{GetResultPayload, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use reqwest::Response;
use snafu::Snafu;
use snafu::{ResultExt, Snafu};

/// A specialized `Error` for get-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub(crate) enum Error {
#[snafu(display("Could not extract metadata from response headers"))]
Header {
store: &'static str,
source: crate::client::header::Error,
},

#[snafu(display("Requested an invalid range"))]
InvalidRangeRequest {
store: &'static str,
source: crate::util::InvalidGetRange,
},

#[snafu(display("Got an invalid range response"))]
InvalidRangeResponse {
store: &'static str,
source: crate::util::InvalidRangeResponse,
},

#[snafu(display("Requested {expected:?}, got {actual:?}"))]
UnexpectedRange {
store: &'static str,
expected: Range<usize>,
actual: Range<usize>,
},
}

impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
let store = match err {
Error::Header { store, .. } => store,
Error::InvalidRangeRequest { store, .. } => store,
Error::InvalidRangeResponse { store, .. } => store,
Error::UnexpectedRange { store, .. } => store,
};
Self::Generic {
store: store,
source: Box::new(err),
}
}
}

/// A client that can perform a get request
#[async_trait]
Expand All @@ -38,13 +83,6 @@ pub trait GetClient: Send + Sync + 'static {
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response>;
}

#[derive(Debug, Snafu)]
#[snafu(display("Requested range {expected:?}, got {actual:?}"))]
pub struct UnexpectedRange {
expected: Range<usize>,
actual: Range<usize>,
}

/// Extension trait for [`GetClient`] that adds common retrieval functionality
#[async_trait]
pub trait GetClientExt {
Expand All @@ -57,20 +95,23 @@ impl<T: GetClient> GetClientExt for T {
let range = options.range.clone();
let response = self.get_request(location, options).await?;
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG)
.map_err(|e| as_generic_err(T::STORE, e))?;
.context(HeaderSnafu { store: T::STORE })?;

// ensure that we receive the range we asked for
let out_range = if let Some(r) = range {
let actual = r
.as_range(meta.size)
.map_err(|source| as_generic_err(T::STORE, source))?;
.context(InvalidRangeRequestSnafu { store: T::STORE })?;

let expected =
response_range(&response).map_err(|source| as_generic_err(T::STORE, source))?;
response_range(&response).context(InvalidRangeResponseSnafu { store: T::STORE })?;

if actual != expected {
return Err(as_generic_err(
T::STORE,
UnexpectedRange { expected, actual },
));
Err(Error::UnexpectedRange {
store: T::STORE,
expected,
actual: actual.clone(),
})?;
}
actual
} else {
Expand Down
12 changes: 7 additions & 5 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::{
maybe_spawn_blocking,
path::{absolute_path_to_url, Path},
util::as_generic_err,
util::InvalidGetRange,
GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutMode, PutOptions, PutResult, Result,
};
Expand All @@ -44,8 +44,6 @@ use tokio::io::AsyncWrite;
use url::Url;
use walkdir::{DirEntry, WalkDir};

const STORE: &'static str = "LocalFileSystem";

/// A specialized `Error` for filesystem object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
Expand Down Expand Up @@ -114,6 +112,11 @@ pub(crate) enum Error {
actual: usize,
},

#[snafu(display("Requested range was invalid"))]
InvalidRange {
source: InvalidGetRange,
},

#[snafu(display("Unable to copy file from {} to {}: {}", from.display(), to.display(), source))]
UnableToCopyFile {
from: PathBuf,
Expand Down Expand Up @@ -420,8 +423,7 @@ impl ObjectStore for LocalFileSystem {
options.check_preconditions(&meta)?;

let range = if let Some(r) = options.range {
r.as_range(meta.size)
.map_err(|e| as_generic_err(STORE, e))?
r.as_range(meta.size).context(InvalidRangeSnafu)?
} else {
0..meta.size
};
Expand Down
Loading

0 comments on commit 9845874

Please sign in to comment.