Skip to content

Commit

Permalink
Use future to put-all! events in out-sink (#30)
Browse files Browse the repository at this point in the history
Co-authored-by: Jie <[email protected]>
Co-authored-by: flybot-nam-nguyenhoai <[email protected]>
  • Loading branch information
3 people authored Jan 16, 2024
1 parent 2956b88 commit 3730187
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions src/robertluo/waterfall/core.clj
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
(ns ^:no-doc robertluo.waterfall.core
"Core data structure"
(:require [manifold.deferred :as d]
[manifold.stream :as ms]
[manifold.stream :as ms]
[robertluo.waterfall.util :as util]
[clojure.core.match :refer [match]])
(:import (java.time Duration)
(java.util Map)
(org.apache.kafka.clients.consumer Consumer ConsumerRecord KafkaConsumer)
(org.apache.kafka.clients.producer
KafkaProducer
Producer
Producer
ProducerRecord
RecordMetadata)
(org.apache.kafka.common.serialization ByteArrayDeserializer ByteArraySerializer)))
Expand All @@ -31,7 +31,7 @@
sending (fn [x]
(let [{:keys [key value topic partition timestamp]} x]
(rmd->map @(.send prod (ProducerRecord. topic partition timestamp key value)))))]
(ms/on-closed strm #(.close prod))
(ms/on-closed strm #(.close prod))
(ms/splice strm (ms/map sending strm))))

;--------------------
Expand Down Expand Up @@ -100,7 +100,7 @@
(cmd-self [::poll duration])) ; resume and poll
[::poll duration]
(let [putting-all (fn [events] ; function to handle events and resume
(d/chain (ms/put-all! out-sink events)
(d/chain (d/future (ms/put-all! out-sink events))
#(when % (cmd-self [::resume duration]))))]
(ensure-sink
(if-let [events (->> (.poll consumer duration) (.iterator) (iterator-seq) (map cr->map) seq)]
Expand All @@ -113,20 +113,20 @@


(defn consumer
[nodes group-id topics
[nodes group-id topics
{:keys [poll-duration position]
:as conf
:as conf
:or {poll-duration (Duration/ofSeconds 10)}}]
(let [conr (-> (dissoc conf :poll-duration :position) ;avoid kafka complaints
(merge {:bootstrap-servers nodes
:group-id group-id
:enable-auto-commit false})
util/->config-map
(KafkaConsumer. (ByteArrayDeserializer.) (ByteArrayDeserializer.)))
(KafkaConsumer. (ByteArrayDeserializer.) (ByteArrayDeserializer.)))
out-sink (ms/stream)
actor (consumer-actor conr out-sink)]
(actor [::subscribe topics])
(when position (actor [::seek position]))
(ms/on-closed out-sink (fn [] @(actor [::close])))
(ms/on-closed out-sink (fn [] @(actor [::close])))
(actor [::poll poll-duration])
(ms/source-only out-sink)))

0 comments on commit 3730187

Please sign in to comment.