diff --git a/core/src/raw/oio/write/block_write.rs b/core/src/raw/oio/write/block_write.rs new file mode 100644 index 000000000000..60e01571b47a --- /dev/null +++ b/core/src/raw/oio/write/block_write.rs @@ -0,0 +1,250 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::pin::Pin; +use std::sync::Arc; +use std::task::ready; +use std::task::Context; +use std::task::Poll; + +use async_trait::async_trait; +use futures::Future; +use futures::FutureExt; +use futures::StreamExt; + +use crate::raw::*; +use crate::*; + +/// BlockWrite is used to implement [`Write`] based on block +/// uploads. By implementing BlockWrite, services don't need to +/// care about the details of uploading blocks. +/// +/// # Architecture +/// +/// The architecture after adopting [`BlockWrite`]: +/// +/// - Services impl `BlockWrite` +/// - `BlockWriter` impl `Write` +/// - Expose `BlockWriter` as `Accessor::Writer` +/// +/// # Notes +/// +/// `BlockWrite` has an oneshot optimization when `write` has been called only once: +/// +/// ```no_build +/// w.write(bs).await?; +/// w.close().await?; +/// ``` +/// +/// We will use `write_once` instead of starting a new block upload. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait BlockWrite: Send + Sync + Unpin + 'static { + /// write_once is used to write the data to underlying storage at once. + /// + /// BlockWriter will call this API when: + /// + /// - All the data has been written to the buffer and we can perform the upload at once. + async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()>; + + /// write_block will write a block of the data and returns the result + /// [`Block`]. + /// + /// BlockWriter will call this API and stores the result in + /// order. + /// + /// - block_id is the id of the block. + async fn write_block(&self, size: u64, block_id: String, body: AsyncBody) -> Result<()>; + + /// complete_block will complete the block upload to build the final + /// file. + async fn complete_block(&self, block_ids: Vec) -> Result<()>; + + /// abort_block will cancel the block upload and purge all data. + async fn abort_block(&self, block_ids: Vec) -> Result<()>; +} + +struct WriteBlockFuture(BoxedFuture>); + +/// # Safety +/// +/// wasm32 is a special target that we only have one event-loop for this WriteBlockFuture. +unsafe impl Send for WriteBlockFuture {} + +/// # Safety +/// +/// We will only take `&mut Self` reference for WriteBlockFuture. +unsafe impl Sync for WriteBlockFuture {} + +impl Future for WriteBlockFuture { + type Output = Result<()>; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.get_mut().0.poll_unpin(cx) + } +} + +/// BlockWriter will implements [`Write`] based on block +/// uploads. +pub struct BlockWriter { + state: State, + w: Arc, + + block_ids: Vec, + cache: Option, + futures: ConcurrentFutures, +} + +enum State { + Idle, + Close(BoxedFuture>), + Abort(BoxedFuture>), +} + +/// # Safety +/// +/// wasm32 is a special target that we only have one event-loop for this state. +unsafe impl Send for State {} +/// # Safety +/// +/// We will only take `&mut Self` reference for State. +unsafe impl Sync for State {} + +impl BlockWriter { + /// Create a new BlockWriter. + pub fn new(inner: W, concurrent: usize) -> Self { + Self { + state: State::Idle, + + w: Arc::new(inner), + block_ids: Vec::new(), + cache: None, + futures: ConcurrentFutures::new(1.max(concurrent)), + } + } + + fn fill_cache(&mut self, bs: &dyn oio::WriteBuf) -> usize { + let size = bs.remaining(); + let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)); + assert!(self.cache.is_none()); + self.cache = Some(bs); + size + } +} + +impl oio::Write for BlockWriter +where + W: BlockWrite, +{ + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { + loop { + match &mut self.state { + State::Idle => { + if self.futures.has_remaining() { + let cache = self.cache.take().expect("pending write must exist"); + let block_id = uuid::Uuid::new_v4().to_string(); + self.block_ids.push(block_id.clone()); + let w = self.w.clone(); + let size = cache.len(); + self.futures.push(WriteBlockFuture(Box::pin(async move { + w.write_block(size as u64, block_id, AsyncBody::ChunkedBytes(cache)) + .await + }))); + let size = self.fill_cache(bs); + return Poll::Ready(Ok(size)); + } else { + ready!(self.futures.poll_next_unpin(cx)); + } + } + State::Close(_) => { + unreachable!("BlockWriter must not go into State::Close during poll_write") + } + State::Abort(_) => { + unreachable!("BlockWriter must not go into State::Abort during poll_write") + } + } + } + } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + match &mut self.state { + State::Idle => { + let w = self.w.clone(); + let block_ids = self.block_ids.clone(); + if self.futures.is_empty() && self.cache.is_none() { + self.state = + State::Close(Box::pin( + async move { w.complete_block(block_ids).await }, + )); + } else { + if self.futures.has_remaining() { + if let Some(cache) = self.cache.take() { + let block_id = uuid::Uuid::new_v4().to_string(); + self.block_ids.push(block_id.clone()); + let size = cache.len(); + let w = self.w.clone(); + self.futures.push(WriteBlockFuture(Box::pin(async move { + w.write_block( + size as u64, + block_id, + AsyncBody::ChunkedBytes(cache), + ) + .await + }))); + } + } + while ready!(self.futures.poll_next_unpin(cx)).is_some() {} + } + } + State::Close(fut) => { + let res = futures::ready!(fut.as_mut().poll(cx)); + self.state = State::Idle; + // We should check res first before clean up cache. + res?; + self.cache = None; + + return Poll::Ready(Ok(())); + } + State::Abort(_) => { + unreachable!("BlockWriter must not go into State::Abort during poll_close") + } + } + } + } + + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + match &mut self.state { + State::Idle => { + let w = self.w.clone(); + let block_ids = self.block_ids.clone(); + self.futures.clear(); + self.state = + State::Abort(Box::pin(async move { w.abort_block(block_ids).await })); + } + State::Abort(fut) => { + let res = futures::ready!(fut.as_mut().poll(cx)); + self.state = State::Idle; + return Poll::Ready(res); + } + State::Close(_) => { + unreachable!("BlockWriter must not go into State::Close during poll_abort") + } + } + } + } +} diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs index 540d8d009783..e2fdf70e47d2 100644 --- a/core/src/raw/oio/write/mod.rs +++ b/core/src/raw/oio/write/mod.rs @@ -42,3 +42,7 @@ pub use exact_buf_write::ExactBufWriter; mod range_write; pub use range_write::RangeWrite; pub use range_write::RangeWriter; + +mod block_write; +pub use block_write::BlockWrite; +pub use block_write::BlockWriter;