Skip to content

Commit

Permalink
Make sure that the datoms of history are distinct (#706)
Browse files Browse the repository at this point in the history
* Make sure that the datoms of history are distinct

* FIXUP

* FIXUP merge sorted sequences of distinct datoms into a new sorted sequence of distinct datoms

---------

Co-authored-by: Jonas Östlund <[email protected]>
  • Loading branch information
jonasseglare and Jonas Östlund authored Sep 27, 2024
1 parent c88c3af commit 6fc6034
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 58 deletions.
12 changes: 12 additions & 0 deletions src/datahike/datom.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -347,3 +347,15 @@
(fn [[e a v t]]
(datom e a v t))
coll))

(defn index-type->cmp-quick
([index-type] (index-type->cmp-quick index-type true))
([index-type current?] (if current?
(case index-type
:aevt cmp-datoms-aevt-quick
:avet cmp-datoms-avet-quick
cmp-datoms-eavt-quick)
(case index-type
:aevt cmp-temporal-datoms-aevt-quick
:avet cmp-temporal-datoms-avet-quick
cmp-temporal-datoms-eavt-quick))))
88 changes: 54 additions & 34 deletions src/datahike/db/search.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -166,38 +166,40 @@
(boolean tx)
(boolean indexed?)))))

(defn current-search-strategy [db pattern]
(defn- resolve-db-index [strategy index-map]
(when strategy
(assoc strategy
:db-index
(get index-map (:index-key strategy)))))

(defn- indexing-for-pattern? [db pattern]
(let [[_ a _ _] pattern]
(when-let [strategy (get-search-strategy pattern
(dbu/indexing? db a)
false)]
(update strategy :index-key #(get db %)))))

(defn temporal-search-strategy [db pattern]
(let [[_ a _ _ _] pattern]
(when-let [strategy (get-search-strategy
pattern
(dbu/indexing? db a)
true)]
(update strategy :index-key #(case %
:eavt (:temporal-eavt db)
:aevt (:temporal-aevt db)
:avet (:temporal-avet db)
nil)))))
(dbu/indexing? db a)))

(defn search-strategy-with-index [db pattern temporal?]
(-> pattern
(get-search-strategy (indexing-for-pattern? db pattern)
temporal?)
(resolve-db-index (if temporal?
{:eavt (:temporal-eavt db)
:aevt (:temporal-aevt db)
:avet (:temporal-avet db)}
db))))

(defn search-current-indices
([db pattern]
(memoize-for
db [:search pattern]
#(if-let [{:keys [index-key lookup-fn]} (current-search-strategy db pattern)]
(lookup-fn index-key pattern)
#(if-let [{:keys [db-index lookup-fn]}
(search-strategy-with-index db pattern false)]
(lookup-fn db-index pattern)
[])))

;; For batches
([db pattern batch-fn]
(if-let [{:keys [index-key strategy-vec backend-fn]}
(current-search-strategy db pattern)]
(batch-fn strategy-vec (backend-fn index-key) identity)
(if-let [{:keys [db-index strategy-vec backend-fn]}
(search-strategy-with-index db pattern false)]
(batch-fn strategy-vec (backend-fn db-index) identity)
[])))

(defn added? [[_ _ _ _ added]]
Expand All @@ -219,37 +221,53 @@
([db pattern]
(validate-pattern pattern false)
(memoize-for db [:temporal-search pattern]
#(if-let [{:keys [index-key lookup-fn]}
(temporal-search-strategy db pattern)]
(let [result (lookup-fn index-key pattern)]
#(if-let [{:keys [db-index lookup-fn]}
(search-strategy-with-index
db pattern true)]
(let [result (lookup-fn db-index pattern)]
(filter-by-added pattern result))
[])))
([db pattern batch-fn]
(validate-pattern pattern true)
(if-let [{:keys [index-key strategy-vec backend-fn]}
(temporal-search-strategy db pattern)]
(batch-fn strategy-vec (backend-fn index-key)
(if-let [{:keys [db-index strategy-vec backend-fn]}
(search-strategy-with-index
db pattern true)]
(batch-fn strategy-vec (backend-fn db-index)
(filter-by-added pattern))
[])))

(defn index-type [db pattern]
(let [idx (indexing-for-pattern? db pattern)
a (:index-key (get-search-strategy pattern idx false))
b (:index-key (get-search-strategy pattern idx true))]
(assert (or (nil? a) (keyword? a)))
(assert (or (nil? b) (keyword? b)))
(when (= a b)
a)))

(defn temporal-search
([db pattern]
(validate-pattern pattern false)
(dbu/distinct-datoms db
(search-current-indices db pattern)
(search-temporal-indices db pattern)))
(dbu/distinct-datoms
db
(index-type db pattern)
(search-current-indices db pattern)
(search-temporal-indices db pattern)))
([db pattern batch-fn]
(validate-pattern pattern true)
(dbu/distinct-datoms db
(search-current-indices db pattern batch-fn)
(search-temporal-indices db pattern batch-fn))))
(dbu/distinct-datoms
db
(index-type db pattern)
(search-current-indices db pattern batch-fn)
(search-temporal-indices db pattern batch-fn))))

(defn temporal-seek-datoms [db index-type cs]
(let [index (get db index-type)
temporal-index (get db (keyword (str "temporal-" (name index-type))))
from (dbu/components->pattern db index-type cs e0 tx0)
to (datom emax nil nil txmax)]
(dbu/distinct-datoms db
index-type
(di/-slice index from to index-type)
(di/-slice temporal-index from to index-type))))

Expand All @@ -260,6 +278,7 @@
to (datom emax nil nil txmax)]
(concat
(-> (dbu/distinct-datoms db
index-type
(di/-slice index from to index-type)
(di/-slice temporal-index from to index-type))
vec
Expand All @@ -272,6 +291,7 @@
(let [from (dbu/resolve-datom current-db nil attr start nil e0 tx0)
to (dbu/resolve-datom current-db nil attr end nil emax txmax)]
(dbu/distinct-datoms db
:avet
(di/-slice (:avet db) from to :avet)
(di/-slice (:temporal-avet db) from to :avet))))

Expand Down
38 changes: 29 additions & 9 deletions src/datahike/db/utils.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
[clojure.data]
[clojure.walk]
[datahike.constants :refer [e0 tx0 emax txmax]]
[datahike.datom :refer [datom datom-tx]]
[datahike.datom :refer [datom datom-tx index-type->cmp-quick]]
[datahike.db.interface :as dbi]
[datahike.index :as di]
[datahike.schema :as ds]
[datahike.tools :refer [raise]])
[datahike.tools :refer [raise merge-distinct-sorted-seqs
distinct-sorted-seq?]])
#?(:cljs (:require-macros [datahike.datom :refer [datom]]
[datahike.tools :refer [raise]]))
#?(:clj (:import [datahike.datom Datom])))
Expand Down Expand Up @@ -202,20 +203,39 @@
:aevt (resolve-datom db c1 c0 c2 c3 default-e default-tx)
:avet (resolve-datom db c2 c0 c1 c3 default-e default-tx)))

(defn distinct-datoms [db current-datoms history-datoms]
(if (dbi/-keep-history? db)
(concat (filter #(or (no-history? db (:a %))
(multival? db (:a %)))
current-datoms)
history-datoms)
current-datoms))
(defn merge-datoms [index-type a b]
(if index-type
(merge-distinct-sorted-seqs
(index-type->cmp-quick index-type false)
a b)
(concat a (lazy-seq (remove (set a) b)))))

(defn distinct-sorted-datoms? [index-type datoms]
(when index-type
(distinct-sorted-seq?
(index-type->cmp-quick index-type false)
datoms)))

(defn distinct-datoms
([db index-type current-datoms history-datoms]
(if (dbi/-keep-history? db)
(merge-datoms
index-type
(filter (fn [datom]
(let [a (:a datom)]
(or (no-history? db a)
(multival? db a))))
current-datoms)
history-datoms)
current-datoms)))

(defn temporal-datoms [db index-type cs]
(let [index (get db index-type)
temporal-index (get db (keyword (str "temporal-" (name index-type))))
from (components->pattern db index-type cs e0 tx0)
to (components->pattern db index-type cs emax txmax)]
(distinct-datoms db
index-type
(di/-slice index from to index-type)
(di/-slice temporal-index from to index-type))))

Expand Down
14 changes: 1 addition & 13 deletions src/datahike/index/persistent_set.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[me.tonsky.persistent-sorted-set.arrays :as arrays]
[clojure.core.cache :as cache]
[clojure.core.cache.wrapped :as wrapped]
[datahike.datom :as dd]
[datahike.datom :as dd :refer [index-type->cmp-quick]]
[datahike.constants :refer [tx0 txmax]]
[datahike.index.interface :as di :refer [IIndex]]
[datahike.tools :as dt]
Expand All @@ -16,18 +16,6 @@
[me.tonsky.persistent_sorted_set PersistentSortedSet IStorage Leaf Branch ANode Settings]
[java.util List])))

(defn index-type->cmp-quick
([index-type] (index-type->cmp-quick index-type true))
([index-type current?] (if current?
(case index-type
:aevt dd/cmp-datoms-aevt-quick
:avet dd/cmp-datoms-avet-quick
dd/cmp-datoms-eavt-quick)
(case index-type
:aevt dd/cmp-temporal-datoms-aevt-quick
:avet dd/cmp-temporal-datoms-avet-quick
dd/cmp-temporal-datoms-eavt-quick))))

