Skip to content

Commit

Permalink
Kinesis allow larger batches when sqs buffer is enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Dec 13, 2024
1 parent 5e47261 commit e9b4d58
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ class KinesisSink[F[_]: Sync] private (
writeBatchToKinesisWithRetries(batch, minBackoff, maxRetries)
// Kinesis not healthy and SQS buffer defined
case Some(sqs) =>
writeBatchToSqsWithRetries(batch, sqs, minBackoff, maxRetries)
val (big, small) = batch.partition(_.payload.size > 192000) // TODO: grab 192000 from the config
if (small.nonEmpty) writeBatchToSqsWithRetries(small, sqs, minBackoff, maxRetries)
if (big.nonEmpty) writeBatchToKinesisWithRetries(small, minBackoff, maxRetries)
}

def writeBatchToKinesisWithRetries(batch: List[Events], nextBackoff: Long, retriesLeft: Int): Unit = {
Expand Down Expand Up @@ -498,11 +500,9 @@ object KinesisSink {

clients.map {
case (kinesisClient, sqsClientAndName) =>
val maxBytes =
if (sqsClientAndName.isDefined) sinkConfig.config.sqsMaxBytes else sinkConfig.config.maxBytes
val ks =
new KinesisSink(
maxBytes,
sinkConfig.config.maxBytes,
kinesisClient,
sinkConfig.config,
sinkConfig.buffer,
Expand Down

0 comments on commit e9b4d58

Please sign in to comment.