Skip to content

Commit

Permalink
Standardize error handling for cases where assignment might be nil
Browse files Browse the repository at this point in the history
  • Loading branch information
smuth4 committed Sep 4, 2023
1 parent 1a4da61 commit d2f078c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]
### Fixed
- [#162](https://github.com/deviceinsight/kafkactl/pull/162) Fix `consumer-group` crash when a group member has no assignment
- [#162](https://github.com/deviceinsight/kafkactl/pull/162) Fix `consumer-group` crashes when a group member has no assignment

## 3.2.0 - 2023-08-17

Expand Down
25 changes: 17 additions & 8 deletions internal/consumergroups/consumer-group-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,16 @@ func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeCon

memberAssignment, err := member.GetMemberAssignment()

var assignedPartitions map[string][]int32

if err != nil {
output.Warnf("failed to get group member assignment (%s, %s): %v", member.ClientHost, member.ClientId, err)
assignedPartitions = make(map[string][]int32)
} else if memberAssignment == nil {
output.Warnf("client (host: %s, ID: %s) has no group member assignment", member.ClientHost, member.ClientId)
} else {
assignedPartitions = filterAssignedPartitions(memberAssignment.Topics, topicPartitions)
output.Debugf("group=%s, protocolType=%s, state=%s", description.GroupId, description.ProtocolType, description.State)
return errors.Wrap(err, "failed to get group member assignment")
}
if memberAssignment == nil {
output.Warnf("assignment does not exist for member=%s, host=%s, clientId=%s, group=%s", member.MemberId, member.ClientHost, member.ClientId, description.GroupId)
continue
}

assignedPartitions := filterAssignedPartitions(memberAssignment.Topics, topicPartitions)

consumerGroupDescription.Members = addMember(consumerGroupDescription.Members, member.ClientHost, member.ClientId, assignedPartitions)
}
Expand Down Expand Up @@ -456,6 +456,11 @@ func findAssignedTopics(admin sarama.ClusterAdmin, groupNames []string) (map[str
return nil, errors.Wrap(err, "failed to get group member assignment")
}

if assignment == nil {
output.Warnf("assignment does not exist for member=%s, host=%s, clientId=%s, group=%s", member.MemberId, member.ClientHost, member.ClientId, description.GroupId)
continue
}

for t := range assignment.Topics {
if !util.ContainsString(topics, t) {
topics = append(topics, t)
Expand Down Expand Up @@ -500,6 +505,10 @@ func filterGroups(admin sarama.ClusterAdmin, groupNames []string, topic string)
output.Debugf("group=%s, protocolType=%s, state=%s", description.GroupId, description.ProtocolType, description.State)
return nil, errors.Wrap(err, "failed to get group member assignment")
}
if assignment == nil {
output.Warnf("assignment does not exist for member=%s, host=%s, clientId=%s, group=%s", member.MemberId, member.ClientHost, member.ClientId, description.GroupId)
continue
}

topics := make([]string, 0, len(assignment.Topics))
for t := range assignment.Topics {
Expand Down

0 comments on commit d2f078c

Please sign in to comment.