Skip to content

Commit

Permalink
metrics #8: kafka consumer summary
Browse files Browse the repository at this point in the history
  • Loading branch information
prepor committed May 31, 2018
1 parent f1580cc commit e0b1e9f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
24 changes: 16 additions & 8 deletions re/src/re/kafka_consumer.clj
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
(ns re.kafka-consumer
(:require [integrant.core :as ig]
[clojure.tools.logging :refer [error info]]
(:require [clojure.tools.logging :refer [error]]
[integrant.core :as ig]
[re.metrics :as metrics]
[re.utils :as utils])
(:import [java.util Collection Map]
[org.apache.kafka.clients.consumer ConsumerRecord KafkaConsumer]
org.apache.kafka.common.errors.WakeupException
[org.apache.kafka.common.serialization ByteArrayDeserializer StringDeserializer]))

(defonce metric-consume-summary
(metrics/make-summary
{:name "kafka_consume_seconds"
:help "Kafka events consuming time"
:labels ["topic" "partition"]}))

(defn consumer-loop [running? ^KafkaConsumer consumer ^Collection topics handler]
(.subscribe consumer topics)
(try
Expand All @@ -15,12 +22,13 @@
(doseq [^ConsumerRecord record res]
(utils/with-retry
(when @running?
(handler {:value (.value record)

:key (.key record)
:offset (.offset record)
:partition (.partition record)
:topic (.topic record)}))))))
(metrics/with-timer metric-consume-summary [(.topic record)
(str (.partition record))]
(handler {:value (.value record)
:key (.key record)
:offset (.offset record)
:partition (.partition record)
:topic (.topic record)})))))))
(catch WakeupException e
(when @running?
(throw e)))
Expand Down
19 changes: 18 additions & 1 deletion re/src/re/metrics.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns re.metrics
(:require [integrant.core :as ig])
(:import io.prometheus.client.hotspot.DefaultExports
(:import [io.prometheus.client Summary Summary$Timer]
io.prometheus.client.hotspot.DefaultExports
io.prometheus.client.jetty.JettyStatisticsCollector
org.eclipse.jetty.server.handler.StatisticsHandler))

Expand All @@ -14,3 +15,19 @@
(.setHandler server stats)
(-> (JettyStatisticsCollector. stats)
(.register)))))

(defn make-summary [{:keys [name help labels]}]
(-> (Summary/build)
(.name name)
(.labelNames (into-array String labels))
(.help help)
(.register)))

(defn with-timer* [^Summary metric labels f]
(let [timer (-> (.labels metric (into-array String labels))
(.startTimer))]
(f)
(.observeDuration ^Summary$Timer timer)))

(defmacro with-timer [metric labels & body]
`(with-timer* ~metric ~labels (fn [] ~@body)))

0 comments on commit e0b1e9f

Please sign in to comment.