Skip to content

Commit

Permalink
add :commit-strategy to consumer-options (#31)
Browse files Browse the repository at this point in the history
* add `:commit-strategy` option
  • Loading branch information
robertluo authored Sep 4, 2024
1 parent 3730187 commit 22a6c6c
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/robertluo/waterfall.clj
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
- optional `::consumer-config` can specify additional kafka consumer configuration. With additions:
- `:position` either `:beginning` `:end`, none for commited position (default)
- `:poll-duration` for how long the consumer poll returns, is a Duration value, default 10 seconds
- `:commit-strategy` one of `:sync`, `:async`, `:auto`, default `:sync`
The returned map has different level of key-values let you use:
- Highest level, no additional knowledge:
Expand Down
20 changes: 13 additions & 7 deletions src/robertluo/waterfall/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
Args:
- consumer: Kafka Consumer instance.
- committer: Function to commit the offset.
- out-sink: Manifold stream where consumed messages are put.
Returns `cmd-self`, a function that accepts following commands:
Expand All @@ -69,7 +70,7 @@
- [::poll duration]: Polls the consumer for messages.
Unknown commands will raise an exception."
[^Consumer consumer out-sink]
[^Consumer consumer committer out-sink]
(let [mailbox (ms/stream)
cmd-self (fn [cmd] (ms/put! mailbox cmd)) ; function to post commands to self
closing? (atom false) ; flag to indicate if the actor is closing
Expand All @@ -96,7 +97,7 @@
(do
(when (.paused consumer)
(.resume consumer (.assignment consumer)))
(.commitSync consumer)
(committer consumer)
(cmd-self [::poll duration])) ; resume and poll
[::poll duration]
(let [putting-all (fn [events] ; function to handle events and resume
Expand All @@ -114,17 +115,22 @@

(defn consumer
[nodes group-id topics
{:keys [poll-duration position]
{:keys [poll-duration position commit-strategy]
:as conf
:or {poll-duration (Duration/ofSeconds 10)}}]
(let [conr (-> (dissoc conf :poll-duration :position) ;avoid kafka complaints
:or {poll-duration (Duration/ofSeconds 10)
commit-strategy :sync}}]
(let [committer (case commit-strategy
:sync (fn [consumer] (.commitSync consumer))
:async (fn [consumer] (.commitAsync consumer))
:auto (constantly nil))
conr (-> (dissoc conf :poll-duration :position) ;avoid kafka complaints
(merge {:bootstrap-servers nodes
:group-id group-id
:enable-auto-commit false})
:enable-auto-commit (= commit-strategy :auto)})
util/->config-map
(KafkaConsumer. (ByteArrayDeserializer.) (ByteArrayDeserializer.)))
out-sink (ms/stream)
actor (consumer-actor conr out-sink)]
actor (consumer-actor conr committer out-sink)]
(actor [::subscribe topics])
(when position (actor [::seek position]))
(ms/on-closed out-sink (fn [] @(actor [::close])))
Expand Down

0 comments on commit 22a6c6c

Please sign in to comment.