Skip to content

Commit

Permalink
feat: fix PR review #3334
Browse files Browse the repository at this point in the history
  • Loading branch information
morristai committed Oct 25, 2023
1 parent 2816380 commit a752033
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 50 deletions.
7 changes: 1 addition & 6 deletions core/src/services/dbfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,7 @@ impl Accessor for DbfsBackend {
}
}

let op = DbfsReader::new(
self.core.clone(),
args,
path.to_string(),
meta.content_length(),
);
let op = DbfsReader::new(self.core.clone(), args, path.to_string());

Ok((RpRead::with_metadata(meta), op))
}
Expand Down
74 changes: 30 additions & 44 deletions core/src/services/dbfs/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,11 @@ use base64::engine::general_purpose;
use base64::Engine;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use futures::future::BoxFuture;
use http::Response;
use serde::Deserialize;

use super::core::DbfsCore;

use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::*;

Expand All @@ -44,16 +41,16 @@ pub struct DbfsReader {
state: State,
path: String,
offset: u64,
buffer: BytesMut,
has_filled: u64,
}

impl DbfsReader {
pub fn new(core: Arc<DbfsCore>, op: OpRead, path: String, content_length: u64) -> Self {
pub fn new(core: Arc<DbfsCore>, op: OpRead, path: String) -> Self {
DbfsReader {
state: State::Idle(Some(core)),
state: State::Reading(Some(core)),
path,
offset: op.range().offset().unwrap_or(0),
buffer: BytesMut::with_capacity(content_length as usize),
has_filled: 0,
}
}

Expand All @@ -63,7 +60,7 @@ impl DbfsReader {
}

fn serde_json_decode(&self, bs: &Bytes) -> Result<Bytes> {
let mut response_body = match serde_json::from_slice::<ReadContentJsonResponse>(bs) {
let response_body = match serde_json::from_slice::<ReadContentJsonResponse>(bs) {
Ok(v) => v,
Err(err) => {
return Err(
Expand All @@ -74,7 +71,7 @@ impl DbfsReader {
}
};

response_body.data = general_purpose::STANDARD
let decoded_data = general_purpose::STANDARD
.decode(response_body.data)
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "decode response content failed")
Expand All @@ -92,14 +89,13 @@ impl DbfsReader {
})
})?;

Ok(response_body.data.into())
Ok(decoded_data.into())
}
}

enum State {
Idle(Option<Arc<DbfsCore>>),
Read(BoxFuture<'static, (Arc<DbfsCore>, Result<Response<IncomingAsyncBody>>)>),
Decode(BoxFuture<'static, (Arc<DbfsCore>, Result<Bytes>)>),
Reading(Option<Arc<DbfsCore>>),
Finalize(BoxFuture<'static, (Arc<DbfsCore>, Result<Bytes>)>),
}

/// # Safety
Expand All @@ -110,9 +106,9 @@ unsafe impl Sync for DbfsReader {}
#[async_trait]
impl oio::Read for DbfsReader {
fn poll_read(&mut self, cx: &mut Context<'_>, mut buf: &mut [u8]) -> Poll<Result<usize>> {
while self.buffer.remaining() != buf.len() {
while self.has_filled as usize != buf.len() {
match &mut self.state {
State::Idle(core) => {
State::Reading(core) => {
let core = core.take().expect("DbfsReader must be initialized");

let path = self.path.clone();
Expand All @@ -121,48 +117,38 @@ impl oio::Read for DbfsReader {

let fut = async move {
let resp = async { core.dbfs_read(&path, offset, len as u64).await }.await;
(core, resp)
};
self.state = State::Read(Box::pin(fut));
}
State::Read(fut) => {
let (core, resp) = ready!(fut.as_mut().poll(cx));
let body = resp?.into_body();

let fut = async move {
let body = match resp {
Ok(resp) => resp.into_body(),
Err(err) => {
return (core, Err(err));
}
};
let bs = async { body.bytes().await }.await;
(core, bs)
};
self.state = State::Decode(Box::pin(fut));
self.state = State::Finalize(Box::pin(fut));
}
State::Decode(fut) => {
State::Finalize(fut) => {
let (core, bs) = ready!(fut.as_mut().poll(cx));
let data = self.serde_json_decode(&bs?)?;

self.buffer.put_slice(&data[..]);
buf.put_slice(&data[..]);
self.set_offset(self.offset + data.len() as u64);
self.state = State::Idle(Some(core));
self.has_filled += data.len() as u64;
self.state = State::Reading(Some(core));
}
}
}
buf.put_slice(&self.buffer[..]);
Poll::Ready(Ok(self.buffer.remaining()))
Poll::Ready(Ok(self.has_filled as usize))
}

fn poll_seek(&mut self, _cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> {
// TODO: drop existing buf and change the offset?
match pos {
SeekFrom::Start(n) => {
self.set_offset(n);
}
SeekFrom::End(n) => {
self.set_offset((self.buffer.remaining() as i64 + n) as u64);
}
SeekFrom::Current(n) => {
self.set_offset((self.offset as i64 + n) as u64);
}
};
Poll::Ready(Ok(0))
fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> {
let (_, _) = (cx, pos);

Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"output reader doesn't support seeking",
)))
}

fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
Expand Down

0 comments on commit a752033

Please sign in to comment.