Skip to content

Commit

Permalink
Lazy scan
Browse files Browse the repository at this point in the history
  • Loading branch information
joelittlejohn committed Apr 2, 2024
1 parent ffdc943 commit 31e863f
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 0 deletions.
20 changes: 20 additions & 0 deletions src/taoensso/faraday.clj
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,26 @@
result (merge-more run1 span-reqs (run1 last-prim-kvs))]
(with-meta (get result table) (meta result))))

(defn scan-lazy-seq
"Returns a lazy sequence of items from table. Requests are stitched together to
produce a continuous sequence, with additional requests occurring only when
needed.
See scan for supported options. :span-reqs is ignored."
[client-opts table {:keys [limit] :as opts}]
(lazy-seq
(let [result (scan client-opts table (assoc opts :span-reqs {:max 0}))
{:keys [last-prim-kvs]} (meta result)
result-seq (if last-prim-kvs
(lazy-cat result
(scan-lazy-seq client-opts table (-> opts
(assoc :last-prim-kvs last-prim-kvs)
(dissoc :limit))))
result)]
(if limit
(take limit result-seq)
result-seq))))

(defn scan-parallel
"Like `scan` but starts a number of worker threads and automatically handles
parallel scan options (:total-segments and :segment). Returns a vector of
Expand Down
93 changes: 93 additions & 0 deletions test/taoensso/faraday/tests/main.clj
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,99 @@
:return #{:id}})
set count)])))))

(deftest scan
(let [num-items 50]
(doseq [batch (partition 25 (range num-items))]
(far/batch-write-item *client-opts*
{bulk-table {:delete (map (fn [i] {:group "group" :id i}) batch)}}))

(let [long-text (apply str (repeatedly 300000 (constantly "n")))]
(doseq [i (range num-items)]
(far/put-item *client-opts* bulk-table {:group "group"
:id i
:text long-text})))

(testing "scan returns the first page"
(is (= 4 (-> (far/scan *client-opts*
bulk-table
{:consistent? true
:return #{:id}})
set count))))

(testing "scan returns the first and second page"
(is (= 8 (-> (far/scan *client-opts*
bulk-table
{:consistent? true
:span-reqs {:max 2}
:return #{:id}})
set count))))))

(deftest scan-lazy-seq
(let [num-items 50]

(doseq [batch (partition 25 (range num-items))]
(far/batch-write-item *client-opts*
{bulk-table {:delete (map (fn [i] {:group "group" :id i}) batch)}}))

(let [long-text (apply str (repeatedly 300000 (constantly "n")))]
(doseq [i (range num-items)]
(far/put-item *client-opts* bulk-table {:group "group"
:id i
:text long-text})))

(testing "scan-lazy-seq returns all items"
(is (= (set (for [i (range num-items)] {:id i}))
(set (far/scan-lazy-seq *client-opts*
bulk-table
{:consistent? true
:return #{:id}})))))

(testing "scan-lazy-seq returns at most :limit items"
(is (= 35
(count (far/scan-lazy-seq *client-opts*
bulk-table
{:consistent? true
:limit 35
:return #{:id}})))))

(testing "scan-lazy-seq can skip items"
(is (= (set (for [i (range 10 num-items)] {:id i}))
(set (far/scan-lazy-seq *client-opts*
bulk-table
{:consistent? true
:last-prim-kvs {:id 9 :group "group"}
:return #{:id}})))))

(testing "scan-lazy-seq is not eager"
(let [calls (atom 0)
client (proxy [AmazonDynamoDBClient] [(BasicAWSCredentials. (:access-key *client-opts*)
(:secret-key *client-opts*))]
(scan [scan-request]
(swap! calls inc)
(proxy-super scan scan-request)))]
(.setEndpoint client (:endpoint *client-opts*))

;; unconsumed seq
(far/scan-lazy-seq (assoc *client-opts* :client client)
bulk-table
{:consistent? true
:return #{:id}})
(is (= 0 @calls))

;; one 'page' of data, since pages contain 4 items
(doall (take 3 (far/scan-lazy-seq (assoc *client-opts* :client client)
bulk-table
{:consistent? true
:return #{:id}})))
(is (= 1 @calls))

;; two 'pages' of data, since pages contain 4 items
(doall (take 6 (far/scan-lazy-seq (assoc *client-opts* :client client)
bulk-table
{:consistent? true
:return #{:id}})))
(is (= 3 @calls))))))

(deftest updating-items
(let [i {:id 10 :name "update me"}]
(far/delete-item *client-opts* ttable {:id 10})
Expand Down

0 comments on commit 31e863f

Please sign in to comment.