Skip to content

Commit

Permalink
Merge remote-tracking branch 'refs/remotes/origin/fix-mongo' into fix…
Browse files Browse the repository at this point in the history
…-mongo
  • Loading branch information
Xuanwo committed Jan 17, 2025
2 parents 1982448 + 6fde936 commit 459b781
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 18 deletions.
1 change: 1 addition & 0 deletions .github/scripts/test_behavior/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def calculate_hint(changed_files: list[str]) -> Hint:
and not p.startswith("core/edge/")
and not p.startswith("core/fuzz/")
and not p.startswith("core/src/services/")
and not p.startswith("core/src/docs/")
):
hint.core = True
hint.binding_java = True
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[![](https://img.shields.io/github/discussions/apache/opendal)](https://github.com/apache/opendal/discussions)
[![](https://img.shields.io/discord/1081052318650339399?logo=discord&label=discord)](https://opendal.apache.org/discord)

OpenDAL is an Open Data Access Layer that enables seamless interaction with diverse storage services.
OpenDAL (`/ˈoʊ.pən.dæl/`, pronounced "OH-puhn-dal") is an Open Data Access Layer that enables seamless interaction with diverse storage services.

OpenDAL's development is guided by its vision of **One Layer, All Storage** and its core principles: **Open Community**, **Solid Foundation**, **Fast Access**, **Object Storage First**, and **Extensible Architecture**. Read the explained vision at [OpenDAL Vision](https://opendal.apache.org/vision).

Expand Down
6 changes: 3 additions & 3 deletions core/benches/vs_s3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ fn bench_read(c: &mut Criterion, op: Operator, s3_client: aws_sdk_s3::Client, bu
let mut group = c.benchmark_group("read");
group.throughput(criterion::Throughput::Bytes(16 * 1024 * 1024));

TEST_RUNTIME.block_on(prepare(op.clone()));
TEST_RUNTIME.block_on(prepare(&op));

group.bench_function("opendal_s3_reader", |b| {
b.to_async(&*TEST_RUNTIME).iter(|| async {
Expand Down Expand Up @@ -118,10 +118,10 @@ fn bench_read(c: &mut Criterion, op: Operator, s3_client: aws_sdk_s3::Client, bu
group.finish()
}

async fn prepare(op: Operator) {
async fn prepare(op: &Operator) {
let mut rng = thread_rng();
let mut content = vec![0; 16 * 1024 * 1024];
rng.fill_bytes(&mut content);

op.write("file", content.clone()).await.unwrap();
op.write("file", content).await.unwrap();
}
121 changes: 121 additions & 0 deletions core/src/docs/rfcs/5556_write_returns_metadata.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
- Proposal Name: `write_returns_metadata`
- Start Date: 2025-01-16
- RFC PR: [apache/opendal#5556](https://github.com/apache/opendal/pull/5556)
- Tracking Issue: [apache/opendal#5557](https://github.com/apache/opendal/issues/5557)

# Summary

Enhance write operations by returning metadata after successful writes.

# Motivation

Currently, write operations (`write`, `write_with`, `writer`, `writer_with`) only return `Result<()>` or `Result<Writer>`.
Users who need metadata after writing (like `ETag` or `version_id`) must make an additional `stat()` call. This is inefficient
and can lead to race conditions if the file is modified between the write and stat operations.

Many storage services (like S3, GCS, Azure Blob) return metadata in their write responses. We should expose this information
to users directly after write operations.

# Guide-level explanation

The write operations will be enhanced to return metadata:

```rust
// Before
op.write("path/to/file", data).await?;
let meta = op.stat("path/to/file").await?;
if Some(etag) = meta.etag() {
println!("File ETag: {}", etag);
}

// After
let meta = op.write("path/to/file", data).await?;
if Some(etag) = meta.etag() {
println!("File ETag: {}", etag);
}
```

For writer operations:

```rust
// Before
let mut writer = op.writer("path/to/file").await?;
writer.write(data).await?;
writer.close().await?;
let meta = op.stat("path/to/file").await?;
if Some(etag) = meta.etag() {
println!("File ETag: {}", etag);
}

// After
let mut writer = op.writer("path/to/file").await?;
writer.write(data).await?;
let meta = writer.close().await?;
if Some(etag) = meta.etag() {
println!("File ETag: {}", etag);
}
```

The behavior remains unchanged if users don't need the metadata - they can simply ignore the return value.

# Reference-level explanation

## Changes to `Operator` API

The following functions will be modified to return `Result<Metadata>` instead of `Result<()>`:

- `write()`
- `write_with()`

The `writer()` and `writer_with()` return types remain unchanged as they return `Result<Writer>`.

## Changes to struct `Writer`

The `Writer` struct will be modified to return `Result<Metadata>` instead of `Result<()>` for the `close()` function.

## Changes to trait `oio::Write` and trait `oio::MultipartWrite`

The `Write` trait will be modified to return `Result<Metadata>` instead of `Result<()>` for the `close()` function.

The `MultipartWrite` trait will be modified to return `Result<Metadata>` instead of `Result<()>` for the `complete_part()`
and `write_once` functions.

## Implementation Details

For services that return metadata in their write responses:
- The metadata will be captured from the service response
- All available fields (etag, version_id, etc.) will be populated

For services that don't return metadata in write responses:
- for `fs`: we can use `stat` to retrieve the metadata before returning. since the metadata is cached by the kernel,
this won't cause a performance issue.
- for other services: A default metadata object will be returned.


# Drawbacks

- Minor breaking change for users who explicitly type the return value of write operations
- Additional complexity in the Writer implementation

# Rationale and alternatives

- Provides a clean, consistent API
- Maintains backward compatibility for users who ignore the return value
- Improves performance by avoiding additional stat calls when possible

# Prior art

Similar patterns exist in other storage SDKs:

- `object_store` crate returns metadata in `PutResult` after calling `put_opts`
- AWS SDK returns metadata in `PutObjectOutput`
- Azure SDK returns `UploadFileResponse` after uploads

# Unresolved questions

- None


# Future possibilities

- None
4 changes: 4 additions & 0 deletions core/src/docs/rfcs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,7 @@ pub mod rfc_5485_conditional_reader {}
/// List With Deleted
#[doc = include_str!("5495_list_with_deleted.md")]
pub mod rfc_5495_list_with_deleted {}

/// Write Returns Metadata
#[doc = include_str!("5556_write_returns_metadata.md")]
pub mod rfc_5556_write_returns_metadata {}
14 changes: 7 additions & 7 deletions core/src/layers/otelmetrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::*;
/// # fn main() -> Result<()> {
/// let meter = opentelemetry::global::meter("opendal");
/// let _ = Operator::new(services::Memory::default())?
/// .layer(OtelMetricsLayer::builder().register(meter))
/// .layer(OtelMetricsLayer::builder().register(&meter))
/// .finish();
/// Ok(())
/// # }
Expand Down Expand Up @@ -69,7 +69,7 @@ impl OtelMetricsLayer {
/// # async fn main() -> Result<()> {
/// let meter = opentelemetry::global::meter("opendal");
/// let op = Operator::new(services::Memory::default())?
/// .layer(OtelMetricsLayer::builder().path_label(1).register(meter))
/// .layer(OtelMetricsLayer::builder().path_label(1).register(&meter))
/// .finish();
///
/// Ok(())
Expand Down Expand Up @@ -115,7 +115,7 @@ impl OtelMetricsLayerBuilder {
/// # async fn main() -> Result<()> {
/// let meter = opentelemetry::global::meter("opendal");
/// let op = Operator::new(services::Memory::default())?
/// .layer(OtelMetricsLayer::builder().path_label(1).register(meter))
/// .layer(OtelMetricsLayer::builder().path_label(1).register(&meter))
/// .finish();
/// debug!("operator: {op:?}");
///
Expand Down Expand Up @@ -145,7 +145,7 @@ impl OtelMetricsLayerBuilder {
/// .layer(
/// OtelMetricsLayer::builder()
/// .operation_duration_seconds_boundaries(vec![0.01, 0.02, 0.05, 0.1, 0.2, 0.5])
/// .register(meter)
/// .register(&meter)
/// )
/// .finish();
/// debug!("operator: {op:?}");
Expand Down Expand Up @@ -178,7 +178,7 @@ impl OtelMetricsLayerBuilder {
/// .layer(
/// OtelMetricsLayer::builder()
/// .operation_bytes_boundaries(vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0])
/// .register(meter)
/// .register(&meter)
/// )
/// .finish();
/// debug!("operator: {op:?}");
Expand Down Expand Up @@ -207,13 +207,13 @@ impl OtelMetricsLayerBuilder {
/// # async fn main() -> Result<()> {
/// let meter = opentelemetry::global::meter("opendal");
/// let op = Operator::new(services::Memory::default())?
/// .layer(OtelMetricsLayer::builder().register(meter))
/// .layer(OtelMetricsLayer::builder().register(&meter))
/// .finish();
///
/// Ok(())
/// # }
/// ```
pub fn register(self, meter: Meter) -> OtelMetricsLayer {
pub fn register(self, meter: &Meter) -> OtelMetricsLayer {
let duration_seconds = meter
.f64_histogram("opendal.operation.duration")
.with_description("Duration of operations")
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/http_util/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ where
.with_operation("http_util::parse_header_to_str")
})?;

let value = if let Some(v) = headers.get(name.clone()) {
let value = if let Some(v) = headers.get(&name) {
v
} else {
return Ok(None);
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ pub fn get_basename(path: &str) -> &str {
if !path.ends_with('/') {
return path
.split('/')
.last()
.next_back()
.expect("file path without name is invalid");
}

Expand Down
7 changes: 4 additions & 3 deletions core/src/services/azblob/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ impl Debug for AzblobError {

/// Parse error response into Error.
pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
let (parts, mut body) = resp.into_parts();
let bs = body.copy_to_bytes(body.remaining());
let (parts, body) = resp.into_parts();
let bs = body.to_bytes();

let (kind, retryable) = match parts.status {
StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
Expand All @@ -76,7 +76,8 @@ pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
_ => (ErrorKind::Unexpected, false),
};

let mut message = match de::from_reader::<_, AzblobError>(bs.clone().reader()) {
let bs_content = bs.chunk();
let mut message = match de::from_reader::<_, AzblobError>(bs_content.reader()) {
Ok(azblob_err) => format!("{azblob_err:?}"),
Err(_) => String::from_utf8_lossy(&bs).into_owned(),
};
Expand Down
1 change: 1 addition & 0 deletions core/src/services/gcs/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
(ErrorKind::ConditionNotMatch, false)
}
StatusCode::TOO_MANY_REQUESTS => (ErrorKind::RateLimited, true),
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
Expand Down
6 changes: 4 additions & 2 deletions core/src/services/s3/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub(crate) struct S3Error {
/// Parse error response into Error.
pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
let (parts, body) = resp.into_parts();
let bs = body.to_bytes();

let (mut kind, mut retryable) = match parts.status.as_u16() {
403 => (ErrorKind::PermissionDenied, false),
Expand All @@ -49,9 +50,10 @@ pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
_ => (ErrorKind::Unexpected, false),
};

let (message, s3_err) = de::from_reader::<_, S3Error>(body.clone().reader())
let body_content = bs.chunk();
let (message, s3_err) = de::from_reader::<_, S3Error>(body_content.reader())
.map(|s3_err| (format!("{s3_err:?}"), Some(s3_err)))
.unwrap_or_else(|_| (String::from_utf8_lossy(body.chunk()).into_owned(), None));
.unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));

if let Some(s3_err) = s3_err {
(kind, retryable) = parse_s3_error_code(s3_err.code.as_str()).unwrap_or((kind, retryable));
Expand Down

0 comments on commit 459b781

Please sign in to comment.