From d27f617b31c0c22fda09690137620f85905e1786 Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Wed, 11 Oct 2023 12:31:20 +0200 Subject: [PATCH] Add kafka sink healthcheck --- .../sinks/KafkaSink.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala index 91be35a0c..8c20858f1 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala @@ -32,9 +32,9 @@ class KafkaSink[F[_]: Sync]( topicName: String ) extends Sink[F] { - private lazy val log = LoggerFactory.getLogger(getClass()) - - override def isHealthy: F[Boolean] = Sync[F].pure(true) + private lazy val log = LoggerFactory.getLogger(getClass()) + @volatile private var kafkaHealthy: Boolean = false + override def isHealthy: F[Boolean] = Sync[F].pure(kafkaHealthy) /** * Store raw events to the topic @@ -49,7 +49,12 @@ class KafkaSink[F[_]: Sync]( new ProducerRecord(topicName, key, event), new Callback { override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = - if (e != null) log.error(s"Sending event failed: ${e.getMessage}") + if (e != null) { + kafkaHealthy = false + log.error(s"Sending event failed: ${e.getMessage}") + } else { + kafkaHealthy = true + } } ) }