Skip to content

Commit

Permalink
Use CompletableFuture in promise implementation. (#700)
Browse files Browse the repository at this point in the history
* Use CompletableFuture in promise implementation.

* Wrap exception correctly, unify transact and transact!.
  • Loading branch information
whilo authored Sep 26, 2024
1 parent e1cb9b0 commit 8a1d1f1
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 40 deletions.
7 changes: 5 additions & 2 deletions src/datahike/api/impl.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
[datahike.db HistoricalDB AsOfDB SinceDB FilteredDB]
[datahike.impl.entity Entity])))

(defn transact [connection arg-map]
(defn transact! [connection arg-map]
(let [arg (cond
(map? arg-map) (if (contains? arg-map :tx-data)
arg-map
Expand All @@ -35,7 +35,10 @@
:else (dt/raise "Bad argument to transact, expected map, vector or sequence."
{:error :transact/syntax
:argument-type (type arg-map)}))]
(deref (dw/transact! connection arg))))
(dw/transact! connection arg)))

(defn transact [connection arg-map]
@(transact! connection arg-map))

;; necessary to support initial-tx shorthand, which really should have been avoided
(defn create-database [& args]
Expand Down
2 changes: 1 addition & 1 deletion src/datahike/api/specification.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ Exists for Datomic API compatibility. Prefer using `@conn` directly if possible.
:doc "Same as transact, but asynchronously returns a future."
:supports-remote? false
:referentially-transparent? false
:impl datahike.writer/transact!}
:impl datahike.api.impl/transact!}

transact
{:args (s/cat :conn spec/SConnection :txs spec/STransactions)
Expand Down
69 changes: 32 additions & 37 deletions src/datahike/tools.cljc
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
(ns ^:no-doc datahike.tools
(:require
[superv.async :refer [throw-if-exception-]]
[clojure.core.async.impl.protocols :as async-impl]
[clojure.core.async :as async]
#?(:clj [clojure.java.io :as io])
[taoensso.timbre :as log])
#?(:clj (:import [java.util Properties UUID Date]
[java.util.concurrent CompletableFuture]
[java.net InetAddress])))

(defn combine-hashes [x y]
Expand Down Expand Up @@ -68,43 +71,35 @@
`(throw #?(:clj (ex-info (str ~@(map (fn [m#] (if (string? m#) m# (list 'pr-str m#))) msgs)) ~data)
:cljs (error (str ~@(map (fn [m#] (if (string? m#) m# (list 'pr-str m#))) msgs)) ~data))))))

; (throwable-promise) derived from (promise) in clojure/core.clj.
; * Clojure
; * Copyright (c) Rich Hickey. All rights reserved.
; * The use and distribution terms for this software are covered by the
; * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
; * which can be found in the file epl-v10.html at the root of this distribution.
; * By using this software in any fashion, you are agreeing to be bound by
; * the terms of this license.
; * You must not remove this notice, or any other, from this software.
(defn throwable-promise
"Returns a promise object that can be read with deref/@, and set,
once only, with deliver. Calls to deref/@ prior to delivery will
block, unless the variant of deref with timeout is used. All
subsequent derefs will return the same delivered value without
blocking. Exceptions delivered to the promise will throw on deref."
[]
(let [d (java.util.concurrent.CountDownLatch. 1)
v (atom d)]
(reify
clojure.lang.IDeref
(deref [_] (.await d) (throw-if-exception- @v))
clojure.lang.IBlockingDeref
(deref
[_ timeout-ms timeout-val]
(if (.await d timeout-ms java.util.concurrent.TimeUnit/MILLISECONDS)
(throw-if-exception- @v)
timeout-val))
clojure.lang.IPending
(isRealized [this]
(zero? (.getCount d)))
clojure.lang.IFn
(invoke
[this x]
(when (and (pos? (.getCount d))
(compare-and-set! v d x))
(.countDown d)
this)))))
;; adapted from https://clojure.atlassian.net/browse/CLJ-2766
#?(:clj
(defn throwable-promise
"Returns a promise object that can be read with deref/@, and set, once only, with deliver. Calls to deref/@ prior to delivery will block, unless the variant of deref with timeout is used. All subsequent derefs will return the same delivered value without blocking. Exceptions delivered to the promise will throw on deref.
Also supports core.async take! to optionally consume values without blocking the reader thread."
[]
(let [cf (CompletableFuture.)
p (async/promise-chan)]
(reify
clojure.lang.IDeref
(deref [_] (throw-if-exception- (try (.get cf) (catch Throwable t t))))
clojure.lang.IBlockingDeref
(deref [_ timeout-ms timeout-val]
(if-let [v (try (.get cf timeout-ms java.util.concurrent.TimeUnit/MILLISECONDS) (catch Throwable t t))]
(throw-if-exception- v)
timeout-val))
clojure.lang.IPending
(isRealized [_] (.isDone cf))
clojure.lang.IFn
(invoke [this x]
(if (instance? Throwable x)
(.completeExceptionally cf x)
(.complete cf x))
(if-not (nil? x) (async/put! p x) (async/close! p))
this)
async-impl/ReadPort
(take! [_this handler] (async-impl/take! p handler)))))
:cljs (def throwable-promise async/promise-chan))

(defn get-version
"Retrieves the current version of a dependency. Thanks to https://stackoverflow.com/a/33070806/10978897"
Expand Down

0 comments on commit 8a1d1f1

Please sign in to comment.