diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 2608ab6b..5f01af23 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -29,6 +29,16 @@ jobs: local-packages: - zarr.opam + env: + AWS_ACCESS_KEY_ID: minioadmin + AWS_SECRET_ACCESS_KEY: minioadmin + + services: + minio: + image: fclairamb/minio-github-actions + ports: + - 9000:9000 + name: Ocaml version - ${{ matrix.ocaml-compiler }} - ${{ matrix.os }} steps: - name: checkout @@ -36,6 +46,24 @@ jobs: with: fetch-depth: 2 + - name: Setup Minio + run: | + mkdir ~/.aws + echo '[default]' > ~/.aws/credentials + echo 'aws_access_key_id = minioadmin' >> ~/.aws/credentials + echo 'aws_secret_access_key = minioadmin' >> ~/.aws/credentials + pip3 install minio + python3 - <<'EOF' + from minio import Minio + minio = Minio( + 'localhost:9000', + access_key='minioadmin', + secret_key='minioadmin', + secure=False + ) + minio.make_bucket('test-bucket-lwt', location='us-east-1') + EOF + - name: setup-ocaml uses: ocaml/setup-ocaml@v3 with: @@ -45,7 +73,7 @@ jobs: run: | opam install --deps-only --with-test --with-doc --yes zarr opam install bytesrw conf-zlib conf-zstd --yes - opam install lwt --yes + opam install lwt aws-s3-lwt --yes opam exec -- dune build zarr zarr-sync zarr-lwt - name: setup ocaml-5-specific diff --git a/Makefile b/Makefile index 1ca1b2a2..5319f402 100644 --- a/Makefile +++ b/Makefile @@ -26,3 +26,8 @@ docs: .PHONY: view-docs view-docs: docs chromium _build/default/_doc/_html/index.html + +.PHONY: minio +minio: + mkdir -p /tmp/minio/test-bucket-lwt + docker run --rm -it -p 9000:9000 -v /tmp/minio:/minio minio/minio:latest server /minio diff --git a/dune-project b/dune-project index aaa4c9e0..7232da60 100644 --- a/dune-project +++ b/dune-project @@ -63,6 +63,7 @@ (and (>= 4.14.0))) (zarr (= :version)) (lwt (>= 2.5.1)) + (aws-s3-lwt (>= 4.8.1)) (odoc :with-doc) (ounit2 :with-test) (ppx_deriving :with-test) diff --git a/zarr-lwt.opam b/zarr-lwt.opam index 4e14ce44..f5e43a41 100644 --- a/zarr-lwt.opam +++ b/zarr-lwt.opam @@ -13,6 +13,7 @@ depends: [ "ocaml" {>= "4.14.0"} "zarr" {= version} "lwt" {>= "2.5.1"} + "aws-s3-lwt" {>= "4.8.1"} "odoc" {with-doc} "ounit2" {with-test} "ppx_deriving" {with-test} diff --git a/zarr-lwt/src/dune b/zarr-lwt/src/dune index 459c8244..9df95697 100644 --- a/zarr-lwt/src/dune +++ b/zarr-lwt/src/dune @@ -3,6 +3,7 @@ (public_name zarr-lwt) (libraries zarr + aws-s3-lwt lwt lwt.unix) (ocamlopt_flags diff --git a/zarr-lwt/src/storage.ml b/zarr-lwt/src/storage.ml index 8d8bf563..56a352b7 100644 --- a/zarr-lwt/src/storage.ml +++ b/zarr-lwt/src/storage.ml @@ -163,3 +163,177 @@ module FilesystemStore = struct include Zarr.Storage.Make(IO) end + +module AmazonS3Store = struct + module Credentials = Aws_s3_lwt.Credentials + module S3 = Aws_s3_lwt.S3 + + open Deferred.Infix + open Deferred.Syntax + + exception Request_failed of S3.error + + let empty_content () = S3.{ + storage_class = Standard; + meta_headers = None; + etag = String.empty; + key = String.empty; + last_modified = 0.; + size = 0 + } + + let fold_or_catch ~not_found res = + let return_or_raise r () = match r with + | Ok v -> Deferred.return v + | Error e -> raise (Request_failed e) + and on_exception ~not_found = function + | Request_failed S3.Not_found -> Lwt.return (not_found ()) + | exn -> raise exn + in + Lwt.catch (return_or_raise res) (on_exception ~not_found) + + let raise_not_found k () = raise (Zarr.Storage.Key_not_found k) + + let empty_Ls = Fun.const ([], S3.Ls.Done) + + let fold_continuation ~return ~more = function + | S3.Ls.Done -> Deferred.return return + | S3.Ls.More continuation -> + continuation () >>= fold_or_catch ~not_found:empty_Ls >>= fun (xs, cont) -> + more xs cont + + module IO = struct + module Deferred = Deferred + + type t = + {retries : int + ;bucket : string + ;cred : Credentials.t + ;endpoint : Aws_s3.Region.endpoint} + + let size t key = + let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in + let f ~endpoint () = S3.head ~bucket ~credentials ~key ~endpoint () in + let* res = S3.retry ~retries:t.retries ~endpoint ~f () in + let+ c = fold_or_catch ~not_found:empty_content res in + c.size + + let is_member t key = + let+ size = size t key in + if size = 0 then false else true + + let get t key = + let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in + let f ~endpoint () = S3.get ~bucket ~credentials ~endpoint ~key () in + let* res = S3.retry ~retries:t.retries ~endpoint ~f () in + fold_or_catch ~not_found:(raise_not_found key) res + + let get_partial_values t key ranges = + let read_range t key (ofs, len) = + let range = match len with + | None -> S3.{first = Some ofs; last = None} + | Some l -> S3.{first = Some ofs; last = Some (ofs + l - 1)} + in + let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in + let f ~endpoint () = S3.get ~bucket ~credentials ~endpoint ~range ~key () in + let* res = S3.retry ~retries:t.retries ~endpoint ~f () in + let+ data = fold_or_catch ~not_found:(raise_not_found key) res in + [data] + in + Deferred.concat_map (read_range t key) ranges + + let set t key data = + let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in + let f ~endpoint () = S3.put ~bucket ~credentials ~endpoint ~data ~key () in + let* res = S3.retry ~retries:t.retries ~endpoint ~f () in + let* _ = fold_or_catch ~not_found:(Fun.const String.empty) res in + Deferred.return_unit + + let set_partial_values t key ?(append=false) rsv = + let* size = size t key in + let* ov = match size with + | 0 -> Deferred.return String.empty + | _ -> get t key + in + let f = if append || ov = String.empty then + fun acc (_, v) -> acc ^ v else + fun acc (rs, v) -> + let s = Bytes.unsafe_of_string acc in + Bytes.blit_string v 0 s rs String.(length v); + Bytes.unsafe_to_string s + in + set t key (List.fold_left f ov rsv) + + let erase t key = + let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in + let f ~endpoint () = S3.delete ~bucket ~credentials ~endpoint ~key () in + let* res = S3.retry ~retries:t.retries ~endpoint ~f () in + fold_or_catch ~not_found:(Fun.const ()) res + + let rec delete_keys t cont () = + let del t xs c = Deferred.iter (delete_content t) xs >>= delete_keys t c in + fold_continuation ~return:() ~more:(del t) cont + + and delete_content t S3.{key; _} = erase t key + + and erase_prefix t prefix = + let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in + let f ~endpoint () = S3.ls ~bucket ~credentials ~endpoint ~prefix () in + let* res = S3.retry ~retries:t.retries ~endpoint ~f () in + let* xs, rest = fold_or_catch ~not_found:empty_Ls res in + Deferred.iter (delete_content t) xs >>= delete_keys t rest + + let rec list t = + let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in + let f ~endpoint () = S3.ls ~bucket ~credentials ~endpoint () in + let* res = S3.retry ~retries:t.retries ~endpoint ~f () in + let* xs, rest = fold_or_catch ~not_found:empty_Ls res in + accumulate_keys (List.map content_key xs) rest + + and content_key S3.{key; _} = key + + and accumulate_keys acc cont = + let append acc xs c = accumulate_keys (acc @ List.map content_key xs) c in + fold_continuation ~return:acc ~more:(append acc) cont + + module S = Set.Make(String) + + let rec partition_keys prefix ((l, r) as acc) cont = + let split ~acc ~prefix xs c = partition_keys prefix (List.fold_left (add prefix) acc xs) c in + fold_continuation ~return:(l, S.elements r) ~more:(split ~acc ~prefix) cont + + and add prefix (l, r) (c : S3.content) = + let size = String.length prefix in + if not (String.contains_from c.key size '/') then c.key :: l, r else + l, S.add String.(sub c.key 0 @@ 1 + index_from c.key size '/') r + + and list_dir t prefix = + let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in + let f ~endpoint () = S3.ls ~bucket ~credentials ~endpoint ~prefix () in + let* res = S3.retry ~retries:t.retries ~endpoint ~f () in + let* xs, rest = fold_or_catch ~not_found:empty_Ls res in + let init = List.fold_left (add prefix) ([], S.empty) xs in + partition_keys prefix init rest + + let rec rename t prefix new_prefix = + let upload t (k, v) = set t k v in + let* xs = list t in + let to_delete = List.filter (String.starts_with ~prefix) xs in + let* data = Deferred.fold_left (rename_and_add ~t ~prefix ~new_prefix) [] to_delete in + let* () = Deferred.iter (upload t) data in + Deferred.iter (erase t) to_delete + + and rename_and_add ~t ~prefix ~new_prefix acc k = + let l = String.length prefix in + let k' = new_prefix ^ String.sub k l (String.length k - l) in + let+ a = get t k in (k', a) :: acc + end + + let with_open ?(scheme=`Http) ?(inet=`V4) ?(retries=3) ~region ~bucket ~profile f = + let* res = Credentials.Helper.get_credentials ~profile () in + let cred = Result.fold ~ok:Fun.id ~error:raise res in + let endpoint = Aws_s3.Region.endpoint ~inet ~scheme region in + f IO.{bucket; cred; endpoint; retries} + + include Zarr.Storage.Make(IO) +end diff --git a/zarr-lwt/src/storage.mli b/zarr-lwt/src/storage.mli index fc8eed67..4326e6de 100644 --- a/zarr-lwt/src/storage.mli +++ b/zarr-lwt/src/storage.mli @@ -20,3 +20,35 @@ module FilesystemStore : sig @raise Failure if [dir] is not a Zarr store path. *) end + +(** An Lwt-aware Amazon S3 bucket storage backend for a Zarr V3 hierarchy. *) +module AmazonS3Store : sig + exception Request_failed of Aws_s3_lwt.S3.error + + include Zarr.Storage.STORE with module Deferred = Deferred + + val with_open : + ?scheme:[ `Http | `Https ] -> + ?inet:[ `V4 | `V6 ] -> + ?retries:int -> + region:Aws_s3.Region.t -> + bucket:string -> + profile:string -> + (t -> 'a Lwt.t) -> + 'a Lwt.t + (** [with_open ~region ~bucket ~profile f] opens an S3 bucket store with + bucket name [bucket] at region [region] using credentials specified by + profile [profile]. The credentials are read locally from a [~/.aws/credentials] + file or from an IAM service if the profile or file is not available. + Function [f] is applied to the store's open handle and its output is + returned to the caller. + + {ul + {- [scheme] is the HTTP scheme to use when connecting to S3, and must be + one of [`Http | `Https]. Defaults to [`Http].} + {- [inet] is the IP version and must be one of [`V4 | `V6]. Defaults to [`V4].} + {- [retries] is the number of times to retry a request should it return an error.} + } + + @raise Request_failed if an error occurs while sending a request to the S3 service. *) +end diff --git a/zarr-lwt/test/test_lwt.ml b/zarr-lwt/test/test_lwt.ml index e3bf3f98..cf0b8932 100644 --- a/zarr-lwt/test/test_lwt.ml +++ b/zarr-lwt/test/test_lwt.ml @@ -141,11 +141,18 @@ let _ = (Zarr.Storage.Not_a_filesystem_store fn) (fun () -> FilesystemStore.open_store fn); + (* ZipStore configuration *) let zpath = tmp_dir ^ ".zip" in + (* AmazonS3Store configuration *) + let region = Aws_s3.Region.minio ~port:9000 ~host:"localhost" () + and bucket = "test-bucket-lwt" + and profile = "default" in + Lwt_main.run @@ Lwt.join [ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z) (* test just opening the now exisitant archive created by the previous test. *) ;ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit) + ;AmazonS3Store.with_open ~region ~bucket ~profile (test_storage (module AmazonS3Store)) ;test_storage (module MemoryStore) @@ MemoryStore.create () ;test_storage (module FilesystemStore) s]) ])