Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch lazyseq to iteration #1443

Merged
merged 4 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions src/ctia/task/migration/migrate_es_stores.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
(:import java.lang.AssertionError))

(def default-batch-size 100)
(def default-buffer-size 3)

;; TODO def => defn
(def all-types
Expand Down Expand Up @@ -124,12 +123,12 @@
:documents data
:search_after next-search-after)))))

(s/defn read-source ;; WARNING: defining schema output breaks lazyness
(s/defn read-source
"returns a lazy-seq of batch from source store"
[read-params :- (s/maybe BatchParams)]
(lazy-seq
(when-let [batch (read-source-batch read-params)]
(cons batch (read-source (dissoc batch :documents))))))
(iteration read-source-batch
:initk read-params
:kf #(dissoc % :documents)))

(s/defn write-target :- s/Int
"This function writes a batch of documents which are (1) modified with `migrations` functions,
Expand Down Expand Up @@ -175,20 +174,18 @@
(s/defn migrate-query :- BatchParams
"migrate documents that match given `query`"
[{:keys [entity-type
migrated-count
buffer-size]
migrated-count]
:as migration-params} :- BatchParams
query :- ESQuery
services :- mst/MigrationStoreServices]
(log/infof "%s - handling sliced query %s"
(name entity-type)
(pr-str query))
(let [read-params (assoc migration-params :query query)
data-queue (seque buffer-size
(read-source read-params))
data (read-source read-params)
new-migrated-count (reduce #(write-target %1 %2 services)
migrated-count
data-queue)]
data)]
(assoc migration-params
:migrated-count
new-migrated-count)))
Expand All @@ -199,7 +196,6 @@
entity-type
migrations
batch-size
buffer-size
confirm?
services :- mst/MigrationStoreServices]
(log/infof "migrating store: %s" entity-type)
Expand All @@ -216,7 +212,6 @@
base-params {:source-store source-store
:target-store target-store
:migrated-count migrated-count-state
:buffer-size buffer-size
:search_after search_after
:migrations migrations
:entity-type entity-type
Expand Down Expand Up @@ -253,7 +248,6 @@
migrations
store-keys
batch-size
buffer-size
confirm?
restart?]
:as migration-params} :- mst/MigrationParams
Expand All @@ -270,7 +264,6 @@
entity-type
migrations
batch-size
buffer-size
confirm?
services))
(handle-deletes migration-state store-keys batch-size confirm? services)))
Expand Down
4 changes: 2 additions & 2 deletions src/ctia/task/migration/store.clj
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
:migrations [(apply s/enum (keys available-migrations))]
:store-keys [s/Keyword]
:batch-size s/Int
:buffer-size s/Int
:confirm? (s/maybe s/Bool)
:restart? (s/maybe s/Bool)
(s/optional-key :store) {:es {s/Keyword ESStoreProperties}}})
Expand Down Expand Up @@ -430,7 +429,8 @@ Rollover requires refresh so we cannot just call ES with condition since refresh
{"id" sort-order})
params
(merge
{:offset (or offset 0)
{:track_total_hits true
:offset (or offset 0)
:limit batch-size}
(when sort-order
{:sort sort-by})
Expand Down
13 changes: 1 addition & 12 deletions test/ctia/task/migration/migrate_es_stores_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@
:migrations [:identity]
:store-keys [:incident :investigation :malware]
:batch-size 100
:buffer-size 3
:confirm? true
:restart? false}]
(testing "misconfigured migration"
Expand Down Expand Up @@ -204,8 +203,7 @@
"configured default target store shall be considered")))))))

(deftest prepare-params-test
(let [migration-props {:buffer-size 3,
:batch-size 100,
(let [migration-props {:batch-size 100,
:migration-id "migration-test",
:restart? false,
:store-keys "malware, tool,sighting ",
Expand Down Expand Up @@ -249,7 +247,6 @@
:migrations [:__test]
:store-keys store-types
:batch-size 10
:buffer-size 3
:confirm? true
:restart? false}
services)
Expand Down Expand Up @@ -403,7 +400,6 @@
:confirm? true
:migrations [:__test]
:batch-size 1000
:buffer-size 3
:restart? false}
services)
(let [test-docs (take total docs)
Expand Down Expand Up @@ -485,7 +481,6 @@
:migrations [:__test]
:store-keys [:relationship]
:batch-size 100
:buffer-size 3
:confirm? true
:restart? false}
services))
Expand All @@ -505,7 +500,6 @@
:migrations [:__test]
:store-keys [:relationship]
:batch-size 100
:buffer-size 3
:confirm? true
:restart? true}
services))
Expand Down Expand Up @@ -566,7 +560,6 @@
:migrations [:__test]
:store-keys store-types
:batch-size 10
:buffer-size 3
:confirm? true
:restart? false}
services))
Expand Down Expand Up @@ -619,7 +612,6 @@
:migrations [:0.4.16]
:store-keys migrated-store-keys
:batch-size 10
:buffer-size 3
:confirm? false
:restart? false}
services)
Expand All @@ -643,7 +635,6 @@
:migrations [:__test]
:store-keys migrated-store-keys
:batch-size 10
:buffer-size 3
:confirm? true
:restart? false}
services))
Expand Down Expand Up @@ -804,7 +795,6 @@
:store-keys migrated-store-keys
;; small batch to check proper delete paging
:batch-size 2
:buffer-size 1
:confirm? true
:restart? true}
services)
Expand Down Expand Up @@ -855,7 +845,6 @@
:migrations [:__test]
:store-keys (into [] example-types)
:batch-size batch-size
:buffer-size 3
:confirm? true
:restart? false}
services)
Expand Down
2 changes: 0 additions & 2 deletions test/ctia/task/migration/store_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@
:confirm? false
:migrations [:identity]
:batch-size 1000
:buffer-size 3
:restart? false}
services)
wo-stores (sut/wo-storemaps fake-migration)]
Expand Down Expand Up @@ -915,7 +914,6 @@
:migrations [:identity]
:store-keys entity-types
:batch-size 1000
:buffer-size 3
:restart? false}
fake-migration (sut/init-migration (assoc base-migration-params
:migration-id migration-id-1
Expand Down
Loading