From 62f2032cf3f18cf4acfa9103ebb542d7eba2ebb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Wed, 26 Apr 2023 12:50:27 +0200 Subject: [PATCH] enrich-pubsub: check max message attributes length (close #782) --- .../snowplow/enrich/pubsub/Sink.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala index 39f2dc1c3..c24ec93a7 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala @@ -28,8 +28,12 @@ import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerC import com.snowplowanalytics.snowplow.enrich.common.fs2.{AttributedByteSink, AttributedData, ByteSink} import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output +import java.nio.charset.StandardCharsets + object Sink { + private val maxAttributeBytesLength = 1024 + private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] @@ -67,6 +71,13 @@ object Sink { private def sinkBatch[F[_]: Concurrent: Parallel, A](producer: PubsubProducer[F, A])(records: List[AttributedData[A]]): F[Unit] = records.parTraverse_ { r => - producer.produce(r.data, r.attributes) + val attributes = dropOversizedAttributes(r) + producer.produce(r.data, attributes) }.void + + private def dropOversizedAttributes[A](r: AttributedData[A]): Map[String, String] = + r.attributes.filter { + case (_, value) => + value.getBytes(StandardCharsets.UTF_8).length <= maxAttributeBytesLength + } }