diff --git a/core/src/services/dbfs/backend.rs b/core/src/services/dbfs/backend.rs index e8e9654ee623..df035027ea68 100644 --- a/core/src/services/dbfs/backend.rs +++ b/core/src/services/dbfs/backend.rs @@ -197,56 +197,37 @@ impl Accessor for DbfsBackend { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let resp = self.core.dbfs_read(path, args.range()).await?; - - let status = resp.status(); - - match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) + let mut meta = Metadata::new(EntryMode::FILE); + + if let Some(length) = args.range().size() { + meta.set_content_length(length); + } else { + let stat_resp = self.core.dbfs_get_status(path).await?; + meta = parse_into_metadata(path, stat_resp.headers())?; + let decoded_response = + serde_json::from_slice::(&stat_resp.into_body().bytes().await?) + .map_err(new_json_deserialize_error)?; + meta.set_last_modified(parse_datetime_from_from_timestamp_millis( + decoded_response.modification_time, + )?); + meta.set_mode(if decoded_response.is_dir { + EntryMode::DIR + } else { + EntryMode::FILE + }); + if !decoded_response.is_dir { + meta.set_content_length(decoded_response.file_size as u64); } - _ => Err(parse_error(resp).await?), } - // let resp = self.core.dbfs_read(path, args.range()).await?; - // - // let status = resp.status(); - // - // match status { - // StatusCode::OK => { - // // NOTE: If range is not specified, we need to get content length from stat API. - // if let Some(size) = args.range().size() { - // let mut meta = parse_into_metadata(path, resp.headers())?; - // meta.set_content_length(size); - // Ok((RpRead::with_metadata(meta), resp.into_body())) - // } else { - // let stat_resp = self.core.dbfs_get_status(path).await?; - // let meta = match stat_resp.status() { - // StatusCode::OK => { - // let mut meta = parse_into_metadata(path, stat_resp.headers())?; - // let bs = stat_resp.into_body().bytes().await?; - // let decoded_response = serde_json::from_slice::(&bs) - // .map_err(new_json_deserialize_error)?; - // meta.set_last_modified(parse_datetime_from_from_timestamp_millis( - // decoded_response.modification_time, - // )?); - // match decoded_response.is_dir { - // true => meta.set_mode(EntryMode::DIR), - // false => { - // meta.set_mode(EntryMode::FILE); - // meta.set_content_length(decoded_response.file_size as u64) - // } - // }; - // meta - // } - // _ => return Err(parse_error(stat_resp).await?), - // }; - // Ok((RpRead::with_metadata(meta), resp.into_body())) - // } - // } - // _ => Err(parse_dbfs_read_error(resp).await?), - // } + let op = DbfsReader::new( + self.core.clone(), + args, + path.to_string(), + meta.content_length(), + ); + + Ok((RpRead::with_metadata(meta), op)) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { diff --git a/core/src/services/dbfs/core.rs b/core/src/services/dbfs/core.rs index e03e14a1f015..34aff70066c8 100644 --- a/core/src/services/dbfs/core.rs +++ b/core/src/services/dbfs/core.rs @@ -26,17 +26,10 @@ use http::Response; use http::StatusCode; use serde_json::json; -use crate::raw::build_rooted_abs_path; -use crate::raw::new_request_build_error; -use crate::raw::percent_encode_path; -use crate::raw::AsyncBody; -use crate::raw::BytesRange; -use crate::raw::HttpClient; -use crate::raw::IncomingAsyncBody; +use crate::raw::*; use crate::*; use super::error::parse_error; -use super::reader::IncomingDbfsAsyncBody; pub struct DbfsCore { pub root: String, @@ -166,7 +159,8 @@ impl DbfsCore { pub async fn dbfs_read( &self, path: &str, - range: BytesRange, + offset: u64, + length: u64, ) -> Result> { let p = build_rooted_abs_path(&self.root, path) .trim_end_matches('/') @@ -178,11 +172,11 @@ impl DbfsCore { percent_encode_path(&p) ); - if let Some(offset) = range.offset() { + if offset > 0 { url.push_str(&format!("&offset={}", offset)); } - if let Some(length) = range.size() { + if length > 0 { url.push_str(&format!("&length={}", length)); } @@ -195,7 +189,14 @@ impl DbfsCore { .body(AsyncBody::Empty) .map_err(new_request_build_error)?; - self.client.send(req).await + let resp = self.client.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(resp), + _ => Err(parse_error(resp).await?), + } } pub async fn dbfs_get_status(&self, path: &str) -> Result> { diff --git a/core/src/services/dbfs/error.rs b/core/src/services/dbfs/error.rs index 8e0cef99d254..41069d45fb0d 100644 --- a/core/src/services/dbfs/error.rs +++ b/core/src/services/dbfs/error.rs @@ -21,7 +21,6 @@ use http::Response; use http::StatusCode; use serde::Deserialize; -use super::reader::IncomingDbfsAsyncBody; use crate::raw::*; use crate::Error; use crate::ErrorKind; @@ -75,34 +74,3 @@ pub async fn parse_error(resp: Response) -> Result { Ok(err) } - -pub async fn parse_dbfs_read_error(resp: Response) -> Result { - let (parts, body) = resp.into_parts(); - let bs = body.bytes().await?; - - let (kind, retryable) = match parts.status { - StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), - StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false), - StatusCode::PRECONDITION_FAILED => (ErrorKind::ConditionNotMatch, false), - StatusCode::INTERNAL_SERVER_ERROR - | StatusCode::BAD_GATEWAY - | StatusCode::SERVICE_UNAVAILABLE - | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), - _ => (ErrorKind::Unexpected, false), - }; - - let message = match serde_json::from_slice::(&bs) { - Ok(dbfs_error) => format!("{:?}", dbfs_error.message), - Err(_) => String::from_utf8_lossy(&bs).into_owned(), - }; - - let mut err = Error::new(kind, &message); - - err = with_error_response_context(err, parts); - - if retryable { - err = err.set_temporary(); - } - - Ok(err) -} diff --git a/core/src/services/dbfs/reader.rs b/core/src/services/dbfs/reader.rs index 58fb440aba18..2e71a688d3ee 100644 --- a/core/src/services/dbfs/reader.rs +++ b/core/src/services/dbfs/reader.rs @@ -15,228 +15,62 @@ // specific language governing permissions and limitations // under the License. -use std::cmp::Ordering; -use std::io; -use std::mem; -use std::str::FromStr; +use std::cmp; +use std::io::SeekFrom; +use std::sync::Arc; use std::task::ready; use std::task::Context; use std::task::Poll; +use async_trait::async_trait; use base64::engine::general_purpose; use base64::Engine; -use bytes::Buf; use bytes::BufMut; use bytes::Bytes; -use futures::StreamExt; -use futures::TryStreamExt; -use http::Request; +use bytes::BytesMut; +use futures::future::BoxFuture; use http::Response; use serde::Deserialize; +use super::core::DbfsCore; + +use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; -impl HttpClient { - /// Send a request in async way. - pub async fn send_dbfs( - &self, - req: Request, - ) -> Result> { - // Uri stores all string alike data in `Bytes` which means - // the clone here is cheap. - let uri = req.uri().clone(); - let is_head = req.method() == http::Method::HEAD; - - let (parts, body) = req.into_parts(); - - let mut req_builder = self - .client - .request( - parts.method, - reqwest::Url::from_str(&uri.to_string()).expect("input request url must be valid"), - ) - .version(parts.version) - .headers(parts.headers); - - req_builder = match body { - AsyncBody::Empty => req_builder.body(reqwest::Body::from("")), - AsyncBody::Bytes(bs) => req_builder.body(reqwest::Body::from(bs)), - AsyncBody::ChunkedBytes(bs) => req_builder.body(reqwest::Body::wrap_stream(bs)), - AsyncBody::Stream(s) => req_builder.body(reqwest::Body::wrap_stream(s)), - }; - - let mut resp = req_builder.send().await.map_err(|err| { - let is_temporary = !( - // Builder related error should not be retried. - err.is_builder() || - // Error returned by RedirectPolicy. - // - // We don't set this by hand, just don't allow retry. - err.is_redirect() || - // We never use `Response::error_for_status`, just don't allow retry. - // - // Status should be checked by our services. - err.is_status() - ); - - let mut oerr = Error::new(ErrorKind::Unexpected, "send async request") - .with_operation("http_util::Client::send_async") - .with_context("url", uri.to_string()) - .set_source(err); - if is_temporary { - oerr = oerr.set_temporary(); - } - - oerr - })?; - - // Get content length from header so that we can check it. - // If the request method is HEAD, we will ignore this. - let content_length = if is_head { - None - } else { - parse_content_length(resp.headers()).expect("response content length must be valid") - }; - - let mut hr = Response::builder() - .version(resp.version()) - .status(resp.status()) - // Insert uri into response extension so that we can fetch - // it later. - .extension(uri.clone()); - // Swap headers directly instead of copy the entire map. - mem::swap(hr.headers_mut().unwrap(), resp.headers_mut()); - - let stream = resp.bytes_stream().map_err(move |err| { - // If stream returns a body related error, we can convert - // it to interrupt so we can retry it. - Error::new(ErrorKind::Unexpected, "read data from http stream") - .map(|v| if err.is_body() { v.set_temporary() } else { v }) - .with_context("url", uri.to_string()) - .set_source(err) - }); - - let body = IncomingDbfsAsyncBody::new(Box::new(oio::into_stream(stream)), content_length); - - let resp = hr.body(body).expect("response must build succeed"); - - Ok(resp) - } -} +const DBFS_READ_LIMIT: usize = 1048576; -/// IncomingDbfsAsyncBody carries the content returned by remote servers. -/// -/// # Notes -/// -/// Client SHOULD NEVER construct this body. -pub struct IncomingDbfsAsyncBody { - inner: oio::Streamer, - size: Option, - consumed: u64, - chunk: Option, +pub struct DbfsReader { + state: State, + path: String, + offset: u64, + buffer: BytesMut, } -impl IncomingDbfsAsyncBody { - /// Construct a new incoming async body - pub fn new(s: oio::Streamer, size: Option) -> Self { - Self { - inner: s, - size, - consumed: 0, - chunk: None, - } - } - - /// Consume the entire body. - pub async fn consume(mut self) -> Result<()> { - use oio::ReadExt; - - while let Some(bs) = self.next().await { - bs.map_err(|err| { - Error::new(ErrorKind::Unexpected, "fetch bytes from stream") - .with_operation("http_util::IncomingDbfsAsyncBody::consume") - .set_source(err) - })?; - } - - Ok(()) - } - - /// Consume the response to bytes. - pub async fn bytes(mut self) -> Result { - use oio::ReadExt; - - // If there's only 1 chunk, we can just return Buf::to _bytes() - let mut first = if let Some(buf) = self.next().await { - buf? - } else { - return Ok(Bytes::new()); - }; - - let second = if let Some(buf) = self.next().await { - buf? - } else { - return Ok(first.copy_to_bytes(first.remaining())); - }; - - // With more than 1 buf, we gotta flatten into a Vec first. - let cap = first.remaining() + second.remaining() + self.size.unwrap_or_default() as usize; - let mut vec = Vec::with_capacity(cap); - vec.put(first); - vec.put(second); - - while let Some(buf) = self.next().await { - vec.put(buf?); +impl DbfsReader { + pub fn new(core: Arc, op: OpRead, path: String, content_length: u64) -> Self { + DbfsReader { + state: State::Idle(Some(core)), + path, + offset: op.range().offset().unwrap_or(0), + buffer: BytesMut::with_capacity(content_length as usize), } - - Ok(vec.into()) } #[inline] - fn check(expect: u64, actual: u64) -> Result<()> { - match actual.cmp(&expect) { - Ordering::Equal => Ok(()), - Ordering::Less => Err(Error::new( - ErrorKind::ContentIncomplete, - &format!("reader got too less data, expect: {expect}, actual: {actual}"), - ) - .set_temporary()), - Ordering::Greater => Err(Error::new( - ErrorKind::ContentTruncated, - &format!("reader got too much data, expect: {expect}, actual: {actual}"), - ) - .set_temporary()), - } + fn set_offset(&mut self, offset: u64) { + self.offset = offset; } -} -impl oio::Read for IncomingDbfsAsyncBody { - fn poll_read(&mut self, cx: &mut Context<'_>, mut buf: &mut [u8]) -> Poll> { - if buf.is_empty() { - return Poll::Ready(Ok(0)); - } - - // We must get a valid bytes from underlying stream - let bs = loop { - match ready!(self.poll_next(cx)) { - Some(Ok(bs)) if bs.is_empty() => continue, - Some(Ok(bs)) => break bs, - Some(Err(err)) => return Poll::Ready(Err(err)), - None => return Poll::Ready(Ok(0)), - } - }; - - // TODO: reqwest::Body::bytes_stream() in poll_next() will not return whole response at once if the file is too big, which causes serde failed - let mut response_body = match serde_json::from_slice::(&bs) { + fn serde_json_decode(&self, bs: &Bytes) -> Result { + let mut response_body = match serde_json::from_slice::(bs) { Ok(v) => v, Err(err) => { - return Poll::Ready(Err(Error::new( - ErrorKind::Unexpected, - "parse response content failed", - ) - .with_operation("http_util::IncomingDbfsAsyncBody::poll_read") - .set_source(err))); + return Err( + Error::new(ErrorKind::Unexpected, "parse response content failed") + .with_operation("http_util::IncomingDbfsAsyncBody::poll_read") + .set_source(err), + ); } }; @@ -258,48 +92,86 @@ impl oio::Read for IncomingDbfsAsyncBody { }) })?; - let amt = response_body.bytes_read as usize; - - buf.put_slice(response_body.data.as_ref()); - - // TODO: will handle chunk here till we find a way to get whole bytes at once - - Poll::Ready(Ok(amt)) + Ok(response_body.data.into()) } +} - fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll> { - let (_, _) = (cx, pos); +enum State { + Idle(Option>), + Read(BoxFuture<'static, (Arc, Result>)>), + Decode(BoxFuture<'static, (Arc, Result)>), +} - Poll::Ready(Err(Error::new( - ErrorKind::Unsupported, - "output reader doesn't support seeking", - ))) - } +/// # Safety +/// +/// We will only take `&mut Self` reference for DbfsReader. +unsafe impl Sync for DbfsReader {} - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - if let Some(bs) = self.chunk.take() { - return Poll::Ready(Some(Ok(bs))); - } +#[async_trait] +impl oio::Read for DbfsReader { + fn poll_read(&mut self, cx: &mut Context<'_>, mut buf: &mut [u8]) -> Poll> { + while self.buffer.remaining() != buf.len() { + match &mut self.state { + State::Idle(core) => { + let core = core.take().expect("DbfsReader must be initialized"); + + let path = self.path.clone(); + let offset = self.offset; + let len = cmp::min(buf.len(), DBFS_READ_LIMIT); + + let fut = async move { + let resp = async { core.dbfs_read(&path, offset, len as u64).await }.await; + (core, resp) + }; + self.state = State::Read(Box::pin(fut)); + } + State::Read(fut) => { + let (core, resp) = ready!(fut.as_mut().poll(cx)); + let body = resp?.into_body(); + + let fut = async move { + let bs = async { body.bytes().await }.await; + (core, bs) + }; + self.state = State::Decode(Box::pin(fut)); + } + State::Decode(fut) => { + let (core, bs) = ready!(fut.as_mut().poll(cx)); + let data = self.serde_json_decode(&bs?)?; - // NOTE: inner: Stream> - // Reader::poll_next() -> Stream::poll_next() - // control by reqwest::Body::bytes_stream() - let res = match ready!(self.inner.poll_next_unpin(cx)) { - Some(Ok(bs)) => { - self.consumed += bs.len() as u64; - Some(Ok(bs)) - } - Some(Err(err)) => Some(Err(err)), - None => { - if let Some(size) = self.size { - Self::check(size, self.consumed)?; + self.buffer.put_slice(&data[..]); + self.set_offset(self.offset + data.len() as u64); + self.state = State::Idle(Some(core)); } + } + } + buf.put_slice(&self.buffer[..]); + Poll::Ready(Ok(self.buffer.remaining())) + } - None + fn poll_seek(&mut self, _cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { + // TODO: drop existing buf and change the offset? + match pos { + SeekFrom::Start(n) => { + self.set_offset(n); + } + SeekFrom::End(n) => { + self.set_offset((self.buffer.remaining() as i64 + n) as u64); + } + SeekFrom::Current(n) => { + self.set_offset((self.offset as i64 + n) as u64); } }; + Poll::Ready(Ok(0)) + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + let _ = cx; - Poll::Ready(res) + Poll::Ready(Some(Err(Error::new( + ErrorKind::Unsupported, + "output reader doesn't support iterating", + )))) } }