Skip to content

Commit

Permalink
Support prefixing the store inside a bucket. Delete in batches of 1000.
Browse files Browse the repository at this point in the history
  • Loading branch information
whilo committed Mar 30, 2023
1 parent b0a10ff commit d98bdf5
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 22 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ For asynchronous execution take a look at the [konserve example](https://github.

(def s3-spec
{:region "us-west-1"
:bucket "konserve-demo"})
:bucket "konserve-demo"
:store-id "test-store" ;; allows multiple stores per bucket
})

(def store (connect-s3-store s3-spec :opts {:sync? true}))

Expand All @@ -40,7 +42,7 @@ For asynchronous execution take a look at the [konserve example](https://github.

(k/bassoc store :binbar (byte-array (range 10)) {:sync? true})
(k/bget store :binbar (fn [{:keys [input-stream]}]
(map byte (slurp input-stream)))
(map byte (slurp input-stream)))
{:sync? true})

```
Expand Down
49 changes: 30 additions & 19 deletions src/konserve_s3/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
ListObjectsRequest ListObjectsResponse
GetObjectRequest GetObjectResponse
PutObjectRequest PutObjectRequest
CopyObjectRequest DeleteObjectRequest HeadObjectRequest
CopyObjectRequest Delete DeleteObjectRequest DeleteObjectsRequest HeadObjectRequest
NoSuchBucketException NoSuchKeyException]
[software.amazon.awssdk.core.sync RequestBody]))

Expand Down Expand Up @@ -124,13 +124,22 @@
(.key key)
(.build))))

(def ^:const default-bucket "konserve")
(defn delete-keys [client bucket keys]
(.deleteObject client (-> (DeleteObjectsRequest/builder)
(.bucket bucket)
(.delete (-> (Delete/builder)
(.objects keys)
(.build)))
(.build))))

(extend-protocol PBackingLock
Boolean
(-release [_ env]
(if (:sync? env) nil (go-try- nil))))

(defn ->key [store-id key]
(str store-id "_" key))

(defrecord S3Blob [bucket key data fetched-object]
PBackingBlob
(-sync [_ env]
Expand Down Expand Up @@ -192,25 +201,25 @@
(async+sync (:sync? env) *default-sync-translation*
(go-try- (swap! data assoc :value blob)))))

(defrecord S3Bucket [client bucket]
(defrecord S3Bucket [client bucket store-id]
PBackingStore
(-create-blob [this store-key env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (S3Blob. this store-key (atom {}) (atom nil)))))
(go-try- (S3Blob. this (->key store-id store-key) (atom {}) (atom nil)))))
(-delete-blob [_ store-key env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (delete client bucket store-key))))
(go-try- (delete client bucket (->key store-id store-key)))))
(-blob-exists? [_ store-key env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (exists? client bucket store-key))))
(go-try- (exists? client bucket (->key store-id store-key)))))
(-copy [_ from to env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (copy client bucket from to))))
(go-try- (copy client bucket (->key store-id from) (->key store-id to)))))
(-atomic-move [_ from to env]
(async+sync (:sync? env) *default-sync-translation*
(go-try-
(copy client bucket from to)
(delete client bucket from))))
(copy client bucket (->key store-id from) (->key store-id to))
(delete client bucket (->key store-id from)))))
(-migratable [_ _key _store-key env]
(if (:sync? env) nil (go-try- nil)))
(-migrate [_ _migration-key _key-vec _serializer _read-handlers _write-handlers env]
Expand All @@ -226,21 +235,22 @@
(async+sync (:sync? env) *default-sync-translation*
(go-try- (when (bucket-exists? client bucket)
(info "This will delete all konserve files, but won't delete the bucket. You can use konserve-s3.core/delete-bucket if you intend to delete the bucket as well.")
(doseq [key (filter (fn [^String key]
(or (.endsWith key ".ksv")
(.endsWith key ".ksv.new")
(.endsWith key ".ksv.backup")))
(list-objects client bucket))]
(delete client bucket key))
(doseq [keys (partition 1000 (list-objects client bucket))]
(delete-keys client bucket keys))
(.close client)))))
(-keys [_ env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (list-objects client bucket)))))
(go-try- (filter (fn [^String key]
(and (.startsWith key store-id)
(or (.endsWith key ".ksv")
(.endsWith key ".ksv.new")
(.endsWith key ".ksv.backup"))))
(list-objects client bucket))))))

(defn connect-store [s3-spec & {:keys [opts]
:as params}]
(let [complete-opts (merge {:sync? true} opts)
backing (S3Bucket. (s3-client s3-spec) (:bucket s3-spec))
backing (S3Bucket. (s3-client s3-spec) (:bucket s3-spec) (:store-id s3-spec))
config (merge {:opts complete-opts
:config {:sync-blob? true
:in-place? false
Expand All @@ -258,15 +268,16 @@

(defn delete-store [s3-spec & {:keys [opts]}]
(let [complete-opts (merge {:sync? true} opts)
backing (S3Bucket. (s3-client s3-spec) (:bucket s3-spec))]
backing (S3Bucket. (s3-client s3-spec) (:bucket s3-spec) (:store-id s3-spec))]
(-delete-store backing complete-opts)))

(comment

(require '[konserve.core :as k])

(def s3-spec {:region "us-west-1"
:bucket "konserve-s3-test2"
:bucket "konserve-s3"
:store-id "test2"
:x-ray? true
:access-key "ACCESS_KEY"
:password "SECRET"})
Expand Down
3 changes: 2 additions & 1 deletion test/konserve_s3/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
[konserve.compliance-test :refer [compliance-test]]
[konserve-s3.core :refer [connect-store release delete-store]]))

(def s3-spec {:region "us-west-1"})
(def s3-spec {:region "us-west-1"
:store-id "test-store"})

(deftest s3-compliance-sync-test
(let [s3-spec (assoc s3-spec :bucket "konserve-s3-sync-test")
Expand Down

0 comments on commit d98bdf5

Please sign in to comment.