Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronous Commit from confluent kafka python consumer client causes a consumer hang once in a while #1801

Open
KumarJayanti opened this issue Aug 20, 2024 · 0 comments

Comments

@KumarJayanti
Copy link

KumarJayanti commented Aug 20, 2024

Description

We have been observing that our consumer client hangs once in a while randomly inside the commit call. This has started happening only after upgrading to 2.2.0 release. We had not observed the hang while we were on 2.1.1.

Irrespective of whatever happens on the broker or whatever mistakes a consumer calling code has made, it is not desirable for a consumer.commit() call to hang. And it is a cause of worry for our on_call. It should ideally throw exceptions back to the calling consumer code if anything is wrong at all.

Eventually this causes the broker to unassign the partitions from those hung consumers but since the consumers are still hung, our overall consumer's scalability is impacted due to multiple partitions assigned to remaining clients in the consumer group.

How to reproduce

Do not have exact instructions as the hang really seems to happen at random and only once in a while. And typically we have seen 2 or 3 consumer clients hang simultaneously out of a total of 5 in the consumer group.

Checklist

Please provide the following information:

  • [*] confluent-kafka-python and librdkafka version (confluent_kafka.version(2.2.0) and confluent_kafka.libversion(2.2.0)):
  • [*] Apache Kafka broker version: is an internal Fork of Apache Kafka hence cannot specify the exact version
  • [*] Client configuration: {security.protocol=SSL ssl.ca.location=/etc/identity/ca/cacerts.pem ssl.certificate.location=/etc/identity/client/certificates/client.pem ssl.key.location=/etc/identity/client/keys/client-key.pem session.timeout.ms=300000 heartbeat.interval.ms=100000 auto.offset.reset=earliest enable.auto.offset.store=False enable.auto.commit=False max.poll.interval.ms=2700000 fetch.min.bytes=40000 max.partition.fetch.bytes=50000 fetch.wait.max.ms=200 group.id=CSPM_ETL partition.assignment.strategy=cooperative-sticky}
  • [*] Operating system: RHEL9 Containers running Python 3.9
  • [*] Provide client logs : please see below
  • [*] Provide broker log excerpts : please see below
  • [*] Critical issue : yes

client code:

def commit_messages(self, context, message_list):
.....
# Group messages by partition
messages_by_partition = defaultdict(list)
for message in message_list:
if not message.error():
messages_by_partition[message.partition()].append(message)

    self.logger.info(f"Partitions Assigned : {len(messages_by_partition)}")

    # Commit the offset of the last message from each partition
    for partition, partition_messages in messages_by_partition.items():
        if partition_messages:
            last_message = partition_messages[-1]
            self.consumer.commit(message=last_message, asynchronous=False)
            self.logger.info(f"commit partition {partition}")
            self.logger.info(f"offsets from: {partition_messages[0].offset()} to :{last_message.offset()}.")

Log of the consumer stuck on commit call

8/20/24
12:42:29.500 AM 2024-08-20 00:42:29,500 - consumer_client.py - commit_messages - 187 - INFO - [] - Partitions Assigned : 1
• host = [...]
• k8s_container_name = [...]
• k8s_namespace = [...]
• k8s_pod_name = [cspm-678f4d4df7-4l942]
• source = [stdout]

Log on the Broker

  • There is no other relevant log before this time indicating any broker side issues
  • We had sessions with our broker infra team and could not really find anything relevant.

8/20/24
12:47:11.745 AM
2024-08-20 00:47:10,817 [executor-Heartbeat] INFO (kafka.coordinator.group.GroupCoordinator) - [GroupCoordinator 31]: Member cspm-678f4d4df7-4l942-0ac8570b-4891-4e0e-97d9-b7940829a1e6 in group CSPM_ETL has failed, removing it from the group
• host = [...]
• k8s_container_name = [kafkaserver-log](
• k8s_namespace = [...]
• k8s_pod_name = [kafka-31](
• source = [stdout]

search on the Web shows a similar (in terms of observed behavior) issue in kafka-python but seems to be resolved atleast 4 years ago: dpkp/kafka-python#1728

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant