Skip to content

Commit

Permalink
Resolving a weird edge case in case of a poison pill message being re… (
Browse files Browse the repository at this point in the history
#3534)

Signed-off-by: Patrick Assuied <[email protected]>
Co-authored-by: Patrick Assuied <[email protected]>
  • Loading branch information
yaron2 and passuied authored Sep 5, 2024
1 parent f375594 commit 75dcdef
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions common/component/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
} else {
for {
select {
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
// Make sure the check for session context done happens before the next message is processed.
// There is a possibility that the pod takes some time to shutdown and in case of a poison pill message, the `retry` would get interrupted (as expected),
// but the next message would be processed as a result,
// therefore dropping the poison pill message regardless of resiliency policy.
case <-session.Context().Done():
return nil
case message, ok := <-claim.Messages():
if !ok {
return nil
Expand All @@ -89,11 +98,6 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
consumer.k.logger.Errorf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
}
}
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
case <-session.Context().Done():
return nil
}
}
}
Expand Down

0 comments on commit 75dcdef

Please sign in to comment.