diff --git a/src/onyx/peer/read_batch.clj b/src/onyx/peer/read_batch.clj index e00d23105..bc0649b15 100644 --- a/src/onyx/peer/read_batch.clj +++ b/src/onyx/peer/read_batch.clj @@ -10,7 +10,6 @@ [onyx.types :as types] [primitive-math :as pm] [onyx.static.uuid :refer [random-uuid]] - [onyx.static.util :refer [ns->ms ms->ns]] [taoensso.timbre :as timbre :refer [debug info]]) (:import [java.util.concurrent.locks LockSupport] [org.agrona.concurrent IdleStrategy] @@ -38,6 +37,9 @@ (set-event! (assoc (get-event state) :onyx.core/batch batch)) (advance)))) +(defn ns->ms [^long ns] + (pm// ns 1000000)) + (defn read-input-batch [state batch-size batch-timeout-ns max-segments-per-barrier since-barrier-count] (let [pipeline (get-input-pipeline state) event (get-event state) @@ -46,16 +48,17 @@ (pm/- ^long max-segments-per-barrier (.get ^AtomicLong since-barrier-count))) end-time (pm/+ (System/nanoTime) ^long batch-timeout-ns) - _ (loop [remaining batch-timeout-ns - n 0] - (when (and (pm/< n batch-size-rest) - (pos? remaining)) - (let [remaining-ms (pm// ^long remaining 1000000) - segment (p/poll! pipeline event remaining-ms)] - (when-not (nil? segment) - (conj! outgoing segment) - (recur (pm/- end-time (System/nanoTime)) - (pm/inc n)))))) + _ (if (p/completed? pipeline) + outgoing + (loop [remaining batch-timeout-ns + n 0] + (when (and (pm/< n batch-size-rest) + (pos? remaining)) + (let [segment (p/poll! pipeline event (ns->ms remaining))] + (when-not (nil? segment) + (conj! outgoing segment) + (recur (pm/- end-time (System/nanoTime)) + (pm/inc n))))))) batch (persistent! outgoing)] (.addAndGet ^AtomicLong since-barrier-count (count batch)) (-> state