Skip to content

Commit

Permalink
Add support for Amazon S3 bucket storage backend.
Browse files Browse the repository at this point in the history
This adds support for using Amazon's S3 bucket as a storage backend via
the `AmazonS3Store` module. It is only implemented for the `Lwt` concurrency
library at this time since the underlying AWS-S3 library does not yet
have support for `Eio`.
  • Loading branch information
zoj613 committed Nov 20, 2024
1 parent 722ab04 commit 6adbbaa
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 1 deletion.
30 changes: 29 additions & 1 deletion .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,41 @@ jobs:
local-packages:
- zarr.opam

env:
AWS_ACCESS_KEY_ID: minioadmin
AWS_SECRET_ACCESS_KEY: minioadmin

services:
minio:
image: fclairamb/minio-github-actions
ports:
- 9000:9000

name: Ocaml version - ${{ matrix.ocaml-compiler }} - ${{ matrix.os }}
steps:
- name: checkout
uses: actions/checkout@v4
with:
fetch-depth: 2

- name: Setup Minio
run: |
mkdir ~/.aws
echo '[default]' > ~/.aws/credentials
echo 'aws_access_key_id = minioadmin' >> ~/.aws/credentials
echo 'aws_secret_access_key = minioadmin' >> ~/.aws/credentials
pip3 install minio
python3 - <<'EOF'
from minio import Minio
minio = Minio(
'localhost:9000',
access_key='minioadmin',
secret_key='minioadmin',
secure=False
)
minio.make_bucket('test-bucket-lwt', location='us-east-1')
EOF
- name: setup-ocaml
uses: ocaml/setup-ocaml@v3
with:
Expand All @@ -45,7 +73,7 @@ jobs:
run: |
opam install --deps-only --with-test --with-doc --yes zarr
opam install bytesrw conf-zlib conf-zstd --yes
opam install lwt --yes
opam install lwt aws-s3-lwt --yes
opam exec -- dune build zarr zarr-sync zarr-lwt
- name: setup ocaml-5-specific
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ docs:
.PHONY: view-docs
view-docs: docs
chromium _build/default/_doc/_html/index.html

