Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock on disconnecting #2365

Open
3 of 8 tasks
jensleuschner92 opened this issue Nov 25, 2024 · 3 comments
Open
3 of 8 tasks

Deadlock on disconnecting #2365

jensleuschner92 opened this issue Nov 25, 2024 · 3 comments

Comments

@jensleuschner92
Copy link

Description

Hey, we are looking for some help. Our Application enables to connecto to n instances of Kafka-Servers. Thoose instances could point to the same server or different. For each of thoose configurations we are running a dedicated thread which instantiates ist own Consumer instance and starts listening.

On some szenarios our application tells the client to close the connection. I.e. we got so much messages we are not fast enought to process, configurations are changed ....

Sadly we where running more and more often in the same situation, that our clients do not read anymore messages. So for diagnostics we are printing Stacktraces. In thoose we are seeing each time the same situation that the application is hanging in the close ... > ReleaseHandle method. IMHO this looks like a possible Deathlock.

Is there some known issue about that?

Thank you in advance for your help.

[Internal Frame, 'M-->U']
Confluent.Kafka.Impl.SafeKafkaHandle.ReleaseHandle(N/A )
Confluent.Kafka.Impl.SafeKafkaHandle.Dispose(N/A N/A )
Confluent.Kafka.Consumer2.Dispose(N/A N/A ) Confluent.Kafka.Consumer2.Close(N/A )
....
System.Threading.ExecutionContext.RunInternal(N/A N/A N/A N/A )
System.Threading.ExecutionContext.Run(N/A N/A N/A N/A )
System.Threading.ExecutionContext.Run(N/A N/A N/A )
System.Threading.ThreadHelper.ThreadStart(N/A )
[Internal Frame, 'U-->M']

Callstack for Thread 5836
[IL Method without Metadata]
[Internal Frame, 'M-->U']
Confluent.Kafka.Impl.SafeKafkaHandle.ReleaseHandle(N/A )
Confluent.Kafka.Impl.SafeKafkaHandle.Dispose(N/A N/A )
Confluent.Kafka.Consumer2.Dispose(N/A N/A ) Confluent.Kafka.Consumer2.Close(N/A )
...
System.Threading.ExecutionContext.RunInternal(N/A N/A N/A N/A )
System.Threading.ExecutionContext.Run(N/A N/A N/A N/A )
System.Threading.ExecutionContext.Run(N/A N/A N/A )
System.Threading.ThreadHelper.ThreadStart(N/A )
[Internal Frame, 'U-->M']

Callstack for Thread 5484
[IL Method without Metadata]
[Internal Frame, 'M-->U']
Confluent.Kafka.Impl.SafeKafkaHandle.ReleaseHandle(N/A )
Confluent.Kafka.Impl.SafeKafkaHandle.Dispose(N/A N/A )
Confluent.Kafka.Consumer2.Dispose(N/A N/A ) Confluent.Kafka.Consumer2.Close(N/A )
....
System.Threading.ExecutionContext.RunInternal(N/A N/A N/A N/A )
System.Threading.ExecutionContext.Run(N/A N/A N/A N/A )
System.Threading.ExecutionContext.Run(N/A N/A N/A )
System.Threading.ThreadHelper.ThreadStart(N/A )

How to reproduce

Confluent.Kafka Verion 2.5.2 (c#)
OS: Windows Server 2019

Create n Threads, each thread shoud do the following x times

instantiate a Consumer (each thread its own)
Consume a little bit
Consumer close
Consumer dispose

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • Confluent.Kafka nuget version.
  • Apache Kafka version.
  • Client configuration.
  • Operating system.
  • Provide logs (with "debug" : "..." as necessary in configuration).
  • Provide broker log excerpts.
  • Critical issue.
@realShinchoku
Copy link

same here

@realShinchoku
Copy link

realShinchoku commented Dec 6, 2024

I found a workaround. Re-subscribe if you get an error. It's not good ideal, but it works. I wonder does need to consumer.Commit() before consumer.Unsubscribe() but i think it still deadlock in the background only consumer.Close() can kill running consume but cannot subscribe again

new ConsumerBuilder<string, string>(new ConsumerConfig
        {
            BootstrapServers = config["Kafka:BootstrapServers"],
            GroupId = "your-group-id",
            AutoOffsetReset = AutoOffsetReset.Earliest
        }).SetErrorHandler((consumer, error) =>
        {
            if (error.Code is ErrorCode.Local_UnknownPartition or ErrorCode.Local_UnknownTopic or ErrorCode.UnknownTopicOrPart or ErrorCode.UnknownTopicId or ErrorCode.InvalidPartitions)
            {
                _logger.LogInformation("{Consumer}: re-subscribing to topics", "your-group-id");
                // Does it need to commit here?
                consumer.Unsubscribe();
                consumer.Subscribe(topics);
            }
        })
        .Build();

@anchitj
Copy link
Member

anchitj commented Dec 17, 2024

Can you please capture debug logs if it occurs again? You can can get them by adding debug: all in the config.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants