Skip to content

Commit

Permalink
Add support for Amazon S3 bucket storage backend.
Browse files Browse the repository at this point in the history
This adds support for using Amazon's S3 bucket as a storage backend via
the `AwsStore` module. It is only implemented for the `Lwt` concurrency
library at this time since the underlying AWS-S3 library does not yet
have support for `Eio`.
  • Loading branch information
zoj613 committed Nov 17, 2024
1 parent c9fa270 commit 41796a5
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
run: |
opam install --deps-only --with-test --with-doc --yes zarr
opam install bytesrw conf-zlib conf-zstd --yes
opam install lwt --yes
opam install lwt aws-s3-lwt --yes
opam exec -- dune build zarr zarr-sync zarr-lwt
- name: setup ocaml-5-specific
Expand Down
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
(and (>= 4.14.0)))
(zarr (= :version))
(lwt (>= 2.5.1))
(aws-s3-lwt (>= 4.8.1))
(odoc :with-doc)
(ounit2 :with-test)
(ppx_deriving :with-test)
Expand Down
1 change: 1 addition & 0 deletions zarr-lwt.opam
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ depends: [
"ocaml" {>= "4.14.0"}
"zarr" {= version}
"lwt" {>= "2.5.1"}
"aws-s3-lwt" {>= "4.8.1"}
"odoc" {with-doc}
"ounit2" {with-test}
"ppx_deriving" {with-test}
Expand Down
1 change: 1 addition & 0 deletions zarr-lwt/src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(public_name zarr-lwt)
(libraries
zarr
aws-s3-lwt
lwt
lwt.unix)
(ocamlopt_flags
Expand Down
157 changes: 157 additions & 0 deletions zarr-lwt/src/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,160 @@ module FilesystemStore = struct

include Zarr.Storage.Make(IO)
end

module AwsStore = struct
open Deferred.Syntax

let fold_result :
ok:('a -> 'b) -> not_found:'b -> ('a, Aws_s3_lwt.S3.error) result -> 'b
= fun ~ok ~not_found -> function
| Ok v -> ok v
| Error Not_found -> not_found
| Error Failed e -> raise e
| Error Throttled -> failwith "Throttled"
| Error Forbidden -> failwith "Forbidden"
| Error Unknown (_, s) -> failwith @@ Printf.sprintf "Unknown: %s" s
| Error Redirect _ -> failwith "Redirect error"

Check warning on line 160 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L154-L160

Added lines #L154 - L160 were not covered by tests

module IO = struct
module Deferred = Deferred

type t =
{bucket : string
;cred : Aws_s3_lwt.Credentials.t
;endpoint : Aws_s3.Region.endpoint }

let size t key =
let content_size : Aws_s3_lwt.S3.content -> int = fun c -> c.size in

Check warning on line 171 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L171

Added line #L171 was not covered by tests
let* res =
Aws_s3_lwt.S3.head

Check warning on line 173 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L173

Added line #L173 was not covered by tests
~bucket:t.bucket ~credentials:t.cred ~endpoint:t.endpoint ~key () in
Deferred.return (fold_result ~ok:content_size ~not_found:0 res)

Check warning on line 175 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L175

Added line #L175 was not covered by tests

let get t key =
let* res = Aws_s3_lwt.S3.get

Check warning on line 178 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L178

Added line #L178 was not covered by tests
~bucket:t.bucket ~credentials:t.cred ~endpoint:t.endpoint ~key () in
let v = fold_result ~ok:Fun.id ~not_found:(raise @@ Zarr.Storage.Key_not_found key) res in
Deferred.return v

Check warning on line 181 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L180-L181

Added lines #L180 - L181 were not covered by tests

let get_partial_values t key ranges =
let read_range ~t ~key acc (ofs, len) =
let range : Aws_s3_lwt.S3.range = match len with
| None -> {first = Some ofs; last = None}
| l -> {first = Some ofs; last = l}

Check warning on line 187 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L184-L187

Added lines #L184 - L187 were not covered by tests
in
let+ res = Aws_s3_lwt.S3.get

Check warning on line 189 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L189

Added line #L189 was not covered by tests
~bucket:t.bucket ~credentials:t.cred ~endpoint:t.endpoint ~range ~key () in
let a = fold_result ~ok:Fun.id ~not_found:(raise @@ Zarr.Storage.Key_not_found key) res in
a :: acc

Check warning on line 192 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L191-L192

Added lines #L191 - L192 were not covered by tests
in
Deferred.fold_left (read_range ~t ~key) [] ranges

let set t key data =
let* res = Aws_s3_lwt.S3.put

Check warning on line 197 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L197

Added line #L197 was not covered by tests
~bucket:t.bucket ~credentials:t.cred ~endpoint:t.endpoint ~key ~data () in
let _ = fold_result ~ok:Fun.id ~not_found:String.empty res in
Deferred.return_unit

Check warning on line 200 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L199-L200

Added lines #L199 - L200 were not covered by tests

let set_partial_values t key ?(append=false) rsv =
let* size = size t key in
let* ov = match size with
| 0 -> Deferred.return String.empty
| _ -> get t key

Check warning on line 206 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L202-L206

Added lines #L202 - L206 were not covered by tests
in
let f = if append || ov = String.empty then
fun acc (_, v) -> acc ^ v else
fun acc (rs, v) ->
let s = Bytes.unsafe_of_string acc in
Bytes.blit_string v 0 s rs String.(length v);
Bytes.unsafe_to_string s

Check warning on line 213 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L208-L213

Added lines #L208 - L213 were not covered by tests
in
let ov' = List.fold_left f ov rsv in
set t key ov'

Check warning on line 216 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L216

Added line #L216 was not covered by tests

let erase t key =
let+ res = Aws_s3_lwt.S3.delete

Check warning on line 219 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L219

Added line #L219 was not covered by tests
~bucket:t.bucket ~credentials:t.cred ~endpoint:t.endpoint ~key () in
fold_result ~ok:Fun.id ~not_found:() res

Check warning on line 221 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L221

Added line #L221 was not covered by tests

(*TODO: Use delete_multi function instead. *)
let erase_prefix t prefix =
let content_key : Aws_s3_lwt.S3.content -> string = fun c -> c.key in

Check warning on line 225 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L225

Added line #L225 was not covered by tests
let rec delete_keys : Aws_s3_lwt.S3.content list * Aws_s3_lwt.S3.Ls.cont -> unit Deferred.t = function
| xs, Done -> Deferred.iter (erase t) (List.map content_key xs)
| xs, More k ->
let* res = k () in
let cont = fold_result ~ok:Fun.id ~not_found:(raise @@ Zarr.Storage.Key_not_found prefix) res in
let* () = Deferred.iter (erase t) (List.map content_key xs) in
delete_keys cont

Check warning on line 232 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L227-L232

Added lines #L227 - L232 were not covered by tests
in
let* res = Aws_s3_lwt.S3.ls

Check warning on line 234 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L234

Added line #L234 was not covered by tests
~bucket:t.bucket ~credentials:t.cred ~endpoint:t.endpoint ~prefix () in
let keys = fold_result ~ok:Fun.id ~not_found:(raise @@ Zarr.Storage.Key_not_found prefix) res in
delete_keys keys

Check warning on line 237 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L236-L237

Added lines #L236 - L237 were not covered by tests

let list t =
let content_key : Aws_s3_lwt.S3.content -> string = fun c -> c.key in

Check warning on line 240 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L240

Added line #L240 was not covered by tests
let rec accumulate_keys :
string list -> Aws_s3_lwt.S3.content list * Aws_s3_lwt.S3.Ls.cont -> string list Deferred.t
= fun acc -> function
| xs, Done -> Deferred.return @@ (acc @ List.map content_key xs)
| xs, More k ->
let* res = k () in
let cont = fold_result ~ok:Fun.id ~not_found:([], Aws_s3_lwt.S3.Ls.Done) res in
let acc' = acc @ List.map content_key xs in

Check warning on line 248 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L244-L248

Added lines #L244 - L248 were not covered by tests
accumulate_keys acc' cont
in
let* res = Aws_s3_lwt.S3.ls

Check warning on line 251 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L251

Added line #L251 was not covered by tests
~bucket:t.bucket ~credentials:t.cred ~endpoint:t.endpoint () in
let keys = fold_result ~ok:Fun.id ~not_found:([], Aws_s3_lwt.S3.Ls.Done) res in
accumulate_keys [] keys

Check warning on line 254 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L253-L254

Added lines #L253 - L254 were not covered by tests

let list_dir t prefix =
let module S = Set.Make(String) in

Check warning on line 257 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L257

Added line #L257 was not covered by tests
let add ~prefix (l, r) (c : Aws_s3_lwt.S3.content) =
let size = String.length prefix in
if not (String.contains_from c.key size '/') then c.key :: l, r else
l, S.add String.(sub c.key 0 @@ 1 + index_from c.key size '/') r

Check warning on line 261 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L259-L261

Added lines #L259 - L261 were not covered by tests
in
let rec accumulate_keys :
string list * S.t -> Aws_s3_lwt.S3.content list * Aws_s3_lwt.S3.Ls.cont -> (string list * S.t) Deferred.t
= fun acc -> function
| xs, Done -> Deferred.return @@ List.fold_left (add ~prefix) acc xs
| xs, More k ->
let* res = k () in
let cont = fold_result ~ok:Fun.id ~not_found:([], Aws_s3_lwt.S3.Ls.Done) res in
let acc' = List.fold_left (add ~prefix) acc xs in
accumulate_keys acc' cont

Check warning on line 271 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L266-L271

Added lines #L266 - L271 were not covered by tests
in
let* res = Aws_s3_lwt.S3.ls

Check warning on line 273 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L273

Added line #L273 was not covered by tests
~bucket:t.bucket ~credentials:t.cred ~endpoint:t.endpoint ~prefix () in
let keys = fold_result ~ok:Fun.id ~not_found:([], Aws_s3_lwt.S3.Ls.Done) res in
let+ keys, prefixes = accumulate_keys ([], S.empty) keys in
keys, S.elements prefixes

Check warning on line 277 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L275-L277

Added lines #L275 - L277 were not covered by tests

let is_member t key =
let+ size = size t key in
if size = 0 then false else true

Check warning on line 281 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L280-L281

Added lines #L280 - L281 were not covered by tests

let rename t prefix new_prefix =
let add ~t ~prefix ~new_prefix acc k =
let l = String.length prefix in
let k' = new_prefix ^ String.sub k l (String.length k - l) in
let+ a = get t k in (k', a) :: acc

Check warning on line 287 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L284-L287

Added lines #L284 - L287 were not covered by tests
in
let* xs = list t in
let to_delete = List.filter (String.starts_with ~prefix) xs in
let* data = Deferred.fold_left (add ~t ~prefix ~new_prefix) [] to_delete in
let* () = Deferred.iter (fun (k, v) -> set t k v) data in
Deferred.iter (erase t) to_delete

Check warning on line 293 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L289-L293

Added lines #L289 - L293 were not covered by tests
end

let with_open ?(scheme=`Https) ?(inet=`V4) ~region ~bucket ~profile f =
let* res = Aws_s3_lwt.Credentials.Helper.get_credentials ~profile () in
let cred = Result.fold ~ok:Fun.id ~error:(fun e -> raise e) res in
let endpoint = Aws_s3.Region.endpoint ~inet ~scheme region in
f IO.{bucket; cred; endpoint}

Check warning on line 300 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L296-L300

Added lines #L296 - L300 were not covered by tests

include Zarr.Storage.Make(IO)
end
14 changes: 14 additions & 0 deletions zarr-lwt/src/storage.mli
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,17 @@ module FilesystemStore : sig
@raise Failure if [dir] is not a Zarr store path. *)
end

(** An Lwt-aware Amazon S3 bucket storage backend for a Zarr V3 hierarchy. *)
module AwsStore : sig
include Zarr.Storage.STORE with type 'a Deferred.t = 'a Lwt.t

val with_open :
?scheme:[ `Http | `Https ] ->
?inet:[ `V4 | `V6 ] ->
region:Aws_s3.Region.t ->
bucket:string ->
profile:string ->
(t -> 'a Lwt.t) ->
'a Lwt.t
end

0 comments on commit 41796a5

Please sign in to comment.