Skip to content

Commit

Permalink
feat(core): add Alluxio e2e test (#3573)
Browse files Browse the repository at this point in the history
  • Loading branch information
hoslo authored Nov 13, 2023
1 parent bec1e9d commit 47c47b7
Show file tree
Hide file tree
Showing 13 changed files with 241 additions and 53 deletions.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,6 @@ OPENDAL_GRIDFS_CONNECTION_STRING=mongodb://localhost:27017
OPENDAL_GRIDFS_DATABASE=<database>
OPENDAL_GRIDFS_BUCKET=<fs>
OPENDAL_GRIDFS_CHUNK_SIZE=<chunk_size>
# alluxio
OPENDAL_ALLUXIO_ENDPOINT=<endpoint>
OPENDAL_ALLUXIO_ROOT=/path/to/dor
35 changes: 35 additions & 0 deletions .github/services/alluxio/alluxio/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: alluxio
description: "Behavior test for Alluxio"

runs:
using: "composite"
steps:
- name: Setup Alluxio service
shell: bash
working-directory: fixtures/alluxio
run: |
docker compose -f docker-compose-alluxio.yml up -d --wait
- name: Set environment variables
shell: bash
run: |
cat << EOF >> $GITHUB_ENV
OPENDAL_ALLUXIO_ENDPOINT=http://127.0.0.1:39999
OPENDAL_ALLUXIO_ROOT=/
EOF
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ jobs:
shell: bash
run: |
FEATURES=(
services-alluxio
services-azblob
services-azdls
services-cacache
Expand Down
2 changes: 2 additions & 0 deletions bindings/java/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ services-all = [
"services-azfile",
"services-libsql",
"services-swift",
"services-alluxio",
]

# Default services provided by opendal.
Expand Down Expand Up @@ -135,6 +136,7 @@ services-swift = ["opendal/services-swift"]
services-tikv = ["opendal/services-tikv"]
services-vercel-artifacts = ["opendal/services-vercel-artifacts"]
services-wasabi = ["opendal/services-wasabi"]
services-alluxio = ["opendal/services-alluxio"]

[dependencies]
anyhow = "1.0.71"
Expand Down
2 changes: 2 additions & 0 deletions bindings/nodejs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ services-all = [
"services-gridfs",
"services-sqlite",
"services-libsql",
"services-alluxio",
]

# Default services provided by opendal.
Expand Down Expand Up @@ -131,6 +132,7 @@ services-swift = ["opendal/services-swift"]
services-tikv = ["opendal/services-tikv"]
services-vercel-artifacts = ["opendal/services-vercel-artifacts"]
services-wasabi = ["opendal/services-wasabi"]
services-alluxio = ["opendal/services-alluxio"]

[lib]
crate-type = ["cdylib"]
Expand Down
2 changes: 2 additions & 0 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ services-all = [
"services-gridfs",
"services-sqlite",
"services-libsql",
"services-alluxio",
]

# Default services provided by opendal.
Expand Down Expand Up @@ -131,6 +132,7 @@ services-swift = ["opendal/services-swift"]
services-tikv = ["opendal/services-tikv"]
services-vercel-artifacts = ["opendal/services-vercel-artifacts"]
services-wasabi = ["opendal/services-wasabi"]
services-alluxio = ["opendal/services-alluxio"]

[lib]
crate-type = ["cdylib"]
Expand Down
7 changes: 3 additions & 4 deletions core/src/services/alluxio/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl Builder for AlluxioBuilder {
Some(endpoint) => Ok(endpoint.clone()),
None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
.with_operation("Builder::build")
.with_context("service", Scheme::Azfile)),
.with_context("service", Scheme::Alluxio)),
}?;
debug!("backend use endpoint {}", &endpoint);

Expand All @@ -161,7 +161,7 @@ impl Builder for AlluxioBuilder {
} else {
HttpClient::new().map_err(|err| {
err.with_operation("Builder::build")
.with_context("service", Scheme::S3)
.with_context("service", Scheme::Alluxio)
})?
};

