diff --git a/core/Cargo.toml b/core/Cargo.toml index 3679f3955a68..acb7595d41d3 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -124,6 +124,7 @@ services-cos = [ ] services-d1 = [] services-dashmap = ["dep:dashmap"] +services-dbfs = [] services-dropbox = [] services-etcd = ["dep:etcd-client", "dep:bb8"] services-foundationdb = ["dep:foundationdb"] diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index 783eaf8fbd1f..9ac5ac4e26b3 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -113,7 +113,7 @@ impl HttpClient { // // We don't set this by hand, just don't allow retry. err.is_redirect() || - // We never use `Response::error_for_status`, just don't allow retry. + // We never use `Response::error_for_status`, just don't allow retry. // // Status should be checked by our services. err.is_status() diff --git a/core/src/services/dbfs/backend.rs b/core/src/services/dbfs/backend.rs new file mode 100644 index 000000000000..c01acb043ae6 --- /dev/null +++ b/core/src/services/dbfs/backend.rs @@ -0,0 +1,315 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use async_trait::async_trait; +use http::StatusCode; +use log::debug; +use serde::Deserialize; + +use crate::raw::*; +use crate::*; + +use super::core::DbfsCore; +use super::error::parse_error; +use super::pager::DbfsPager; +use super::reader::DbfsReader; +use super::writer::DbfsWriter; + +/// [Dbfs](https://docs.databricks.com/api/azure/workspace/dbfs)'s REST API support. +#[doc = include_str!("docs.md")] +#[derive(Default, Clone)] +pub struct DbfsBuilder { + root: Option, + endpoint: Option, + token: Option, +} + +impl Debug for DbfsBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Builder"); + + ds.field("root", &self.root); + ds.field("endpoint", &self.endpoint); + + if self.token.is_some() { + ds.field("token", &""); + } + + ds.finish() + } +} + +impl DbfsBuilder { + /// Set root of this backend. + /// + /// All operations will happen under this root. + pub fn root(&mut self, root: &str) -> &mut Self { + if !root.is_empty() { + self.root = Some(root.to_string()) + } + + self + } + + /// Set endpoint of this backend. + /// + /// Endpoint must be full uri, e.g. + /// + /// - Azure: `https://adb-1234567890123456.78.azuredatabricks.net` + /// - Aws: `https://dbc-123a5678-90bc.cloud.databricks.com` + pub fn endpoint(&mut self, endpoint: &str) -> &mut Self { + self.endpoint = if endpoint.is_empty() { + None + } else { + Some(endpoint.trim_end_matches('/').to_string()) + }; + self + } + + /// Set the token of this backend. + pub fn token(&mut self, token: &str) -> &mut Self { + if !token.is_empty() { + self.token = Some(token.to_string()); + } + self + } +} + +impl Builder for DbfsBuilder { + const SCHEME: Scheme = Scheme::Dbfs; + type Accessor = DbfsBackend; + + fn from_map(map: HashMap) -> Self { + let mut builder = DbfsBuilder::default(); + + map.get("endpoint").map(|v| builder.endpoint(v)); + map.get("token").map(|v| builder.token(v)); + + builder + } + + /// Build a DbfsBackend. + fn build(&mut self) -> Result { + debug!("backend build started: {:?}", &self); + + let root = normalize_root(&self.root.take().unwrap_or_default()); + debug!("backend use root {}", root); + + let endpoint = match &self.endpoint { + Some(endpoint) => Ok(endpoint.clone()), + None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Dbfs)), + }?; + debug!("backend use endpoint: {}", &endpoint); + + let token = match self.token.take() { + Some(token) => token, + None => { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "missing token for Dbfs", + )); + } + }; + + let client = HttpClient::new()?; + + debug!("backend build finished: {:?}", &self); + Ok(DbfsBackend { + core: Arc::new(DbfsCore { + root, + endpoint: endpoint.to_string(), + token, + client, + }), + }) + } +} + +/// Backend for DBFS service +#[derive(Debug, Clone)] +pub struct DbfsBackend { + core: Arc, +} + +#[async_trait] +impl Accessor for DbfsBackend { + type Reader = DbfsReader; + type BlockingReader = (); + type Writer = oio::OneShotWriter; + type BlockingWriter = (); + type Pager = DbfsPager; + type BlockingPager = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::Dbfs) + .set_root(&self.core.root) + .set_native_capability(Capability { + stat: true, + + read: true, + read_can_next: true, + read_with_range: true, + + write: true, + create_dir: true, + delete: true, + rename: true, + + list: true, + list_with_delimiter_slash: true, + + ..Default::default() + }); + am + } + + async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { + let resp = self.core.dbfs_create_dir(path).await?; + + let status = resp.status(); + + match status { + StatusCode::CREATED | StatusCode::OK => { + resp.into_body().consume().await?; + Ok(RpCreateDir::default()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let mut meta = Metadata::new(EntryMode::FILE); + + if let Some(length) = args.range().size() { + meta.set_content_length(length); + } else { + let stat_resp = self.core.dbfs_get_status(path).await?; + meta = parse_into_metadata(path, stat_resp.headers())?; + let decoded_response = + serde_json::from_slice::(&stat_resp.into_body().bytes().await?) + .map_err(new_json_deserialize_error)?; + meta.set_last_modified(parse_datetime_from_from_timestamp_millis( + decoded_response.modification_time, + )?); + meta.set_mode(if decoded_response.is_dir { + EntryMode::DIR + } else { + EntryMode::FILE + }); + if !decoded_response.is_dir { + meta.set_content_length(decoded_response.file_size as u64); + } + } + + let op = DbfsReader::new(self.core.clone(), args, path.to_string()); + + Ok((RpRead::with_metadata(meta), op)) + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + Ok(( + RpWrite::default(), + oio::OneShotWriter::new(DbfsWriter::new(self.core.clone(), args, path.to_string())), + )) + } + + async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { + self.core.dbfs_ensure_parent_path(to).await?; + + let resp = self.core.dbfs_rename(from, to).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + Ok(RpRename::default()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn stat(&self, path: &str, _: OpStat) -> Result { + // Stat root always returns a DIR. + if path == "/" { + return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); + } + + let resp = self.core.dbfs_get_status(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let mut meta = parse_into_metadata(path, resp.headers())?; + let bs = resp.into_body().bytes().await?; + let decoded_response = serde_json::from_slice::(&bs) + .map_err(new_json_deserialize_error)?; + meta.set_last_modified(parse_datetime_from_from_timestamp_millis( + decoded_response.modification_time, + )?); + match decoded_response.is_dir { + true => meta.set_mode(EntryMode::DIR), + false => { + meta.set_mode(EntryMode::FILE); + meta.set_content_length(decoded_response.file_size as u64) + } + }; + Ok(RpStat::new(meta)) + } + StatusCode::NOT_FOUND if path.ends_with('/') => { + Ok(RpStat::new(Metadata::new(EntryMode::DIR))) + } + _ => Err(parse_error(resp).await?), + } + } + + /// NOTE: Server will return 200 even if the path doesn't exist. + async fn delete(&self, path: &str, _: OpDelete) -> Result { + let resp = self.core.dbfs_delete(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(RpDelete::default()), + _ => Err(parse_error(resp).await?), + } + } + + async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Pager)> { + let op = DbfsPager::new(self.core.clone(), path.to_string()); + + Ok((RpList::default(), op)) + } +} + +#[derive(Deserialize)] +struct DbfsStatus { + // Not used fields. + // path: String, + is_dir: bool, + file_size: i64, + modification_time: i64, +} diff --git a/core/src/services/dbfs/core.rs b/core/src/services/dbfs/core.rs new file mode 100644 index 000000000000..44e20e571099 --- /dev/null +++ b/core/src/services/dbfs/core.rs @@ -0,0 +1,239 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; + +use base64::prelude::BASE64_STANDARD; +use base64::Engine; +use bytes::Bytes; +use http::header; +use http::Request; +use http::Response; +use http::StatusCode; +use serde_json::json; + +use crate::raw::*; +use crate::*; + +use super::error::parse_error; + +pub struct DbfsCore { + pub root: String, + pub endpoint: String, + pub token: String, + pub client: HttpClient, +} + +impl Debug for DbfsCore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DbfsCore") + .field("root", &self.root) + .field("endpoint", &self.endpoint) + .field("token", &self.token) + .finish_non_exhaustive() + } +} + +impl DbfsCore { + pub async fn dbfs_create_dir(&self, path: &str) -> Result> { + let url = format!("{}/api/2.0/dbfs/mkdirs", self.endpoint); + let mut req = Request::post(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let req_body = &json!({ + "path": percent_encode_path(&p), + }); + let body = AsyncBody::Bytes(Bytes::from(req_body.to_string())); + + let req = req.body(body).map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn dbfs_delete(&self, path: &str) -> Result> { + let url = format!("{}/api/2.0/dbfs/delete", self.endpoint); + let mut req = Request::post(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let request_body = &json!({ + "path": percent_encode_path(&p), + // TODO: support recursive toggle, should we add a new field in OpDelete? + "recursive": true, + }); + + let body = AsyncBody::Bytes(Bytes::from(request_body.to_string())); + + let req = req.body(body).map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn dbfs_rename(&self, from: &str, to: &str) -> Result> { + let source = build_rooted_abs_path(&self.root, from); + let target = build_rooted_abs_path(&self.root, to); + + let url = format!("{}/api/2.0/dbfs/move", self.endpoint); + let mut req = Request::post(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req_body = &json!({ + "source_path": percent_encode_path(&source), + "destination_path": percent_encode_path(&target), + }); + + let body = AsyncBody::Bytes(Bytes::from(req_body.to_string())); + + let req = req.body(body).map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn dbfs_list(&self, path: &str) -> Result> { + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let url = format!( + "{}/api/2.0/dbfs/list?path={}", + self.endpoint, + percent_encode_path(&p) + ); + let mut req = Request::get(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub fn dbfs_create_file_request(&self, path: &str, body: Bytes) -> Result> { + let url = format!("{}/api/2.0/dbfs/put", self.endpoint); + + let contents = BASE64_STANDARD.encode(body); + let mut req = Request::post(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req_body = &json!({ + "path": path, + "contents": contents, + "overwrite": true, + }); + + let body = AsyncBody::Bytes(Bytes::from(req_body.to_string())); + + req.body(body).map_err(new_request_build_error) + } + + pub async fn dbfs_read( + &self, + path: &str, + offset: u64, + length: u64, + ) -> Result> { + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let mut url = format!( + "{}/api/2.0/dbfs/read?path={}", + self.endpoint, + percent_encode_path(&p) + ); + + if offset > 0 { + url.push_str(&format!("&offset={}", offset)); + } + + if length > 0 { + url.push_str(&format!("&length={}", length)); + } + + let mut req = Request::get(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.client.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(resp), + _ => Err(parse_error(resp).await?), + } + } + + pub async fn dbfs_get_status(&self, path: &str) -> Result> { + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let url = format!( + "{}/api/2.0/dbfs/get-status?path={}", + &self.endpoint, + percent_encode_path(&p) + ); + + let mut req = Request::get(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn dbfs_ensure_parent_path(&self, path: &str) -> Result<()> { + let resp = self.dbfs_get_status(path).await?; + + match resp.status() { + StatusCode::OK => return Ok(()), + StatusCode::NOT_FOUND => { + self.dbfs_create_dir(path).await?; + } + _ => return Err(parse_error(resp).await?), + } + Ok(()) + } +} diff --git a/core/src/services/dbfs/docs.md b/core/src/services/dbfs/docs.md new file mode 100644 index 000000000000..0e9cfa2ec4d8 --- /dev/null +++ b/core/src/services/dbfs/docs.md @@ -0,0 +1,58 @@ +This service will visit the [DBFS API](https://docs.databricks.com/api/azure/workspace/dbfs) supported by [Databricks File System](https://docs.databricks.com/en/dbfs/index.html). + +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [ ] copy +- [x] rename +- [x] list +- [ ] ~~scan~~ +- [ ] ~~presign~~ +- [ ] blocking + +## Configurations + +- `root`: Set the work directory for backend. +- `endpoint`: Set the endpoint for backend. +- `token`: Databricks personal access token. + +Refer to [`Builder`]'s public API docs for more information. + +## Examples + +### Via Builder + +```rust +use std::sync::Arc; + +use anyhow::Result; +use opendal::services::Dbfs; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + let mut builder = Dbfs::default(); + // set the root for Dbfs, all operations will happen under this root + // + // Note: + // if the root is not exists, the builder will automatically create the + // root directory for you + // if the root exists and is a directory, the builder will continue working + // if the root exists and is a folder, the builder will fail on building backend + builder.root("/path/to/dir"); + // set the endpoint of Dbfs workspace + builder.endpoint("https://adb-1234567890123456.78.azuredatabricks.net"); + // set the personal access token for builder + builder.token("access_token"); + + let op: Operator = Operator::new(builder)?.finish(); + + Ok(()) +} +``` diff --git a/core/src/services/dbfs/error.rs b/core/src/services/dbfs/error.rs new file mode 100644 index 000000000000..41069d45fb0d --- /dev/null +++ b/core/src/services/dbfs/error.rs @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; + +use http::Response; +use http::StatusCode; +use serde::Deserialize; + +use crate::raw::*; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +/// DbfsError is the error returned by DBFS service. +#[derive(Default, Deserialize)] +struct DbfsError { + error_code: String, + message: String, +} + +impl Debug for DbfsError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut de = f.debug_struct("DbfsError"); + de.field("error_code", &self.error_code); + // replace `\n` to ` ` for better reading. + de.field("message", &self.message.replace('\n', " ")); + + de.finish() + } +} + +pub async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (kind, retryable) = match parts.status { + StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), + StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false), + StatusCode::PRECONDITION_FAILED => (ErrorKind::ConditionNotMatch, false), + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let message = match serde_json::from_slice::(&bs) { + Ok(dbfs_error) => format!("{:?}", dbfs_error.message), + Err(_) => String::from_utf8_lossy(&bs).into_owned(), + }; + + let mut err = Error::new(kind, &message); + + err = with_error_response_context(err, parts); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} diff --git a/core/src/services/dbfs/mod.rs b/core/src/services/dbfs/mod.rs new file mode 100644 index 000000000000..bb72050318ff --- /dev/null +++ b/core/src/services/dbfs/mod.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +pub use backend::DbfsBuilder as Dbfs; + +mod core; +mod error; +mod pager; +mod reader; +mod writer; diff --git a/core/src/services/dbfs/pager.rs b/core/src/services/dbfs/pager.rs new file mode 100644 index 000000000000..58aef5d757dc --- /dev/null +++ b/core/src/services/dbfs/pager.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use http::StatusCode; +use serde::Deserialize; + +use crate::raw::*; +use crate::services::dbfs::core::DbfsCore; +use crate::*; + +use super::error::parse_error; + +pub struct DbfsPager { + core: Arc, + path: String, + done: bool, +} + +impl DbfsPager { + pub fn new(core: Arc, path: String) -> Self { + Self { + core, + path, + done: false, + } + } +} + +#[async_trait] +impl oio::Page for DbfsPager { + async fn next(&mut self) -> Result>> { + if self.done { + return Ok(None); + } + + let response = self.core.dbfs_list(&self.path).await?; + + let status_code = response.status(); + if !status_code.is_success() { + if status_code == StatusCode::NOT_FOUND { + return Ok(None); + } + let error = parse_error(response).await?; + return Err(error); + } + + let bytes = response.into_body().bytes().await?; + let mut decoded_response = + serde_json::from_slice::(&bytes).map_err(new_json_deserialize_error)?; + + self.done = true; + + let mut entries = Vec::with_capacity(decoded_response.files.len()); + + while let Some(status) = decoded_response.files.pop() { + let entry: oio::Entry = match status.is_dir { + true => { + let normalized_path = format!("{}/", &status.path); + let mut meta = Metadata::new(EntryMode::DIR); + meta.set_last_modified(parse_datetime_from_from_timestamp_millis( + status.modification_time, + )?); + oio::Entry::new(&normalized_path, meta) + } + false => { + let mut meta = Metadata::new(EntryMode::FILE); + meta.set_last_modified(parse_datetime_from_from_timestamp_millis( + status.modification_time, + )?); + meta.set_content_length(status.file_size as u64); + oio::Entry::new(&status.path, meta) + } + }; + entries.push(entry); + } + Ok(Some(entries)) + } +} + +#[derive(Debug, Deserialize)] +struct DbfsOutputList { + files: Vec, +} + +#[derive(Debug, Deserialize)] +struct DbfsStatus { + path: String, + is_dir: bool, + file_size: i64, + modification_time: i64, +} diff --git a/core/src/services/dbfs/reader.rs b/core/src/services/dbfs/reader.rs new file mode 100644 index 000000000000..a74ddf71987e --- /dev/null +++ b/core/src/services/dbfs/reader.rs @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp; +use std::io::SeekFrom; +use std::sync::Arc; +use std::task::ready; +use std::task::Context; +use std::task::Poll; + +use async_trait::async_trait; +use base64::engine::general_purpose; +use base64::Engine; +use bytes::BufMut; +use bytes::Bytes; +use futures::future::BoxFuture; +use serde::Deserialize; + +use super::core::DbfsCore; + +use crate::raw::*; +use crate::*; + +// The number of bytes to read starting from the offset. This has a limit of 1 MB +// Reference: https://docs.databricks.com/api/azure/workspace/dbfs/read +const DBFS_READ_LIMIT: usize = 1024 * 1024; + +pub struct DbfsReader { + state: State, + path: String, + offset: u64, + has_filled: u64, +} + +impl DbfsReader { + pub fn new(core: Arc, op: OpRead, path: String) -> Self { + DbfsReader { + state: State::Reading(Some(core)), + path, + offset: op.range().offset().unwrap_or(0), + has_filled: 0, + } + } + + #[inline] + fn set_offset(&mut self, offset: u64) { + self.offset = offset; + } + + fn serde_json_decode(&self, bs: &Bytes) -> Result { + let response_body = match serde_json::from_slice::(bs) { + Ok(v) => v, + Err(err) => { + return Err( + Error::new(ErrorKind::Unexpected, "parse response content failed") + .with_operation("http_util::IncomingDbfsAsyncBody::poll_read") + .set_source(err), + ); + } + }; + + let decoded_data = general_purpose::STANDARD + .decode(response_body.data) + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "decode response content failed") + .with_operation("http_util::IncomingDbfsAsyncBody::poll_read") + .set_source(err) + })?; + + Ok(decoded_data.into()) + } +} + +enum State { + Reading(Option>), + Finalize(BoxFuture<'static, (Arc, Result)>), +} + +/// # Safety +/// +/// We will only take `&mut Self` reference for DbfsReader. +unsafe impl Sync for DbfsReader {} + +#[async_trait] +impl oio::Read for DbfsReader { + fn poll_read(&mut self, cx: &mut Context<'_>, mut buf: &mut [u8]) -> Poll> { + while self.has_filled as usize != buf.len() { + match &mut self.state { + State::Reading(core) => { + let core = core.take().expect("DbfsReader must be initialized"); + + let path = self.path.clone(); + let offset = self.offset; + let len = cmp::min(buf.len(), DBFS_READ_LIMIT); + + let fut = async move { + let resp = async { core.dbfs_read(&path, offset, len as u64).await }.await; + let body = match resp { + Ok(resp) => resp.into_body(), + Err(err) => { + return (core, Err(err)); + } + }; + let bs = async { body.bytes().await }.await; + (core, bs) + }; + self.state = State::Finalize(Box::pin(fut)); + } + State::Finalize(fut) => { + let (core, bs) = ready!(fut.as_mut().poll(cx)); + let data = self.serde_json_decode(&bs?)?; + + buf.put_slice(&data[..]); + self.set_offset(self.offset + data.len() as u64); + self.has_filled += data.len() as u64; + self.state = State::Reading(Some(core)); + } + } + } + Poll::Ready(Ok(self.has_filled as usize)) + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { + let (_, _) = (cx, pos); + + Poll::Ready(Err(Error::new( + ErrorKind::Unsupported, + "output reader doesn't support seeking", + ))) + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + let _ = cx; + + Poll::Ready(Some(Err(Error::new( + ErrorKind::Unsupported, + "output reader doesn't support iterating", + )))) + } +} + +#[derive(Debug, Default, Deserialize)] +#[serde(default)] +struct ReadContentJsonResponse { + bytes_read: u64, + data: String, +} diff --git a/core/src/services/dbfs/writer.rs b/core/src/services/dbfs/writer.rs new file mode 100644 index 000000000000..04dbe4611812 --- /dev/null +++ b/core/src/services/dbfs/writer.rs @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use http::StatusCode; + +use crate::raw::oio::WriteBuf; +use crate::raw::*; +use crate::services::dbfs::core::DbfsCore; +use crate::*; + +use super::error::parse_error; + +pub struct DbfsWriter { + core: Arc, + path: String, +} + +impl DbfsWriter { + const MAX_SIMPLE_SIZE: usize = 1024 * 1024; + + pub fn new(core: Arc, _op: OpWrite, path: String) -> Self { + DbfsWriter { core, path } + } +} + +#[async_trait] +impl oio::OneShotWrite for DbfsWriter { + async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> { + let bs = bs.bytes(bs.remaining()); + let size = bs.len(); + + // MAX_BLOCK_SIZE_EXCEEDED will be thrown if this limit(1MB) is exceeded. + if size >= Self::MAX_SIMPLE_SIZE { + return Err(Error::new( + ErrorKind::Unsupported, + "AppendObjectWrite has not been implemented for Dbfs", + )); + } + + let req = self.core.dbfs_create_file_request(&self.path, bs)?; + + let resp = self.core.client.send(req).await?; + + let status = resp.status(); + match status { + StatusCode::CREATED | StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } +} diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index d6d1b80806f5..06e12933e2e5 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -248,3 +248,8 @@ pub use self::azfile::Azfile; mod mongodb; #[cfg(feature = "services-mongodb")] pub use self::mongodb::Mongodb; + +#[cfg(feature = "services-dbfs")] +mod dbfs; +#[cfg(feature = "services-dbfs")] +pub use self::dbfs::Dbfs; diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 231d617295c1..879dc6b9e19a 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -52,6 +52,8 @@ pub enum Scheme { Etcd, /// [foundationdb][crate::services::Foundationdb]: Foundationdb services. Foundationdb, + /// [dbfs][crate::services::Dbfs]: DBFS backend support. + Dbfs, /// [fs][crate::services::Fs]: POSIX alike file system. Fs, /// [ftp][crate::services::Ftp]: FTP backend. @@ -279,6 +281,7 @@ impl FromStr for Scheme { "dashmap" => Ok(Scheme::Dashmap), "dropbox" => Ok(Scheme::Dropbox), "etcd" => Ok(Scheme::Etcd), + "dbfs" => Ok(Scheme::Dbfs), "fs" => Ok(Scheme::Fs), "gcs" => Ok(Scheme::Gcs), "gdrive" => Ok(Scheme::Gdrive), @@ -331,6 +334,7 @@ impl From for &'static str { Scheme::D1 => "d1", Scheme::Dashmap => "dashmap", Scheme::Etcd => "etcd", + Scheme::Dbfs => "dbfs", Scheme::Fs => "fs", Scheme::Gcs => "gcs", Scheme::Ghac => "ghac",