Skip to content

Commit

Permalink
Merge pull request #162 from smuth4/fix/desc-cg-crash
Browse files Browse the repository at this point in the history
Fix  `describe consumer-group` crash when a group member has no assignments
  • Loading branch information
d-rk authored Sep 5, 2023
2 parents fb7aae0 + d2f078c commit b5bde67
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Fixed
- [#162](https://github.com/deviceinsight/kafkactl/pull/162) Fix `consumer-group` crashes when a group member has no assignment

- [#159](https://github.com/deviceinsight/kafkactl/issues/159) Add ability to read config file from `$PWD/kafkactl.yml`

Expand Down
23 changes: 17 additions & 6 deletions internal/consumergroups/consumer-group-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,16 @@ func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeCon

memberAssignment, err := member.GetMemberAssignment()

var assignedPartitions map[string][]int32

if err != nil {
output.Warnf("failed to get group member assignment (%s, %s): %v", member.ClientHost, member.ClientId, err)
assignedPartitions = make(map[string][]int32)
} else {
assignedPartitions = filterAssignedPartitions(memberAssignment.Topics, topicPartitions)
output.Debugf("group=%s, protocolType=%s, state=%s", description.GroupId, description.ProtocolType, description.State)
return errors.Wrap(err, "failed to get group member assignment")
}
if memberAssignment == nil {
output.Warnf("assignment does not exist for member=%s, host=%s, clientId=%s, group=%s", member.MemberId, member.ClientHost, member.ClientId, description.GroupId)
continue
}

assignedPartitions := filterAssignedPartitions(memberAssignment.Topics, topicPartitions)

consumerGroupDescription.Members = addMember(consumerGroupDescription.Members, member.ClientHost, member.ClientId, assignedPartitions)
}
Expand Down Expand Up @@ -454,6 +456,11 @@ func findAssignedTopics(admin sarama.ClusterAdmin, groupNames []string) (map[str
return nil, errors.Wrap(err, "failed to get group member assignment")
}

if assignment == nil {
output.Warnf("assignment does not exist for member=%s, host=%s, clientId=%s, group=%s", member.MemberId, member.ClientHost, member.ClientId, description.GroupId)
continue
}

for t := range assignment.Topics {
if !util.ContainsString(topics, t) {
topics = append(topics, t)
Expand Down Expand Up @@ -498,6 +505,10 @@ func filterGroups(admin sarama.ClusterAdmin, groupNames []string, topic string)
output.Debugf("group=%s, protocolType=%s, state=%s", description.GroupId, description.ProtocolType, description.State)
return nil, errors.Wrap(err, "failed to get group member assignment")
}
if assignment == nil {
output.Warnf("assignment does not exist for member=%s, host=%s, clientId=%s, group=%s", member.MemberId, member.ClientHost, member.ClientId, description.GroupId)
continue
}

topics := make([]string, 0, len(assignment.Topics))
for t := range assignment.Topics {
Expand Down

0 comments on commit b5bde67

Please sign in to comment.