From 720b824cc0399e054b4f1532ee59571c6225a336 Mon Sep 17 00:00:00 2001 From: Piotr Limanowski Date: Mon, 15 Jul 2024 19:38:42 +0200 Subject: [PATCH] Attempt to trigger sinking events in separate context boundary --- .../Routes.scala | 3 ++- .../Service.scala | 10 +++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala index e80f239ba..31a1c13a9 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala @@ -28,9 +28,10 @@ class Routes[F[_]: Async]( implicit val dns: Dns[F] = Dns.forSync[F] - private val healthRoutes = HttpRoutes.of[F] { + private val healthRoutes = HttpRoutes.strict[F] { case GET -> Root / "health" => Ok("ok") + } <+> HttpRoutes.of[F] { case GET -> Root / "sink-health" => service .sinksHealthy diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala index 9acff5b66..b11df74e1 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala @@ -32,6 +32,7 @@ import org.typelevel.ci._ import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload import com.snowplowanalytics.snowplow.collector.core.model._ +import cats.effect.kernel.Async trait IService[F[_]] { def preflightResponse(req: Request[F]): F[Response[F]] @@ -54,7 +55,7 @@ object Service { val spAnonymousNuid = "00000000-0000-0000-0000-000000000000" } -class Service[F[_]: Sync]( +class Service[F[_]: Async]( config: Config[Any], sinks: Sinks[F], appInfo: AppInfo @@ -121,15 +122,14 @@ class Service[F[_]: Sync]( `Access-Control-Allow-Credentials`().toRaw1.some ).flatten responseHeaders = Headers(headerList ++ bounceLocationHeaders(config.cookieBounce, shouldBounce, request)) - _ <- if (!doNotTrack && !shouldBounce) sinkEvent(event, partitionKey) else Sync[F].unit - resp = buildHttpResponse( + _ <- if (!doNotTrack && !shouldBounce) Async[F].start(sinkEvent(event, partitionKey)) else Async[F].unit + } yield buildHttpResponse( queryParams = request.uri.query.params, headers = responseHeaders, redirect = redirect, pixelExpected = pixelExpected, shouldBounce = shouldBounce ) - } yield resp override def sinksHealthy: F[Boolean] = (sinks.good.isHealthy, sinks.bad.isHealthy).mapN(_ && _) @@ -323,7 +323,7 @@ class Service[F[_]: Sync]( ): F[Unit] = for { // Split events into Good and Bad - eventSplit <- Sync[F].delay(splitBatch.splitAndSerializePayload(event, sinks.good.maxBytes)) + eventSplit <- Async[F].delay(splitBatch.splitAndSerializePayload(event, sinks.good.maxBytes)) // Send events to respective sinks _ <- sinks.good.storeRawEvents(eventSplit.good, partitionKey) _ <- sinks.bad.storeRawEvents(eventSplit.bad, partitionKey)