From 78b6a9f26dbeb5118990d3140f1c636d8e54e719 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 15 Jan 2025 17:28:30 +0800 Subject: [PATCH 1/6] feat(gcs): Convert TOO_MANY_REQUESTS to retryable Ratelimited (#5551) --- core/src/services/gcs/error.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/services/gcs/error.rs b/core/src/services/gcs/error.rs index 1410c4fa0d3c..187f0de5be1d 100644 --- a/core/src/services/gcs/error.rs +++ b/core/src/services/gcs/error.rs @@ -58,6 +58,7 @@ pub(super) fn parse_error(resp: Response) -> Error { StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => { (ErrorKind::ConditionNotMatch, false) } + StatusCode::TOO_MANY_REQUESTS => (ErrorKind::RateLimited, true), StatusCode::INTERNAL_SERVER_ERROR | StatusCode::BAD_GATEWAY | StatusCode::SERVICE_UNAVAILABLE From abfbf1587bfe6dbeea1659c0392b61190c154287 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 15 Jan 2025 17:53:57 +0800 Subject: [PATCH 2/6] docs: Add docs on how to pronounce opendal (#5552) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index db39c5e230eb..ac820226dbfe 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [![](https://img.shields.io/github/discussions/apache/opendal)](https://github.com/apache/opendal/discussions) [![](https://img.shields.io/discord/1081052318650339399?logo=discord&label=discord)](https://opendal.apache.org/discord) -OpenDAL is an Open Data Access Layer that enables seamless interaction with diverse storage services. +OpenDAL (`/ˈoʊ.pən.dæl/`, pronounced "OH-puhn-dal") is an Open Data Access Layer that enables seamless interaction with diverse storage services. OpenDAL's development is guided by its vision of **One Layer, All Storage** and its core principles: **Open Community**, **Solid Foundation**, **Fast Access**, **Object Storage First**, and **Extensible Architecture**. Read the explained vision at [OpenDAL Vision](https://opendal.apache.org/vision). From ef94a688408d00ce2bda99f2beab669e57063c9c Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Thu, 16 Jan 2025 00:59:36 +0800 Subject: [PATCH 3/6] chore(layer/otelmetrics): take meter by reference (#5553) --- core/src/layers/otelmetrics.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/layers/otelmetrics.rs b/core/src/layers/otelmetrics.rs index 55702928e95c..2cfea4e87a5c 100644 --- a/core/src/layers/otelmetrics.rs +++ b/core/src/layers/otelmetrics.rs @@ -40,7 +40,7 @@ use crate::*; /// # fn main() -> Result<()> { /// let meter = opentelemetry::global::meter("opendal"); /// let _ = Operator::new(services::Memory::default())? -/// .layer(OtelMetricsLayer::builder().register(meter)) +/// .layer(OtelMetricsLayer::builder().register(&meter)) /// .finish(); /// Ok(()) /// # } @@ -69,7 +69,7 @@ impl OtelMetricsLayer { /// # async fn main() -> Result<()> { /// let meter = opentelemetry::global::meter("opendal"); /// let op = Operator::new(services::Memory::default())? - /// .layer(OtelMetricsLayer::builder().path_label(1).register(meter)) + /// .layer(OtelMetricsLayer::builder().path_label(1).register(&meter)) /// .finish(); /// /// Ok(()) @@ -115,7 +115,7 @@ impl OtelMetricsLayerBuilder { /// # async fn main() -> Result<()> { /// let meter = opentelemetry::global::meter("opendal"); /// let op = Operator::new(services::Memory::default())? - /// .layer(OtelMetricsLayer::builder().path_label(1).register(meter)) + /// .layer(OtelMetricsLayer::builder().path_label(1).register(&meter)) /// .finish(); /// debug!("operator: {op:?}"); /// @@ -145,7 +145,7 @@ impl OtelMetricsLayerBuilder { /// .layer( /// OtelMetricsLayer::builder() /// .operation_duration_seconds_boundaries(vec![0.01, 0.02, 0.05, 0.1, 0.2, 0.5]) - /// .register(meter) + /// .register(&meter) /// ) /// .finish(); /// debug!("operator: {op:?}"); @@ -178,7 +178,7 @@ impl OtelMetricsLayerBuilder { /// .layer( /// OtelMetricsLayer::builder() /// .operation_bytes_boundaries(vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0]) - /// .register(meter) + /// .register(&meter) /// ) /// .finish(); /// debug!("operator: {op:?}"); @@ -207,13 +207,13 @@ impl OtelMetricsLayerBuilder { /// # async fn main() -> Result<()> { /// let meter = opentelemetry::global::meter("opendal"); /// let op = Operator::new(services::Memory::default())? - /// .layer(OtelMetricsLayer::builder().register(meter)) + /// .layer(OtelMetricsLayer::builder().register(&meter)) /// .finish(); /// /// Ok(()) /// # } /// ``` - pub fn register(self, meter: Meter) -> OtelMetricsLayer { + pub fn register(self, meter: &Meter) -> OtelMetricsLayer { let duration_seconds = meter .f64_histogram("opendal.operation.duration") .with_description("Duration of operations") From 8aac21fbc4f6aec3623edf2a1846fc9ee53f5cdb Mon Sep 17 00:00:00 2001 From: meteorgan Date: Thu, 16 Jan 2025 20:28:32 +0800 Subject: [PATCH 4/6] ci: skip running behavior tests when adding or modifying documentation (#5558) --- .github/scripts/test_behavior/plan.py | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/scripts/test_behavior/plan.py b/.github/scripts/test_behavior/plan.py index 0f32ec6e4f53..013f7d89882e 100755 --- a/.github/scripts/test_behavior/plan.py +++ b/.github/scripts/test_behavior/plan.py @@ -139,6 +139,7 @@ def calculate_hint(changed_files: list[str]) -> Hint: and not p.startswith("core/edge/") and not p.startswith("core/fuzz/") and not p.startswith("core/src/services/") + and not p.startswith("core/src/docs/") ): hint.core = True hint.binding_java = True From da155eeedfb59f74b37e1c4eebfa441dcf25ddc7 Mon Sep 17 00:00:00 2001 From: meteorgan Date: Thu, 16 Jan 2025 23:09:30 +0800 Subject: [PATCH 5/6] RFC-5556: Write Returns Metadata (#5556) --- .../docs/rfcs/5556_write_returns_metadata.md | 121 ++++++++++++++++++ core/src/docs/rfcs/mod.rs | 4 + 2 files changed, 125 insertions(+) create mode 100644 core/src/docs/rfcs/5556_write_returns_metadata.md diff --git a/core/src/docs/rfcs/5556_write_returns_metadata.md b/core/src/docs/rfcs/5556_write_returns_metadata.md new file mode 100644 index 000000000000..a381580e516a --- /dev/null +++ b/core/src/docs/rfcs/5556_write_returns_metadata.md @@ -0,0 +1,121 @@ +- Proposal Name: `write_returns_metadata` +- Start Date: 2025-01-16 +- RFC PR: [apache/opendal#5556](https://github.com/apache/opendal/pull/5556) +- Tracking Issue: [apache/opendal#5557](https://github.com/apache/opendal/issues/5557) + +# Summary + +Enhance write operations by returning metadata after successful writes. + +# Motivation + +Currently, write operations (`write`, `write_with`, `writer`, `writer_with`) only return `Result<()>` or `Result`. +Users who need metadata after writing (like `ETag` or `version_id`) must make an additional `stat()` call. This is inefficient +and can lead to race conditions if the file is modified between the write and stat operations. + +Many storage services (like S3, GCS, Azure Blob) return metadata in their write responses. We should expose this information +to users directly after write operations. + +# Guide-level explanation + +The write operations will be enhanced to return metadata: + +```rust +// Before +op.write("path/to/file", data).await?; +let meta = op.stat("path/to/file").await?; +if Some(etag) = meta.etag() { + println!("File ETag: {}", etag); +} + +// After +let meta = op.write("path/to/file", data).await?; +if Some(etag) = meta.etag() { + println!("File ETag: {}", etag); +} +``` + +For writer operations: + +```rust +// Before +let mut writer = op.writer("path/to/file").await?; +writer.write(data).await?; +writer.close().await?; +let meta = op.stat("path/to/file").await?; +if Some(etag) = meta.etag() { + println!("File ETag: {}", etag); +} + +// After +let mut writer = op.writer("path/to/file").await?; +writer.write(data).await?; +let meta = writer.close().await?; +if Some(etag) = meta.etag() { + println!("File ETag: {}", etag); +} +``` + +The behavior remains unchanged if users don't need the metadata - they can simply ignore the return value. + +# Reference-level explanation + +## Changes to `Operator` API + +The following functions will be modified to return `Result` instead of `Result<()>`: + +- `write()` +- `write_with()` + +The `writer()` and `writer_with()` return types remain unchanged as they return `Result`. + +## Changes to struct `Writer` + +The `Writer` struct will be modified to return `Result` instead of `Result<()>` for the `close()` function. + +## Changes to trait `oio::Write` and trait `oio::MultipartWrite` + +The `Write` trait will be modified to return `Result` instead of `Result<()>` for the `close()` function. + +The `MultipartWrite` trait will be modified to return `Result` instead of `Result<()>` for the `complete_part()` +and `write_once` functions. + +## Implementation Details + +For services that return metadata in their write responses: +- The metadata will be captured from the service response +- All available fields (etag, version_id, etc.) will be populated + +For services that don't return metadata in write responses: +- for `fs`: we can use `stat` to retrieve the metadata before returning. since the metadata is cached by the kernel, +this won't cause a performance issue. +- for other services: A default metadata object will be returned. + + +# Drawbacks + +- Minor breaking change for users who explicitly type the return value of write operations +- Additional complexity in the Writer implementation + +# Rationale and alternatives + +- Provides a clean, consistent API +- Maintains backward compatibility for users who ignore the return value +- Improves performance by avoiding additional stat calls when possible + +# Prior art + +Similar patterns exist in other storage SDKs: + +- `object_store` crate returns metadata in `PutResult` after calling `put_opts` +- AWS SDK returns metadata in `PutObjectOutput` +- Azure SDK returns `UploadFileResponse` after uploads + +# Unresolved questions + +- None + + +# Future possibilities + +- None \ No newline at end of file diff --git a/core/src/docs/rfcs/mod.rs b/core/src/docs/rfcs/mod.rs index 7dcb2bf8a4b0..4d4980e81782 100644 --- a/core/src/docs/rfcs/mod.rs +++ b/core/src/docs/rfcs/mod.rs @@ -256,3 +256,7 @@ pub mod rfc_5485_conditional_reader {} /// List With Deleted #[doc = include_str!("5495_list_with_deleted.md")] pub mod rfc_5495_list_with_deleted {} + +/// Write Returns Metadata +#[doc = include_str!("5556_write_returns_metadata.md")] +pub mod rfc_5556_write_returns_metadata {} From 66bf9db1a41b58afc03d877509e45b555067cfd6 Mon Sep 17 00:00:00 2001 From: yihong Date: Fri, 17 Jan 2025 09:58:17 +0800 Subject: [PATCH 6/6] refactor: refactor some unnecessary clone and use next_back to make clippy happy (#5554) --- core/benches/vs_s3/src/main.rs | 6 +++--- core/src/raw/http_util/header.rs | 2 +- core/src/raw/path.rs | 2 +- core/src/services/azblob/error.rs | 7 ++++--- core/src/services/s3/error.rs | 6 ++++-- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/core/benches/vs_s3/src/main.rs b/core/benches/vs_s3/src/main.rs index 91b1be90495f..324b2bbad98c 100644 --- a/core/benches/vs_s3/src/main.rs +++ b/core/benches/vs_s3/src/main.rs @@ -69,7 +69,7 @@ fn bench_read(c: &mut Criterion, op: Operator, s3_client: aws_sdk_s3::Client, bu let mut group = c.benchmark_group("read"); group.throughput(criterion::Throughput::Bytes(16 * 1024 * 1024)); - TEST_RUNTIME.block_on(prepare(op.clone())); + TEST_RUNTIME.block_on(prepare(&op)); group.bench_function("opendal_s3_reader", |b| { b.to_async(&*TEST_RUNTIME).iter(|| async { @@ -118,10 +118,10 @@ fn bench_read(c: &mut Criterion, op: Operator, s3_client: aws_sdk_s3::Client, bu group.finish() } -async fn prepare(op: Operator) { +async fn prepare(op: &Operator) { let mut rng = thread_rng(); let mut content = vec![0; 16 * 1024 * 1024]; rng.fill_bytes(&mut content); - op.write("file", content.clone()).await.unwrap(); + op.write("file", content).await.unwrap(); } diff --git a/core/src/raw/http_util/header.rs b/core/src/raw/http_util/header.rs index 2da77a4b08dc..2db92e9efcad 100644 --- a/core/src/raw/http_util/header.rs +++ b/core/src/raw/http_util/header.rs @@ -129,7 +129,7 @@ where .with_operation("http_util::parse_header_to_str") })?; - let value = if let Some(v) = headers.get(name.clone()) { + let value = if let Some(v) = headers.get(&name) { v } else { return Ok(None); diff --git a/core/src/raw/path.rs b/core/src/raw/path.rs index fb3c4ad07abc..79b44951495b 100644 --- a/core/src/raw/path.rs +++ b/core/src/raw/path.rs @@ -157,7 +157,7 @@ pub fn get_basename(path: &str) -> &str { if !path.ends_with('/') { return path .split('/') - .last() + .next_back() .expect("file path without name is invalid"); } diff --git a/core/src/services/azblob/error.rs b/core/src/services/azblob/error.rs index 1ea38ad8755e..c67d805c035e 100644 --- a/core/src/services/azblob/error.rs +++ b/core/src/services/azblob/error.rs @@ -60,8 +60,8 @@ impl Debug for AzblobError { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), @@ -76,7 +76,8 @@ pub(super) fn parse_error(resp: Response) -> Error { _ => (ErrorKind::Unexpected, false), }; - let mut message = match de::from_reader::<_, AzblobError>(bs.clone().reader()) { + let bs_content = bs.chunk(); + let mut message = match de::from_reader::<_, AzblobError>(bs_content.reader()) { Ok(azblob_err) => format!("{azblob_err:?}"), Err(_) => String::from_utf8_lossy(&bs).into_owned(), }; diff --git a/core/src/services/s3/error.rs b/core/src/services/s3/error.rs index 503ca4ecbd77..385b75fac109 100644 --- a/core/src/services/s3/error.rs +++ b/core/src/services/s3/error.rs @@ -37,6 +37,7 @@ pub(crate) struct S3Error { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (mut kind, mut retryable) = match parts.status.as_u16() { 403 => (ErrorKind::PermissionDenied, false), @@ -49,9 +50,10 @@ pub(super) fn parse_error(resp: Response) -> Error { _ => (ErrorKind::Unexpected, false), }; - let (message, s3_err) = de::from_reader::<_, S3Error>(body.clone().reader()) + let body_content = bs.chunk(); + let (message, s3_err) = de::from_reader::<_, S3Error>(body_content.reader()) .map(|s3_err| (format!("{s3_err:?}"), Some(s3_err))) - .unwrap_or_else(|_| (String::from_utf8_lossy(body.chunk()).into_owned(), None)); + .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None)); if let Some(s3_err) = s3_err { (kind, retryable) = parse_s3_error_code(s3_err.code.as_str()).unwrap_or((kind, retryable));