diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala index 91be35a0c..8c20858f1 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala @@ -32,9 +32,9 @@ class KafkaSink[F[_]: Sync]( topicName: String ) extends Sink[F] { - private lazy val log = LoggerFactory.getLogger(getClass()) - - override def isHealthy: F[Boolean] = Sync[F].pure(true) + private lazy val log = LoggerFactory.getLogger(getClass()) + @volatile private var kafkaHealthy: Boolean = false + override def isHealthy: F[Boolean] = Sync[F].pure(kafkaHealthy) /** * Store raw events to the topic @@ -49,7 +49,12 @@ class KafkaSink[F[_]: Sync]( new ProducerRecord(topicName, key, event), new Callback { override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = - if (e != null) log.error(s"Sending event failed: ${e.getMessage}") + if (e != null) { + kafkaHealthy = false + log.error(s"Sending event failed: ${e.getMessage}") + } else { + kafkaHealthy = true + } } ) }