diff --git a/core/Cargo.toml b/core/Cargo.toml index 1dd4e7935201..3679f3955a68 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -116,6 +116,7 @@ services-azdls = [ "reqsign?/reqwest_request", ] services-cacache = ["dep:cacache"] +services-cloudflare-kv = [] services-cos = [ "dep:reqsign", "reqsign?/services-tencent", @@ -231,9 +232,9 @@ foundationdb = { version = "0.8.0", features = [ futures = { version = "0.3", default-features = false, features = ["std"] } governor = { version = "0.5", optional = true, features = ["std"] } hdrs = { version = "0.3.0", optional = true, features = ["async_file"] } +hrana-client-proto = { version = "0.2.1", optional = true } http = "0.2.9" hyper = "0.14" -hrana-client-proto = { version = "0.2.1", optional = true } lazy-regex = { version = "3.0.1", optional = true } log = "0.4" madsim = { version = "0.2.21", optional = true } diff --git a/core/src/services/cloudflare_kv/backend.rs b/core/src/services/cloudflare_kv/backend.rs new file mode 100644 index 000000000000..e05bb3931e22 --- /dev/null +++ b/core/src/services/cloudflare_kv/backend.rs @@ -0,0 +1,362 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; + +use async_trait::async_trait; +use http::header; +use http::Request; +use http::StatusCode; +use serde::Deserialize; + +use super::error::parse_error; +use crate::raw::adapters::kv; +use crate::raw::*; +use crate::ErrorKind; +use crate::*; + +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct CloudflareKvBuilder { + /// The token used to authenticate with CloudFlare. + token: Option, + /// The account ID used to authenticate with CloudFlare. Used as URI path parameter. + account_id: Option, + /// The namespace ID. Used as URI path parameter. + namespace_id: Option, + + /// The HTTP client used to communicate with CloudFlare. + http_client: Option, + /// Root within this backend. + root: Option, +} + +impl Debug for CloudflareKvBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CloudFlareKvBuilder") + .field("account_id", &self.account_id) + .field("namespace_id", &self.namespace_id) + .field("root", &self.root) + .finish() + } +} + +impl CloudflareKvBuilder { + /// Set the token used to authenticate with CloudFlare. + pub fn token(&mut self, token: &str) -> &mut Self { + if !token.is_empty() { + self.token = Some(token.to_string()) + } + self + } + + /// Set the account ID used to authenticate with CloudFlare. + pub fn account_id(&mut self, account_id: &str) -> &mut Self { + if !account_id.is_empty() { + self.account_id = Some(account_id.to_string()) + } + self + } + + /// Set the namespace ID. + pub fn namespace_id(&mut self, namespace_id: &str) -> &mut Self { + if !namespace_id.is_empty() { + self.namespace_id = Some(namespace_id.to_string()) + } + self + } + + /// Set the root within this backend. + pub fn root(&mut self, root: &str) -> &mut Self { + if !root.is_empty() { + self.root = Some(root.to_string()) + } + self + } +} + +impl Builder for CloudflareKvBuilder { + const SCHEME: Scheme = Scheme::CloudflareKv; + + type Accessor = CloudflareKvBackend; + + fn from_map(map: HashMap) -> Self { + let mut builder = Self::default(); + map.get("token").map(|v| builder.token(v)); + map.get("account_id").map(|v| builder.account_id(v)); + map.get("namespace_id").map(|v| builder.namespace_id(v)); + map.get("root").map(|v| builder.root(v)); + builder + } + + fn build(&mut self) -> Result { + let authorization = match &self.token { + Some(token) => format_authorization_by_bearer(token)?, + None => return Err(Error::new(ErrorKind::ConfigInvalid, "token is required")), + }; + + let Some(account_id) = self.account_id.clone() else { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "account_id is required", + )); + }; + + let Some(namespace_id) = self.namespace_id.clone() else { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "namespace_id is required", + )); + }; + + let client = if let Some(client) = self.http_client.take() { + client + } else { + HttpClient::new().map_err(|err| { + err.with_operation("Builder::build") + .with_context("service", Scheme::CloudflareKv) + })? + }; + + let root = normalize_root( + self.root + .clone() + .unwrap_or_else(|| "/".to_string()) + .as_str(), + ); + + let url_prefix = format!( + r"https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}", + account_id, namespace_id + ); + + Ok(kv::Backend::new(Adapter { + authorization, + account_id, + namespace_id, + client, + url_prefix, + }) + .with_root(&root)) + } +} + +pub type CloudflareKvBackend = kv::Backend; + +#[derive(Clone)] +pub struct Adapter { + authorization: String, + account_id: String, + namespace_id: String, + client: HttpClient, + url_prefix: String, +} + +impl Debug for Adapter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Adapter") + .field("account_id", &self.account_id) + .field("namespace_id", &self.namespace_id) + .finish() + } +} + +impl Adapter { + fn sign(&self, mut req: Request) -> Result> { + req.headers_mut() + .insert(header::AUTHORIZATION, self.authorization.parse().unwrap()); + Ok(req) + } +} + +#[async_trait] +impl kv::Adapter for Adapter { + fn metadata(&self) -> kv::Metadata { + kv::Metadata::new( + Scheme::CloudflareKv, + &self.namespace_id, + Capability { + read: true, + write: true, + list: true, + + ..Default::default() + }, + ) + } + + async fn get(&self, path: &str) -> Result>> { + let url = format!("{}/values/{}", self.url_prefix, path); + let mut req = Request::get(&url); + req = req.header(header::CONTENT_TYPE, "application/json"); + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + req = self.sign(req)?; + let resp = self.client.send(req).await?; + let status = resp.status(); + match status { + StatusCode::OK => { + let body = resp.into_body().bytes().await?; + Ok(Some(body.into())) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn set(&self, path: &str, value: &[u8]) -> Result<()> { + let url = format!("{}/values/{}", self.url_prefix, path); + let req = Request::put(&url); + let multipart = Multipart::new(); + let multipart = multipart + .part(FormDataPart::new("metadata").content(serde_json::Value::Null.to_string())) + .part(FormDataPart::new("value").content(value.to_vec())); + let mut req = multipart.apply(req)?; + req = self.sign(req)?; + let resp = self.client.send(req).await?; + let status = resp.status(); + match status { + StatusCode::OK => Ok(()), + _ => Err(parse_error(resp).await?), + } + } + + async fn delete(&self, path: &str) -> Result<()> { + let url = format!("{}/values/{}", self.url_prefix, path); + let mut req = Request::delete(&url); + req = req.header(header::CONTENT_TYPE, "application/json"); + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + req = self.sign(req)?; + let resp = self.client.send(req).await?; + let status = resp.status(); + match status { + StatusCode::OK => Ok(()), + _ => Err(parse_error(resp).await?), + } + } + + async fn scan(&self, path: &str) -> Result> { + let mut url = format!("{}/keys", self.url_prefix); + if !path.is_empty() { + url = format!("{}?prefix={}", url, path); + } + let mut req = Request::get(&url); + req = req.header(header::CONTENT_TYPE, "application/json"); + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + req = self.sign(req)?; + let resp = self.client.send(req).await?; + let status = resp.status(); + match status { + StatusCode::OK => { + let body = resp.into_body().bytes().await?; + let response: CfKvScanResponse = serde_json::from_slice(&body).map_err(|e| { + Error::new( + crate::ErrorKind::Unexpected, + &format!("failed to parse error response: {}", e), + ) + })?; + Ok(response.result.into_iter().map(|r| r.name).collect()) + } + _ => Err(parse_error(resp).await?), + } + } +} + +#[derive(Debug, Deserialize)] +pub(crate) struct CfKvResponse { + pub(crate) errors: Vec, +} + +#[derive(Debug, Deserialize)] +pub(crate) struct CfKvScanResponse { + result: Vec, + // According to https://developers.cloudflare.com/api/operations/workers-kv-namespace-list-a-namespace'-s-keys, result_info is used to determine if there are more keys to be listed + // result_info: Option, +} + +#[derive(Debug, Deserialize)] +struct CfKvScanResult { + name: String, +} + +// #[derive(Debug, Deserialize)] +// struct CfKvResultInfo { +// count: i64, +// cursor: String, +// } + +#[derive(Debug, Deserialize)] +pub(crate) struct CfKvError { + pub(crate) code: i32, +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_deserialize_scan_json_response() { + let json_str = r#"{ + "errors": [], + "messages": [], + "result": [ + { + "expiration": 1577836800, + "metadata": { + "someMetadataKey": "someMetadataValue" + }, + "name": "My-Key" + } + ], + "success": true, + "result_info": { + "count": 1, + "cursor": "6Ck1la0VxJ0djhidm1MdX2FyDGxLKVeeHZZmORS_8XeSuhz9SjIJRaSa2lnsF01tQOHrfTGAP3R5X1Kv5iVUuMbNKhWNAXHOl6ePB0TUL8nw" + } + }"#; + + let response: CfKvScanResponse = serde_json::from_slice(json_str.as_bytes()).unwrap(); + + assert_eq!(response.result.len(), 1); + assert_eq!(response.result[0].name, "My-Key"); + // assert!(response.result_info.is_some()); + // if let Some(result_info) = response.result_info { + // assert_eq!(result_info.count, 1); + // assert_eq!(result_info.cursor, "6Ck1la0VxJ0djhidm1MdX2FyDGxLKVeeHZZmORS_8XeSuhz9SjIJRaSa2lnsF01tQOHrfTGAP3R5X1Kv5iVUuMbNKhWNAXHOl6ePB0TUL8nw"); + // } + } + + #[test] + fn test_deserialize_json_response() { + let json_str = r#"{ + "errors": [], + "messages": [], + "result": {}, + "success": true + }"#; + + let response: CfKvResponse = serde_json::from_slice(json_str.as_bytes()).unwrap(); + + assert_eq!(response.errors.len(), 0); + } +} diff --git a/core/src/services/cloudflare_kv/docs.md b/core/src/services/cloudflare_kv/docs.md new file mode 100644 index 000000000000..06c6006359ee --- /dev/null +++ b/core/src/services/cloudflare_kv/docs.md @@ -0,0 +1,22 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [ ] copy +- [ ] rename +- [ ] ~~list~~ +- [x] scan +- [ ] ~~presign~~ +- [ ] blocking + +## Configuration + +- `root`: Set the working directory of `OpenDAL` +- `token`: Set the token of cloudflare api +- `account_id`: Set the account identifier of cloudflare +- `namespace_id`: Set the namespace identifier of d1 diff --git a/core/src/services/cloudflare_kv/error.rs b/core/src/services/cloudflare_kv/error.rs new file mode 100644 index 000000000000..10f4c4aaabf7 --- /dev/null +++ b/core/src/services/cloudflare_kv/error.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Buf; +use http::Response; +use http::StatusCode; + +use crate::raw::*; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +use serde_json::de; + +use super::backend::CfKvError; +use super::backend::CfKvResponse; + +/// Parse error response into Error. +pub(crate) async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (mut kind, mut retryable) = match parts.status { + StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), + // Some services (like owncloud) return 403 while file locked. + StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, true), + // Allowing retry for resource locked. + StatusCode::LOCKED => (ErrorKind::Unexpected, true), + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let (message, err) = de::from_reader::<_, CfKvResponse>(bs.clone().reader()) + .map(|err| (format!("{err:?}"), Some(err))) + .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None)); + + if let Some(err) = err { + (kind, retryable) = parse_cfkv_error_code(err.errors).unwrap_or((kind, retryable)); + } + + let mut err = Error::new(kind, &message); + + err = with_error_response_context(err, parts); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} + +pub(crate) fn parse_cfkv_error_code(errors: Vec) -> Option<(ErrorKind, bool)> { + if errors.is_empty() { + return None; + } + + match errors[0].code { + // The request is malformed: failed to decode id. + 7400 => Some((ErrorKind::Unexpected, false)), + // no such column: Xxxx. + 7500 => Some((ErrorKind::NotFound, false)), + // Authentication error. + 10000 => Some((ErrorKind::PermissionDenied, false)), + _ => None, + } +} diff --git a/core/src/services/cloudflare_kv/mod.rs b/core/src/services/cloudflare_kv/mod.rs new file mode 100644 index 000000000000..fa09aa3d0024 --- /dev/null +++ b/core/src/services/cloudflare_kv/mod.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +mod error; + +pub use backend::CloudflareKvBuilder as CloudflareKv; diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 7215de5905a4..d6d1b80806f5 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -29,6 +29,11 @@ mod azdls; #[cfg(feature = "services-azdls")] pub use azdls::Azdls; +#[cfg(feature = "services-cloudflare-kv")] +mod cloudflare_kv; +#[cfg(feature = "services-cloudflare-kv")] +pub use self::cloudflare_kv::CloudflareKv; + #[cfg(feature = "services-cos")] mod cos; #[cfg(feature = "services-cos")] diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 5a6bd30886d6..231d617295c1 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -40,6 +40,8 @@ pub enum Scheme { Azdls, /// [cacache][crate::services::Cacache]: cacache backend support. Cacache, + /// [cloudflare-kv][crate::services::CloudflareKv]: Cloudflare KV services. + CloudflareKv, /// [cos][crate::services::Cos]: Tencent Cloud Object Storage services. Cos, /// [d1][crate::services::D1]: D1 services @@ -271,6 +273,7 @@ impl FromStr for Scheme { // And abfs is widely used in hadoop ecosystem, keep it for easy to use. "azdls" | "azdfs" | "abfs" => Ok(Scheme::Azdls), "cacache" => Ok(Scheme::Cacache), + "cloudflare_kv" => Ok(Scheme::CloudflareKv), "cos" => Ok(Scheme::Cos), "d1" => Ok(Scheme::D1), "dashmap" => Ok(Scheme::Dashmap), @@ -323,6 +326,7 @@ impl From for &'static str { Scheme::Azblob => "azblob", Scheme::Azdls => "azdls", Scheme::Cacache => "cacache", + Scheme::CloudflareKv => "cloudflare_kv", Scheme::Cos => "cos", Scheme::D1 => "d1", Scheme::Dashmap => "dashmap",