Skip to content

Commit

Permalink
filter topics
Browse files Browse the repository at this point in the history
  • Loading branch information
mpicque committed Jan 15, 2025
1 parent 9a58bfd commit bfe819d
Showing 1 changed file with 89 additions and 6 deletions.
95 changes: 89 additions & 6 deletions consumer_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,32 @@ import (
"github.com/IBM/sarama"
cli "github.com/jawher/mow.cli"
"log"
"regexp"
"sort"
"strings"
)

func consumerGroupsCmd(c *cli.Cmd) {
// add unique & sorted flags
var (
sorted = c.BoolOpt("s sorted", false, "sort the consumer groups (ascending)")
sorted = c.BoolOpt("s sorted", false, "sort the consumer groups (ascending)")
listOffsets = c.BoolOpt("l list-offsets", false, "list the offsets for each consumer groups")
filter = c.StringOpt("f filter", "", "filter the consumer groups (regex)")
topics = c.StringsOpt("t topics", nil, "topic(s) to consume from")
)

c.Action = func() {
cfg := config(*useSSL, *sslCAFile, *sslCertFile, *sslKeyFile)
consumerGroups(cfg, splitFlatten(*bootstrapServers), sorted)
consumerGroups(cfg, splitFlatten(*bootstrapServers), sorted, listOffsets, filter, topics)
}
}

func consumerGroups(config *sarama.Config, bootstrapServers []string, sorted *bool) {
fmt.Printf("Listing consumer groups for all topics on broker(s) %q\n", strings.Join(bootstrapServers, ", "))
func consumerGroups(config *sarama.Config, bootstrapServers []string, sorted *bool, listOffsets *bool, filter *string, topicsFilter *[]string) {
if topicsFilter != nil {
fmt.Printf("Listing consumer groups for topics %q on broker(s) %q\n", strings.Join(*topicsFilter, ", "), strings.Join(bootstrapServers, ", "))
} else {
fmt.Printf("Listing consumer groups for all topics on broker(s) %q\n", strings.Join(bootstrapServers, ", "))
}

newAdmin, err := sarama.NewClusterAdmin(bootstrapServers, config)
die(err)
Expand All @@ -33,19 +41,94 @@ func consumerGroups(config *sarama.Config, bootstrapServers []string, sorted *bo
}
}()

topicPartitions := make(map[string][]int32)
if *listOffsets {
topics, err := newAdmin.ListTopics()
die(err)

// convert topics to map[string][]int32{} (topic -> partitions)
for topicName, topicDetail := range topics {
if topics != nil && !arrayContains(topicsFilter, topicName) {
continue
}
numPartitions := int(topicDetail.NumPartitions)
partitions := make([]int32, numPartitions)
for i := 0; i < numPartitions; i++ {
partitions[i] = int32(i)
}
topicPartitions[topicName] = partitions
}
}

consumerGroupsMap, err := newAdmin.ListConsumerGroups()
die(err)

consumerGroups := keys(consumerGroupsMap, sorted)
if *filter != "" {
consumerGroups = filterSlice(consumerGroups, *filter)
}

fmt.Printf("Consumer groups:\n")
// the consumer group name is the key
for _, consumerGroup := range consumerGroups {
fmt.Printf(" %s\n", consumerGroup)
logged := false

if *listOffsets {
offsets, err := newAdmin.ListConsumerGroupOffsets(consumerGroup, topicPartitions)
die(err)

// print only if any partition offset is not -1
for topic, partitions := range offsets.Blocks {
if topicsFilter != nil && !arrayContains(topicsFilter, topic) {
continue
}
// if all partitions are -1, skip this topic
partitionsWithOffsets := len(partitions)
for _, offset := range partitions {
if offset.Offset == -1 {
partitionsWithOffsets--
}
}
if partitionsWithOffsets == 0 {
// no partition with offset, skip this topic
continue
}

if !logged {
fmt.Printf(" %s\n", consumerGroup)
logged = true
}
fmt.Printf(" Topic %s:\n", topic)
for partition, offset := range partitions {
fmt.Printf(" - partition %d: %d\n", partition, offset.Offset)
}
}
}
}
fmt.Printf("\nTotal: %d\n", len(consumerGroups))
}

func arrayContains(topics *[]string, topic string) bool {
if len(*topics) == 0 {
return false
}
for _, t := range *topics {
if t == topic {
return true
}
}
return false
}

func filterSlice(groups []string, filterRegex string) []string {
var filtered []string
for _, consumerGroup := range groups {
if match, _ := regexp.MatchString(filterRegex, consumerGroup); match {
filtered = append(filtered, consumerGroup)
}
}
return filtered
}

// extract all keys of the map, sort the keys if sorted flag is set
func keys(groups map[string]string, sorted *bool) []string {
var keys []string
Expand Down

0 comments on commit bfe819d

Please sign in to comment.