.PHONY: minio
minio:
mkdir -p /tmp/minio/test-bucket-lwt
docker run --rm -it -p 9000:9000 -v /tmp/minio:/minio minio/minio:latest server /minio
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
(and (>= 4.14.0)))
(zarr (= :version))
(lwt (>= 2.5.1))
(aws-s3-lwt (>= 4.8.1))
(odoc :with-doc)
(ounit2 :with-test)
(ppx_deriving :with-test)
Expand Down
1 change: 1 addition & 0 deletions zarr-lwt.opam
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ depends: [
"ocaml" {>= "4.14.0"}
"zarr" {= version}
"lwt" {>= "2.5.1"}
"aws-s3-lwt" {>= "4.8.1"}
"odoc" {with-doc}
"ounit2" {with-test}
"ppx_deriving" {with-test}
Expand Down
1 change: 1 addition & 0 deletions zarr-lwt/src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(public_name zarr-lwt)
(libraries
zarr
aws-s3-lwt
lwt
lwt.unix)
(ocamlopt_flags
Expand Down
174 changes: 174 additions & 0 deletions zarr-lwt/src/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,177 @@ module FilesystemStore = struct

include Zarr.Storage.Make(IO)
end

module AmazonS3Store = struct
module Credentials = Aws_s3_lwt.Credentials
module S3 = Aws_s3_lwt.S3

open Deferred.Infix
open Deferred.Syntax

exception Request_failed of S3.error

let empty_content () = S3.{
storage_class = Standard;
meta_headers = None;
etag = String.empty;
key = String.empty;
last_modified = 0.;
size = 0
}

let fold_or_catch ~not_found res =
let return_or_raise r () = match r with
| Ok v -> Deferred.return v
| Error e -> raise (Request_failed e)
and on_exception ~not_found = function
| Request_failed S3.Not_found -> Lwt.return (not_found ())
| exn -> raise exn

Check warning on line 191 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L191

Added line #L191 was not covered by tests
in
Lwt.catch (return_or_raise res) (on_exception ~not_found)

let raise_not_found k () = raise (Zarr.Storage.Key_not_found k)

Check warning on line 195 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L195

Added line #L195 was not covered by tests

let empty_Ls = Fun.const ([], S3.Ls.Done)

let fold_continuation ~return ~more = function
| S3.Ls.Done -> Deferred.return return
| S3.Ls.More continuation ->
continuation () >>= fold_or_catch ~not_found:empty_Ls >>= fun (xs, cont) ->
more xs cont

Check warning on line 203 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L201-L203

Added lines #L201 - L203 were not covered by tests

module IO = struct
module Deferred = Deferred

type t =
{retries : int
;bucket : string
;cred : Credentials.t
;endpoint : Aws_s3.Region.endpoint}

let size t key =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let f ~endpoint () = S3.head ~bucket ~credentials ~key ~endpoint () in
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
let+ c = fold_or_catch ~not_found:empty_content res in
c.size

let is_member t key =
let+ size = size t key in
if size = 0 then false else true

let get t key =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let f ~endpoint () = S3.get ~bucket ~credentials ~endpoint ~key () in
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
fold_or_catch ~not_found:(raise_not_found key) res

let get_partial_values t key ranges =
let read_range t key (ofs, len) =
let range = match len with
| None -> S3.{first = Some ofs; last = None}
| Some l -> S3.{first = Some ofs; last = Some (ofs + l - 1)}
in
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let f ~endpoint () = S3.get ~bucket ~credentials ~endpoint ~range ~key () in
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
let+ data = fold_or_catch ~not_found:(raise_not_found key) res in
[data]
in
Deferred.concat_map (read_range t key) ranges

let set t key data =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let f ~endpoint () = S3.put ~bucket ~credentials ~endpoint ~data ~key () in
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
let* _ = fold_or_catch ~not_found:(Fun.const String.empty) res in
Deferred.return_unit

let set_partial_values t key ?(append=false) rsv =
let* size = size t key in
let* ov = match size with
| 0 -> Deferred.return String.empty
| _ -> get t key
in
let f = if append || ov = String.empty then
fun acc (_, v) -> acc ^ v else
fun acc (rs, v) ->
let s = Bytes.unsafe_of_string acc in
Bytes.blit_string v 0 s rs String.(length v);
Bytes.unsafe_to_string s
in
set t key (List.fold_left f ov rsv)

let erase t key =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let f ~endpoint () = S3.delete ~bucket ~credentials ~endpoint ~key () in
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
fold_or_catch ~not_found:(Fun.const ()) res

let rec delete_keys t cont () =
let del t xs c = Deferred.iter (delete_content t) xs >>= delete_keys t c in

Check warning on line 274 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L274

Added line #L274 was not covered by tests
fold_continuation ~return:() ~more:(del t) cont

and delete_content t S3.{key; _} = erase t key

and erase_prefix t prefix =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let f ~endpoint () = S3.ls ~bucket ~credentials ~endpoint ~prefix () in
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
Deferred.iter (delete_content t) xs >>= delete_keys t rest

let rec list t =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let f ~endpoint () = S3.ls ~bucket ~credentials ~endpoint () in
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
accumulate_keys (List.map content_key xs) rest

and content_key S3.{key; _} = key

and accumulate_keys acc cont =
let append acc xs c = accumulate_keys (acc @ List.map content_key xs) c in

Check warning on line 296 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L296

Added line #L296 was not covered by tests
fold_continuation ~return:acc ~more:(append acc) cont

module S = Set.Make(String)

let rec partition_keys prefix ((l, r) as acc) cont =
let split ~acc ~prefix xs c = partition_keys prefix (List.fold_left (add prefix) acc xs) c in

Check warning on line 302 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L302

Added line #L302 was not covered by tests
fold_continuation ~return:(l, S.elements r) ~more:(split ~acc ~prefix) cont

and add prefix (l, r) (c : S3.content) =
let size = String.length prefix in
if not (String.contains_from c.key size '/') then c.key :: l, r else
l, S.add String.(sub c.key 0 @@ 1 + index_from c.key size '/') r

and list_dir t prefix =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let f ~endpoint () = S3.ls ~bucket ~credentials ~endpoint ~prefix () in
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
let init = List.fold_left (add prefix) ([], S.empty) xs in
partition_keys prefix init rest

let rec rename t prefix new_prefix =
let upload t (k, v) = set t k v in
let* xs = list t in
let to_delete = List.filter (String.starts_with ~prefix) xs in
let* data = Deferred.fold_left (rename_and_add ~t ~prefix ~new_prefix) [] to_delete in
let* () = Deferred.iter (upload t) data in
Deferred.iter (erase t) to_delete

and rename_and_add ~t ~prefix ~new_prefix acc k =
let l = String.length prefix in
let k' = new_prefix ^ String.sub k l (String.length k - l) in
let+ a = get t k in (k', a) :: acc
end

let with_open ?(scheme=`Http) ?(inet=`V4) ?(retries=3) ~region ~bucket ~profile f =
let* res = Credentials.Helper.get_credentials ~profile () in
let cred = Result.fold ~ok:Fun.id ~error:raise res in
let endpoint = Aws_s3.Region.endpoint ~inet ~scheme region in
f IO.{bucket; cred; endpoint; retries}

include Zarr.Storage.Make(IO)
end
32 changes: 32 additions & 0 deletions zarr-lwt/src/storage.mli
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,35 @@ module FilesystemStore : sig
@raise Failure if [dir] is not a Zarr store path. *)
end

(** An Lwt-aware Amazon S3 bucket storage backend for a Zarr V3 hierarchy. *)
module AmazonS3Store : sig
exception Request_failed of Aws_s3_lwt.S3.error

include Zarr.Storage.STORE with module Deferred = Deferred

val with_open :
?scheme:[ `Http | `Https ] ->
?inet:[ `V4 | `V6 ] ->
?retries:int ->
region:Aws_s3.Region.t ->
bucket:string ->
profile:string ->
(t -> 'a Lwt.t) ->
'a Lwt.t
(** [with_open ~region ~bucket ~profile f] opens an S3 bucket store with
bucket name [bucket] at region [region] using credentials specified by
profile [profile]. The credentials are read locally from a [~/.aws/credentials]
file or from an IAM service if the profile or file is not available.
Function [f] is applied to the store's open handle and its output is
returned to the caller.
{ul
{- [scheme] is the HTTP scheme to use when connecting to S3, and must be
one of [`Http | `Https]. Defaults to [`Http].}
{- [inet] is the IP version and must be one of [`V4 | `V6]. Defaults to [`V4].}
{- [retries] is the number of times to retry a request should it return an error.}
}
@raise Request_failed if an error occurs while sending a request to the S3 service. *)
end
7 changes: 7 additions & 0 deletions zarr-lwt/test/test_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,18 @@ let _ =
(Zarr.Storage.Not_a_filesystem_store fn)
(fun () -> FilesystemStore.open_store fn);

(* ZipStore configuration *)
let zpath = tmp_dir ^ ".zip" in
(* AmazonS3Store configuration *)
let region = Aws_s3.Region.minio ~port:9000 ~host:"localhost" ()
and bucket = "test-bucket-lwt"
and profile = "default" in

Lwt_main.run @@ Lwt.join
[ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z)
(* test just opening the now exisitant archive created by the previous test. *)
;ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit)
;AmazonS3Store.with_open ~region ~bucket ~profile (test_storage (module AmazonS3Store))
;test_storage (module MemoryStore) @@ MemoryStore.create ()
;test_storage (module FilesystemStore) s])
])

0 comments on commit 6adbbaa

Please sign in to comment.