From 31e863f317a65076bfb3c9ba788833cb49caa5cd Mon Sep 17 00:00:00 2001 From: Joe Littlejohn Date: Sat, 5 Aug 2023 18:23:33 +0100 Subject: [PATCH] Lazy scan --- src/taoensso/faraday.clj | 20 ++++++ test/taoensso/faraday/tests/main.clj | 93 ++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+) diff --git a/src/taoensso/faraday.clj b/src/taoensso/faraday.clj index e5639e9..fe4fb37 100644 --- a/src/taoensso/faraday.clj +++ b/src/taoensso/faraday.clj @@ -1532,6 +1532,26 @@ result (merge-more run1 span-reqs (run1 last-prim-kvs))] (with-meta (get result table) (meta result)))) +(defn scan-lazy-seq + "Returns a lazy sequence of items from table. Requests are stitched together to + produce a continuous sequence, with additional requests occurring only when + needed. + + See scan for supported options. :span-reqs is ignored." + [client-opts table {:keys [limit] :as opts}] + (lazy-seq + (let [result (scan client-opts table (assoc opts :span-reqs {:max 0})) + {:keys [last-prim-kvs]} (meta result) + result-seq (if last-prim-kvs + (lazy-cat result + (scan-lazy-seq client-opts table (-> opts + (assoc :last-prim-kvs last-prim-kvs) + (dissoc :limit)))) + result)] + (if limit + (take limit result-seq) + result-seq)))) + (defn scan-parallel "Like `scan` but starts a number of worker threads and automatically handles parallel scan options (:total-segments and :segment). Returns a vector of diff --git a/test/taoensso/faraday/tests/main.clj b/test/taoensso/faraday/tests/main.clj index 6e8d645..56d22d7 100644 --- a/test/taoensso/faraday/tests/main.clj +++ b/test/taoensso/faraday/tests/main.clj @@ -201,6 +201,99 @@ :return #{:id}}) set count)]))))) +(deftest scan + (let [num-items 50] + (doseq [batch (partition 25 (range num-items))] + (far/batch-write-item *client-opts* + {bulk-table {:delete (map (fn [i] {:group "group" :id i}) batch)}})) + + (let [long-text (apply str (repeatedly 300000 (constantly "n")))] + (doseq [i (range num-items)] + (far/put-item *client-opts* bulk-table {:group "group" + :id i + :text long-text}))) + + (testing "scan returns the first page" + (is (= 4 (-> (far/scan *client-opts* + bulk-table + {:consistent? true + :return #{:id}}) + set count)))) + + (testing "scan returns the first and second page" + (is (= 8 (-> (far/scan *client-opts* + bulk-table + {:consistent? true + :span-reqs {:max 2} + :return #{:id}}) + set count)))))) + +(deftest scan-lazy-seq + (let [num-items 50] + + (doseq [batch (partition 25 (range num-items))] + (far/batch-write-item *client-opts* + {bulk-table {:delete (map (fn [i] {:group "group" :id i}) batch)}})) + + (let [long-text (apply str (repeatedly 300000 (constantly "n")))] + (doseq [i (range num-items)] + (far/put-item *client-opts* bulk-table {:group "group" + :id i + :text long-text}))) + + (testing "scan-lazy-seq returns all items" + (is (= (set (for [i (range num-items)] {:id i})) + (set (far/scan-lazy-seq *client-opts* + bulk-table + {:consistent? true + :return #{:id}}))))) + + (testing "scan-lazy-seq returns at most :limit items" + (is (= 35 + (count (far/scan-lazy-seq *client-opts* + bulk-table + {:consistent? true + :limit 35 + :return #{:id}}))))) + + (testing "scan-lazy-seq can skip items" + (is (= (set (for [i (range 10 num-items)] {:id i})) + (set (far/scan-lazy-seq *client-opts* + bulk-table + {:consistent? true + :last-prim-kvs {:id 9 :group "group"} + :return #{:id}}))))) + + (testing "scan-lazy-seq is not eager" + (let [calls (atom 0) + client (proxy [AmazonDynamoDBClient] [(BasicAWSCredentials. (:access-key *client-opts*) + (:secret-key *client-opts*))] + (scan [scan-request] + (swap! calls inc) + (proxy-super scan scan-request)))] + (.setEndpoint client (:endpoint *client-opts*)) + + ;; unconsumed seq + (far/scan-lazy-seq (assoc *client-opts* :client client) + bulk-table + {:consistent? true + :return #{:id}}) + (is (= 0 @calls)) + + ;; one 'page' of data, since pages contain 4 items + (doall (take 3 (far/scan-lazy-seq (assoc *client-opts* :client client) + bulk-table + {:consistent? true + :return #{:id}}))) + (is (= 1 @calls)) + + ;; two 'pages' of data, since pages contain 4 items + (doall (take 6 (far/scan-lazy-seq (assoc *client-opts* :client client) + bulk-table + {:consistent? true + :return #{:id}}))) + (is (= 3 @calls)))))) + (deftest updating-items (let [i {:id 10 :name "update me"}] (far/delete-item *client-opts* ttable {:id 10})