diff --git a/CHANGELOG.md b/CHANGELOG.md index b9571043..ddb696fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Fixed +- [#162](https://github.com/deviceinsight/kafkactl/pull/162) Fix `consumer-group` crashes when a group member has no assignment - [#159](https://github.com/deviceinsight/kafkactl/issues/159) Add ability to read config file from `$PWD/kafkactl.yml` diff --git a/internal/consumergroups/consumer-group-operation.go b/internal/consumergroups/consumer-group-operation.go index 84b59838..99e421df 100644 --- a/internal/consumergroups/consumer-group-operation.go +++ b/internal/consumergroups/consumer-group-operation.go @@ -127,14 +127,16 @@ func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeCon memberAssignment, err := member.GetMemberAssignment() - var assignedPartitions map[string][]int32 - if err != nil { - output.Warnf("failed to get group member assignment (%s, %s): %v", member.ClientHost, member.ClientId, err) - assignedPartitions = make(map[string][]int32) - } else { - assignedPartitions = filterAssignedPartitions(memberAssignment.Topics, topicPartitions) + output.Debugf("group=%s, protocolType=%s, state=%s", description.GroupId, description.ProtocolType, description.State) + return errors.Wrap(err, "failed to get group member assignment") } + if memberAssignment == nil { + output.Warnf("assignment does not exist for member=%s, host=%s, clientId=%s, group=%s", member.MemberId, member.ClientHost, member.ClientId, description.GroupId) + continue + } + + assignedPartitions := filterAssignedPartitions(memberAssignment.Topics, topicPartitions) consumerGroupDescription.Members = addMember(consumerGroupDescription.Members, member.ClientHost, member.ClientId, assignedPartitions) } @@ -454,6 +456,11 @@ func findAssignedTopics(admin sarama.ClusterAdmin, groupNames []string) (map[str return nil, errors.Wrap(err, "failed to get group member assignment") } + if assignment == nil { + output.Warnf("assignment does not exist for member=%s, host=%s, clientId=%s, group=%s", member.MemberId, member.ClientHost, member.ClientId, description.GroupId) + continue + } + for t := range assignment.Topics { if !util.ContainsString(topics, t) { topics = append(topics, t) @@ -498,6 +505,10 @@ func filterGroups(admin sarama.ClusterAdmin, groupNames []string, topic string) output.Debugf("group=%s, protocolType=%s, state=%s", description.GroupId, description.ProtocolType, description.State) return nil, errors.Wrap(err, "failed to get group member assignment") } + if assignment == nil { + output.Warnf("assignment does not exist for member=%s, host=%s, clientId=%s, group=%s", member.MemberId, member.ClientHost, member.ClientId, description.GroupId) + continue + } topics := make([]string, 0, len(assignment.Topics)) for t := range assignment.Topics {