From ba49ea27fea7b91ccd3b558c6fcd5c1fa025e264 Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Tue, 19 Sep 2023 17:05:12 +0200 Subject: [PATCH] Fix effects messiness caused by unnecessarily nesting fuctions --- .../sinks/NsqSink.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala index ce5e3216d..70ffc3b4d 100644 --- a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala @@ -27,6 +27,7 @@ import scala.concurrent.ExecutionContextExecutorService import scala.concurrent.duration.MILLISECONDS import cats.effect.{Resource, Sync} +import cats.implicits._ import com.snowplowanalytics.client.nsq.NSQProducer import com.snowplowanalytics.snowplow.collector.core.{Sink} @@ -49,23 +50,20 @@ class NsqSink[F[_]: Sync] private ( var healthStatus = true - override def isHealthy: F[Boolean] = Sync[F].pure(healthStatus) + override def isHealthy: F[Boolean] = Sync[F].pure(healthStatus) private val producer = new NSQProducer().addAddress(nsqConfig.host, nsqConfig.port).start() -def produceData(topicName: String, events: List[Array[Byte]]): F[Unit] = - Sync[F].blocking(producer.produceMulti(topicName, events.asJava)) - .onError { case _: NSQException | _: TimeoutException => - Sync[F].delay(healthStatus = false) - } *> Sync[F].delay(healthStatus = true) - /** * Store raw events to the topic * @param events The list of events to send * @param key The partition key (unused) */ override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = - Sync[F].delay(produceData(topicName, events)) + Sync[F].blocking(producer.produceMulti(topicName, events.asJava)).onError { + case _: NSQException | _: TimeoutException => + Sync[F].delay(healthStatus = false) + } *> Sync[F].delay(healthStatus = true) def shutdown(): Unit = producer.shutdown()