Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): support user defined metadata for oss #4881

Merged
merged 4 commits into from
Jul 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ mod tests {
#[test]
fn assert_size() {
assert_eq!(40, size_of::<Operator>());
assert_eq!(256, size_of::<Entry>());
assert_eq!(232, size_of::<Metadata>());
assert_eq!(304, size_of::<Entry>());
assert_eq!(280, size_of::<Metadata>());
assert_eq!(1, size_of::<EntryMode>());
assert_eq!(24, size_of::<Scheme>());
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
//! access raw APIs.
//! - Raw APIs should only be accessed via `opendal::raw::Xxxx`, any public
//! API should never expose raw API directly.
//! - Raw APIs are far more less stable than public API, please don't rely on
//! - Raw APIs are far less stable than public API, please don't rely on
//! them whenever possible.

mod accessor;
Expand Down
21 changes: 17 additions & 4 deletions core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//!
//! By using ops, users can add more context for operation.

use std::collections::HashMap;
use std::time::Duration;

use flagset::FlagSet;
Expand Down Expand Up @@ -537,24 +538,24 @@ impl OpStat {
self.override_content_disposition.as_deref()
}

/// Sets the cache-control header that should be send back by the remote read operation.
/// Sets the cache-control header that should be sent back by the remote read operation.
pub fn with_override_cache_control(mut self, cache_control: &str) -> Self {
self.override_cache_control = Some(cache_control.into());
self
}

/// Returns the cache-control header that should be send back by the remote read operation.
/// Returns the cache-control header that should be sent back by the remote read operation.
pub fn override_cache_control(&self) -> Option<&str> {
self.override_cache_control.as_deref()
}

/// Sets the content-type header that should be send back by the remote read operation.
/// Sets the content-type header that should be sent back by the remote read operation.
pub fn with_override_content_type(mut self, content_type: &str) -> Self {
self.override_content_type = Some(content_type.into());
self
}

/// Returns the content-type header that should be send back by the remote read operation.
/// Returns the content-type header that should be sent back by the remote read operation.
pub fn override_content_type(&self) -> Option<&str> {
self.override_content_type.as_deref()
}
Expand All @@ -580,6 +581,7 @@ pub struct OpWrite {
content_disposition: Option<String>,
cache_control: Option<String>,
executor: Option<Executor>,
user_metadata: Option<HashMap<String, String>>,
}

impl OpWrite {
Expand Down Expand Up @@ -677,6 +679,17 @@ impl OpWrite {
}
self
}

/// Set the user defined metadata of the op
pub fn with_user_metadata(mut self, metadata: HashMap<String, String>) -> Self {
self.user_metadata = Some(metadata);
self
}

/// Get the user defined metadata from the op
pub fn user_metadata(&self) -> Option<&HashMap<String, String>> {
self.user_metadata.as_ref()
}
}

/// Args for `writer` operation.
Expand Down
7 changes: 5 additions & 2 deletions core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl OssBuilder {
self
}

/// Set a endpoint for generating presigned urls.
/// Set an endpoint for generating presigned urls.
///
/// You can offer a public endpoint like <https://oss-cn-beijing.aliyuncs.com> to return a presinged url for
/// public accessors, along with an internal endpoint like <https://oss-cn-beijing-internal.aliyuncs.com>
Expand Down Expand Up @@ -416,6 +416,7 @@ impl Access for OssBackend {
} else {
Some(usize::MAX)
},
write_with_user_metadata: true,

delete: true,
copy: true,
Expand Down Expand Up @@ -447,7 +448,9 @@ impl Access for OssBackend {
match status {
StatusCode::OK => {
let headers = resp.headers();
let mut meta = parse_into_metadata(path, headers)?;
let mut meta =
self.core
.parse_metadata(path, constants::X_OSS_META_PREFIX, resp.headers())?;

if let Some(v) = parse_header_to_str(headers, "x-oss-version-id")? {
meta.set_version(v);
Expand Down
119 changes: 90 additions & 29 deletions core/src/services/oss/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Write;
Expand All @@ -28,10 +29,10 @@ use http::header::CONTENT_TYPE;
use http::header::IF_MATCH;
use http::header::IF_NONE_MATCH;
use http::header::RANGE;
use http::HeaderName;
use http::HeaderValue;
use http::Request;
use http::Response;
use http::{HeaderMap, HeaderName};
use reqsign::AliyunCredential;
use reqsign::AliyunLoader;
use reqsign::AliyunOssSigner;
Expand All @@ -41,14 +42,16 @@ use serde::Serialize;
use crate::raw::*;
use crate::*;

mod constants {
pub mod constants {
pub const X_OSS_SERVER_SIDE_ENCRYPTION: &str = "x-oss-server-side-encryption";

pub const X_OSS_SERVER_SIDE_ENCRYPTION_KEY_ID: &str = "x-oss-server-side-encryption-key-id";

pub const RESPONSE_CONTENT_DISPOSITION: &str = "response-content-disposition";

pub const OSS_QUERY_VERSION_ID: &str = "versionId";

pub const X_OSS_META_PREFIX: &str = "x-oss-meta-";
}

pub struct OssCore {
Expand Down Expand Up @@ -156,6 +159,88 @@ impl OssCore {
}
req
}

fn insert_metadata_headers(
&self,
mut req: http::request::Builder,
size: Option<u64>,
args: &OpWrite,
) -> Result<http::request::Builder> {
req = req.header(CONTENT_LENGTH, size.unwrap_or_default());

if let Some(mime) = args.content_type() {
req = req.header(CONTENT_TYPE, mime);
}

if let Some(pos) = args.content_disposition() {
req = req.header(CONTENT_DISPOSITION, pos);
}

if let Some(cache_control) = args.cache_control() {
req = req.header(CACHE_CONTROL, cache_control);
}

if let Some(user_metadata) = args.user_metadata() {
for (key, value) in user_metadata {
// before insert user defined metadata header, add prefix to the header name
if !self.check_user_metadata_key(key) {
return Err(Error::new(
ErrorKind::Unsupported,
"the format of the user metadata key is invalid, please refer the document",
));
}
req = req.header(format!("{}{}", constants::X_OSS_META_PREFIX, key), value)
}
}

Ok(req)
}

// According to https://help.aliyun.com/zh/oss/developer-reference/putobject
// there are some limits in user defined metadata key
fn check_user_metadata_key(&self, key: &str) -> bool {
key.chars().all(|c| c.is_ascii_alphanumeric() || c == '-')
}

/// parse_metadata will parse http headers(including standards http headers
/// and user defined metadata header) into Metadata.
///
/// # Arguments
///
/// * `user_metadata_prefix` is the prefix of user defined metadata key
///
/// # Notes
///
/// before return the user defined metadata, we'll strip the user_metadata_prefix from the key
pub fn parse_metadata(
&self,
path: &str,
user_metadata_prefix: &str,
headers: &HeaderMap,
) -> Result<Metadata> {
let mut m = parse_into_metadata(path, headers)?;

let data: HashMap<String, String> = headers
.iter()
.filter_map(|(key, _)| {
if key.as_str().starts_with(user_metadata_prefix) {
if let Ok(Some(value)) = parse_header_to_str(headers, key) {
let key_str = key.to_string();
let stripped_key = key_str
.strip_prefix(user_metadata_prefix)
.expect("strip prefix must succeed");
return Some((stripped_key.to_string(), value.to_string()));
}
}
None
})
.collect();
if !data.is_empty() {
m.with_user_metadata(data);
}

Ok(m)
}
}

impl OssCore {
Expand All @@ -174,19 +259,7 @@ impl OssCore {

let mut req = Request::put(&url);

req = req.header(CONTENT_LENGTH, size.unwrap_or_default());

if let Some(mime) = args.content_type() {
req = req.header(CONTENT_TYPE, mime);
}

if let Some(pos) = args.content_disposition() {
req = req.header(CONTENT_DISPOSITION, pos);
}

if let Some(cache_control) = args.cache_control() {
req = req.header(CACHE_CONTROL, cache_control)
}
req = self.insert_metadata_headers(req, size, args)?;

// set sse headers
req = self.insert_sse_headers(req);
Expand Down Expand Up @@ -214,19 +287,7 @@ impl OssCore {

let mut req = Request::post(&url);

req = req.header(CONTENT_LENGTH, size);

if let Some(mime) = args.content_type() {
req = req.header(CONTENT_TYPE, mime);
}

if let Some(pos) = args.content_disposition() {
req = req.header(CONTENT_DISPOSITION, pos);
}

if let Some(cache_control) = args.cache_control() {
req = req.header(CACHE_CONTROL, cache_control)
}
req = self.insert_metadata_headers(req, Some(size), args)?;

// set sse headers
req = self.insert_sse_headers(req);
Expand Down Expand Up @@ -593,7 +654,7 @@ impl OssCore {
self.send(req).await
}

/// Abort an on-going multipart upload.
/// Abort an ongoing multipart upload.
/// reference docs https://www.alibabacloud.com/help/zh/oss/developer-reference/abortmultipartupload
pub async fn oss_abort_multipart_upload(
&self,
Expand Down
2 changes: 2 additions & 0 deletions core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ pub struct Capability {
pub write_with_content_disposition: bool,
/// If operator supports write with cache control.
pub write_with_cache_control: bool,
/// If operator supports write with user defined metadata
pub write_with_user_metadata: bool,
/// write_multi_max_size is the max size that services support in write_multi.
///
/// For example, AWS S3 supports 5GiB as max in write_multi.
Expand Down
4 changes: 2 additions & 2 deletions core/src/types/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ pub struct Lister {
/// use opendal::Entry;
/// use opendal::Result;
///
/// assert_eq!(256, size_of::<(String, Result<opendal::raw::RpStat>)>());
/// assert_eq!(256, size_of::<Option<Entry>>());
/// assert_eq!(304, size_of::<(String, Result<opendal::raw::RpStat>)>());
/// assert_eq!(304, size_of::<Option<Entry>>());
/// ```
///
/// So let's ignore this lint:
Expand Down
22 changes: 21 additions & 1 deletion core/src/types/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use chrono::prelude::*;
use flagset::flags;
use flagset::FlagSet;
use std::collections::HashMap;

use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -45,6 +46,7 @@ pub struct Metadata {
etag: Option<String>,
last_modified: Option<DateTime<Utc>>,
version: Option<String>,
user_metadata: Option<HashMap<String, String>>,
}

impl Metadata {
Expand All @@ -71,6 +73,7 @@ impl Metadata {
etag: None,
content_disposition: None,
version: None,
user_metadata: None,
}
}

Expand All @@ -87,7 +90,7 @@ impl Metadata {
self
}

/// Check if there metadata already contains given metakey.
/// Check if the metadata already contains given metakey.
pub(crate) fn contains_metakey(&self, metakey: impl Into<FlagSet<Metakey>>) -> bool {
let input_metakey = metakey.into();

Expand Down Expand Up @@ -507,6 +510,21 @@ impl Metadata {
self.metakey |= Metakey::Version;
self
}

/// User defined metadata of this entry
///
/// The prefix of the user defined metadata key(for example: in oss, it's x-oss-meta-)
/// is remove from the key
pub fn user_metadata(&self) -> Option<&HashMap<String, String>> {
self.user_metadata.as_ref()
}

/// Set user defined metadata of this entry
pub fn with_user_metadata(&mut self, data: HashMap<String, String>) -> &mut Self {
self.user_metadata = Some(data);
self.metakey |= Metakey::UserMetaData;
self
}
}

flags! {
Expand Down Expand Up @@ -548,5 +566,7 @@ flags! {
LastModified,
/// Key for version.
Version,
/// Key for user metadata
UserMetaData,
}
}
Loading
Loading