From af82c7a37b6ad1bc1c0e782a542e1562399ad4ce Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Sun, 27 May 2018 21:06:27 -0700 Subject: [PATCH] Inputs should not call poll! after completed?. This fix prevents poll! from being called once the plugin is marked with completed?. The downside will be when completed? is an expensive call, and thus this fix should be looked at critically before merging. --- src/onyx/peer/read_batch.clj | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/onyx/peer/read_batch.clj b/src/onyx/peer/read_batch.clj index e00d23105..bc0649b15 100644 --- a/src/onyx/peer/read_batch.clj +++ b/src/onyx/peer/read_batch.clj @@ -10,7 +10,6 @@ [onyx.types :as types] [primitive-math :as pm] [onyx.static.uuid :refer [random-uuid]] - [onyx.static.util :refer [ns->ms ms->ns]] [taoensso.timbre :as timbre :refer [debug info]]) (:import [java.util.concurrent.locks LockSupport] [org.agrona.concurrent IdleStrategy] @@ -38,6 +37,9 @@ (set-event! (assoc (get-event state) :onyx.core/batch batch)) (advance)))) +(defn ns->ms [^long ns] + (pm// ns 1000000)) + (defn read-input-batch [state batch-size batch-timeout-ns max-segments-per-barrier since-barrier-count] (let [pipeline (get-input-pipeline state) event (get-event state) @@ -46,16 +48,17 @@ (pm/- ^long max-segments-per-barrier (.get ^AtomicLong since-barrier-count))) end-time (pm/+ (System/nanoTime) ^long batch-timeout-ns) - _ (loop [remaining batch-timeout-ns - n 0] - (when (and (pm/< n batch-size-rest) - (pos? remaining)) - (let [remaining-ms (pm// ^long remaining 1000000) - segment (p/poll! pipeline event remaining-ms)] - (when-not (nil? segment) - (conj! outgoing segment) - (recur (pm/- end-time (System/nanoTime)) - (pm/inc n)))))) + _ (if (p/completed? pipeline) + outgoing + (loop [remaining batch-timeout-ns + n 0] + (when (and (pm/< n batch-size-rest) + (pos? remaining)) + (let [segment (p/poll! pipeline event (ns->ms remaining))] + (when-not (nil? segment) + (conj! outgoing segment) + (recur (pm/- end-time (System/nanoTime)) + (pm/inc n))))))) batch (persistent! outgoing)] (.addAndGet ^AtomicLong since-barrier-count (count batch)) (-> state