(def index-type->kwseq
{:eavt [:e :a :v :tx :added]
:aevt [:a :e :v :tx :added]
Expand Down
32 changes: 32 additions & 0 deletions src/datahike/tools.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,35 @@
acc-inds
mask))))))

(defn distinct-sorted-seq? [cmp s]
(if (empty? s)
true
(loop [previous (first s)
s (rest s)]
(if (empty? s)
true
(let [x (first s)]
(if (neg? (cmp previous x))
(recur x (rest s))
false))))))

(defn merge-distinct-sorted-seqs
"Takes a comparator function `cmp` and two sequences `seq-a` and `seq-b` that are both distinct and sorted by `cmp`. Then combines the elements from both sequences to form a new sorted sequence that is distinct. The function distinct-sorted-seq? must return true for all input sequences and the result will also be a sequence for which this function returns true."
[cmp seq-a seq-b]
(cond
(empty? seq-a) seq-b
(empty? seq-b) seq-a
:else
(let [a (first seq-a)
b (first seq-b)
i (cmp a b)]
(cond
(< i 0) (cons
a (lazy-seq
(merge-distinct-sorted-seqs cmp (rest seq-a) seq-b)))
(= i 0) (cons
a (lazy-seq
(merge-distinct-sorted-seqs cmp (rest seq-a) (rest seq-b))))
:else (cons
b (lazy-seq
(merge-distinct-sorted-seqs cmp seq-a (rest seq-b))))))))
15 changes: 15 additions & 0 deletions test/datahike/test/api_test.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -874,3 +874,18 @@

(deftest test-metrics-attr-refs
(test-metrics (assoc metrics-base-cfg :attribute-refs? true)))

(deftest test-no-history-datoms-in-empty-db
(testing "When attribute-refs? is false, there are no initial datoms"
(utils/with-connect [conn (-> {:store {:backend :mem
:id "hashing"}
:keep-history? true
:schema-flexibility :write}
utils/provide-unique-id
utils/recreate-database)]
(is (-> @conn
d/history
(d/datoms {:index :aevt
:components []
:limit -1})
empty?)))))
30 changes: 30 additions & 0 deletions test/datahike/test/attribute_refs/datoms_test.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,33 @@
(:e datom)
(:v datom)
(:tx datom)]})))))))

(deftest test-distinct-datoms-history-db
(with-connect [conn (-> cfg
provide-unique-id
recreate-database)]
(is (= 1 (-> @conn
d/history
(d/datoms {:index :aevt
:components [:db/txInstant]
:limit -1})
count)))))

(deftest test-history-datoms-in-empty-db
(testing "Datoms in history of empty database"
(with-connect [conn (-> cfg
provide-unique-id
recreate-database)]
(let [datoms (-> @conn
d/history
(d/datoms {:index :aevt
:components []
:limit -1}))
ident-datom (some (fn [datom]
(when (and (:a datom) (:e datom)
(= :db/ident (:v datom)))
datom))
datoms)]
(is (seq datoms))
(is (apply distinct? datoms))
(is ident-datom)))))
20 changes: 18 additions & 2 deletions test/datahike/test/migrate_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
[datahike.api :as d]
[datahike.datom :as datom]
[datahike.migrate :as m]
[datahike.db.utils :as dbu]
[datahike.test.utils :as utils]))

(def tx-data [[:db/add 1 :db/cardinality :db.cardinality/one 536870913 true]
Expand Down Expand Up @@ -231,9 +232,23 @@
[[:db/retractEntity [:name "Alice"]]]]
_ (doseq [tx-data txs]
(d/transact source-conn {:tx-data tx-data}))
export-data (->> (d/datoms (d/history @source-conn) :eavt)
datoms-to-export (d/datoms (d/history @source-conn) :eavt)

;; The datoms to export must primarily
;; be sorted by transaction entity id
;; and secondarily so that datoms with attribute `:db/txInstant`
;; come before other datoms
export-data (->> datoms-to-export
(map (comp vec seq))
(sort-by #(nth % 3))
(sort-by (fn [[e a v tx]]
[tx

;; TODO: It seems as if :db/txInstant
;; datoms must come first. Is this a bug
;; in load-entities?
(case a
:db/txInstant 0
1)]))
(into []))
target-cfg (-> source-cfg
(assoc-in [:store :id] "load-entities-history-test-target")
Expand All @@ -249,6 +264,7 @@
:where
[?e :name ?n ?t ?op]]
(d/history @conn)))]
(is (dbu/distinct-sorted-datoms? :eavt datoms-to-export))
(is (= (current-q source-conn)
(current-q target-conn)))
(is (= (history-q source-conn)
Expand Down
Loading

0 comments on commit 6fc6034

Please sign in to comment.