Skip to content

Commit

Permalink
feat(core): Implement write and write_from for Writer (#4482)
Browse files Browse the repository at this point in the history
* polish writer write & write_from api

* modify writer & writer_from

* public buffer

* modify vec!
  • Loading branch information
zjregee authored Apr 15, 2024
1 parent 2931d7b commit 1a99896
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 10 deletions.
3 changes: 2 additions & 1 deletion core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,8 @@ mod tests {
#[tokio::test]
async fn test_writer() {
let op = new_test_operator(Capability::default());
let res = op.write("path", vec![]).await;
let bs: Vec<u8> = vec![];
let res = op.write("path", bs).await;
assert!(res.is_err());
assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);

Expand Down
5 changes: 2 additions & 3 deletions core/src/types/operator/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use std::future::Future;
use std::time::Duration;

use bytes::Bytes;
use futures::stream;
use futures::Stream;
use futures::StreamExt;
Expand Down Expand Up @@ -622,7 +621,7 @@ impl Operator {
/// # Ok(())
/// # }
/// ```
pub async fn write(&self, path: &str, bs: impl Into<Bytes>) -> Result<()> {
pub async fn write(&self, path: &str, bs: impl Into<Buffer>) -> Result<()> {
let bs = bs.into();
self.write_with(path, bs).await
}
Expand Down Expand Up @@ -1111,7 +1110,7 @@ impl Operator {
pub fn write_with(
&self,
path: &str,
bs: impl Into<Bytes>,
bs: impl Into<Buffer>,
) -> FutureWrite<impl Future<Output = Result<()>>> {
let path = normalize_path(path);
let bs = bs.into();
Expand Down
3 changes: 1 addition & 2 deletions core/src/types/operator/operator_futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::future::IntoFuture;
use std::ops::RangeBounds;
use std::time::Duration;

use bytes::Bytes;
use flagset::FlagSet;
use futures::Future;

Expand Down Expand Up @@ -243,7 +242,7 @@ impl<F: Future<Output = Result<Reader>>> FutureReader<F> {
/// Future that generated by [`Operator::write_with`].
///
/// Users can add more options by public functions provided by this struct.
pub type FutureWrite<F> = OperatorFuture<(OpWrite, Bytes), (), F>;
pub type FutureWrite<F> = OperatorFuture<(OpWrite, Buffer), (), F>;

impl<F: Future<Output = Result<()>>> FutureWrite<F> {
/// Set the append mode of op.
Expand Down
72 changes: 69 additions & 3 deletions core/src/types/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,24 @@ impl Writer {
Ok(Writer { inner: w })
}

/// Write into inner writer.
pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
/// Write Buffer into inner writer.
pub async fn write(&mut self, bs: impl Into<Buffer>) -> Result<()> {
let mut bs = bs.into();
while !bs.is_empty() {
let n = self.inner.write(bs.clone().into()).await?;
let n = self.inner.write_dyn(bs.clone()).await?;
bs.advance(n);
}
Ok(())
}

/// Write bytes::Buf into inner writer.
pub async fn write_from(&mut self, bs: impl Buf) -> Result<()> {
let mut bs = bs;
let mut bs = Buffer::from(bs.copy_to_bytes(bs.remaining()));
while !bs.is_empty() {
let n = self.inner.write_dyn(bs.clone()).await?;
bs.advance(n);
}
Ok(())
}

Expand Down Expand Up @@ -385,3 +395,59 @@ impl io::Write for BlockingWriter {
Ok(())
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
use rand::rngs::ThreadRng;
use rand::Rng;
use rand::RngCore;

use crate::services;
use crate::Operator;

fn gen_random_bytes() -> Vec<u8> {
let mut rng = ThreadRng::default();
// Generate size between 1B..16MB.
let size = rng.gen_range(1..16 * 1024 * 1024);
let mut content = vec![0; size];
rng.fill_bytes(&mut content);
content
}

#[tokio::test]
async fn test_writer_write() {
let op = Operator::new(services::Memory::default()).unwrap().finish();
let path = "test_file";

let content = gen_random_bytes();
let mut writer = op.writer(path).await.unwrap();
writer
.write(content.clone())
.await
.expect("write must succeed");
writer.close().await.expect("close must succeed");

let buf = op.read(path).await.expect("read to end mut succeed");

assert_eq!(buf.to_bytes(), content);
}

#[tokio::test]
async fn test_writer_write_from() {
let op = Operator::new(services::Memory::default()).unwrap().finish();
let path = "test_file";

let content = gen_random_bytes();
let mut writer = op.writer(path).await.unwrap();
writer
.write_from(Bytes::from(content.clone()))
.await
.expect("write must succeed");
writer.close().await.expect("close must succeed");

let buf = op.read(path).await.expect("read to end mut succeed");

assert_eq!(buf.to_bytes(), content);
}
}
3 changes: 2 additions & 1 deletion core/tests/behavior/async_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ pub async fn test_write_with_empty_content(op: Operator) -> Result<()> {

let path = TEST_FIXTURE.new_file_path();

op.write(&path, vec![]).await?;
let bs: Vec<u8> = vec![];
op.write(&path, bs).await?;

let meta = op.stat(&path).await.expect("stat must succeed");
assert_eq!(meta.content_length(), 0);
Expand Down

0 comments on commit 1a99896

Please sign in to comment.