From 37301870f1aa61f3facebe1f07b6c39deb783721 Mon Sep 17 00:00:00 2001 From: loicb Date: Tue, 16 Jan 2024 10:13:38 +0800 Subject: [PATCH] Use future to put-all! events in out-sink (#30) Co-authored-by: Jie Co-authored-by: flybot-nam-nguyenhoai <137136212+flybot-nam-nguyenhoai@users.noreply.github.com> --- src/robertluo/waterfall/core.clj | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/robertluo/waterfall/core.clj b/src/robertluo/waterfall/core.clj index 5201cd6..fceaf1e 100644 --- a/src/robertluo/waterfall/core.clj +++ b/src/robertluo/waterfall/core.clj @@ -1,7 +1,7 @@ (ns ^:no-doc robertluo.waterfall.core "Core data structure" (:require [manifold.deferred :as d] - [manifold.stream :as ms] + [manifold.stream :as ms] [robertluo.waterfall.util :as util] [clojure.core.match :refer [match]]) (:import (java.time Duration) @@ -9,7 +9,7 @@ (org.apache.kafka.clients.consumer Consumer ConsumerRecord KafkaConsumer) (org.apache.kafka.clients.producer KafkaProducer - Producer + Producer ProducerRecord RecordMetadata) (org.apache.kafka.common.serialization ByteArrayDeserializer ByteArraySerializer))) @@ -31,7 +31,7 @@ sending (fn [x] (let [{:keys [key value topic partition timestamp]} x] (rmd->map @(.send prod (ProducerRecord. topic partition timestamp key value)))))] - (ms/on-closed strm #(.close prod)) + (ms/on-closed strm #(.close prod)) (ms/splice strm (ms/map sending strm)))) ;-------------------- @@ -100,7 +100,7 @@ (cmd-self [::poll duration])) ; resume and poll [::poll duration] (let [putting-all (fn [events] ; function to handle events and resume - (d/chain (ms/put-all! out-sink events) + (d/chain (d/future (ms/put-all! out-sink events)) #(when % (cmd-self [::resume duration]))))] (ensure-sink (if-let [events (->> (.poll consumer duration) (.iterator) (iterator-seq) (map cr->map) seq)] @@ -113,20 +113,20 @@ (defn consumer - [nodes group-id topics + [nodes group-id topics {:keys [poll-duration position] - :as conf + :as conf :or {poll-duration (Duration/ofSeconds 10)}}] (let [conr (-> (dissoc conf :poll-duration :position) ;avoid kafka complaints (merge {:bootstrap-servers nodes :group-id group-id :enable-auto-commit false}) util/->config-map - (KafkaConsumer. (ByteArrayDeserializer.) (ByteArrayDeserializer.))) + (KafkaConsumer. (ByteArrayDeserializer.) (ByteArrayDeserializer.))) out-sink (ms/stream) actor (consumer-actor conr out-sink)] (actor [::subscribe topics]) (when position (actor [::seek position])) - (ms/on-closed out-sink (fn [] @(actor [::close]))) + (ms/on-closed out-sink (fn [] @(actor [::close]))) (actor [::poll poll-duration]) (ms/source-only out-sink)))