Skip to content

Commit

Permalink
refactor(core/raw): Migrate oio::Write from WriteBuf to Bytes (#4356)
Browse files Browse the repository at this point in the history
* Build passed

Signed-off-by: Xuanwo <[email protected]>

* Fix check

Signed-off-by: Xuanwo <[email protected]>

* cargo format

Signed-off-by: Xuanwo <[email protected]>

* Fix clippy

Signed-off-by: Xuanwo <[email protected]>

* Fix tests

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Mar 14, 2024
1 parent a23883d commit b8ff1af
Show file tree
Hide file tree
Showing 56 changed files with 209 additions and 237 deletions.
8 changes: 2 additions & 6 deletions core/benches/oio/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,8 @@ use rand::RngCore;
pub struct BlackHoleWriter;

impl oio::Write for BlackHoleWriter {
fn poll_write(
&mut self,
_: &mut Context<'_>,
bs: &dyn oio::WriteBuf,
) -> Poll<opendal::Result<usize>> {
Poll::Ready(Ok(bs.remaining()))
fn poll_write(&mut self, _: &mut Context<'_>, bs: Bytes) -> Poll<opendal::Result<usize>> {
Poll::Ready(Ok(bs.len()))
}

fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<opendal::Result<()>> {
Expand Down
2 changes: 1 addition & 1 deletion core/benches/oio/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub fn bench_exact_buf_write(c: &mut Criterion) {

let mut bs = content.clone();
while !bs.is_empty() {
let n = w.write(&bs).await.unwrap();
let n = w.write(bs.clone()).await.unwrap();
bs.advance(n);
}
w.close().await.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,9 @@ impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> {
}

impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
fn write(&mut self, bs: Bytes) -> Result<usize> {
self.handle
.block_on(poll_fn(|cx| self.inner.poll_write(cx, bs)))
.block_on(poll_fn(|cx| self.inner.poll_write(cx, bs.clone())))
}

fn close(&mut self) -> Result<()> {
Expand Down
5 changes: 3 additions & 2 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::task::Context;
use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;

use crate::raw::oio::BufferReader;
use crate::raw::oio::FileReader;
Expand Down Expand Up @@ -711,7 +712,7 @@ impl<W> oio::Write for CompleteWriter<W>
where
W: oio::Write,
{
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;
Expand Down Expand Up @@ -747,7 +748,7 @@ impl<W> oio::BlockingWrite for CompleteWriter<W>
where
W: oio::BlockingWrite,
{
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
fn write(&mut self, bs: Bytes) -> Result<usize> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> {
}

impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
self.inner.poll_write(cx, bs)
}

Expand All @@ -292,7 +292,7 @@ impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
fn write(&mut self, bs: Bytes) -> Result<usize> {
self.inner.write(bs)
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/dtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for DtraceLayerWrapper<R> {
}

impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, writer_write_start, c_path.as_ptr());
self.inner
Expand Down Expand Up @@ -453,7 +453,7 @@ impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for DtraceLayerWrapper<R> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
fn write(&mut self, bs: Bytes) -> Result<usize> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, blocking_writer_write_start, c_path.as_ptr());
self.inner
Expand Down
12 changes: 6 additions & 6 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,12 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {

#[async_trait::async_trait]
impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
self.inner.poll_write(cx, bs).map_err(|err| {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
self.inner.poll_write(cx, bs.clone()).map_err(|err| {
err.with_operation(WriteOperation::Write)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("write_buf", bs.remaining().to_string())
.with_context("write_buf", bs.len().to_string())
})
}

Expand All @@ -416,12 +416,12 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
}

impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.inner.write(bs).map_err(|err| {
fn write(&mut self, bs: Bytes) -> Result<usize> {
self.inner.write(bs.clone()).map_err(|err| {
err.with_operation(WriteOperation::BlockingWrite)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("write_buf", bs.remaining().to_string())
.with_context("write_buf", bs.len().to_string())
})
}

Expand Down
12 changes: 6 additions & 6 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1147,8 +1147,8 @@ impl<W> LoggingWriter<W> {
}

impl<W: oio::Write> oio::Write for LoggingWriter<W> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
match ready!(self.inner.poll_write(cx, bs)) {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
match ready!(self.inner.poll_write(cx, bs.clone())) {
Ok(n) => {
self.written += n as u64;
trace!(
Expand All @@ -1158,7 +1158,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
WriteOperation::Write,
self.path,
self.written,
bs.remaining(),
bs.len(),
n,
);
Poll::Ready(Ok(n))
Expand Down Expand Up @@ -1245,8 +1245,8 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
}

impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
match self.inner.write(bs) {
fn write(&mut self, bs: Bytes) -> Result<usize> {
match self.inner.write(bs.clone()) {
Ok(n) => {
self.written += n as u64;
trace!(
Expand All @@ -1256,7 +1256,7 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
WriteOperation::BlockingWrite,
self.path,
self.written,
bs.remaining(),
bs.len(),
n
);
Ok(n)
Expand Down
6 changes: 1 addition & 5 deletions core/src/layers/madsim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,7 @@ pub struct MadsimWriter {
}

impl oio::Write for MadsimWriter {
fn poll_write(
&mut self,
cx: &mut Context<'_>,
bs: &dyn oio::WriteBuf,
) -> Poll<crate::Result<usize>> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<crate::Result<usize>> {
#[cfg(madsim)]
{
let req = Request::Write(self.path.to_string(), bs);
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> {
}

impl<R: oio::Write> oio::Write for MetricWrapper<R> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
self.inner
.poll_write(cx, bs)
.map_ok(|n| {
Expand Down Expand Up @@ -849,7 +849,7 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
fn write(&mut self, bs: Bytes) -> Result<usize> {
self.inner
.write(bs)
.map(|n| {
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/minitrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> {
}

impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::Write.into_static());
self.inner.poll_write(cx, bs)
Expand All @@ -344,7 +344,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
fn write(&mut self, bs: Bytes) -> Result<usize> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
self.inner.write(bs)
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/oteltrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> {
}

impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
self.inner.poll_write(cx, bs)
}

Expand All @@ -312,7 +312,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
fn write(&mut self, bs: Bytes) -> Result<usize> {
self.inner.write(bs)
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
}

impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::Write.into_static(),
Expand Down Expand Up @@ -786,7 +786,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
fn write(&mut self, bs: Bytes) -> Result<usize> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::BlockingWrite.into_static(),
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/prometheus_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
}

impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
self.inner
.poll_write(cx, bs)
.map_ok(|n| {
Expand Down Expand Up @@ -622,7 +622,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
fn write(&mut self, bs: Bytes) -> Result<usize> {
self.inner
.write(bs)
.map(|n| {
Expand Down
10 changes: 5 additions & 5 deletions core/src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,13 +776,13 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> oio::BlockingRead for RetryWrapp
}

impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
if let Some(sleep) = self.sleep.as_mut() {
ready!(sleep.poll_unpin(cx));
self.sleep = None;
}

match ready!(self.inner.as_mut().unwrap().poll_write(cx, bs)) {
match ready!(self.inner.as_mut().unwrap().poll_write(cx, bs.clone())) {
Ok(v) => {
self.current_backoff = None;
Poll::Ready(Ok(v))
Expand Down Expand Up @@ -815,7 +815,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
],
);
self.sleep = Some(Box::pin(tokio::time::sleep(dur)));
self.poll_write(cx, bs)
self.poll_write(cx, bs.clone())
}
}
}
Expand Down Expand Up @@ -916,8 +916,8 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
}

impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for RetryWrapper<R, I> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
{ || self.inner.as_mut().unwrap().write(bs) }
fn write(&mut self, bs: Bytes) -> Result<usize> {
{ || self.inner.as_mut().unwrap().write(bs.clone()) }
.retry(&self.builder)
.when(|e| e.is_temporary())
.notify(|err, dur| {
Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ThrottleWrapper<R> {
}

impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
let buf_length = NonZeroU32::new(bs.remaining() as u32).unwrap();
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();

loop {
match self.limiter.check_n(buf_length) {
Expand Down Expand Up @@ -242,8 +242,8 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for ThrottleWrapper<R> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
let buf_length = NonZeroU32::new(bs.remaining() as u32).unwrap();
fn write(&mut self, bs: Bytes) -> Result<usize> {
let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();

loop {
match self.limiter.check_n(buf_length) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
}

impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
self.poll_timeout(cx, WriteOperation::Write.into_static())?;

let v = ready!(self.inner.poll_write(cx, bs));
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
self.inner.poll_write(cx, bs)
}

Expand All @@ -335,7 +335,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
fn write(&mut self, bs: Bytes) -> Result<usize> {
self.inner.write(bs)
}

Expand Down
12 changes: 6 additions & 6 deletions core/src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ enum Buffer {
unsafe impl<S: Adapter> Sync for KvWriter<S> {}

impl<S: Adapter> oio::Write for KvWriter<S> {
fn poll_write(&mut self, _: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
fn poll_write(&mut self, _: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
if self.future.is_some() {
self.future = None;
return Poll::Ready(Err(Error::new(
Expand All @@ -301,8 +301,8 @@ impl<S: Adapter> oio::Write for KvWriter<S> {

match &mut self.buffer {
Buffer::Active(buf) => {
buf.extend_from_slice(bs.chunk());
Poll::Ready(Ok(bs.chunk().len()))
buf.extend_from_slice(&bs);
Poll::Ready(Ok(bs.len()))
}
Buffer::Frozen(_) => unreachable!("KvWriter should not be frozen during poll_write"),
}
Expand Down Expand Up @@ -350,11 +350,11 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
}

impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
fn write(&mut self, bs: Bytes) -> Result<usize> {
match &mut self.buffer {
Buffer::Active(buf) => {
buf.extend_from_slice(bs.chunk());
Ok(bs.chunk().len())
buf.extend_from_slice(&bs);
Ok(bs.len())
}
Buffer::Frozen(_) => unreachable!("KvWriter should not be frozen during poll_write"),
}
Expand Down
Loading

0 comments on commit b8ff1af

Please sign in to comment.