diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index 1a24335da846..65a25bcf1950 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -567,8 +567,9 @@ impl Accessor for AzblobBackend { read_with_override_content_disposition: true, write: true, - write_can_empty: true, write_can_append: true, + write_can_empty: true, + write_can_multi: true, write_with_cache_control: true, write_with_content_type: true, @@ -631,7 +632,7 @@ impl Accessor for AzblobBackend { let w = if args.append() { AzblobWriters::Two(oio::AppendWriter::new(w)) } else { - AzblobWriters::One(oio::OneShotWriter::new(w)) + AzblobWriters::One(oio::BlockWriter::new(w, args.concurrent())) }; Ok((RpWrite::default(), w)) diff --git a/core/src/services/azblob/core.rs b/core/src/services/azblob/core.rs index 64420683dc34..c0c6790b17e2 100644 --- a/core/src/services/azblob/core.rs +++ b/core/src/services/azblob/core.rs @@ -15,12 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::fmt; -use std::fmt::Debug; -use std::fmt::Formatter; -use std::fmt::Write; -use std::time::Duration; - +use base64::prelude::BASE64_STANDARD; +use base64::Engine; +use bytes::Bytes; use http::header::HeaderName; use http::header::CONTENT_LENGTH; use http::header::CONTENT_TYPE; @@ -32,7 +29,13 @@ use http::Response; use reqsign::AzureStorageCredential; use reqsign::AzureStorageLoader; use reqsign::AzureStorageSigner; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; +use std::fmt; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::fmt::Write; +use std::time::Duration; +use uuid::Uuid; use crate::raw::*; use crate::*; @@ -370,6 +373,118 @@ impl AzblobCore { Ok(req) } + pub fn azblob_put_block_request( + &self, + path: &str, + block_id: Uuid, + size: Option, + args: &OpWrite, + body: AsyncBody, + ) -> Result> { + // To be written as part of a blob, a block must have been successfully written to the server in an earlier Put Block operation. + // refer to https://learn.microsoft.com/en-us/rest/api/storageservices/put-block?tabs=microsoft-entra-id + let p = build_abs_path(&self.root, path); + + let encoded_block_id: String = + percent_encode_path(&BASE64_STANDARD.encode(block_id.as_bytes())); + let url = format!( + "{}/{}/{}?comp=block&blockid={}", + self.endpoint, + self.container, + percent_encode_path(&p), + encoded_block_id, + ); + let mut req = Request::put(&url); + // Set SSE headers. + req = self.insert_sse_headers(req); + + if let Some(cache_control) = args.cache_control() { + req = req.header(constants::X_MS_BLOB_CACHE_CONTROL, cache_control); + } + if let Some(size) = size { + req = req.header(CONTENT_LENGTH, size) + } + + if let Some(ty) = args.content_type() { + req = req.header(CONTENT_TYPE, ty) + } + // Set body + let req = req.body(body).map_err(new_request_build_error)?; + + Ok(req) + } + + pub async fn azblob_put_block( + &self, + path: &str, + block_id: Uuid, + size: Option, + args: &OpWrite, + body: AsyncBody, + ) -> Result> { + let mut req = self.azblob_put_block_request(path, block_id, size, args, body)?; + + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn azblob_complete_put_block_list_request( + &self, + path: &str, + block_ids: Vec, + args: &OpWrite, + ) -> Result> { + let p = build_abs_path(&self.root, path); + let url = format!( + "{}/{}/{}?comp=blocklist", + self.endpoint, + self.container, + percent_encode_path(&p), + ); + + let req = Request::put(&url); + + // Set SSE headers. + let mut req = self.insert_sse_headers(req); + if let Some(cache_control) = args.cache_control() { + req = req.header(constants::X_MS_BLOB_CACHE_CONTROL, cache_control); + } + + let content = quick_xml::se::to_string(&PutBlockListRequest { + latest: block_ids + .into_iter() + .map(|block_id| { + let encoded_block_id: String = BASE64_STANDARD.encode(block_id.as_bytes()); + encoded_block_id + }) + .collect(), + }) + .map_err(new_xml_deserialize_error)?; + + req = req.header(CONTENT_LENGTH, content.len()); + + let req = req + .body(AsyncBody::Bytes(Bytes::from(content))) + .map_err(new_request_build_error)?; + + Ok(req) + } + + pub async fn azblob_complete_put_block_list( + &self, + path: &str, + block_ids: Vec, + args: &OpWrite, + ) -> Result> { + let mut req = self + .azblob_complete_put_block_list_request(path, block_ids, args) + .await?; + + self.sign(&mut req).await?; + + self.send(req).await + } + pub fn azblob_head_blob_request( &self, path: &str, @@ -533,6 +648,13 @@ impl AzblobCore { } } +/// Request of PutBlockListRequest +#[derive(Default, Debug, Serialize, Deserialize)] +#[serde(default, rename = "BlockList", rename_all = "PascalCase")] +pub struct PutBlockListRequest { + pub latest: Vec, +} + #[derive(Default, Debug, Deserialize)] #[serde(default, rename_all = "PascalCase")] pub struct ListBlobsOutput { @@ -761,4 +883,42 @@ mod tests { de::from_reader(Bytes::from(bs).reader()).expect("must success") } + + /// This example is from https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id + #[test] + fn test_serialize_put_block_list_request() { + let req = PutBlockListRequest { + latest: vec!["1".to_string(), "2".to_string(), "3".to_string()], + }; + + let actual = quick_xml::se::to_string(&req).expect("must succeed"); + + pretty_assertions::assert_eq!( + actual, + r#" + + 1 + 2 + 3 + "# + // Cleanup space and new line + .replace([' ', '\n'], "") + // Escape `"` by hand to address + .replace('"', """) + ); + + let bs = " + + 1 + 2 + 3 + "; + + let out: PutBlockListRequest = + de::from_reader(Bytes::from(bs).reader()).expect("must success"); + assert_eq!( + out.latest, + vec!["1".to_string(), "2".to_string(), "3".to_string()] + ); + } } diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 6e8b415dd987..e0cbc72f9aed 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use async_trait::async_trait; use http::StatusCode; +use uuid::Uuid; use super::core::AzblobCore; use super::error::parse_error; @@ -27,7 +28,7 @@ use crate::*; const X_MS_BLOB_TYPE: &str = "x-ms-blob-type"; -pub type AzblobWriters = TwoWays, oio::AppendWriter>; +pub type AzblobWriters = TwoWays, oio::AppendWriter>; pub struct AzblobWriter { core: Arc, @@ -42,34 +43,6 @@ impl AzblobWriter { } } -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl oio::OneShotWrite for AzblobWriter { - async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> { - let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining())); - let mut req = self.core.azblob_put_blob_request( - &self.path, - Some(bs.len() as u64), - &self.op, - AsyncBody::ChunkedBytes(bs), - )?; - - self.core.sign(&mut req).await?; - - let resp = self.core.send(req).await?; - - let status = resp.status(); - - match status { - StatusCode::CREATED | StatusCode::OK => { - resp.into_body().consume().await?; - Ok(()) - } - _ => Err(parse_error(resp).await?), - } - } -} - #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl oio::AppendWrite for AzblobWriter { @@ -137,3 +110,66 @@ impl oio::AppendWrite for AzblobWriter { } } } + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +#[async_trait] +impl oio::BlockWrite for AzblobWriter { + async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { + let mut req: http::Request = + self.core + .azblob_put_blob_request(&self.path, Some(size), &self.op, body)?; + self.core.sign(&mut req).await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::CREATED | StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn write_block(&self, block_id: Uuid, size: u64, body: AsyncBody) -> Result<()> { + let resp = self + .core + .azblob_put_block(&self.path, block_id, Some(size), &self.op, body) + .await?; + + let status = resp.status(); + match status { + StatusCode::CREATED | StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn complete_block(&self, block_ids: Vec) -> Result<()> { + let resp = self + .core + .azblob_complete_put_block_list(&self.path, block_ids, &self.op) + .await?; + + let status = resp.status(); + match status { + StatusCode::CREATED | StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn abort_block(&self, _block_ids: Vec) -> Result<()> { + // refer to https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id + // Any uncommitted blocks are garbage collected if there are no successful calls to Put Block or Put Block List on the blob within a week. + // If Put Blob is called on the blob, any uncommitted blocks are garbage collected. + Ok(()) + } +}