diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala index 39f2dc1c3..c24ec93a7 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala @@ -28,8 +28,12 @@ import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerC import com.snowplowanalytics.snowplow.enrich.common.fs2.{AttributedByteSink, AttributedData, ByteSink} import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output +import java.nio.charset.StandardCharsets + object Sink { + private val maxAttributeBytesLength = 1024 + private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] @@ -67,6 +71,13 @@ object Sink { private def sinkBatch[F[_]: Concurrent: Parallel, A](producer: PubsubProducer[F, A])(records: List[AttributedData[A]]): F[Unit] = records.parTraverse_ { r => - producer.produce(r.data, r.attributes) + val attributes = dropOversizedAttributes(r) + producer.produce(r.data, attributes) }.void + + private def dropOversizedAttributes[A](r: AttributedData[A]): Map[String, String] = + r.attributes.filter { + case (_, value) => + value.getBytes(StandardCharsets.UTF_8).length <= maxAttributeBytesLength + } }