Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy scan #166

Merged
merged 1 commit into from
Jun 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/taoensso/faraday.clj
Original file line number Diff line number Diff line change
Expand Up @@ -1533,6 +1533,26 @@
result (merge-more run1 span-reqs (run1 last-prim-kvs))]
(with-meta (get result table) (meta result))))

(defn scan-lazy-seq
"Returns a lazy sequence of items from table. Requests are stitched together to
produce a continuous sequence, with additional requests occurring only when
needed.

See scan for supported options. :span-reqs is ignored."
[client-opts table {:keys [limit] :as opts}]
(lazy-seq
(let [result (scan client-opts table (assoc opts :span-reqs {:max 0}))
{:keys [last-prim-kvs]} (meta result)
result-seq (if last-prim-kvs
(lazy-cat result
(scan-lazy-seq client-opts table (-> opts
(assoc :last-prim-kvs last-prim-kvs)
(dissoc :limit))))
result)]
(if limit
(take limit result-seq)
result-seq))))

(defn scan-parallel
"Like `scan` but starts a number of worker threads and automatically handles
parallel scan options (:total-segments and :segment). Returns a vector of
Expand Down
96 changes: 96 additions & 0 deletions test/taoensso/faraday/tests/main.clj
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,102 @@
:return #{:id}})
set count)])))))

(deftest scan
(let [num-items 50]
(doseq [batch (partition 25 (range num-items))]
(far/batch-write-item *client-opts*
{bulk-table {:delete (map (fn [i] {:group "group" :id i}) batch)}}))

(let [long-text (apply str (repeatedly 300000 (constantly "n")))]
(doseq [i (range num-items)]
(far/put-item *client-opts* bulk-table {:group "group"
:id i
:text long-text})))

(testing "scan returns the first page"
(is (= 4 (-> (far/scan *client-opts*
bulk-table
{:consistent? true
:return #{:id}})
set count))))

(testing "scan returns the first and second page"
(is (= 8 (-> (far/scan *client-opts*
bulk-table
{:consistent? true
:span-reqs {:max 2}
:return #{:id}})
set count))))))

(deftest scan-lazy-seq
(let [num-items 50]

(doseq [batch (partition 25 (range num-items))]
(far/batch-write-item *client-opts*
{bulk-table {:delete (map (fn [i] {:group "group" :id i}) batch)}}))

(let [long-text (apply str (repeatedly 300000 (constantly "n")))]
(doseq [i (range num-items)]
(far/put-item *client-opts* bulk-table {:group "group"
:id i
:text long-text})))

(testing "scan-lazy-seq returns all items"
(is (= (set (for [i (range num-items)] {:id i}))
(set (far/scan-lazy-seq *client-opts*
bulk-table
{:consistent? true
:return #{:id}})))))

(testing "scan-lazy-seq returns at most :limit items"
(is (= 35
(count (far/scan-lazy-seq *client-opts*
bulk-table
{:consistent? true
:limit 35
:return #{:id}})))))

(testing "scan-lazy-seq can skip items"
(is (= (set (for [i (range 10 num-items)] {:id i}))
(set (far/scan-lazy-seq *client-opts*
bulk-table
{:consistent? true
:last-prim-kvs {:id 9 :group "group"}
:return #{:id}})))))

(testing "scan-lazy-seq is not eager"
(let [clientfn (fn [counter]
(doto (proxy [AmazonDynamoDBClient] [(BasicAWSCredentials. (:access-key *client-opts*)
(:secret-key *client-opts*))]
(scan [scan-request]
(swap! counter inc)
(proxy-super scan scan-request)))
(.setEndpoint (:endpoint *client-opts*))))]

;; unconsumed seq
(let [calls (atom 0)]
(far/scan-lazy-seq (assoc *client-opts* :client (clientfn calls))
bulk-table
{:consistent? true
:return #{:id}})
(is (= 0 @calls)))

;; one 'page' of data, since pages contain 4 items
(let [calls (atom 0)]
(doall (take 3 (far/scan-lazy-seq (assoc *client-opts* :client (clientfn calls))
bulk-table
{:consistent? true
:return #{:id}})))
(is (= 1 @calls)))

;; two 'pages' of data, since pages contain 4 items
(let [calls (atom 0)]
(doall (take 6 (far/scan-lazy-seq (assoc *client-opts* :client (clientfn calls))
bulk-table
{:consistent? true
:return #{:id}})))
(is (= 2 @calls)))))))

(deftest updating-items
(let [i {:id 10 :name "update me"}]
(far/delete-item *client-opts* ttable {:id 10})
Expand Down