From 1d089426dfa74097fb8d71cd174604e676d17032 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 1 Nov 2023 19:47:30 +0800 Subject: [PATCH] refactor(services/wasabi)!: Remove native support for wasabi services (#3455) * refactor(services/wasabi)!: Remove native support for wasabi services Signed-off-by: Xuanwo * Add docs Signed-off-by: Xuanwo * Remove docs Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- .github/workflows/service_test_wasabi.yml | 68 -- core/Cargo.toml | 9 +- core/src/docs/upgrade.md | 4 + core/src/services/mod.rs | 5 - core/src/services/wasabi/backend.rs | 951 ---------------------- core/src/services/wasabi/core.rs | 911 --------------------- core/src/services/wasabi/docs.md | 198 ----- core/src/services/wasabi/error.rs | 131 --- core/src/services/wasabi/mod.rs | 24 - core/src/services/wasabi/pager.rs | 231 ------ core/src/services/wasabi/writer.rs | 65 -- core/src/types/operator/builder.rs | 2 - core/src/types/scheme.rs | 6 - website/docs/services/wasabi.mdx | 70 -- 14 files changed, 8 insertions(+), 2667 deletions(-) delete mode 100644 .github/workflows/service_test_wasabi.yml delete mode 100644 core/src/services/wasabi/backend.rs delete mode 100644 core/src/services/wasabi/core.rs delete mode 100644 core/src/services/wasabi/docs.md delete mode 100644 core/src/services/wasabi/error.rs delete mode 100644 core/src/services/wasabi/mod.rs delete mode 100644 core/src/services/wasabi/pager.rs delete mode 100644 core/src/services/wasabi/writer.rs delete mode 100644 website/docs/services/wasabi.mdx diff --git a/.github/workflows/service_test_wasabi.yml b/.github/workflows/service_test_wasabi.yml deleted file mode 100644 index f8dd015af5b7..000000000000 --- a/.github/workflows/service_test_wasabi.yml +++ /dev/null @@ -1,68 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name: Service Test Wasabi - -on: - push: - branches: - - main - pull_request: - branches: - - main - paths: - - "core/src/**" - - "core/tests/**" - - "!core/src/docs/**" - - "!core/src/services/**" - - "core/src/services/wasabi/**" - - ".github/workflows/service_test_wasabi.yml" - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} - cancel-in-progress: true - -jobs: - wasabi: - runs-on: ubuntu-latest - if: (github.event_name == 'push' && github.repository == 'apache/incubator-opendal') || !github.event.pull_request.head.repo.fork - steps: - - uses: actions/checkout@v4 - - name: Setup Rust toolchain - uses: ./.github/actions/setup - with: - need-nextest: true - - - name: Load secret - id: op-load-secret - uses: 1password/load-secrets-action@v1 - with: - export-env: true - env: - OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }} - OPENDAL_WASABI_REGION: op://services/wasabi/region - OPENDAL_WASABI_BUCKET: op://services/wasabi/bucket - OPENDAL_WASABI_ENDPOINT: op://services/wasabi/endpoint - OPENDAL_WASABI_ACCESS_KEY_ID: op://services/wasabi/access_key_id - OPENDAL_WASABI_SECRET_ACCESS_KEY: op://services/wasabi/secret_access_key - - - name: Test - shell: bash - working-directory: core - run: cargo nextest run behavior --features tests,services-wasabi - env: - OPENDAL_TEST: wasabi diff --git a/core/Cargo.toml b/core/Cargo.toml index 8bed9d249c67..e76769d42f3b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -186,11 +186,10 @@ services-sqlite = ["dep:rusqlite", "dep:r2d2"] services-supabase = [] services-tikv = ["tikv-client"] services-vercel-artifacts = [] -services-wasabi = [ - "dep:reqsign", - "reqsign?/services-aws", - "reqsign?/reqwest_request", -] +# Deprecated +# wasabi services support has been removed. +# We will remove this feature in next version. +services-wasabi = [] services-webdav = [] services-webhdfs = [] diff --git a/core/src/docs/upgrade.md b/core/src/docs/upgrade.md index d66e574fcba5..5982790bac5b 100644 --- a/core/src/docs/upgrade.md +++ b/core/src/docs/upgrade.md @@ -19,6 +19,10 @@ OpenDAL bumps it's MSRV to 1.67.0. - The `enable_create_simulation` option has been removed. We add this option to allow ghac simulate create empty file, but it's could result in unexpected behavior when users create a file with content length `1`. So we remove it. +### Wasabi Service Removed + +`wasabi` service native support has been removed. Users who want to access wasabi can use our `s3` service instead. + # Upgrade to v0.41 There is no public API and raw API changes. diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 06e12933e2e5..feaf6746fdcf 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -164,11 +164,6 @@ mod supabase; #[cfg(feature = "services-supabase")] pub use supabase::Supabase; -#[cfg(feature = "services-wasabi")] -mod wasabi; -#[cfg(feature = "services-wasabi")] -pub use wasabi::Wasabi; - #[cfg(feature = "services-webdav")] mod webdav; #[cfg(feature = "services-webdav")] diff --git a/core/src/services/wasabi/backend.rs b/core/src/services/wasabi/backend.rs deleted file mode 100644 index c2f746e77578..000000000000 --- a/core/src/services/wasabi/backend.rs +++ /dev/null @@ -1,951 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::collections::HashMap; -use std::fmt::Debug; -use std::fmt::Formatter; -use std::fmt::Write; -use std::sync::Arc; - -use async_trait::async_trait; -use base64::prelude::BASE64_STANDARD; -use base64::Engine; -use bytes::Buf; -use http::StatusCode; -use log::debug; -use md5::Digest; -use md5::Md5; -use once_cell::sync::Lazy; -use reqsign::AwsConfig; -use reqsign::AwsCredentialLoad; -use reqsign::AwsDefaultLoader; -use reqsign::AwsV4Signer; - -use super::core::*; -use super::error::parse_error; -use super::error::parse_wasabi_error_code; -use super::pager::WasabiPager; -use super::writer::WasabiWriter; -use crate::raw::*; -use crate::*; - -/// Allow constructing correct region endpoint if user gives a global endpoint. -static ENDPOINT_TEMPLATES: Lazy> = Lazy::new(|| { - let mut m = HashMap::new(); - // AWS S3 Service. - m.insert( - "https://s3.wasabisys.com", - "https://s3.{region}.wasabisys.com", - ); - m -}); - -/// Wasabi (an aws S3 compatible service) support -#[doc = include_str!("docs.md")] -#[derive(Default)] -pub struct WasabiBuilder { - root: Option, - - bucket: String, - endpoint: Option, - region: Option, - role_arn: Option, - external_id: Option, - access_key_id: Option, - secret_access_key: Option, - server_side_encryption: Option, - server_side_encryption_aws_kms_key_id: Option, - server_side_encryption_customer_algorithm: Option, - server_side_encryption_customer_key: Option, - server_side_encryption_customer_key_md5: Option, - default_storage_class: Option, - - /// temporary credentials, check the official [doc](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html) for detail - security_token: Option, - - disable_config_load: bool, - disable_ec2_metadata: bool, - enable_virtual_host_style: bool, - - http_client: Option, - customed_credential_load: Option>, -} - -impl Debug for WasabiBuilder { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut d = f.debug_struct("Builder"); - - d.field("root", &self.root) - .field("bucket", &self.bucket) - .field("endpoint", &self.endpoint) - .field("region", &self.region); - - d.finish_non_exhaustive() - } -} - -impl WasabiBuilder { - /// Set root of this backend. - /// - /// All operations will happen under this root. - pub fn root(&mut self, root: &str) -> &mut Self { - self.root = if root.is_empty() { - None - } else { - Some(root.to_string()) - }; - - self - } - - /// Set bucket name of this backend. - pub fn bucket(&mut self, bucket: &str) -> &mut Self { - self.bucket = bucket.to_string(); - - self - } - - /// Set endpoint of this backend. - /// - /// Endpoint must be full uri, e.g. - /// - /// - `https://s3.wasabisys.com` or `https://s3.{region}.wasabisys.com` - /// - /// If user inputs endpoint without scheme like "s3.wasabisys.com", we - /// will prepend "https://" before it. - pub fn endpoint(&mut self, endpoint: &str) -> &mut Self { - if !endpoint.is_empty() { - // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/` - self.endpoint = Some(endpoint.trim_end_matches('/').to_string()) - } - - self - } - - /// Region represent the signing region of this endpoint. - /// - /// - If region is set, we will take user's input first. - /// - If not, the default `us-east-1` will be used. - pub fn region(&mut self, region: &str) -> &mut Self { - if !region.is_empty() { - self.region = Some(region.to_string()) - } - - self - } - - /// Set access_key_id of this backend. - /// - /// - If access_key_id is set, we will take user's input first. - /// - If not, we will try to load it from environment. - pub fn access_key_id(&mut self, v: &str) -> &mut Self { - if !v.is_empty() { - self.access_key_id = Some(v.to_string()) - } - - self - } - - /// Set secret_access_key of this backend. - /// - /// - If secret_access_key is set, we will take user's input first. - /// - If not, we will try to load it from environment. - pub fn secret_access_key(&mut self, v: &str) -> &mut Self { - if !v.is_empty() { - self.secret_access_key = Some(v.to_string()) - } - - self - } - - /// Set role_arn for this backend. - pub fn role_arn(&mut self, v: &str) -> &mut Self { - if !v.is_empty() { - self.role_arn = Some(v.to_string()) - } - - self - } - - /// Set external_id for this backend. - pub fn external_id(&mut self, v: &str) -> &mut Self { - if !v.is_empty() { - self.external_id = Some(v.to_string()) - } - - self - } - - /// Set default storage_class for this backend. - /// Unlike S3, wasabi only supports one single storage class, - /// which is most like standard S3 storage class, - /// check `https://docs.wasabi.com/docs/operations-on-objects` for more details. - /// - /// Available values: - /// - `STANDARD` - pub fn default_storage_class(&mut self, v: &str) -> &mut Self { - if !v.is_empty() { - self.default_storage_class = Some(v.to_string()) - } - - self - } - - /// Set server_side_encryption for this backend. - /// - /// Available values: `AES256`, `aws:kms`. - /// - /// # Note - /// - /// This function is the low-level setting for SSE related features. - /// - /// SSE related options should be set carefully to make them works. - /// Please use `server_side_encryption_with_*` helpers if even possible. - pub fn server_side_encryption(&mut self, v: &str) -> &mut Self { - if !v.is_empty() { - self.server_side_encryption = Some(v.to_string()) - } - - self - } - - /// Set server_side_encryption_aws_kms_key_id for this backend - /// - /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id` - /// is not set, S3 will use aws managed kms key to encrypt data. - /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id` - /// is a valid kms key id, S3 will use the provided kms key to encrypt data. - /// - If the `server_side_encryption_aws_kms_key_id` is invalid or not found, an error will be - /// returned. - /// - If `server_side_encryption` is not `aws:kms`, setting `server_side_encryption_aws_kms_key_id` - /// is a noop. - /// - /// # Note - /// - /// This function is the low-level setting for SSE related features. - /// - /// SSE related options should be set carefully to make them works. - /// Please use `server_side_encryption_with_*` helpers if even possible. - pub fn server_side_encryption_aws_kms_key_id(&mut self, v: &str) -> &mut Self { - if !v.is_empty() { - self.server_side_encryption_aws_kms_key_id = Some(v.to_string()) - } - - self - } - - /// Set server_side_encryption_customer_algorithm for this backend. - /// - /// Available values: `AES256`. - /// - /// # Note - /// - /// This function is the low-level setting for SSE related features. - /// - /// SSE related options should be set carefully to make them works. - /// Please use `server_side_encryption_with_*` helpers if even possible. - pub fn server_side_encryption_customer_algorithm(&mut self, v: &str) -> &mut Self { - if !v.is_empty() { - self.server_side_encryption_customer_algorithm = Some(v.to_string()) - } - - self - } - - /// Set server_side_encryption_customer_key for this backend. - /// - /// # Args - /// - /// `v`: base64 encoded key that matches algorithm specified in - /// `server_side_encryption_customer_algorithm`. - /// - /// # Note - /// - /// This function is the low-level setting for SSE related features. - /// - /// SSE related options should be set carefully to make them works. - /// Please use `server_side_encryption_with_*` helpers if even possible. - pub fn server_side_encryption_customer_key(&mut self, v: &str) -> &mut Self { - if !v.is_empty() { - self.server_side_encryption_customer_key = Some(v.to_string()) - } - - self - } - - /// Set server_side_encryption_customer_key_md5 for this backend. - /// - /// # Args - /// - /// `v`: MD5 digest of key specified in `server_side_encryption_customer_key`. - /// - /// # Note - /// - /// This function is the low-level setting for SSE related features. - /// - /// SSE related options should be set carefully to make them works. - /// Please use `server_side_encryption_with_*` helpers if even possible. - pub fn server_side_encryption_customer_key_md5(&mut self, v: &str) -> &mut Self { - if !v.is_empty() { - self.server_side_encryption_customer_key_md5 = Some(v.to_string()) - } - - self - } - - /// Enable server side encryption with aws managed kms key - /// - /// As known as: SSE-KMS - /// - /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions. - pub fn server_side_encryption_with_aws_managed_kms_key(&mut self) -> &mut Self { - self.server_side_encryption = Some("aws:kms".to_string()); - self - } - - /// Enable server side encryption with customer managed kms key - /// - /// As known as: SSE-KMS - /// - /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions. - pub fn server_side_encryption_with_customer_managed_kms_key( - &mut self, - aws_kms_key_id: &str, - ) -> &mut Self { - self.server_side_encryption = Some("aws:kms".to_string()); - self.server_side_encryption_aws_kms_key_id = Some(aws_kms_key_id.to_string()); - self - } - - /// Enable server side encryption with s3 managed key - /// - /// As known as: SSE-S3 - /// - /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions. - pub fn server_side_encryption_with_s3_key(&mut self) -> &mut Self { - self.server_side_encryption = Some("AES256".to_string()); - self - } - - /// Enable server side encryption with customer key. - /// - /// As known as: SSE-C - /// - /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions. - pub fn server_side_encryption_with_customer_key( - &mut self, - algorithm: &str, - key: &[u8], - ) -> &mut Self { - self.server_side_encryption_customer_algorithm = Some(algorithm.to_string()); - self.server_side_encryption_customer_key = Some(BASE64_STANDARD.encode(key)); - self.server_side_encryption_customer_key_md5 = - Some(BASE64_STANDARD.encode(Md5::digest(key).as_slice())); - self - } - - /// Set temporary credential used in service connections - /// - /// # Warning - /// - /// security token's lifetime is short and requires users to refresh in time. - pub fn security_token(&mut self, token: &str) -> &mut Self { - if !token.is_empty() { - self.security_token = Some(token.to_string()); - } - self - } - - /// Disable config load so that opendal will not load config from - /// environment. - /// - /// For examples: - /// - /// - envs like `AWS_ACCESS_KEY_ID` - /// - files like `~/.aws/config` - pub fn disable_config_load(&mut self) -> &mut Self { - self.disable_config_load = true; - self - } - - /// Disable load credential from ec2 metadata. - /// - /// This option is used to disable the default behavior of opendal - /// to load credential from ec2 metadata, a.k.a, IMDSv2 - pub fn disable_ec2_metadata(&mut self) -> &mut Self { - self.disable_ec2_metadata = true; - self - } - - /// Enable virtual host style so that opendal will send API requests - /// in virtual host style instead of path style. - /// - /// - By default, opendal will send API to `https://s3.us-east-1.wasabisys.com/bucket_name` - /// - Enabled, opendal will send API to `https://bucket_name.s3.us-east-1.wasabisys.com` - pub fn enable_virtual_host_style(&mut self) -> &mut Self { - self.enable_virtual_host_style = true; - self - } - - /// Adding a customed credential load for service. - pub fn customed_credential_load(&mut self, cred: Box) -> &mut Self { - self.customed_credential_load = Some(cred); - self - } - - /// Specify the http client that used by this service. - /// - /// # Notes - /// - /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed - /// during minor updates. - pub fn http_client(&mut self, client: HttpClient) -> &mut Self { - self.http_client = Some(client); - self - } - - /// Check if `bucket` is valid - /// `bucket` must be not empty and if `enable_virtual_host_style` is true - /// it couldn't contain dot(.) character - fn is_bucket_valid(&self) -> bool { - if self.bucket.is_empty() { - return false; - } - // If enable virtual host style, `bucket` will reside in domain part, - // for example `https://bucket_name.s3.us-east-1.amazonaws.com`, - // so `bucket` with dot can't be recognized correctly for this format. - if self.enable_virtual_host_style && self.bucket.contains('.') { - return false; - } - true - } - - /// Build endpoint with given region. - fn build_endpoint(&self, region: &str) -> String { - let bucket = { - debug_assert!(self.is_bucket_valid(), "bucket must be valid"); - - self.bucket.as_str() - }; - - let mut endpoint = match &self.endpoint { - Some(endpoint) => { - if endpoint.starts_with("http") { - endpoint.to_string() - } else { - // Prefix https if endpoint doesn't start with scheme. - format!("https://{endpoint}") - } - } - None => "https://s3.wasabisys.com".to_string(), - }; - - // If endpoint contains bucket name, we should trim them. - endpoint = endpoint.replace(&format!("//{bucket}."), "//"); - - // Update with endpoint templates. - endpoint = if let Some(template) = ENDPOINT_TEMPLATES.get(endpoint.as_str()) { - template.replace("{region}", region) - } else { - // If we don't know where about this endpoint, just leave - // them as it. - endpoint.to_string() - }; - - // Apply virtual host style. - if self.enable_virtual_host_style { - endpoint = endpoint.replace("//", &format!("//{bucket}.")) - } else { - write!(endpoint, "/{bucket}").expect("write into string must succeed"); - }; - - endpoint - } -} - -impl Builder for WasabiBuilder { - const SCHEME: Scheme = Scheme::Wasabi; - type Accessor = WasabiBackend; - - fn from_map(map: HashMap) -> Self { - let mut builder = WasabiBuilder::default(); - - map.get("root").map(|v| builder.root(v)); - map.get("bucket").map(|v| builder.bucket(v)); - map.get("endpoint").map(|v| builder.endpoint(v)); - map.get("region").map(|v| builder.region(v)); - map.get("access_key_id").map(|v| builder.access_key_id(v)); - map.get("secret_access_key") - .map(|v| builder.secret_access_key(v)); - map.get("security_token").map(|v| builder.security_token(v)); - map.get("role_arn").map(|v| builder.role_arn(v)); - map.get("external_id").map(|v| builder.external_id(v)); - map.get("server_side_encryption") - .map(|v| builder.server_side_encryption(v)); - map.get("server_side_encryption_aws_kms_key_id") - .map(|v| builder.server_side_encryption_aws_kms_key_id(v)); - map.get("server_side_encryption_customer_algorithm") - .map(|v| builder.server_side_encryption_customer_algorithm(v)); - map.get("server_side_encryption_customer_key") - .map(|v| builder.server_side_encryption_customer_key(v)); - map.get("server_side_encryption_customer_key_md5") - .map(|v| builder.server_side_encryption_customer_key_md5(v)); - map.get("disable_config_load") - .filter(|v| *v == "on" || *v == "true") - .map(|_| builder.disable_config_load()); - map.get("disable_ec2_metadata") - .filter(|v| *v == "on" || *v == "true") - .map(|_| builder.disable_ec2_metadata()); - map.get("enable_virtual_host_style") - .filter(|v| *v == "on" || *v == "true") - .map(|_| builder.enable_virtual_host_style()); - map.get("default_storage_class") - .map(|v| builder.default_storage_class(v)); - - builder - } - - fn build(&mut self) -> Result { - debug!("backend build started: {:?}", &self); - - let root = normalize_root(&self.root.take().unwrap_or_default()); - debug!("backend use root {}", &root); - - // Handle bucket name. - let bucket = if self.is_bucket_valid() { - Ok(&self.bucket) - } else { - Err( - Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured") - .with_context("service", Scheme::S3), - ) - }?; - debug!("backend use bucket {}", &bucket); - - let default_storage_class = match &self.default_storage_class { - None => None, - Some(v) => Some( - build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?, - ), - }; - - let server_side_encryption = match &self.server_side_encryption { - None => None, - Some(v) => Some( - build_header_value(v) - .map_err(|err| err.with_context("key", "server_side_encryption"))?, - ), - }; - - let server_side_encryption_aws_kms_key_id = - match &self.server_side_encryption_aws_kms_key_id { - None => None, - Some(v) => Some(build_header_value(v).map_err(|err| { - err.with_context("key", "server_side_encryption_aws_kms_key_id") - })?), - }; - - let server_side_encryption_customer_algorithm = - match &self.server_side_encryption_customer_algorithm { - None => None, - Some(v) => Some(build_header_value(v).map_err(|err| { - err.with_context("key", "server_side_encryption_customer_algorithm") - })?), - }; - - let server_side_encryption_customer_key = - match &self.server_side_encryption_customer_key { - None => None, - Some(v) => Some(build_header_value(v).map_err(|err| { - err.with_context("key", "server_side_encryption_customer_key") - })?), - }; - - let server_side_encryption_customer_key_md5 = - match &self.server_side_encryption_customer_key_md5 { - None => None, - Some(v) => Some(build_header_value(v).map_err(|err| { - err.with_context("key", "server_side_encryption_customer_key_md5") - })?), - }; - - let client = if let Some(client) = self.http_client.take() { - client - } else { - HttpClient::new().map_err(|err| { - err.with_operation("Builder::build") - .with_context("service", Scheme::S3) - })? - }; - - let mut cfg = AwsConfig::default(); - if !self.disable_config_load { - cfg = cfg.from_profile(); - cfg = cfg.from_env(); - } - - // Setting all value from user input if available. - if let Some(v) = self.region.take() { - cfg.region = Some(v); - } - if let Some(v) = self.access_key_id.take() { - cfg.access_key_id = Some(v) - } - if let Some(v) = self.secret_access_key.take() { - cfg.secret_access_key = Some(v) - } - if let Some(v) = self.security_token.take() { - cfg.session_token = Some(v) - } - if let Some(v) = self.role_arn.take() { - cfg.role_arn = Some(v) - } - if let Some(v) = self.external_id.take() { - cfg.external_id = Some(v) - } - - if cfg.region.is_none() { - // region is required to make signer work. - // - // If we don't know region after loading from builder and env. - // We will use `us-east-1` as default. - cfg.region = Some("us-east-1".to_string()); - } - - let region = cfg.region.to_owned().unwrap(); - debug!("backend use region: {region}"); - - // Building endpoint. - let endpoint = self.build_endpoint(®ion); - debug!("backend use endpoint: {endpoint}"); - - let mut loader = AwsDefaultLoader::new(client.client(), cfg); - if self.disable_ec2_metadata { - loader = loader.with_disable_ec2_metadata(); - } - - let signer = AwsV4Signer::new("s3", ®ion); - - debug!("backend build finished"); - Ok(WasabiBackend { - core: Arc::new(WasabiCore { - bucket: bucket.to_string(), - endpoint, - root, - server_side_encryption, - server_side_encryption_aws_kms_key_id, - server_side_encryption_customer_algorithm, - server_side_encryption_customer_key, - server_side_encryption_customer_key_md5, - default_storage_class, - signer, - loader, - client, - }), - }) - } -} - -/// Backend for wasabi service. -#[derive(Debug, Clone)] -pub struct WasabiBackend { - core: Arc, -} - -#[async_trait] -impl Accessor for WasabiBackend { - type Reader = IncomingAsyncBody; - type BlockingReader = (); - type Writer = oio::OneShotWriter; - type BlockingWriter = (); - type Pager = WasabiPager; - type BlockingPager = (); - - fn info(&self) -> AccessorInfo { - let mut am = AccessorInfo::default(); - am.set_scheme(Scheme::Wasabi) - .set_root(&self.core.root) - .set_name(&self.core.bucket) - .set_native_capability(Capability { - stat: true, - stat_with_if_match: true, - stat_with_if_none_match: true, - - read: true, - read_can_next: true, - read_with_range: true, - - write: true, - create_dir: true, - delete: true, - copy: true, - rename: true, - - list: true, - list_without_delimiter: true, - list_with_delimiter_slash: true, - - presign: true, - presign_stat: true, - presign_read: true, - presign_write: true, - - batch: true, - - ..Default::default() - }); - - am - } - - async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { - let mut req = - self.core - .put_object_request(path, Some(0), &OpWrite::default(), AsyncBody::Empty)?; - - self.core.sign(&mut req).await?; - - let resp = self.core.send(req).await?; - - let status = resp.status(); - - match status { - StatusCode::CREATED | StatusCode::OK => { - resp.into_body().consume().await?; - Ok(RpCreateDir::default()) - } - _ => Err(parse_error(resp).await?), - } - } - - async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let resp = self.core.get_object(path, &args).await?; - - let status = resp.status(); - - match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), - _ => Err(parse_error(resp).await?), - } - } - - async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - Ok(( - RpWrite::default(), - oio::OneShotWriter::new(WasabiWriter::new(self.core.clone(), args, path.to_string())), - )) - } - - async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { - let resp = self.core.copy_object(from, to).await?; - - let status = resp.status(); - - match status { - StatusCode::OK => { - // According to the documentation, when using copy_object, a 200 error may occur and we need to detect it. - // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html#API_CopyObject_RequestSyntax - resp.into_body().consume().await?; - - Ok(RpCopy::default()) - } - _ => Err(parse_error(resp).await?), - } - } - - async fn stat(&self, path: &str, args: OpStat) -> Result { - // Stat root always returns a DIR. - if path == "/" { - return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); - } - - let resp = self.core.head_object(path, &args).await?; - - let status = resp.status(); - - match status { - StatusCode::OK => parse_into_metadata(path, resp.headers()).map(RpStat::new), - StatusCode::NOT_FOUND if path.ends_with('/') => { - Ok(RpStat::new(Metadata::new(EntryMode::DIR))) - } - _ => Err(parse_error(resp).await?), - } - } - - async fn delete(&self, path: &str, _: OpDelete) -> Result { - let resp = self.core.delete_object(path).await?; - - let status = resp.status(); - - match status { - StatusCode::NO_CONTENT => Ok(RpDelete::default()), - _ => Err(parse_error(resp).await?), - } - } - - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - Ok(( - RpList::default(), - WasabiPager::new(self.core.clone(), path, args.delimiter(), args.limit()), - )) - } - - async fn presign(&self, path: &str, args: OpPresign) -> Result { - // We will not send this request out, just for signing. - let mut req = match args.operation() { - PresignOperation::Stat(v) => self.core.head_object_request(path, v)?, - PresignOperation::Read(v) => self.core.get_object_request(path, v)?, - PresignOperation::Write(_) => { - self.core - .put_object_request(path, None, &OpWrite::default(), AsyncBody::Empty)? - } - }; - - self.core.sign_query(&mut req, args.expire()).await?; - - // We don't need this request anymore, consume it directly. - let (parts, _) = req.into_parts(); - - Ok(RpPresign::new(PresignedRequest::new( - parts.method, - parts.uri, - parts.headers, - ))) - } - - async fn batch(&self, args: OpBatch) -> Result { - let ops = args.into_operation(); - if ops.len() > 1000 { - return Err(Error::new( - ErrorKind::Unsupported, - "wasabi services only allow delete up to 1000 keys at once", - ) - .with_context("length", ops.len().to_string())); - } - - let paths = ops.into_iter().map(|(p, _)| p).collect(); - - let resp = self.core.delete_objects(paths).await?; - - let status = resp.status(); - - if let StatusCode::OK = status { - let bs = resp.into_body().bytes().await?; - - let result: DeleteObjectsResult = - quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; - - let mut batched_result = Vec::with_capacity(result.deleted.len() + result.error.len()); - for i in result.deleted { - let path = build_rel_path(&self.core.root, &i.key); - batched_result.push((path, Ok(RpDelete::default().into()))); - } - // TODO: we should handle those errors with code. - for i in result.error { - let path = build_rel_path(&self.core.root, &i.key); - - // set the error kind and mark temporary if retryable - let (kind, retryable) = - parse_wasabi_error_code(&i.code).unwrap_or((ErrorKind::Unexpected, false)); - let mut err: Error = Error::new(kind, &format!("{i:?}")); - if retryable { - err = err.set_temporary(); - } - - batched_result.push((path, Err(err))); - } - - Ok(RpBatch::new(batched_result)) - } else { - Err(parse_error(resp).await?) - } - } - - /// Execute rename API call - /// Wasabi will auto-create missing path for destination `to` if any - async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { - let resp = self.core.rename_object(from, to).await?; - - let status = resp.status(); - - match status { - StatusCode::OK => Ok(RpRename::default()), - _ => Err(parse_error(resp).await?), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_is_valid_bucket() { - let bucket_cases = vec![ - ("", false, false), - ("test", false, true), - ("test.xyz", false, true), - ("", true, false), - ("test", true, true), - ("test.xyz", true, false), - ]; - - for (bucket, enable_virtual_host_style, expected) in bucket_cases { - let mut b = WasabiBuilder::default(); - b.bucket(bucket); - if enable_virtual_host_style { - b.enable_virtual_host_style(); - } - assert_eq!(b.is_bucket_valid(), expected) - } - } - - #[test] - fn test_build_endpoint() { - let _ = tracing_subscriber::fmt().with_test_writer().try_init(); - - let endpoint_cases = vec![ - Some("s3.wasabisys.com"), - Some("https://s3.wasabisys.com"), - Some("https://s3.us-east-2.wasabisys.com"), - None, - ]; - - for endpoint in &endpoint_cases { - let mut b = WasabiBuilder::default(); - b.bucket("test"); - if let Some(endpoint) = endpoint { - b.endpoint(endpoint); - } - - let endpoint = b.build_endpoint("us-east-2"); - assert_eq!(endpoint, "https://s3.us-east-2.wasabisys.com/test"); - } - - for endpoint in &endpoint_cases { - let mut b = WasabiBuilder::default(); - b.bucket("test"); - b.enable_virtual_host_style(); - if let Some(endpoint) = endpoint { - b.endpoint(endpoint); - } - - let endpoint = b.build_endpoint("us-east-2"); - assert_eq!(endpoint, "https://test.s3.us-east-2.wasabisys.com"); - } - } -} diff --git a/core/src/services/wasabi/core.rs b/core/src/services/wasabi/core.rs deleted file mode 100644 index 2a44bb56447d..000000000000 --- a/core/src/services/wasabi/core.rs +++ /dev/null @@ -1,911 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::fmt; -use std::fmt::Debug; -use std::fmt::Formatter; -use std::fmt::Write; -use std::time::Duration; - -use bytes::Bytes; -use http::header::HeaderName; -use http::header::CACHE_CONTROL; -use http::header::CONTENT_DISPOSITION; -use http::header::CONTENT_LENGTH; -use http::header::CONTENT_TYPE; -use http::HeaderValue; -use http::Request; -use http::Response; -use reqsign::AwsCredential; -use reqsign::AwsDefaultLoader; -use reqsign::AwsV4Signer; -use serde::Deserialize; -use serde::Serialize; - -use crate::raw::*; -use crate::*; - -mod constants { - pub const X_AMZ_COPY_SOURCE: &str = "x-amz-copy-source"; - - pub const X_AMZ_SERVER_SIDE_ENCRYPTION: &str = "x-amz-server-side-encryption"; - pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: &str = - "x-amz-server-side-encryption-customer-algorithm"; - pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str = - "x-amz-server-side-encryption-customer-key"; - pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str = - "x-amz-server-side-encryption-customer-key-md5"; - pub const X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID: &str = - "x-amz-server-side-encryption-aws-kms-key-id"; - pub const X_AMZ_STORAGE_CLASS: &str = "x-amz-storage-class"; - - #[allow(dead_code)] - pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: &str = - "x-amz-copy-source-server-side-encryption-customer-algorithm"; - #[allow(dead_code)] - pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str = - "x-amz-copy-source-server-side-encryption-customer-key"; - #[allow(dead_code)] - pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str = - "x-amz-copy-source-server-side-encryption-customer-key-md5"; - - pub const RESPONSE_CONTENT_DISPOSITION: &str = "response-content-disposition"; - pub const RESPONSE_CACHE_CONTROL: &str = "response-cache-control"; - - pub const DESTINATION: &str = "Destination"; - pub const OVERWRITE: &str = "Overwrite"; -} - -pub struct WasabiCore { - pub bucket: String, - pub endpoint: String, - pub root: String, - pub server_side_encryption: Option, - pub server_side_encryption_aws_kms_key_id: Option, - pub server_side_encryption_customer_algorithm: Option, - pub server_side_encryption_customer_key: Option, - pub server_side_encryption_customer_key_md5: Option, - pub default_storage_class: Option, - - pub signer: AwsV4Signer, - pub loader: AwsDefaultLoader, - pub client: HttpClient, -} - -impl Debug for WasabiCore { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("WasabiCore") - .field("bucket", &self.bucket) - .field("endpoint", &self.endpoint) - .field("root", &self.root) - .finish_non_exhaustive() - } -} - -impl WasabiCore { - /// If credential is not found, we will not sign the request. - async fn load_credential(&self) -> Result> { - let cred = self - .loader - .load() - .await - .map_err(new_request_credential_error)?; - - if let Some(cred) = cred { - Ok(Some(cred)) - } else { - // Mark this error as temporary since it could be caused by AWS STS. - Err(Error::new( - ErrorKind::PermissionDenied, - "no valid credential found, please check configuration or try again", - ) - .set_temporary()) - } - } - - pub async fn sign(&self, req: &mut Request) -> Result<()> { - let cred = if let Some(cred) = self.load_credential().await? { - cred - } else { - return Ok(()); - }; - - self.signer.sign(req, &cred).map_err(new_request_sign_error) - } - - pub async fn sign_query(&self, req: &mut Request, duration: Duration) -> Result<()> { - let cred = if let Some(cred) = self.load_credential().await? { - cred - } else { - return Ok(()); - }; - - self.signer - .sign_query(req, duration, &cred) - .map_err(new_request_sign_error) - } - - #[inline] - pub async fn send(&self, req: Request) -> Result> { - self.client.send(req).await - } - - /// # Note - /// - /// header like X_AMZ_SERVER_SIDE_ENCRYPTION doesn't need to set while - /// get or stat. - pub fn insert_sse_headers( - &self, - mut req: http::request::Builder, - is_write: bool, - ) -> http::request::Builder { - if is_write { - if let Some(v) = &self.server_side_encryption { - let mut v = v.clone(); - v.set_sensitive(true); - - req = req.header( - HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION), - v, - ) - } - if let Some(v) = &self.server_side_encryption_aws_kms_key_id { - let mut v = v.clone(); - v.set_sensitive(true); - - req = req.header( - HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID), - v, - ) - } - } - - if let Some(v) = &self.server_side_encryption_customer_algorithm { - let mut v = v.clone(); - v.set_sensitive(true); - - req = req.header( - HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM), - v, - ) - } - if let Some(v) = &self.server_side_encryption_customer_key { - let mut v = v.clone(); - v.set_sensitive(true); - - req = req.header( - HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY), - v, - ) - } - if let Some(v) = &self.server_side_encryption_customer_key_md5 { - let mut v = v.clone(); - v.set_sensitive(true); - - req = req.header( - HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5), - v, - ) - } - - req - } -} - -impl WasabiCore { - pub fn head_object_request(&self, path: &str, args: &OpStat) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); - - let mut req = Request::head(&url); - - req = self.insert_sse_headers(req, false); - - if let Some(v) = args.if_match() { - req = req.header(http::header::IF_MATCH, v); - } - - if let Some(v) = args.if_none_match() { - req = req.header(http::header::IF_NONE_MATCH, v); - } - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - Ok(req) - } - - pub fn get_object_request(&self, path: &str, args: &OpRead) -> Result> { - let p = build_abs_path(&self.root, path); - - // Construct headers to add to the request - let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); - - // Add query arguments to the URL based on response overrides - let mut query_args = Vec::new(); - if let Some(override_content_disposition) = args.override_content_disposition() { - query_args.push(format!( - "{}={}", - constants::RESPONSE_CONTENT_DISPOSITION, - percent_encode_path(override_content_disposition) - )) - } - if let Some(override_cache_control) = args.override_cache_control() { - query_args.push(format!( - "{}={}", - constants::RESPONSE_CACHE_CONTROL, - percent_encode_path(override_cache_control) - )) - } - if !query_args.is_empty() { - url.push_str(&format!("?{}", query_args.join("&"))); - } - - let mut req = Request::get(&url); - - let range = args.range(); - if !range.is_full() { - req = req.header(http::header::RANGE, range.to_header()); - } - - if let Some(if_none_match) = args.if_none_match() { - req = req.header(http::header::IF_NONE_MATCH, if_none_match); - } - - // Set SSE headers. - // TODO: how will this work with presign? - req = self.insert_sse_headers(req, false); - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - Ok(req) - } - - pub async fn get_object( - &self, - path: &str, - args: &OpRead, - ) -> Result> { - let mut req = self.get_object_request(path, args)?; - - self.sign(&mut req).await?; - - self.send(req).await - } - - pub fn put_object_request( - &self, - path: &str, - size: Option, - args: &OpWrite, - body: AsyncBody, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); - - let mut req = Request::put(&url); - - if let Some(size) = size { - req = req.header(CONTENT_LENGTH, size) - } - - if let Some(mime) = args.content_type() { - req = req.header(CONTENT_TYPE, mime) - } - - if let Some(pos) = args.content_disposition() { - req = req.header(CONTENT_DISPOSITION, pos) - } - - if let Some(cache_control) = args.cache_control() { - req = req.header(CACHE_CONTROL, cache_control) - } - - // Set storage class header - if let Some(v) = &self.default_storage_class { - req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v); - } - - // Set SSE headers. - req = self.insert_sse_headers(req, true); - - // Set body - let req = req.body(body).map_err(new_request_build_error)?; - - Ok(req) - } - - pub async fn head_object( - &self, - path: &str, - args: &OpStat, - ) -> Result> { - let mut req = self.head_object_request(path, args)?; - - self.sign(&mut req).await?; - - self.send(req).await - } - - pub async fn delete_object(&self, path: &str) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); - - let mut req = Request::delete(&url) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.sign(&mut req).await?; - - self.send(req).await - } - - pub async fn copy_object(&self, from: &str, to: &str) -> Result> { - let from = build_abs_path(&self.root, from); - let to = build_abs_path(&self.root, to); - - let source = format!("{}/{}", self.bucket, percent_encode_path(&from)); - let target = format!("{}/{}", self.endpoint, percent_encode_path(&to)); - - let mut req = Request::put(&target); - - // Set SSE headers. - req = self.insert_sse_headers(req, true); - - if let Some(v) = &self.server_side_encryption_customer_algorithm { - let mut v = v.clone(); - v.set_sensitive(true); - - req = req.header( - HeaderName::from_static( - constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM, - ), - v, - ) - } - - if let Some(v) = &self.server_side_encryption_customer_key { - let mut v = v.clone(); - v.set_sensitive(true); - - req = req.header( - HeaderName::from_static( - constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY, - ), - v, - ) - } - - if let Some(v) = &self.server_side_encryption_customer_key_md5 { - let mut v = v.clone(); - v.set_sensitive(true); - - req = req.header( - HeaderName::from_static( - constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5, - ), - v, - ) - } - - let mut req = req - .header(constants::X_AMZ_COPY_SOURCE, &source) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.sign(&mut req).await?; - - self.send(req).await - } - - /// Make this functions as `pub(suber)` because `DirStream` depends - /// on this. - pub async fn list_objects( - &self, - path: &str, - continuation_token: &str, - delimiter: &str, - limit: Option, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let mut url = format!( - "{}?list-type=2&delimiter={delimiter}&prefix={}", - self.endpoint, - percent_encode_path(&p) - ); - if let Some(limit) = limit { - write!(url, "&max-keys={limit}").expect("write into string must succeed"); - } - if !continuation_token.is_empty() { - // AWS S3 could return continuation-token that contains `=` - // which could lead `reqsign` parse query wrongly. - // URL encode continuation-token before starting signing so that - // our signer will not be confused. - write!( - url, - "&continuation-token={}", - percent_encode_path(continuation_token) - ) - .expect("write into string must succeed"); - } - - let mut req = Request::get(&url) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.sign(&mut req).await?; - - self.send(req).await - } - - pub async fn initiate_multipart_upload( - &self, - path: &str, - args: &OpWrite, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p)); - - let mut req = Request::post(&url); - - if let Some(mime) = args.content_type() { - req = req.header(CONTENT_TYPE, mime) - } - - if let Some(content_disposition) = args.content_disposition() { - req = req.header(CONTENT_DISPOSITION, content_disposition) - } - - if let Some(cache_control) = args.cache_control() { - req = req.header(CACHE_CONTROL, cache_control) - } - - // Set storage class header - if let Some(v) = &self.default_storage_class { - req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v); - } - - // Set SSE headers. - let req = self.insert_sse_headers(req, true); - - let mut req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.sign(&mut req).await?; - - self.send(req).await - } - - pub fn upload_part_request( - &self, - path: &str, - upload_id: &str, - part_number: usize, - size: Option, - body: AsyncBody, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!( - "{}/{}?partNumber={}&uploadId={}", - self.endpoint, - percent_encode_path(&p), - part_number, - percent_encode_path(upload_id) - ); - - let mut req = Request::put(&url); - - if let Some(size) = size { - req = req.header(CONTENT_LENGTH, size); - } - - // Set SSE headers. - req = self.insert_sse_headers(req, true); - - // Set body - let req = req.body(body).map_err(new_request_build_error)?; - - Ok(req) - } - - pub async fn complete_multipart_upload( - &self, - path: &str, - upload_id: &str, - parts: &[CompleteMultipartUploadRequestPart], - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!( - "{}/{}?uploadId={}", - self.endpoint, - percent_encode_path(&p), - percent_encode_path(upload_id) - ); - - let req = Request::post(&url); - - // Set SSE headers. - let req = self.insert_sse_headers(req, true); - - let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { - part: parts.to_vec(), - }) - .map_err(new_xml_deserialize_error)?; - // Make sure content length has been set to avoid post with chunked encoding. - let req = req.header(CONTENT_LENGTH, content.len()); - // Set content-type to `application/xml` to avoid mixed with form post. - let req = req.header(CONTENT_TYPE, "application/xml"); - - let mut req = req - .body(AsyncBody::Bytes(Bytes::from(content))) - .map_err(new_request_build_error)?; - - self.sign(&mut req).await?; - - self.send(req).await - } - - /// Abort an on-going multipart upload. - pub async fn abort_multipart_upload( - &self, - path: &str, - upload_id: &str, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!( - "{}/{}?uploadId={}", - self.endpoint, - percent_encode_path(&p), - percent_encode_path(upload_id) - ); - - let mut req = Request::delete(&url) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - self.sign(&mut req).await?; - self.send(req).await - } - - pub async fn delete_objects(&self, paths: Vec) -> Result> { - let url = format!("{}/?delete", self.endpoint); - - let req = Request::post(&url); - - let content = quick_xml::se::to_string(&DeleteObjectsRequest { - object: paths - .into_iter() - .map(|path| DeleteObjectsRequestObject { - key: build_abs_path(&self.root, &path), - }) - .collect(), - }) - .map_err(new_xml_deserialize_error)?; - - // Make sure content length has been set to avoid post with chunked encoding. - let req = req.header(CONTENT_LENGTH, content.len()); - // Set content-type to `application/xml` to avoid mixed with form post. - let req = req.header(CONTENT_TYPE, "application/xml"); - // Set content-md5 as required by API. - let req = req.header("CONTENT-MD5", format_content_md5(content.as_bytes())); - - let mut req = req - .body(AsyncBody::Bytes(Bytes::from(content))) - .map_err(new_request_build_error)?; - - self.sign(&mut req).await?; - - self.send(req).await - } - - pub async fn put_object( - &self, - path: &str, - size: Option, - args: &OpWrite, - body: AsyncBody, - ) -> Result> { - let mut req = self.put_object_request(path, size, args, body)?; - - self.sign(&mut req).await?; - - self.send(req).await - } - - pub async fn rename_object(&self, from: &str, to: &str) -> Result> { - let from = percent_encode_path(build_abs_path(&self.root, from).as_str()); - let to = percent_encode_path(build_abs_path(&self.root, to).as_str()); - - let url = format!("{}/{}", self.endpoint, from); - - let mut req = Request::builder().method("MOVE").uri(url); - - // Set SSE headers. - req = self.insert_sse_headers(req, true); - - let mut req = req - .header(constants::DESTINATION, to) - .header(constants::OVERWRITE, "true") - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.sign(&mut req).await?; - - self.send(req).await - } - - pub async fn append_object( - &self, - path: &str, - size: Option, - body: AsyncBody, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!("{}/{}?append", self.endpoint, percent_encode_path(&p)); - - let mut req = Request::put(&url); - - if let Some(size) = size { - req = req.header(CONTENT_LENGTH, size) - } - - // Set storage class header - if let Some(v) = &self.default_storage_class { - req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v); - } - - // Set SSE headers. - req = self.insert_sse_headers(req, true); - - let mut req = req.body(body).map_err(new_request_build_error)?; - - self.sign(&mut req).await?; - - self.send(req).await - } -} - -/// Result of CreateMultipartUpload -#[derive(Default, Debug, Deserialize)] -#[serde(default, rename_all = "PascalCase")] -pub struct InitiateMultipartUploadResult { - pub upload_id: String, -} - -/// Request of CompleteMultipartUploadRequest -#[derive(Default, Debug, Serialize)] -#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")] -pub struct CompleteMultipartUploadRequest { - pub part: Vec, -} - -#[derive(Clone, Default, Debug, Serialize)] -#[serde(default, rename_all = "PascalCase")] -pub struct CompleteMultipartUploadRequestPart { - #[serde(rename = "PartNumber")] - pub part_number: usize, - /// # TODO - /// - /// quick-xml will do escape on `"` which leads to our serialized output is - /// not the same as aws s3's example. - /// - /// Ideally, we could use `serialize_with` to address this (buf failed) - /// - /// ```ignore - /// #[derive(Default, Debug, Serialize)] - /// #[serde(default, rename_all = "PascalCase")] - /// struct CompleteMultipartUploadRequestPart { - /// #[serde(rename = "PartNumber")] - /// part_number: usize, - /// #[serde(rename = "ETag", serialize_with = "partial_escape")] - /// etag: String, - /// } - /// - /// fn partial_escape(s: &str, ser: S) -> std::result::Result - /// where - /// S: serde::Serializer, - /// { - /// ser.serialize_str(&String::from_utf8_lossy( - /// &quick_xml::escape::partial_escape(s.as_bytes()), - /// )) - /// } - /// ``` - /// - /// ref: - #[serde(rename = "ETag")] - pub etag: String, -} - -/// Request of DeleteObjects. -#[derive(Default, Debug, Serialize)] -#[serde(default, rename = "Delete", rename_all = "PascalCase")] -pub struct DeleteObjectsRequest { - pub object: Vec, -} - -#[derive(Default, Debug, Serialize)] -#[serde(rename_all = "PascalCase")] -pub struct DeleteObjectsRequestObject { - pub key: String, -} - -/// Result of DeleteObjects. -#[derive(Default, Debug, Deserialize)] -#[serde(default, rename = "DeleteResult", rename_all = "PascalCase")] -pub struct DeleteObjectsResult { - pub deleted: Vec, - pub error: Vec, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(rename_all = "PascalCase")] -pub struct DeleteObjectsResultDeleted { - pub key: String, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(default, rename_all = "PascalCase")] -pub struct DeleteObjectsResultError { - pub code: String, - pub key: String, - pub message: String, -} - -#[cfg(test)] -mod tests { - use bytes::Buf; - use bytes::Bytes; - - use super::*; - - /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html#API_CreateMultipartUpload_Examples - #[test] - fn test_deserialize_initiate_multipart_upload_result() { - let bs = Bytes::from( - r#" - - example-bucket - example-object - VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA - "#, - ); - - let out: InitiateMultipartUploadResult = - quick_xml::de::from_reader(bs.reader()).expect("must success"); - - assert_eq!( - out.upload_id, - "VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA" - ) - } - - /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_Examples - #[test] - fn test_serialize_complete_multipart_upload_request() { - let req = CompleteMultipartUploadRequest { - part: vec![ - CompleteMultipartUploadRequestPart { - part_number: 1, - etag: "\"a54357aff0632cce46d942af68356b38\"".to_string(), - }, - CompleteMultipartUploadRequestPart { - part_number: 2, - etag: "\"0c78aef83f66abc1fa1e8477f296d394\"".to_string(), - }, - CompleteMultipartUploadRequestPart { - part_number: 3, - etag: "\"acbd18db4cc2f85cedef654fccc4a4d8\"".to_string(), - }, - ], - }; - - let actual = quick_xml::se::to_string(&req).expect("must succeed"); - - pretty_assertions::assert_eq!( - actual, - r#" - - 1 - "a54357aff0632cce46d942af68356b38" - - - 2 - "0c78aef83f66abc1fa1e8477f296d394" - - - 3 - "acbd18db4cc2f85cedef654fccc4a4d8" - - "# - // Cleanup space and new line - .replace([' ', '\n'], "") - // Escape `"` by hand to address - .replace('"', """) - ) - } - - /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples - #[test] - fn test_serialize_delete_objects_request() { - let req = DeleteObjectsRequest { - object: vec![ - DeleteObjectsRequestObject { - key: "sample1.txt".to_string(), - }, - DeleteObjectsRequestObject { - key: "sample2.txt".to_string(), - }, - ], - }; - - let actual = quick_xml::se::to_string(&req).expect("must succeed"); - - pretty_assertions::assert_eq!( - actual, - r#" - - sample1.txt - - - sample2.txt - - "# - // Cleanup space and new line - .replace([' ', '\n'], "") - ) - } - - /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples - #[test] - fn test_deserialize_delete_objects_result() { - let bs = Bytes::from( - r#" - - - sample1.txt - - - sample2.txt - AccessDenied - Access Denied - - "#, - ); - - let out: DeleteObjectsResult = - quick_xml::de::from_reader(bs.reader()).expect("must success"); - - assert_eq!(out.deleted.len(), 1); - assert_eq!(out.deleted[0].key, "sample1.txt"); - assert_eq!(out.error.len(), 1); - assert_eq!(out.error[0].key, "sample2.txt"); - assert_eq!(out.error[0].code, "AccessDenied"); - assert_eq!(out.error[0].message, "Access Denied"); - } -} diff --git a/core/src/services/wasabi/docs.md b/core/src/services/wasabi/docs.md deleted file mode 100644 index 4b2cb2d425ec..000000000000 --- a/core/src/services/wasabi/docs.md +++ /dev/null @@ -1,198 +0,0 @@ -## Capabilities - -This service can be used to: - -- [x] stat -- [x] read -- [x] write -- [x] create_dir -- [x] delete -- [x] copy -- [x] rename -- [x] list -- [x] scan -- [x] presign -- [ ] blocking - -## Configuration - -- `root`: Set the work dir for backend. -- `bucket`: Set the container name for backend. -- `endpoint`: Set the endpoint for backend. -- `region`: Set the region for backend. -- `access_key_id`: Set the access_key_id for backend. -- `secret_access_key`: Set the secret_access_key for backend. -- `security_token`: Set the security_token for backend. -- `default_storage_class`: Set the default storage_class for backend. -- `server_side_encryption`: Set the server_side_encryption for backend. -- `server_side_encryption_aws_kms_key_id`: Set the server_side_encryption_aws_kms_key_id for backend. -- `server_side_encryption_customer_algorithm`: Set the server_side_encryption_customer_algorithm for kend. -- `server_side_encryption_customer_key`: Set the server_side_encryption_customer_key for backend. -- `server_side_encryption_customer_key_md5`: Set the server_side_encryption_customer_key_md5 for kend. -- `disable_config_load`: Disable aws config load from env -- `enable_virtual_host_style`: Enable virtual host style. - -Refer to [`WasabiBuilder`]'s public API docs for more information. - -### Temporary security credentials - -OpenDAL now provides support for S3 temporary security credentials in IAM. - -The way to take advantage of this feature is to build your S3 backend with `Builder::security_token`. - -But OpenDAL will not refresh the temporary security credentials, please keep in mind to refresh those entials in time. - -### Server Side Encryption - -OpenDAL provides full support of S3 Server Side Encryption(SSE) features. - -The easiest way to configure them is to use helper functions like - -- SSE-KMS: `server_side_encryption_with_aws_managed_kms_key` -- SSE-KMS: `server_side_encryption_with_customer_managed_kms_key` -- SSE-S3: `server_side_encryption_with_s3_key` -- SSE-C: `server_side_encryption_with_customer_key` - -If those functions don't fulfill need, low-level options are also provided: - -- Use service managed kms key - - `server_side_encryption="aws:kms"` -- Use customer provided kms key - - `server_side_encryption="aws:kms"` - - `server_side_encryption_aws_kms_key_id="your-kms-key"` -- Use S3 managed key - - `server_side_encryption="AES256"` -- Use customer key - - `server_side_encryption_customer_algorithm="AES256"` - - `server_side_encryption_customer_key="base64-of-your-aes256-key"` - - `server_side_encryption_customer_key_md5="base64-of-your-aes256-key-md5"` - -After SSE have been configured, all requests send by this backed will attach those headers. - -Reference: [Protecting data using server-side encryption](https://docs.aws.amazon.com/AmazonS3/latest/rguide/serv-side-encryption.html) - -## Example - -### Via Builder - -#### Basic Setup - -```rust -use anyhow::Result; -use opendal::services::Wasabi; -use opendal::Operator; - -#[tokio::main] -async fn main() -> Result<()> { - // Create s3 backend builder. - let mut builder = Wasabi::default(); - // Set the root for s3, all operations will happen under this root. - // - // NOTE: the root must be absolute path. - builder.root("/path/to/dir"); - // Set the bucket name, this is required. - builder.bucket("test"); - // Set the endpoint. - // - // For examples: - // - "https://s3.wasabisys.com" - // - "http://127.0.0.1:9000" - // - "https://oss-ap-northeast-1.aliyuncs.com" - // - "https://cos.ap-seoul.myqcloud.com" - // - // Default to "https://s3.wasabisys.com" - builder.endpoint("https://s3.wasabisys.com"); - // Set the access_key_id and secret_access_key. - // - // OpenDAL will try load credential from the env. - // If credential not set and no valid credential in env, OpenDAL will - // send request without signing like anonymous user. - builder.access_key_id("access_key_id"); - builder.secret_access_key("secret_access_key"); - - let op: Operator = Operator::new(builder)?.finish(); - - Ok(()) -} -``` - -#### Wasabi with SSE-C - -```rust -use anyhow::Result; -use opendal::services::Wasabi; -use opendal::Operator; - -#[tokio::main] -async fn main() -> Result<()> { - let mut builder = Wasabi::default(); - - // Enable SSE-C - builder.server_side_encryption_with_customer_key("AES256", "customer_key".as_bytes()); - - let op = Operator::new(builder)?.finish(); - - Ok(()) -} -``` - -#### Wasabi with SSE-KMS and aws managed kms key - -```rust -use anyhow::Result; -use opendal::services::Wasabi; -use opendal::Operator; - -#[tokio::main] -async fn main() -> Result<()> { - let mut builder = Wasabi::default(); - - // Enable SSE-KMS with aws managed kms key - builder.server_side_encryption_with_aws_managed_kms_key(); - - let op = Operator::new(builder)?.finish(); - - Ok(()) -} -``` - -#### Wasabi with SSE-KMS and customer managed kms key - -```rust -use anyhow::Result; -use opendal::services::Wasabi; -use opendal::Operator; - -#[tokio::main] -async fn main() -> Result<()> { - let mut builder = Wasabi::default(); - - // Enable SSE-KMS with customer managed kms key - builder.server_side_encryption_with_customer_managed_kms_key("aws_kms_key_id"); - - let op = Operator::new(builder)?.finish(); - - Ok(()) -} -``` - -#### Wasabi with SSE-S3 - -```rust -use anyhow::Result; -use log::info; -use opendal::services::Wasabi; -use opendal::Operator; - -#[tokio::main] -async fn main() -> Result<()> { - let mut builder = Wasabi::default(); - - // Enable SSE-S3 - builder.server_side_encryption_with_s3_key(); - - let op = Operator::new(builder)?.finish(); - - Ok(()) -} -``` \ No newline at end of file diff --git a/core/src/services/wasabi/error.rs b/core/src/services/wasabi/error.rs deleted file mode 100644 index 74bb4aa2e596..000000000000 --- a/core/src/services/wasabi/error.rs +++ /dev/null @@ -1,131 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use bytes::Buf; -use http::Response; -use http::StatusCode; -use quick_xml::de; -use serde::Deserialize; - -use crate::raw::*; -use crate::Error; -use crate::ErrorKind; -use crate::Result; - -/// WasabiError is the error returned by wasabi service. -#[derive(Default, Debug, Deserialize)] -#[serde(default, rename_all = "PascalCase")] -struct WasabiError { - code: String, - message: String, - resource: String, - request_id: String, -} - -/// Parse error response into Error. -pub async fn parse_error(resp: Response) -> Result { - let (parts, body) = resp.into_parts(); - let bs = body.bytes().await?; - - let (mut kind, mut retryable) = match parts.status { - StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), - StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false), - StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => { - (ErrorKind::ConditionNotMatch, false) - } - StatusCode::INTERNAL_SERVER_ERROR - | StatusCode::BAD_GATEWAY - | StatusCode::SERVICE_UNAVAILABLE - | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), - _ => (ErrorKind::Unexpected, false), - }; - - let (message, wasabi_err) = de::from_reader::<_, WasabiError>(bs.clone().reader()) - .map(|err| (format!("{err:?}"), Some(err))) - .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None)); - - if let Some(wasabi_err) = wasabi_err { - (kind, retryable) = parse_wasabi_error_code(&wasabi_err.code).unwrap_or((kind, retryable)); - } - - let mut err = Error::new(kind, &message); - - err = with_error_response_context(err, parts); - - if retryable { - err = err.set_temporary(); - } - - Ok(err) -} - -/// Returns the `Error kind` of this code and whether the error is retryable. -/// All possible error code: -pub fn parse_wasabi_error_code(code: &str) -> Option<(ErrorKind, bool)> { - match code { - // > Your socket connection to the server was not read from - // > or written to within the timeout period." - // - // It's Ok for us to retry it again. - "RequestTimeout" => Some((ErrorKind::Unexpected, true)), - // > An internal error occurred. Try again. - "InternalError" => Some((ErrorKind::Unexpected, true)), - // > A conflicting conditional operation is currently in progress - // > against this resource. Try again. - "OperationAborted" => Some((ErrorKind::Unexpected, true)), - // > Please reduce your request rate. - // - // It's Ok to retry since later on the request rate may get reduced. - "SlowDown" => Some((ErrorKind::RateLimited, true)), - // > Service is unable to handle request. - // - // ServiceUnavailable is considered a retryable error because it typically - // indicates a temporary issue with the service or server, such as high load, - // maintenance, or an internal problem. - "ServiceUnavailable" => Some((ErrorKind::Unexpected, true)), - _ => None, - } -} - -#[cfg(test)] -mod tests { - use super::*; - - /// Error response example is from https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html - #[test] - fn test_parse_error() { - let bs = bytes::Bytes::from( - r#" - - - NoSuchKey - The resource you requested does not exist - /mybucket/myfoto.jpg - 4442587FB7D0A2F9 - -"#, - ); - - let out: WasabiError = de::from_reader(bs.reader()).expect("must success"); - println!("{out:?}"); - - assert_eq!(out.code, "NoSuchKey"); - assert_eq!(out.message, "The resource you requested does not exist"); - assert_eq!(out.resource, "/mybucket/myfoto.jpg"); - assert_eq!(out.request_id, "4442587FB7D0A2F9"); - } -} diff --git a/core/src/services/wasabi/mod.rs b/core/src/services/wasabi/mod.rs deleted file mode 100644 index 7356abc054a3..000000000000 --- a/core/src/services/wasabi/mod.rs +++ /dev/null @@ -1,24 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -mod backend; -pub use backend::WasabiBuilder as Wasabi; - -mod core; -mod error; -mod pager; -mod writer; diff --git a/core/src/services/wasabi/pager.rs b/core/src/services/wasabi/pager.rs deleted file mode 100644 index 038d69a14c4c..000000000000 --- a/core/src/services/wasabi/pager.rs +++ /dev/null @@ -1,231 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use async_trait::async_trait; -use bytes::Buf; -use quick_xml::de; -use serde::Deserialize; - -use super::core::WasabiCore; -use super::error::parse_error; -use crate::raw::*; -use crate::EntryMode; -use crate::Metadata; -use crate::Result; - -pub struct WasabiPager { - core: Arc, - - path: String, - delimiter: String, - limit: Option, - - token: String, - done: bool, -} - -impl WasabiPager { - pub fn new(core: Arc, path: &str, delimiter: &str, limit: Option) -> Self { - Self { - core, - - path: path.to_string(), - delimiter: delimiter.to_string(), - limit, - - token: "".to_string(), - done: false, - } - } -} - -#[async_trait] -impl oio::Page for WasabiPager { - async fn next(&mut self) -> Result>> { - if self.done { - return Ok(None); - } - - let resp = self - .core - .list_objects(&self.path, &self.token, &self.delimiter, self.limit) - .await?; - - if resp.status() != http::StatusCode::OK { - return Err(parse_error(resp).await?); - } - - let bs = resp.into_body().bytes().await?; - - let output: Output = de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; - - // Try our best to check whether this list is done. - // - // - Check `is_truncated` - // - Check `next_continuation_token` - // - Check the length of `common_prefixes` and `contents` (very rarely case) - self.done = if let Some(is_truncated) = output.is_truncated { - !is_truncated - } else if let Some(next_continuation_token) = output.next_continuation_token.as_ref() { - next_continuation_token.is_empty() - } else { - output.common_prefixes.is_empty() && output.contents.is_empty() - }; - self.token = output.next_continuation_token.clone().unwrap_or_default(); - - let mut entries = Vec::with_capacity(output.common_prefixes.len() + output.contents.len()); - - for prefix in output.common_prefixes { - let de = oio::Entry::new( - &build_rel_path(&self.core.root, &prefix.prefix), - Metadata::new(EntryMode::DIR), - ); - - entries.push(de); - } - - for object in output.contents { - // s3 could return the dir itself in contents - // which endswith `/`. - // We should ignore them. - if object.key.ends_with('/') { - continue; - } - - let mut meta = Metadata::new(EntryMode::FILE); - - meta.set_etag(&object.etag); - meta.set_content_md5(object.etag.trim_matches('"')); - meta.set_content_length(object.size); - - // object.last_modified provides more precious time that contains - // nanosecond, let's trim them. - meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?); - - let de = oio::Entry::new(&build_rel_path(&self.core.root, &object.key), meta); - - entries.push(de); - } - - Ok(Some(entries)) - } -} - -/// Output of ListBucket/ListObjects. -/// -/// ## Note -/// -/// Use `Option` in `is_truncated` and `next_continuation_token` to make -/// the behavior more clear so that we can be compatible to more s3 services. -/// -/// And enable `serde(default)` so that we can keep going even when some field -/// is not exist. -#[derive(Default, Debug, Deserialize)] -#[serde(default, rename_all = "PascalCase")] -struct Output { - is_truncated: Option, - next_continuation_token: Option, - common_prefixes: Vec, - contents: Vec, -} - -#[derive(Default, Debug, Eq, PartialEq, Deserialize)] -#[serde(rename_all = "PascalCase")] -struct OutputContent { - key: String, - size: u64, - last_modified: String, - #[serde(rename = "ETag")] - etag: String, -} - -#[derive(Default, Debug, Eq, PartialEq, Deserialize)] -#[serde(rename_all = "PascalCase")] -struct OutputCommonPrefix { - prefix: String, -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_parse_list_output() { - let bs = bytes::Bytes::from( - r#" - example-bucket - photos/2006/ - 3 - 1000 - / - false - - photos/2006 - 2016-04-30T23:51:29.000Z - "d41d8cd98f00b204e9800998ecf8427e" - 56 - STANDARD - - - photos/2007 - 2016-04-30T23:51:29.000Z - "d41d8cd98f00b204e9800998ecf8427e" - 100 - STANDARD - - - - photos/2006/February/ - - - photos/2006/January/ - -"#, - ); - - let out: Output = de::from_reader(bs.reader()).expect("must success"); - - assert!(!out.is_truncated.unwrap()); - assert!(out.next_continuation_token.is_none()); - assert_eq!( - out.common_prefixes - .iter() - .map(|v| v.prefix.clone()) - .collect::>(), - vec!["photos/2006/February/", "photos/2006/January/"] - ); - assert_eq!( - out.contents, - vec![ - OutputContent { - key: "photos/2006".to_string(), - size: 56, - etag: "\"d41d8cd98f00b204e9800998ecf8427e\"".to_string(), - last_modified: "2016-04-30T23:51:29.000Z".to_string(), - }, - OutputContent { - key: "photos/2007".to_string(), - size: 100, - last_modified: "2016-04-30T23:51:29.000Z".to_string(), - etag: "\"d41d8cd98f00b204e9800998ecf8427e\"".to_string(), - } - ] - ) - } -} diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs deleted file mode 100644 index 50fc8ff4fa6e..000000000000 --- a/core/src/services/wasabi/writer.rs +++ /dev/null @@ -1,65 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use async_trait::async_trait; -use http::StatusCode; - -use super::core::*; -use super::error::parse_error; -use crate::raw::oio::WriteBuf; -use crate::raw::*; -use crate::*; - -pub struct WasabiWriter { - core: Arc, - - op: OpWrite, - path: String, -} - -impl WasabiWriter { - pub fn new(core: Arc, op: OpWrite, path: String) -> Self { - WasabiWriter { core, op, path } - } -} - -#[async_trait] -impl oio::OneShotWrite for WasabiWriter { - async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> { - let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining())); - - let resp = self - .core - .put_object( - &self.path, - Some(bs.len()), - &self.op, - AsyncBody::ChunkedBytes(bs), - ) - .await?; - - match resp.status() { - StatusCode::CREATED | StatusCode::OK => { - resp.into_body().consume().await?; - Ok(()) - } - _ => Err(parse_error(resp).await?), - } - } -} diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 211444064581..9a70ab0be842 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -233,8 +233,6 @@ impl Operator { Scheme::Tikv => Self::from_map::(map)?.finish(), #[cfg(feature = "services-vercel-artifacts")] Scheme::VercelArtifacts => Self::from_map::(map)?.finish(), - #[cfg(feature = "services-wasabi")] - Scheme::Wasabi => Self::from_map::(map)?.finish(), #[cfg(feature = "services-webdav")] Scheme::Webdav => Self::from_map::(map)?.finish(), #[cfg(feature = "services-webhdfs")] diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 32b84c04c5a9..63f331dc537e 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -113,8 +113,6 @@ pub enum Scheme { Supabase, /// [Vercel Artifacts][crate::services::VercelArtifacts]: Vercel Artifacts service, as known as Vercel Remote Caching. VercelArtifacts, - /// [wasabi][crate::services::Wasabi]: Wasabi service - Wasabi, /// [webdav][crate::services::Webdav]: WebDAV support. Webdav, /// [webhdfs][crate::services::Webhdfs]: WebHDFS RESTful API Services @@ -237,8 +235,6 @@ impl Scheme { Scheme::Tikv, #[cfg(feature = "services-vercel-artifacts")] Scheme::VercelArtifacts, - #[cfg(feature = "services-wasabi")] - Scheme::Wasabi, #[cfg(feature = "services-webdav")] Scheme::Webdav, #[cfg(feature = "services-webhdfs")] @@ -313,7 +309,6 @@ impl FromStr for Scheme { "supabase" => Ok(Scheme::Supabase), "oss" => Ok(Scheme::Oss), "vercel_artifacts" => Ok(Scheme::VercelArtifacts), - "wasabi" => Ok(Scheme::Wasabi), "webdav" => Ok(Scheme::Webdav), "webhdfs" => Ok(Scheme::Webhdfs), "tikv" => Ok(Scheme::Tikv), @@ -366,7 +361,6 @@ impl From for &'static str { Scheme::Supabase => "supabase", Scheme::VercelArtifacts => "vercel_artifacts", Scheme::Oss => "oss", - Scheme::Wasabi => "wasabi", Scheme::Webdav => "webdav", Scheme::Webhdfs => "webhdfs", Scheme::Redb => "redb", diff --git a/website/docs/services/wasabi.mdx b/website/docs/services/wasabi.mdx deleted file mode 100644 index aa7ec677b209..000000000000 --- a/website/docs/services/wasabi.mdx +++ /dev/null @@ -1,70 +0,0 @@ ---- -title: Wasabi ---- - -Wasabi (an aws S3 compatible service) support - -import Docs from '../../../core/src/services/wasabi/docs.md' - - - -### Via Config - -import Tabs from '@theme/Tabs'; -import TabItem from '@theme/TabItem'; - - - - -```rust -use anyhow::Result; -use opendal::Operator; -use opendal::Scheme; -use std::collections::HashMap; - -#[tokio::main] -async fn main() -> Result<()> { - let mut map = HashMap::new(); - map.insert("root".to_string(), "/path/to/dir".to_string()); - map.insert("bucket".to_string(), "test".to_string()); - map.insert("endpoint".to_string(), "https://s3.wasabisys.com".to_string()); - map.insert("access_key_id".to_string(), "access_key_id".to_string()); - map.insert("secret_access_key".to_string(), "secret_access_key".to_string()); - - let op: Operator = Operator::via_map(Scheme::Wasabi, map)?; - Ok(()) -} -``` - - - - -```javascript -import { Operator } from "opendal"; -async function main() { - const op = new Operator("wasabi", { - root: "/path/to/dir", - bucket: "test", - endpoint: "https://s3.wasabisys.com", - access_key_id: "access_key_id", - secret_access_key: "secret_access_key", - }); -} -``` - - - - -```python -import opendal -op = opendal.Operator("wasabi", - root="/path/to/dir", - bucket="test", - endpoint="https://s3.wasabisys.com", - access_key_id="access_key_id", - secret_access_key="secret_access_key", -) -``` - - - \ No newline at end of file