Expand Down Expand Up @@ -204,7 +204,6 @@ impl Accessor for AlluxioBackend {

create_dir: true,
delete: true,
rename: true,

list: true,
list_without_recursive: true,
Expand All @@ -230,7 +229,7 @@ impl Accessor for AlluxioBackend {
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let w = AlluxioWriter::new(self.core.clone(), args.clone(), path.to_string());
let w = AlluxioWriter::new(self.core.clone(), args, path.to_string());
let w = OneShotWriter::new(w);

Ok((RpWrite::default(), w))
Expand Down
66 changes: 45 additions & 21 deletions core/src/services/alluxio/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ struct CreateFileRequest {
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct CreateDirRequest {
#[serde(skip_serializing_if = "Option::is_none")]
recursive: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
allow_exists: Option<bool>,
}

/// Metadata of alluxio object
Expand Down Expand Up @@ -97,17 +100,20 @@ impl Debug for AlluxioCore {

impl AlluxioCore {
pub async fn create_dir(&self, path: &str) -> Result<()> {
let path = build_abs_path(&self.root, path);
let path = build_rooted_abs_path(&self.root, path);

let r = CreateDirRequest {
recursive: Some(true),
allow_exists: Some(true),
};

let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
let body = bytes::Bytes::from(body);

let mut req = Request::post(format!(
"{}/api/v1/paths//{}/create-directory",
self.endpoint, path
"{}/api/v1/paths/{}/create-directory",
self.endpoint,
percent_encode_path(&path)
));

req = req.header("Content-Type", "application/json");
Expand All @@ -126,7 +132,7 @@ impl AlluxioCore {
}

pub async fn create_file(&self, path: &str) -> Result<u64> {
let path = build_abs_path(&self.root, path);
let path = build_rooted_abs_path(&self.root, path);

let r = CreateFileRequest {
recursive: Some(true),
Expand All @@ -135,8 +141,9 @@ impl AlluxioCore {
let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
let body = bytes::Bytes::from(body);
let mut req = Request::post(format!(
"{}/api/v1/paths//{}/create-file",
self.endpoint, path
"{}/api/v1/paths/{}/create-file",
self.endpoint,
percent_encode_path(&path)
));

req = req.header("Content-Type", "application/json");
Expand All @@ -160,11 +167,12 @@ impl AlluxioCore {
}

pub(super) async fn open_file(&self, path: &str) -> Result<u64> {
let path = build_abs_path(&self.root, path);
let path = build_rooted_abs_path(&self.root, path);

let req = Request::post(format!(
"{}/api/v1/paths//{}/open-file",
self.endpoint, path
"{}/api/v1/paths/{}/open-file",
self.endpoint,
percent_encode_path(&path)
));
let req = req
.body(AsyncBody::Empty)
Expand All @@ -185,9 +193,13 @@ impl AlluxioCore {
}

pub(super) async fn delete(&self, path: &str) -> Result<()> {
let path = build_abs_path(&self.root, path);
let path = build_rooted_abs_path(&self.root, path);

let req = Request::post(format!("{}/api/v1/paths//{}/delete", self.endpoint, path));
let req = Request::post(format!(
"{}/api/v1/paths/{}/delete",
self.endpoint,
percent_encode_path(&path)
));
let req = req
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
Expand All @@ -197,16 +209,25 @@ impl AlluxioCore {

match status {
StatusCode::OK => Ok(()),
_ => Err(parse_error(resp).await?),
_ => {
let err = parse_error(resp).await?;
if err.kind() == ErrorKind::NotFound {
return Ok(());
}
Err(err)
}
}
}

pub(super) async fn rename(&self, path: &str, dst: &str) -> Result<()> {
let path = build_abs_path(&self.root, path);
let path = build_rooted_abs_path(&self.root, path);
let dst = build_rooted_abs_path(&self.root, dst);

let req = Request::post(format!(
"{}/api/v1/paths//{}/rename?dst=/{}",
self.endpoint, path, dst
"{}/api/v1/paths/{}/rename?dst={}",
self.endpoint,
percent_encode_path(&path),
percent_encode_path(&dst)
));

let req = req
Expand All @@ -224,16 +245,18 @@ impl AlluxioCore {
}

pub(super) async fn get_status(&self, path: &str) -> Result<FileInfo> {
let path = build_abs_path(&self.root, path);
let path = build_rooted_abs_path(&self.root, path);

let req = Request::post(format!(
"{}/api/v1/paths//{}/get-status",
self.endpoint, path
"{}/api/v1/paths/{}/get-status",
self.endpoint,
percent_encode_path(&path)
));

let req = req
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;

let resp = self.client.send(req).await?;

let status = resp.status();
Expand All @@ -250,11 +273,12 @@ impl AlluxioCore {
}

pub(super) async fn list_status(&self, path: &str) -> Result<Vec<FileInfo>> {
let path = build_abs_path(&self.root, path);
let path = build_rooted_abs_path(&self.root, path);

let req = Request::post(format!(
"{}/api/v1/paths//{}/list-status",
self.endpoint, path
"{}/api/v1/paths/{}/list-status",
self.endpoint,
percent_encode_path(&path)
));

let req = req
Expand Down
57 changes: 40 additions & 17 deletions core/src/services/alluxio/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {

if let Some(alluxio_err) = alluxio_err {
kind = match alluxio_err.status_code.as_str() {
"AlreadyExists" => ErrorKind::AlreadyExists,
"NotFound" => ErrorKind::NotFound,
"InvalidArgument" => ErrorKind::InvalidInput,
"ALREADY_EXISTS" => ErrorKind::AlreadyExists,
"NOT_FOUND" => ErrorKind::NotFound,
"INVALID_ARGUMENT" => ErrorKind::InvalidInput,
_ => ErrorKind::Unexpected,
}
}
Expand All @@ -63,23 +63,46 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
#[cfg(test)]
mod tests {
use super::*;
use futures::stream;
use http::StatusCode;

/// Error response example is from https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
#[test]
fn test_parse_error() {
let bs = bytes::Bytes::from(
r#"
{
"statusCode": "AlreadyExists",
"message": "The resource you requested already exist"
}
"#,
);
#[tokio::test]
async fn test_parse_error() {
let err_res = vec![
(
r#"{"statusCode":"ALREADY_EXISTS","message":"The resource you requested already exist"}"#,
ErrorKind::AlreadyExists,
),
(
r#"{"statusCode":"NOT_FOUND","message":"The resource you requested does not exist"}"#,
ErrorKind::NotFound,
),
(
r#"{"statusCode":"INVALID_ARGUMENT","message":"The argument you provided is invalid"}"#,
ErrorKind::InvalidInput,
),
(
r#"{"statusCode":"INTERNAL_SERVER_ERROR","message":"Internal server error"}"#,
ErrorKind::Unexpected,
),
];

let out: AlluxioError = serde_json::from_reader(bs.reader()).expect("must success");
println!("{out:?}");
for res in err_res {
let bs = bytes::Bytes::from(res.0);
let body = IncomingAsyncBody::new(
Box::new(oio::into_stream(stream::iter(vec![Ok(bs.clone())]))),
None,
);
let resp = Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(body)
.unwrap();

assert_eq!(out.status_code, "AlreadyExists");
assert_eq!(out.message, "The resource you requested already exist");
let err = parse_error(resp).await;

assert!(err.is_ok());
assert_eq!(err.unwrap().kind(), res.1);
}
}
}
Loading

0 comments on commit 47c47b7

Please sign in to comment.