Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large Number of Consumers Prevent Test Completion (Kafka Driver) #416

Open
ColinSullivan1 opened this issue Aug 26, 2024 · 2 comments
Open

Comments

@ColinSullivan1
Copy link

ColinSullivan1 commented Aug 26, 2024

Hello OpenMessaging maintainers - thank you for your work on this project!

I'm running some very large benchmarks (20k consumers) spread out over 8 very large machines to simulate a large scale test. As the test nears completion (I suspect when results are aggregated) there are numerous consumer errors. These result in the WorkloadGenerator timing out.

Even with smaller tests, I see the timeouts due consumer errors at the end of the tests.

For example, I can see the aggregate high level stats are OK:

e.g.

15:30:11.050 [main] INFO WorkloadGenerator - Pub rate  8983.0 msg/s /  8.8 MB/s | Pub err     0.0 err/s | Cons rate  8983.0 msg/s /  8.8 MB/s | Backlog:  0.0 K | Pub Latency (ms) avg:  2.0 - 50%:  2.0 - 99%:  2.5 - 99.9%:  6.7 - Max: 31.0 | Pub Delay Latency (us) avg: 57.1 - 50%: 57.0 - 99%: 61.0 - 99.9%: 65.0 - Max: 1462.0
15:30:21.169 [main] INFO WorkloadGenerator - Pub rate  8988.9 msg/s /  8.8 MB/s | Pub err     0.0 err/s | Cons rate  8988.5 msg/s /  8.8 MB/s | Backlog:  0.0 K | Pub Latency (ms) avg:  2.0 - 50%:  2.0 - 99%:  2.5 - 99.9%:  4.6 - Max: 18.3 | Pub Delay Latency (us) avg: 57.3 - 50%: 57.0 - 99%: 64.0 - 99.9%: 67.0 - Max: 124.0
15:30:21.255 [main] INFO WorkloadGenerator - ----- Aggregated Pub Latency (ms) avg:  2.0 - 50%:  2.0 - 95%:  2.3 - 99%:  2.5 - 99.9%:  6.7 - 99.99%: 18.3 - Max: 61.4 | Pub Delay (us)  avg: 57.9 - 50%: 57.0 - 95%: 60.0 - 99%: 64.0 - 99.9%: 70.0 - 99.99%: 1462.0 - Max: 20531.0

There are no errors, no backlog, and steady throughput at the rate I've specified. However, the test times out and the consumer logs show errors. Note that with high throughput tests and fewer numbers of consumers I do not see the issue.

Example Test Setup

Driver:

name: MyTestDriver
driverClass: io.openmessaging.benchmark.driver.kafka.KafkaBenchmarkDriver

# Kafka client-specific configuration
replicationFactor: 3
reset: true

topicConfig: |
  min.insync.replicas=2

commonConfig: |
  bootstrap.servers=myserver:9092
  # add additional time for large scale tests.
  request.timeout.ms=480000
  socket.connection.setup.timeout.ms = 30000
  socket.connection.setup.timeout.max.ms = 60000

producerConfig: |
  acks=all
  linger.ms=1
  batch.size=1048576

consumerConfig: |
  auto.offset.reset=earliest
  enable.auto.commit=false
  max.partition.fetch.bytes=10485760
  # try to fix the issue of consumer not consuming messages
  max.poll.records = 50
  max.poll.interval.ms = 300000
  #session.timeout.ms = 90000
  #group.min.session.timeout.ms=6000
  #group.max.session.timeout.ms=92000

Workload:

name: services-1

# overprovision partitions to rule out contention
partitionsPerTopic: 15
messageSize: 1024
payloadFile: "payload/payload-1Kb.data"
subscriptionsPerTopic: 1
consumerPerSubscription: 12
producersPerTopic: 1
producerRate: 9000
consumerBacklogSizeGB: 0
warmupDurationMinutes: 1
testDurationMinutes: 3

Consumer Errors

Some of the consumer errors include:

15:24:15.049 [pool-116-thread-1] ERROR ConsumerCoordinator - [Consumer clientId=consumer-sub-000-Pp8hAjg-115, groupId=sub-000-Pp8hAjg] Offset commit with offsets {test-topic-0000022-tQSNfvA-0=OffsetAndMetadata{offset=2082, leaderEpoch=null, metadata=''}} failed

org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException

15:24:14.921 [pool-13-thread-1] ERROR ConsumerCoordinator - [Consumer clientId=consumer-sub-000-trBz_PI-12, groupId=sub-000-trBz_PI] Offset commit with offsets {test-topic-0000141-ZzmWoJA-0=OffsetAndMetadata{offset=2080, leaderEpoch=null, metadata=''}} failed
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.

I've attached output and logs of a test that exhibits these symptoms.

benchmark-output.txt
sanitized-benchmark-worker.log

Would you have any suggestions for running the benchmarks with extremely large numbers of consumers? Happy to provide more information if you need it.

Edit: Perhaps all of the rebalancing is blocking some requests around shutdown/closing consumers?

Thanks!

@ColinSullivan1 ColinSullivan1 changed the title Errors with Large Consumer Counts Prevent Test Completion (Kafka Driver) Large Number of Consumers Prevent Test Completion (Kafka Driver) Aug 27, 2024
@ColinSullivan1
Copy link
Author

ColinSullivan1 commented Aug 27, 2024

FYI, I tried setting the internal property, internal.leave.group.on.close = false which didn't seem to make a difference on the system I was benchmarking.

Adding some instrumentation showed that the Kafka driver's close API was taking up to 800 milliseconds to complete. Waiting for all to consumers to close would have taken quite a long time.

One more data point - as an experiment, I commented out consumer.close() in the Kafka driver, and this test made it completion.

I'm wondering if gathering stats and writing the output file before closing the consumers would work, followed by invoking the consumer.close() APIs in an executor to parallelize work that occurs during the close.

Another option might be to use the close API that accepts a duration, but with 20k consumers I'm not sure if that'd help enough on its own.

wdyt?

@ColinSullivan1
Copy link
Author

I notice theworker.stopAll() API is called in the WorkloadGenerator.run() before the method exits (and before the results file is generated). This prevents the results from from being generated as worker.stopAll() takes a very long time to complete and the benchmark times out. Note that worker.stopAll() is also called later on during the workload shutdown.

Removing this line allows me generate the results file. The benchmark still times out from the subsequent worker.stopAll() call is made though, but at least I can get results.

Is worker.stopAll() necessary in WorkloadGenerator.run()?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants
@ColinSullivan1 and others