Skip to content

Commit

Permalink
feat: implement DBFS reader
Browse files Browse the repository at this point in the history
  • Loading branch information
morristai committed Oct 25, 2023
1 parent da1d8e0 commit a06ad4c
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 320 deletions.
75 changes: 28 additions & 47 deletions core/src/services/dbfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,56 +197,37 @@ impl Accessor for DbfsBackend {
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let resp = self.core.dbfs_read(path, args.range()).await?;

let status = resp.status();

match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let meta = parse_into_metadata(path, resp.headers())?;
Ok((RpRead::with_metadata(meta), resp.into_body()))
let mut meta = Metadata::new(EntryMode::FILE);

if let Some(length) = args.range().size() {
meta.set_content_length(length);
} else {
let stat_resp = self.core.dbfs_get_status(path).await?;
meta = parse_into_metadata(path, stat_resp.headers())?;
let decoded_response =
serde_json::from_slice::<DbfsStatus>(&stat_resp.into_body().bytes().await?)
.map_err(new_json_deserialize_error)?;
meta.set_last_modified(parse_datetime_from_from_timestamp_millis(
decoded_response.modification_time,
)?);
meta.set_mode(if decoded_response.is_dir {
EntryMode::DIR
} else {
EntryMode::FILE
});
if !decoded_response.is_dir {
meta.set_content_length(decoded_response.file_size as u64);
}
_ => Err(parse_error(resp).await?),
}

// let resp = self.core.dbfs_read(path, args.range()).await?;
//
// let status = resp.status();
//
// match status {
// StatusCode::OK => {
// // NOTE: If range is not specified, we need to get content length from stat API.
// if let Some(size) = args.range().size() {
// let mut meta = parse_into_metadata(path, resp.headers())?;
// meta.set_content_length(size);
// Ok((RpRead::with_metadata(meta), resp.into_body()))
// } else {
// let stat_resp = self.core.dbfs_get_status(path).await?;
// let meta = match stat_resp.status() {
// StatusCode::OK => {
// let mut meta = parse_into_metadata(path, stat_resp.headers())?;
// let bs = stat_resp.into_body().bytes().await?;
// let decoded_response = serde_json::from_slice::<DbfsStatus>(&bs)
// .map_err(new_json_deserialize_error)?;
// meta.set_last_modified(parse_datetime_from_from_timestamp_millis(
// decoded_response.modification_time,
// )?);
// match decoded_response.is_dir {
// true => meta.set_mode(EntryMode::DIR),
// false => {
// meta.set_mode(EntryMode::FILE);
// meta.set_content_length(decoded_response.file_size as u64)
// }
// };
// meta
// }
// _ => return Err(parse_error(stat_resp).await?),
// };
// Ok((RpRead::with_metadata(meta), resp.into_body()))
// }
// }
// _ => Err(parse_dbfs_read_error(resp).await?),
// }
let op = DbfsReader::new(
self.core.clone(),
args,
path.to_string(),
meta.content_length(),
);

Ok((RpRead::with_metadata(meta), op))
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
Expand Down
25 changes: 13 additions & 12 deletions core/src/services/dbfs/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,10 @@ use http::Response;
use http::StatusCode;
use serde_json::json;

use crate::raw::build_rooted_abs_path;
use crate::raw::new_request_build_error;
use crate::raw::percent_encode_path;
use crate::raw::AsyncBody;
use crate::raw::BytesRange;
use crate::raw::HttpClient;
use crate::raw::IncomingAsyncBody;
use crate::raw::*;
use crate::*;

use super::error::parse_error;
use super::reader::IncomingDbfsAsyncBody;

pub struct DbfsCore {
pub root: String,
Expand Down Expand Up @@ -166,7 +159,8 @@ impl DbfsCore {
pub async fn dbfs_read(
&self,
path: &str,
range: BytesRange,
offset: u64,
length: u64,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_rooted_abs_path(&self.root, path)
.trim_end_matches('/')
Expand All @@ -178,11 +172,11 @@ impl DbfsCore {
percent_encode_path(&p)
);

if let Some(offset) = range.offset() {
if offset > 0 {
url.push_str(&format!("&offset={}", offset));
}

if let Some(length) = range.size() {
if length > 0 {
url.push_str(&format!("&length={}", length));
}

Expand All @@ -195,7 +189,14 @@ impl DbfsCore {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;

self.client.send(req).await
let resp = self.client.send(req).await?;

let status = resp.status();

match status {
StatusCode::OK => Ok(resp),
_ => Err(parse_error(resp).await?),
}
}

pub async fn dbfs_get_status(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
Expand Down
32 changes: 0 additions & 32 deletions core/src/services/dbfs/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use http::Response;
use http::StatusCode;
use serde::Deserialize;

use super::reader::IncomingDbfsAsyncBody;
use crate::raw::*;
use crate::Error;
use crate::ErrorKind;
Expand Down Expand Up @@ -75,34 +74,3 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {

Ok(err)
}

pub async fn parse_dbfs_read_error(resp: Response<IncomingDbfsAsyncBody>) -> Result<Error> {
let (parts, body) = resp.into_parts();
let bs = body.bytes().await?;

let (kind, retryable) = match parts.status {
StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
StatusCode::PRECONDITION_FAILED => (ErrorKind::ConditionNotMatch, false),
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
_ => (ErrorKind::Unexpected, false),
};

let message = match serde_json::from_slice::<DbfsError>(&bs) {
Ok(dbfs_error) => format!("{:?}", dbfs_error.message),
Err(_) => String::from_utf8_lossy(&bs).into_owned(),
};

let mut err = Error::new(kind, &message);

err = with_error_response_context(err, parts);

if retryable {
err = err.set_temporary();
}

Ok(err)
}
Loading

0 comments on commit a06ad4c

Please sign in to comment.