Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(services/azblob): support multi write for azblob #4181

Merged
merged 25 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,9 @@ impl Accessor for AzblobBackend {
read_with_override_content_disposition: true,

write: true,
write_can_empty: true,
write_can_append: true,
write_can_empty: true,
write_can_multi: true,
write_with_cache_control: true,
write_with_content_type: true,

Expand Down Expand Up @@ -631,7 +632,7 @@ impl Accessor for AzblobBackend {
let w = if args.append() {
AzblobWriters::Two(oio::AppendWriter::new(w))
} else {
AzblobWriters::One(oio::OneShotWriter::new(w))
AzblobWriters::One(oio::BlockWriter::new(w, args.concurrent()))
};

Ok((RpWrite::default(), w))
Expand Down
174 changes: 167 additions & 7 deletions core/src/services/azblob/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Write;
use std::time::Duration;

use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
use http::header::HeaderName;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
Expand All @@ -32,7 +29,13 @@ use http::Response;
use reqsign::AzureStorageCredential;
use reqsign::AzureStorageLoader;
use reqsign::AzureStorageSigner;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Write;
use std::time::Duration;
use uuid::Uuid;

use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -370,6 +373,118 @@ impl AzblobCore {
Ok(req)
}

pub fn azblob_put_block_request(
&self,
path: &str,
block_id: Uuid,
size: Option<u64>,
args: &OpWrite,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
// To be written as part of a blob, a block must have been successfully written to the server in an earlier Put Block operation.
// refer to https://learn.microsoft.com/en-us/rest/api/storageservices/put-block?tabs=microsoft-entra-id
let p = build_abs_path(&self.root, path);

let encoded_block_id: String =
percent_encode_path(&BASE64_STANDARD.encode(block_id.as_bytes()));
let url = format!(
"{}/{}/{}?comp=block&blockid={}",
self.endpoint,
self.container,
percent_encode_path(&p),
encoded_block_id,
);
let mut req = Request::put(&url);
// Set SSE headers.
req = self.insert_sse_headers(req);

if let Some(cache_control) = args.cache_control() {
req = req.header(constants::X_MS_BLOB_CACHE_CONTROL, cache_control);
}
if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size)
}

if let Some(ty) = args.content_type() {
req = req.header(CONTENT_TYPE, ty)
}
// Set body
let req = req.body(body).map_err(new_request_build_error)?;

Ok(req)
}

pub async fn azblob_put_block(
&self,
path: &str,
block_id: Uuid,
size: Option<u64>,
args: &OpWrite,
body: AsyncBody,
) -> Result<Response<IncomingAsyncBody>> {
let mut req = self.azblob_put_block_request(path, block_id, size, args, body)?;

self.sign(&mut req).await?;
self.send(req).await
}

pub async fn azblob_complete_put_block_list_request(
&self,
path: &str,
block_ids: Vec<Uuid>,
args: &OpWrite,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}/{}/{}?comp=blocklist",
self.endpoint,
self.container,
percent_encode_path(&p),
);

let req = Request::put(&url);

// Set SSE headers.
let mut req = self.insert_sse_headers(req);
if let Some(cache_control) = args.cache_control() {
req = req.header(constants::X_MS_BLOB_CACHE_CONTROL, cache_control);
}

let content = quick_xml::se::to_string(&PutBlockListRequest {
latest: block_ids
.into_iter()
.map(|block_id| {
let encoded_block_id: String = BASE64_STANDARD.encode(block_id.as_bytes());
encoded_block_id
})
.collect(),
})
.map_err(new_xml_deserialize_error)?;

req = req.header(CONTENT_LENGTH, content.len());

let req = req
.body(AsyncBody::Bytes(Bytes::from(content)))
.map_err(new_request_build_error)?;

Ok(req)
}

pub async fn azblob_complete_put_block_list(
&self,
path: &str,
block_ids: Vec<Uuid>,
args: &OpWrite,
) -> Result<Response<IncomingAsyncBody>> {
let mut req = self
.azblob_complete_put_block_list_request(path, block_ids, args)
.await?;

self.sign(&mut req).await?;

self.send(req).await
}

pub fn azblob_head_blob_request(
&self,
path: &str,
Expand Down Expand Up @@ -533,6 +648,13 @@ impl AzblobCore {
}
}

/// Request of PutBlockListRequest
#[derive(Default, Debug, Serialize, Deserialize)]
#[serde(default, rename = "BlockList", rename_all = "PascalCase")]
pub struct PutBlockListRequest {
pub latest: Vec<String>,
}

#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
pub struct ListBlobsOutput {
Expand Down Expand Up @@ -761,4 +883,42 @@ mod tests {

de::from_reader(Bytes::from(bs).reader()).expect("must success")
}

/// This example is from https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id
#[test]
fn test_serialize_put_block_list_request() {
let req = PutBlockListRequest {
latest: vec!["1".to_string(), "2".to_string(), "3".to_string()],
};

let actual = quick_xml::se::to_string(&req).expect("must succeed");

pretty_assertions::assert_eq!(
actual,
r#"
<BlockList>
<Latest>1</Latest>
<Latest>2</Latest>
<Latest>3</Latest>
</BlockList>"#
// Cleanup space and new line
.replace([' ', '\n'], "")
// Escape `"` by hand to address <https://github.com/tafia/quick-xml/issues/362>
.replace('"', "&quot;")
);

let bs = "<?xml version=\"1.0\" encoding=\"utf-8\"?>
<BlockList>
<Latest>1</Latest>
<Latest>2</Latest>
<Latest>3</Latest>
</BlockList>";

let out: PutBlockListRequest =
de::from_reader(Bytes::from(bs).reader()).expect("must success");
assert_eq!(
out.latest,
vec!["1".to_string(), "2".to_string(), "3".to_string()]
);
}
}
94 changes: 65 additions & 29 deletions core/src/services/azblob/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use http::StatusCode;
use uuid::Uuid;

use super::core::AzblobCore;
use super::error::parse_error;
Expand All @@ -27,7 +28,7 @@ use crate::*;

const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";

pub type AzblobWriters = TwoWays<oio::OneShotWriter<AzblobWriter>, oio::AppendWriter<AzblobWriter>>;
pub type AzblobWriters = TwoWays<oio::BlockWriter<AzblobWriter>, oio::AppendWriter<AzblobWriter>>;

pub struct AzblobWriter {
core: Arc<AzblobCore>,
Expand All @@ -42,34 +43,6 @@ impl AzblobWriter {
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl oio::OneShotWrite for AzblobWriter {
async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
let mut req = self.core.azblob_put_blob_request(
&self.path,
Some(bs.len() as u64),
&self.op,
AsyncBody::ChunkedBytes(bs),
)?;

self.core.sign(&mut req).await?;

let resp = self.core.send(req).await?;

let status = resp.status();

match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
Ok(())
}
_ => Err(parse_error(resp).await?),
}
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl oio::AppendWrite for AzblobWriter {
Expand Down Expand Up @@ -137,3 +110,66 @@ impl oio::AppendWrite for AzblobWriter {
}
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[async_trait]
impl oio::BlockWrite for AzblobWriter {
async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
let mut req: http::Request<AsyncBody> =
self.core
.azblob_put_blob_request(&self.path, Some(size), &self.op, body)?;
self.core.sign(&mut req).await?;

let resp = self.core.send(req).await?;

let status = resp.status();

match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
Ok(())
}
_ => Err(parse_error(resp).await?),
}
}

async fn write_block(&self, block_id: Uuid, size: u64, body: AsyncBody) -> Result<()> {
let resp = self
.core
.azblob_put_block(&self.path, block_id, Some(size), &self.op, body)
.await?;

let status = resp.status();
match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
Ok(())
}
_ => Err(parse_error(resp).await?),
}
}

async fn complete_block(&self, block_ids: Vec<Uuid>) -> Result<()> {
let resp = self
.core
.azblob_complete_put_block_list(&self.path, block_ids, &self.op)
.await?;

let status = resp.status();
match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
Ok(())
}
_ => Err(parse_error(resp).await?),
}
}

async fn abort_block(&self, _block_ids: Vec<Uuid>) -> Result<()> {
// refer to https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id
// Any uncommitted blocks are garbage collected if there are no successful calls to Put Block or Put Block List on the blob within a week.
// If Put Blob is called on the blob, any uncommitted blocks are garbage collected.
Ok(())
}
}
Loading