Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer much slower than producer #1307

Open
6 tasks done
bichselb opened this issue Oct 1, 2024 · 5 comments
Open
6 tasks done

Consumer much slower than producer #1307

bichselb opened this issue Oct 1, 2024 · 5 comments

Comments

@bichselb
Copy link

bichselb commented Oct 1, 2024

Description

In my very simple implementation, I can easily produce 1 million messages per second, but can only consume around 100k messages per second. How can I speed up the consumer to be as fast as the producer? I would like to avoid parallelization, as this is also not needed for good producer performance.

Both my producer and consumer follow the README examples very closely.

How to reproduce

I put my whole experimental setup here: https://github.com/bichselb/confluent-kafka-go-performance/tree/slow

My consumer is a straight-forward adaptation of the README example:

package main

import (
	"fmt"
	"time"

	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {
    start := time.Now()

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "kafka:29092",
		"group.id":          "myGroup",
		"auto.offset.reset": "earliest",
	})

	if err != nil {
		panic(err)
	}

	err = c.Subscribe("myTopic", nil)

	if err != nil {
		panic(err)
	}

    messagesPerBatch := 1_000_000
    nReceived := 0

    for {
        msg, err := c.ReadMessage(10 * time.Second)
        if err == nil {
            nReceived++
        } else if err.(kafka.Error).IsTimeout() {
            fmt.Printf("Timeout after %d messages\n", nReceived)
            break
        } else {
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
        }

        if (nReceived % messagesPerBatch == 0) && (nReceived > 0) {
            elapsed := time.Since(start)
            perSecond := int(float64(nReceived) / elapsed.Seconds())
            fmt.Printf("Received %d messages within %.2f seconds (average %d messages/sec)\n", nReceived, elapsed.Seconds(), perSecond)
        }
    }

	c.Close()
}

Checklist

Please provide the following information:

@bichselb
Copy link
Author

bichselb commented Oct 9, 2024

I was able to make the consumer almost as fast as the producer by setting various configuration properties specified by librdkafka (most importantly fetch.message.max.bytes).

This is my fixed setup: https://github.com/bichselb/confluent-kafka-go-performance/tree/fast

@OneCricketeer
Copy link

Note - your start time us inclusive of the time to actually connect and configure the client, so it'll be higher than a true msg/s number

@bichselb
Copy link
Author

bichselb commented Oct 16, 2024

Thanks @OneCricketeer for the hint. However, this is not the reason for the slowness (the client configuration time is negligible compared to sending 1 million messages).

@bichselb
Copy link
Author

I am leaving this issue open in case someone knows how to close the remaining gap between the producer (~1 million messages/sec) and the consumer (~800k messages/sec).

Feel free to close if this remaining gap is expected.

@OneCricketeer
Copy link

OneCricketeer commented Oct 17, 2024

Have you tried ZSTD compression? https://blog.cloudflare.com/squeezing-the-firehose/

Used tools like iperf3 to check max network speeds? Monitored cpu/memory/disk/IO rates while consuming?

There's not much information to go on, only from the shown code

Besides, Kafka has a builtin consumer perf script that'll use Java, but it'll still give you a theoretical limit of how fast you could consume.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants