Skip to content

Commit

Permalink
Add support for Amazon S3 bucket storage backend.
Browse files Browse the repository at this point in the history
This adds support for using Amazon's S3 bucket as a storage backend via
the `AmazonS3Store` module. It is only implemented for the `Lwt` concurrency
library at this time since the underlying AWS-S3 library does not yet
have support for `Eio`.
  • Loading branch information
zoj613 committed Nov 19, 2024
1 parent c9fa270 commit ee2d821
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 2 deletions.
30 changes: 29 additions & 1 deletion .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,41 @@ jobs:
local-packages:
- zarr.opam

env:
AWS_ACCESS_KEY_ID: minioadmin
AWS_SECRET_ACCESS_KEY: minioadmin

services:
minio:
image: fclairamb/minio-github-actions
ports:
- 9000:9000

name: Ocaml version - ${{ matrix.ocaml-compiler }} - ${{ matrix.os }}
steps:
- name: checkout
uses: actions/checkout@v4
with:
fetch-depth: 2

- name: Setup Minio
run: |
mkdir ~/.aws
echo '[default]' > ~/.aws/credentials
echo 'aws_access_key_id = minioadmin' >> ~/.aws/credentials
echo 'aws_secret_access_key = minioadmin' >> ~/.aws/credentials
pip3 install minio
python3 - <<'EOF'
from minio import Minio
minio = Minio(
'localhost:9000',
access_key='minioadmin',
secret_key='minioadmin',
secure=False
)
minio.make_bucket('test-bucket-lwt', location='us-east-1')
EOF
- name: setup-ocaml
uses: ocaml/setup-ocaml@v3
with:
Expand All @@ -45,7 +73,7 @@ jobs:
run: |
opam install --deps-only --with-test --with-doc --yes zarr
opam install bytesrw conf-zlib conf-zstd --yes
opam install lwt --yes
opam install lwt aws-s3-lwt --yes
opam exec -- dune build zarr zarr-sync zarr-lwt
- name: setup ocaml-5-specific
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ docs:
.PHONY: view-docs
view-docs: docs
chromium _build/default/_doc/_html/index.html

.PHONY: minio
minio:
mkdir -p /tmp/minio/test-bucket-lwt
docker run --rm -it -p 9000:9000 -v /tmp/minio:/minio minio/minio:latest server /minio
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
(and (>= 4.14.0)))
(zarr (= :version))
(lwt (>= 2.5.1))
(aws-s3-lwt (>= 4.8.1))
(odoc :with-doc)
(ounit2 :with-test)
(ppx_deriving :with-test)
Expand Down
1 change: 1 addition & 0 deletions zarr-lwt.opam
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ depends: [
"ocaml" {>= "4.14.0"}
"zarr" {= version}
"lwt" {>= "2.5.1"}
"aws-s3-lwt" {>= "4.8.1"}
"odoc" {with-doc}
"ounit2" {with-test}
"ppx_deriving" {with-test}
Expand Down
1 change: 1 addition & 0 deletions zarr-lwt/src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(public_name zarr-lwt)
(libraries
zarr
aws-s3-lwt
lwt
lwt.unix)
(ocamlopt_flags
Expand Down
168 changes: 168 additions & 0 deletions zarr-lwt/src/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,171 @@ module FilesystemStore = struct

include Zarr.Storage.Make(IO)
end

module AmazonS3Store = struct
module Credentials = Aws_s3_lwt.Credentials
module S3 = Aws_s3_lwt.S3

open Deferred.Syntax

exception S3_error of S3.error

let empty_content () = S3.{
storage_class = Standard;
meta_headers = None;
etag = String.empty;
key = String.empty;
last_modified = 0.;
size = 0
}

let empty_Ls = Fun.const ([], S3.Ls.Done)

let raise_not_found k () = raise (Zarr.Storage.Key_not_found k)

Check warning on line 167 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L167

Added line #L167 was not covered by tests

let fold_or_catch ~not_found res =
let return_or_raise r () = match r with
| Ok v -> Deferred.return v
| Error e -> raise (S3_error e)
and on_exception ~not_found = function
| S3_error S3.Not_found -> Lwt.return (not_found ())
| exn -> raise exn

Check warning on line 175 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L175

Added line #L175 was not covered by tests
in
Lwt.catch (return_or_raise res) (on_exception ~not_found)

module IO = struct
module Deferred = Deferred
open Deferred.Infix

type t =
{bucket : string
;cred : Credentials.t
;endpoint : Aws_s3.Region.endpoint}

let size t key =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.head ~bucket ~credentials ~endpoint ~key () in
let+ c = fold_or_catch ~not_found:empty_content res in
c.size

let is_member t key =
let+ size = size t key in
if size = 0 then false else true

let get t key =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.get ~bucket ~credentials ~endpoint ~key () in
fold_or_catch ~not_found:(raise_not_found key) res

let get_partial_values t key ranges =
let read_range t key (ofs, len) =
let range = match len with
| None -> S3.{first = Some ofs; last = None}
| Some l -> S3.{first = Some ofs; last = Some (ofs + l - 1)}
in
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.get ~bucket ~credentials ~endpoint ~range ~key () in
let+ data = fold_or_catch ~not_found:(raise_not_found key) res in
[data]
in
Deferred.concat_map (read_range t key) ranges

let set t key data =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.put ~bucket ~credentials ~endpoint ~data ~key () in
let* _ = fold_or_catch ~not_found:(Fun.const String.empty) res in
Deferred.return_unit

let set_partial_values t key ?(append=false) rsv =
let* size = size t key in
let* ov = match size with
| 0 -> Deferred.return String.empty
| _ -> get t key
in
let f = if append || ov = String.empty then
fun acc (_, v) -> acc ^ v else
fun acc (rs, v) ->
let s = Bytes.unsafe_of_string acc in
Bytes.blit_string v 0 s rs String.(length v);
Bytes.unsafe_to_string s
in
set t key (List.fold_left f ov rsv)

let erase t key =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.delete ~bucket ~credentials ~endpoint ~key () in
fold_or_catch ~not_found:(Fun.const ()) res

let rec delete_keys t cont () = match cont with
| S3.Ls.Done -> Deferred.return_unit
| S3.Ls.More continuation ->
let* res = continuation () in
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
Deferred.iter (delete_content t) xs >>= delete_keys t rest

Check warning on line 247 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L244-L247

Added lines #L244 - L247 were not covered by tests

and delete_content t S3.{key; _} = erase t key

and erase_prefix t prefix =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.ls ~bucket ~credentials ~endpoint ~prefix () in
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
Deferred.iter (delete_content t) xs >>= delete_keys t rest

let rec list t =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.ls ~bucket ~credentials ~endpoint () in
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
accumulate_keys (List.map content_key xs) rest

and content_key S3.{key; _} = key

and accumulate_keys acc = function
| S3.Ls.Done -> Deferred.return acc
| S3.Ls.More continuation ->
let* res = continuation () in
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
accumulate_keys (acc @ List.map content_key xs) rest

Check warning on line 270 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L267-L270

Added lines #L267 - L270 were not covered by tests

module S = Set.Make(String)

let rec partition_keys prefix ((l, r) as acc) = function
| S3.Ls.Done -> Deferred.return (l, S.elements r)
| S3.Ls.More continuation ->
let* res = continuation () in
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
partition_keys prefix (List.fold_left (add prefix) acc xs) rest

Check warning on line 279 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L276-L279

Added lines #L276 - L279 were not covered by tests

and add prefix (l, r) (c : S3.content) =
let size = String.length prefix in
if not (String.contains_from c.key size '/') then c.key :: l, r else
l, S.add String.(sub c.key 0 @@ 1 + index_from c.key size '/') r

and list_dir t prefix =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.ls ~bucket ~credentials ~endpoint ~prefix () in
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
let init = List.fold_left (add prefix) ([], S.empty) xs in
partition_keys prefix init rest

let rec rename t prefix new_prefix =
let upload t (k, v) = set t k v in
let* xs = list t in
let to_delete = List.filter (String.starts_with ~prefix) xs in
let* data = Deferred.fold_left (rename_and_add ~t ~prefix ~new_prefix) [] to_delete in
let* () = Deferred.iter (upload t) data in
Deferred.iter (erase t) to_delete

and rename_and_add ~t ~prefix ~new_prefix acc k =
let l = String.length prefix in
let k' = new_prefix ^ String.sub k l (String.length k - l) in
let+ a = get t k in (k', a) :: acc
end

let with_open ?(scheme=`Http) ?(inet=`V4) ~region ~bucket ~profile f =
let* res = Credentials.Helper.get_credentials ~profile () in
let cred = Result.fold ~ok:Fun.id ~error:raise res in
let endpoint = Aws_s3.Region.endpoint ~inet ~scheme region in
f IO.{bucket; cred; endpoint}

include Zarr.Storage.Make(IO)
end
32 changes: 31 additions & 1 deletion zarr-lwt/src/storage.mli
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(** An Lwt-aware in-memory storage backend for Zarr v3 hierarchy. *)
module MemoryStore : sig include Zarr.Memory.S with type 'a Deferred.t = 'a Lwt.t end
module MemoryStore : Zarr.Memory.S with type 'a Deferred.t = 'a Lwt.t

(** An Lwt-aware Zip file storage backend for a Zarr v3 hierarchy. *)
module ZipStore : Zarr.Zip.S with type 'a Deferred.t = 'a Lwt.t
Expand All @@ -18,3 +18,33 @@ module FilesystemStore : sig
@raise Failure if [dir] is not a Zarr store path. *)
end

(** An Lwt-aware Amazon S3 bucket storage backend for a Zarr V3 hierarchy. *)
module AmazonS3Store : sig
exception S3_error of Aws_s3_lwt.S3.error

include Zarr.Storage.STORE with type 'a Deferred.t = 'a Lwt.t

val with_open :
?scheme:[ `Http | `Https ] ->
?inet:[ `V4 | `V6 ] ->
region:Aws_s3.Region.t ->
bucket:string ->
profile:string ->
(t -> 'a Lwt.t) ->
'a Lwt.t
(** [with_open ~region ~bucket ~profile f] opens an S3 bucket store with
bucket name [bucket] at region [region] using credentials specified by
profile [profile]. The credentials are read locally from a [~/.aws/credentials]
file or from an IAM service if the profile or file is not available.
Function [f] is applied to the store's open handle and its output is
returned to the caller.
{ul
{- [scheme] is the HTTP scheme to use when connecting to S3, and must be
one of [`Http | `Https]. Defaults to [`Http].}
{- [inet] is the IP version and must be one of [`V4 | `V6]. Defaults to [`V4].}
}
@raise S3_error if an error occurs while sending a request to the S3 service. *)
end
7 changes: 7 additions & 0 deletions zarr-lwt/test/test_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,18 @@ let _ =
(Zarr.Storage.Not_a_filesystem_store fn)
(fun () -> FilesystemStore.open_store fn);

(* ZipStore configuration *)
let zpath = tmp_dir ^ ".zip" in
(* AmazonS3Store configuration *)
let region = Aws_s3.Region.minio ~port:9000 ~host:"localhost" ()
and bucket = "test-bucket-lwt"
and profile = "default" in

Lwt_main.run @@ Lwt.join
[ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z)
(* test just opening the now exisitant archive created by the previous test. *)
;ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit)
;AmazonS3Store.with_open ~region ~bucket ~profile (test_storage (module AmazonS3Store))
;test_storage (module MemoryStore) @@ MemoryStore.create ()
;test_storage (module FilesystemStore) s])
])

0 comments on commit ee2d821

Please sign in to comment.