Skip to content

Commit

Permalink
enrich-pubsub: check max message attributes length (close #782)
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Apr 26, 2023
1 parent 97c8c10 commit 62f2032
Showing 1 changed file with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerC
import com.snowplowanalytics.snowplow.enrich.common.fs2.{AttributedByteSink, AttributedData, ByteSink}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output

import java.nio.charset.StandardCharsets

object Sink {

private val maxAttributeBytesLength = 1024

private implicit def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]

Expand Down Expand Up @@ -67,6 +71,13 @@ object Sink {

private def sinkBatch[F[_]: Concurrent: Parallel, A](producer: PubsubProducer[F, A])(records: List[AttributedData[A]]): F[Unit] =
records.parTraverse_ { r =>
producer.produce(r.data, r.attributes)
val attributes = dropOversizedAttributes(r)
producer.produce(r.data, attributes)
}.void

private def dropOversizedAttributes[A](r: AttributedData[A]): Map[String, String] =
r.attributes.filter {
case (_, value) =>
value.getBytes(StandardCharsets.UTF_8).length <= maxAttributeBytesLength
}
}

0 comments on commit 62f2032

Please sign in to comment.