From 6fc6034e314a1dabc5e743e0b94624daa81d770a Mon Sep 17 00:00:00 2001 From: jonasseglare <5850088+jonasseglare@users.noreply.github.com> Date: Fri, 27 Sep 2024 19:55:48 +0200 Subject: [PATCH] Make sure that the datoms of history are distinct (#706) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Make sure that the datoms of history are distinct * FIXUP * FIXUP merge sorted sequences of distinct datoms into a new sorted sequence of distinct datoms --------- Co-authored-by: Jonas Östlund --- src/datahike/datom.cljc | 12 +++ src/datahike/db/search.cljc | 88 ++++++++++++------- src/datahike/db/utils.cljc | 38 ++++++-- src/datahike/index/persistent_set.cljc | 14 +-- src/datahike/tools.cljc | 32 +++++++ test/datahike/test/api_test.cljc | 15 ++++ .../test/attribute_refs/datoms_test.cljc | 30 +++++++ test/datahike/test/migrate_test.clj | 20 ++++- test/datahike/test/tools_test.clj | 36 ++++++++ 9 files changed, 227 insertions(+), 58 deletions(-) diff --git a/src/datahike/datom.cljc b/src/datahike/datom.cljc index 856d569e3..8d6690864 100644 --- a/src/datahike/datom.cljc +++ b/src/datahike/datom.cljc @@ -347,3 +347,15 @@ (fn [[e a v t]] (datom e a v t)) coll)) + +(defn index-type->cmp-quick + ([index-type] (index-type->cmp-quick index-type true)) + ([index-type current?] (if current? + (case index-type + :aevt cmp-datoms-aevt-quick + :avet cmp-datoms-avet-quick + cmp-datoms-eavt-quick) + (case index-type + :aevt cmp-temporal-datoms-aevt-quick + :avet cmp-temporal-datoms-avet-quick + cmp-temporal-datoms-eavt-quick)))) diff --git a/src/datahike/db/search.cljc b/src/datahike/db/search.cljc index 870c6da11..4789d374b 100644 --- a/src/datahike/db/search.cljc +++ b/src/datahike/db/search.cljc @@ -166,38 +166,40 @@ (boolean tx) (boolean indexed?))))) -(defn current-search-strategy [db pattern] +(defn- resolve-db-index [strategy index-map] + (when strategy + (assoc strategy + :db-index + (get index-map (:index-key strategy))))) + +(defn- indexing-for-pattern? [db pattern] (let [[_ a _ _] pattern] - (when-let [strategy (get-search-strategy pattern - (dbu/indexing? db a) - false)] - (update strategy :index-key #(get db %))))) - -(defn temporal-search-strategy [db pattern] - (let [[_ a _ _ _] pattern] - (when-let [strategy (get-search-strategy - pattern - (dbu/indexing? db a) - true)] - (update strategy :index-key #(case % - :eavt (:temporal-eavt db) - :aevt (:temporal-aevt db) - :avet (:temporal-avet db) - nil))))) + (dbu/indexing? db a))) + +(defn search-strategy-with-index [db pattern temporal?] + (-> pattern + (get-search-strategy (indexing-for-pattern? db pattern) + temporal?) + (resolve-db-index (if temporal? + {:eavt (:temporal-eavt db) + :aevt (:temporal-aevt db) + :avet (:temporal-avet db)} + db)))) (defn search-current-indices ([db pattern] (memoize-for db [:search pattern] - #(if-let [{:keys [index-key lookup-fn]} (current-search-strategy db pattern)] - (lookup-fn index-key pattern) + #(if-let [{:keys [db-index lookup-fn]} + (search-strategy-with-index db pattern false)] + (lookup-fn db-index pattern) []))) ;; For batches ([db pattern batch-fn] - (if-let [{:keys [index-key strategy-vec backend-fn]} - (current-search-strategy db pattern)] - (batch-fn strategy-vec (backend-fn index-key) identity) + (if-let [{:keys [db-index strategy-vec backend-fn]} + (search-strategy-with-index db pattern false)] + (batch-fn strategy-vec (backend-fn db-index) identity) []))) (defn added? [[_ _ _ _ added]] @@ -219,30 +221,45 @@ ([db pattern] (validate-pattern pattern false) (memoize-for db [:temporal-search pattern] - #(if-let [{:keys [index-key lookup-fn]} - (temporal-search-strategy db pattern)] - (let [result (lookup-fn index-key pattern)] + #(if-let [{:keys [db-index lookup-fn]} + (search-strategy-with-index + db pattern true)] + (let [result (lookup-fn db-index pattern)] (filter-by-added pattern result)) []))) ([db pattern batch-fn] (validate-pattern pattern true) - (if-let [{:keys [index-key strategy-vec backend-fn]} - (temporal-search-strategy db pattern)] - (batch-fn strategy-vec (backend-fn index-key) + (if-let [{:keys [db-index strategy-vec backend-fn]} + (search-strategy-with-index + db pattern true)] + (batch-fn strategy-vec (backend-fn db-index) (filter-by-added pattern)) []))) +(defn index-type [db pattern] + (let [idx (indexing-for-pattern? db pattern) + a (:index-key (get-search-strategy pattern idx false)) + b (:index-key (get-search-strategy pattern idx true))] + (assert (or (nil? a) (keyword? a))) + (assert (or (nil? b) (keyword? b))) + (when (= a b) + a))) + (defn temporal-search ([db pattern] (validate-pattern pattern false) - (dbu/distinct-datoms db - (search-current-indices db pattern) - (search-temporal-indices db pattern))) + (dbu/distinct-datoms + db + (index-type db pattern) + (search-current-indices db pattern) + (search-temporal-indices db pattern))) ([db pattern batch-fn] (validate-pattern pattern true) - (dbu/distinct-datoms db - (search-current-indices db pattern batch-fn) - (search-temporal-indices db pattern batch-fn)))) + (dbu/distinct-datoms + db + (index-type db pattern) + (search-current-indices db pattern batch-fn) + (search-temporal-indices db pattern batch-fn)))) (defn temporal-seek-datoms [db index-type cs] (let [index (get db index-type) @@ -250,6 +267,7 @@ from (dbu/components->pattern db index-type cs e0 tx0) to (datom emax nil nil txmax)] (dbu/distinct-datoms db + index-type (di/-slice index from to index-type) (di/-slice temporal-index from to index-type)))) @@ -260,6 +278,7 @@ to (datom emax nil nil txmax)] (concat (-> (dbu/distinct-datoms db + index-type (di/-slice index from to index-type) (di/-slice temporal-index from to index-type)) vec @@ -272,6 +291,7 @@ (let [from (dbu/resolve-datom current-db nil attr start nil e0 tx0) to (dbu/resolve-datom current-db nil attr end nil emax txmax)] (dbu/distinct-datoms db + :avet (di/-slice (:avet db) from to :avet) (di/-slice (:temporal-avet db) from to :avet)))) diff --git a/src/datahike/db/utils.cljc b/src/datahike/db/utils.cljc index 59baebb7e..fb589403d 100644 --- a/src/datahike/db/utils.cljc +++ b/src/datahike/db/utils.cljc @@ -3,11 +3,12 @@ [clojure.data] [clojure.walk] [datahike.constants :refer [e0 tx0 emax txmax]] - [datahike.datom :refer [datom datom-tx]] + [datahike.datom :refer [datom datom-tx index-type->cmp-quick]] [datahike.db.interface :as dbi] [datahike.index :as di] [datahike.schema :as ds] - [datahike.tools :refer [raise]]) + [datahike.tools :refer [raise merge-distinct-sorted-seqs + distinct-sorted-seq?]]) #?(:cljs (:require-macros [datahike.datom :refer [datom]] [datahike.tools :refer [raise]])) #?(:clj (:import [datahike.datom Datom]))) @@ -202,13 +203,31 @@ :aevt (resolve-datom db c1 c0 c2 c3 default-e default-tx) :avet (resolve-datom db c2 c0 c1 c3 default-e default-tx))) -(defn distinct-datoms [db current-datoms history-datoms] - (if (dbi/-keep-history? db) - (concat (filter #(or (no-history? db (:a %)) - (multival? db (:a %))) - current-datoms) - history-datoms) - current-datoms)) +(defn merge-datoms [index-type a b] + (if index-type + (merge-distinct-sorted-seqs + (index-type->cmp-quick index-type false) + a b) + (concat a (lazy-seq (remove (set a) b))))) + +(defn distinct-sorted-datoms? [index-type datoms] + (when index-type + (distinct-sorted-seq? + (index-type->cmp-quick index-type false) + datoms))) + +(defn distinct-datoms + ([db index-type current-datoms history-datoms] + (if (dbi/-keep-history? db) + (merge-datoms + index-type + (filter (fn [datom] + (let [a (:a datom)] + (or (no-history? db a) + (multival? db a)))) + current-datoms) + history-datoms) + current-datoms))) (defn temporal-datoms [db index-type cs] (let [index (get db index-type) @@ -216,6 +235,7 @@ from (components->pattern db index-type cs e0 tx0) to (components->pattern db index-type cs emax txmax)] (distinct-datoms db + index-type (di/-slice index from to index-type) (di/-slice temporal-index from to index-type)))) diff --git a/src/datahike/index/persistent_set.cljc b/src/datahike/index/persistent_set.cljc index 76e9e4184..234985283 100644 --- a/src/datahike/index/persistent_set.cljc +++ b/src/datahike/index/persistent_set.cljc @@ -3,7 +3,7 @@ [me.tonsky.persistent-sorted-set.arrays :as arrays] [clojure.core.cache :as cache] [clojure.core.cache.wrapped :as wrapped] - [datahike.datom :as dd] + [datahike.datom :as dd :refer [index-type->cmp-quick]] [datahike.constants :refer [tx0 txmax]] [datahike.index.interface :as di :refer [IIndex]] [datahike.tools :as dt] @@ -16,18 +16,6 @@ [me.tonsky.persistent_sorted_set PersistentSortedSet IStorage Leaf Branch ANode Settings] [java.util List]))) -(defn index-type->cmp-quick - ([index-type] (index-type->cmp-quick index-type true)) - ([index-type current?] (if current? - (case index-type - :aevt dd/cmp-datoms-aevt-quick - :avet dd/cmp-datoms-avet-quick - dd/cmp-datoms-eavt-quick) - (case index-type - :aevt dd/cmp-temporal-datoms-aevt-quick - :avet dd/cmp-temporal-datoms-avet-quick - dd/cmp-temporal-datoms-eavt-quick)))) - (def index-type->kwseq {:eavt [:e :a :v :tx :added] :aevt [:a :e :v :tx :added] diff --git a/src/datahike/tools.cljc b/src/datahike/tools.cljc index ef1783770..20256056e 100644 --- a/src/datahike/tools.cljc +++ b/src/datahike/tools.cljc @@ -268,3 +268,35 @@ acc-inds mask)))))) +(defn distinct-sorted-seq? [cmp s] + (if (empty? s) + true + (loop [previous (first s) + s (rest s)] + (if (empty? s) + true + (let [x (first s)] + (if (neg? (cmp previous x)) + (recur x (rest s)) + false)))))) + +(defn merge-distinct-sorted-seqs + "Takes a comparator function `cmp` and two sequences `seq-a` and `seq-b` that are both distinct and sorted by `cmp`. Then combines the elements from both sequences to form a new sorted sequence that is distinct. The function distinct-sorted-seq? must return true for all input sequences and the result will also be a sequence for which this function returns true." + [cmp seq-a seq-b] + (cond + (empty? seq-a) seq-b + (empty? seq-b) seq-a + :else + (let [a (first seq-a) + b (first seq-b) + i (cmp a b)] + (cond + (< i 0) (cons + a (lazy-seq + (merge-distinct-sorted-seqs cmp (rest seq-a) seq-b))) + (= i 0) (cons + a (lazy-seq + (merge-distinct-sorted-seqs cmp (rest seq-a) (rest seq-b)))) + :else (cons + b (lazy-seq + (merge-distinct-sorted-seqs cmp seq-a (rest seq-b)))))))) diff --git a/test/datahike/test/api_test.cljc b/test/datahike/test/api_test.cljc index 2cf99029e..d0570131e 100644 --- a/test/datahike/test/api_test.cljc +++ b/test/datahike/test/api_test.cljc @@ -874,3 +874,18 @@ (deftest test-metrics-attr-refs (test-metrics (assoc metrics-base-cfg :attribute-refs? true))) + +(deftest test-no-history-datoms-in-empty-db + (testing "When attribute-refs? is false, there are no initial datoms" + (utils/with-connect [conn (-> {:store {:backend :mem + :id "hashing"} + :keep-history? true + :schema-flexibility :write} + utils/provide-unique-id + utils/recreate-database)] + (is (-> @conn + d/history + (d/datoms {:index :aevt + :components [] + :limit -1}) + empty?))))) diff --git a/test/datahike/test/attribute_refs/datoms_test.cljc b/test/datahike/test/attribute_refs/datoms_test.cljc index edcf370a6..d20ce280c 100644 --- a/test/datahike/test/attribute_refs/datoms_test.cljc +++ b/test/datahike/test/attribute_refs/datoms_test.cljc @@ -108,3 +108,33 @@ (:e datom) (:v datom) (:tx datom)]}))))))) + +(deftest test-distinct-datoms-history-db + (with-connect [conn (-> cfg + provide-unique-id + recreate-database)] + (is (= 1 (-> @conn + d/history + (d/datoms {:index :aevt + :components [:db/txInstant] + :limit -1}) + count))))) + +(deftest test-history-datoms-in-empty-db + (testing "Datoms in history of empty database" + (with-connect [conn (-> cfg + provide-unique-id + recreate-database)] + (let [datoms (-> @conn + d/history + (d/datoms {:index :aevt + :components [] + :limit -1})) + ident-datom (some (fn [datom] + (when (and (:a datom) (:e datom) + (= :db/ident (:v datom))) + datom)) + datoms)] + (is (seq datoms)) + (is (apply distinct? datoms)) + (is ident-datom))))) diff --git a/test/datahike/test/migrate_test.clj b/test/datahike/test/migrate_test.clj index 35b5c6475..e96495624 100644 --- a/test/datahike/test/migrate_test.clj +++ b/test/datahike/test/migrate_test.clj @@ -3,6 +3,7 @@ [datahike.api :as d] [datahike.datom :as datom] [datahike.migrate :as m] + [datahike.db.utils :as dbu] [datahike.test.utils :as utils])) (def tx-data [[:db/add 1 :db/cardinality :db.cardinality/one 536870913 true] @@ -231,9 +232,23 @@ [[:db/retractEntity [:name "Alice"]]]] _ (doseq [tx-data txs] (d/transact source-conn {:tx-data tx-data})) - export-data (->> (d/datoms (d/history @source-conn) :eavt) + datoms-to-export (d/datoms (d/history @source-conn) :eavt) + + ;; The datoms to export must primarily + ;; be sorted by transaction entity id + ;; and secondarily so that datoms with attribute `:db/txInstant` + ;; come before other datoms + export-data (->> datoms-to-export (map (comp vec seq)) - (sort-by #(nth % 3)) + (sort-by (fn [[e a v tx]] + [tx + + ;; TODO: It seems as if :db/txInstant + ;; datoms must come first. Is this a bug + ;; in load-entities? + (case a + :db/txInstant 0 + 1)])) (into [])) target-cfg (-> source-cfg (assoc-in [:store :id] "load-entities-history-test-target") @@ -249,6 +264,7 @@ :where [?e :name ?n ?t ?op]] (d/history @conn)))] + (is (dbu/distinct-sorted-datoms? :eavt datoms-to-export)) (is (= (current-q source-conn) (current-q target-conn))) (is (= (history-q source-conn) diff --git a/test/datahike/test/tools_test.clj b/test/datahike/test/tools_test.clj index 8f4ceccec..b208a9201 100644 --- a/test/datahike/test/tools_test.clj +++ b/test/datahike/test/tools_test.clj @@ -96,3 +96,39 @@ (wrap-range-tree [1]))) (is (= [:inds [0 2] :mask [0 nil 1]] (wrap-range-tree [0 2])))) + +(deftest merge-distinct-sorted-seqs-test + (testing "Custom comparator" + (let [m {:one 1 + :two 2 + :three 3 + :four 4 + :five 5 + :six 6 + :seven 7 + :eight 8 + :nine 9 + :ten 10} + cmp (fn [a b] (compare (m a) (m b)))] + (is (= [] (dt/merge-distinct-sorted-seqs cmp [] []))) + (is (= [:one] (dt/merge-distinct-sorted-seqs cmp [:one] []))) + (is (= [:one] (dt/merge-distinct-sorted-seqs cmp [:one] [:one]))) + (is (= [:one] (dt/merge-distinct-sorted-seqs cmp [] [:one]))) + (is (= [:one :two] (dt/merge-distinct-sorted-seqs cmp [:two] [:one]))) + (is (= [:one :two :three :four :five :nine :ten] + (dt/merge-distinct-sorted-seqs cmp + [:one :two :three :nine :ten] + [:two :three :four :five]))) + (is (dt/distinct-sorted-seq? cmp [])) + (is (dt/distinct-sorted-seq? cmp [:one])) + (is (dt/distinct-sorted-seq? cmp [:one :two])) + (is (dt/distinct-sorted-seq? cmp [:one :two :three])) + (is (not (dt/distinct-sorted-seq? cmp [:one :two :three :three]))) + (is (not (dt/distinct-sorted-seq? cmp [:one :one :two :three]))) + (is (not (dt/distinct-sorted-seq? cmp [:one :two :two :three]))) + (is (not (dt/distinct-sorted-seq? cmp [:one :two :three :five :four]))))) + (testing "Infinite length sequences" + (let [evens (iterate #(+ 2 %) 0) + odds (iterate #(+ 2 %) 1) + result (dt/merge-distinct-sorted-seqs compare odds evens)] + (is (= (range 1000) (take 1000 result))))))