Skip to content

Commit

Permalink
Merge pull request #184 from rafiramadhana/177-describe-default-config
Browse files Browse the repository at this point in the history
Add include defaults when describing topics
  • Loading branch information
d-rk authored Jan 16, 2024
2 parents c032438 + 529b639 commit 0ac3c55
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 17 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- [#184](https://github.com/deviceinsight/kafkactl/pull/184) Added option to show default configs when describing topics

## 3.5.1 - 2023-11-10

## 3.5.0 - 2023-11-10
Expand Down
2 changes: 1 addition & 1 deletion cmd/describe/describe-topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func newDescribeTopicCmd() *cobra.Command {
}

cmdDescribeTopic.Flags().StringVarP(&flags.OutputFormat, "output", "o", flags.OutputFormat, "output format. One of: json|yaml|wide")
cmdDescribeTopic.Flags().BoolVarP(&flags.PrintConfigs, "print-configs", "c", true, "print configs")
cmdDescribeTopic.Flags().StringVarP((*string)(&flags.PrintConfigs), "print-configs", "c", "no_defaults", "print configs. One of none|no_defaults|all")
cmdDescribeTopic.Flags().BoolVarP(&flags.SkipEmptyPartitions, "skip-empty", "s", false, "show only partitions that have a messages")

return cmdDescribeTopic
Expand Down
68 changes: 68 additions & 0 deletions cmd/describe/describe-topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,69 @@ import (
"strings"
"testing"

"github.com/deviceinsight/kafkactl/internal"
"github.com/deviceinsight/kafkactl/internal/topic"

"github.com/deviceinsight/kafkactl/testutil"
)

func TestDescribeTopicConfigsIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)

prefix := "describe-t-configs-"

topicName1 := testutil.CreateTopic(t, prefix, "--config", "retention.ms=3600000")

kafkaCtl := testutil.CreateKafkaCtlCommand()
kafkaCtl.Verbose = false

// default --print-configs=no_defaults
if _, err := kafkaCtl.Execute("describe", "topic", topicName1, "-o", "yaml"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

describedTopic, err := topic.FromYaml(kafkaCtl.GetStdOut())
if err != nil {
t.Fatalf("failed to read yaml: %v", err)
}

configKeys := getConfigKeys(describedTopic.Configs)

testutil.AssertArraysEquals(t, []string{"retention.ms"}, configKeys)
testutil.AssertEquals(t, "3600000", describedTopic.Configs[0].Value)

// explicitly without defaults
if _, err := kafkaCtl.Execute("describe", "topic", topicName1, "-c", "no_defaults", "-o", "yaml"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

describedTopic, err = topic.FromYaml(kafkaCtl.GetStdOut())
if err != nil {
t.Fatalf("failed to read yaml: %v", err)
}

configKeys = getConfigKeys(describedTopic.Configs)

testutil.AssertArraysEquals(t, []string{"retention.ms"}, configKeys)
testutil.AssertEquals(t, "3600000", describedTopic.Configs[0].Value)

// all configs
if _, err := kafkaCtl.Execute("describe", "topic", topicName1, "-c", "all", "-o", "yaml"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

describedTopic, err = topic.FromYaml(kafkaCtl.GetStdOut())
if err != nil {
t.Fatalf("failed to read yaml: %v", err)
}

configKeys = getConfigKeys(describedTopic.Configs)

testutil.AssertContains(t, "retention.ms", configKeys)
testutil.AssertContains(t, "cleanup.policy", configKeys)
}

func TestDescribeTopicAutoCompletionIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)
Expand All @@ -30,3 +90,11 @@ func TestDescribeTopicAutoCompletionIntegration(t *testing.T) {
testutil.AssertContains(t, topicName2, outputLines)
testutil.AssertContains(t, topicName3, outputLines)
}

func getConfigKeys(configs []internal.Config) []string {
keys := make([]string, len(configs))
for i, config := range configs {
keys[i] = config.Name
}
return keys
}
4 changes: 2 additions & 2 deletions internal/broker/broker-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (operation *Operation) GetBrokers(flags GetBrokersFlags) error {
Name: fmt.Sprint(broker.ID()),
}

if configs, err = internal.ListConfigs(&admin, brokerConfig); err != nil {
if configs, err = internal.ListConfigs(&admin, brokerConfig, false); err != nil {
return err
}

Expand Down Expand Up @@ -153,7 +153,7 @@ func (operation *Operation) DescribeBroker(id int32, flags DescribeBrokerFlags)
Name: fmt.Sprint(broker.ID()),
}

if configs, err = internal.ListConfigs(&admin, brokerConfig); err != nil {
if configs, err = internal.ListConfigs(&admin, brokerConfig, false); err != nil {
return err
}

Expand Down
13 changes: 9 additions & 4 deletions internal/common-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,9 @@ func TopicExists(client *sarama.Client, name string) (bool, error) {
return false, nil
}

func ListConfigs(admin *sarama.ClusterAdmin, resource sarama.ConfigResource) ([]Config, error) {
func ListConfigs(admin *sarama.ClusterAdmin, resource sarama.ConfigResource, includeDefaults bool) ([]Config, error) {

var (
configs = make([]Config, 0)
configEntries []sarama.ConfigEntry
err error
)
Expand All @@ -321,15 +320,21 @@ func ListConfigs(admin *sarama.ClusterAdmin, resource sarama.ConfigResource) ([]
return nil, errors.Wrap(err, fmt.Sprintf("failed to describe %v config", getResourceTypeName(resource.Type)))
}

return listConfigsFromEntries(configEntries, includeDefaults), nil
}

func listConfigsFromEntries(configEntries []sarama.ConfigEntry, includeDefaults bool) []Config {
var configs = make([]Config, 0)

for _, configEntry := range configEntries {

if !configEntry.Default && configEntry.Source != sarama.SourceDefault {
if includeDefaults || (!configEntry.Default && configEntry.Source != sarama.SourceDefault) {
entry := Config{Name: configEntry.Name, Value: configEntry.Value}
configs = append(configs, entry)
}
}

return configs, nil
return configs
}

func getResourceTypeName(resourceType sarama.ConfigResourceType) string {
Expand Down
83 changes: 83 additions & 0 deletions internal/common-operation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package internal

import (
"reflect"
"testing"

"github.com/IBM/sarama"
)

func TestListConfigsFromEntries(t *testing.T) {
testCases := []struct {
name string
entries []sarama.ConfigEntry
includeDefaults bool
configs []Config
}{
{
name: "not include defaults, empty entries",
entries: []sarama.ConfigEntry{},
configs: []Config{},
},
{
name: "not include defaults",
entries: []sarama.ConfigEntry{
{
Name: "non_default",
Value: "ND",
Default: false,
Source: sarama.SourceUnknown,
},
{
Name: "default",
Value: "D",
Default: true,
Source: sarama.SourceDefault,
},
},
configs: []Config{
{Name: "non_default", Value: "ND"},
},
},
{
name: "include defaults, empty entries",
entries: []sarama.ConfigEntry{},
configs: []Config{},
includeDefaults: true,
},
{
name: "include defaults",
entries: []sarama.ConfigEntry{
{
Name: "non_default",
Value: "ND",
Default: false,
Source: sarama.SourceUnknown,
},
{
Name: "default",
Value: "D",
Default: true,
Source: sarama.SourceDefault,
},
},
configs: []Config{
{Name: "non_default", Value: "ND"},
{Name: "default", Value: "D"},
},
includeDefaults: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
configs := listConfigsFromEntries(tc.entries, tc.includeDefaults)

if len(configs) > 0 &&
len(tc.configs) > 0 &&
!reflect.DeepEqual(configs, tc.configs) {
t.Fatalf("expect: %v, got %v", tc.configs, configs)
}
})
}
}
38 changes: 28 additions & 10 deletions internal/topic/topic-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ type requestedTopicFields struct {
partitionLeader bool
partitionReplicas bool
partitionISRs bool
config bool
config PrintConfigsParam
}

var allFields = requestedTopicFields{partitionID: true, partitionOffset: true, partitionLeader: true, partitionReplicas: true, partitionISRs: true, config: true}
var allFields = requestedTopicFields{
partitionID: true, partitionOffset: true, partitionLeader: true,
partitionReplicas: true, partitionISRs: true, config: NonDefaultConfigs,
}

type GetTopicsFlags struct {
OutputFormat string
Expand All @@ -60,8 +63,16 @@ type AlterTopicFlags struct {
Configs []string
}

type PrintConfigsParam string

const (
NoConfigs PrintConfigsParam = "none"
AllConfigs PrintConfigsParam = "all"
NonDefaultConfigs PrintConfigsParam = "no_defaults"
)

type DescribeTopicFlags struct {
PrintConfigs bool
PrintConfigs PrintConfigsParam
SkipEmptyPartitions bool
OutputFormat string
}
Expand Down Expand Up @@ -161,7 +172,10 @@ func (operation *Operation) DescribeTopic(topic string, flags DescribeTopicFlags
return errors.Wrap(err, "failed to create cluster admin")
}

if t, err = readTopic(&client, &admin, topic, allFields); err != nil {
fields := allFields
fields.config = flags.PrintConfigs

if t, err = readTopic(&client, &admin, topic, fields); err != nil {
return errors.Wrap(err, "failed to read topic")
}

Expand All @@ -170,7 +184,7 @@ func (operation *Operation) DescribeTopic(topic string, flags DescribeTopicFlags

func (operation *Operation) printTopic(topic Topic, flags DescribeTopicFlags) error {

if !flags.PrintConfigs {
if flags.PrintConfigs == NoConfigs {
topic.Configs = nil
}

Expand Down Expand Up @@ -401,7 +415,11 @@ func (operation *Operation) AlterTopic(topic string, flags AlterTopicFlags) erro
}

if flags.ValidateOnly {
describeFlags := DescribeTopicFlags{PrintConfigs: len(flags.Configs) > 0}
printConfigs := NoConfigs
if len(flags.Configs) > 0 {
printConfigs = NonDefaultConfigs
}
describeFlags := DescribeTopicFlags{PrintConfigs: printConfigs}
return operation.printTopic(t, describeFlags)
}
return nil
Expand Down Expand Up @@ -472,7 +490,7 @@ func (operation *Operation) CloneTopic(sourceTopic, targetTopic string) error {
requestedFields := requestedTopicFields{
partitionID: true,
partitionReplicas: true,
config: true,
config: NonDefaultConfigs,
}

if t, err = readTopic(&client, &admin, sourceTopic, requestedFields); err != nil {
Expand Down Expand Up @@ -581,7 +599,7 @@ func (operation *Operation) GetTopics(flags GetTopicsFlags) error {
} else if flags.OutputFormat == "compact" {
tableWriter.Initialize()
} else if flags.OutputFormat == "wide" {
requestedFields = requestedTopicFields{partitionID: true, partitionReplicas: true, config: true}
requestedFields = requestedTopicFields{partitionID: true, partitionReplicas: true, config: NonDefaultConfigs}
if err := tableWriter.WriteHeader("TOPIC", "PARTITIONS", "REPLICATION FACTOR", "CONFIGS"); err != nil {
return err
}
Expand Down Expand Up @@ -725,14 +743,14 @@ func readTopic(client *sarama.Client, admin *sarama.ClusterAdmin, name string, r
return top.Partitions[i].ID < top.Partitions[j].ID
})

if requestedFields.config {
if requestedFields.config != NoConfigs {

topicConfig := sarama.ConfigResource{
Type: sarama.TopicResource,
Name: name,
}

if top.Configs, err = internal.ListConfigs(admin, topicConfig); err != nil {
if top.Configs, err = internal.ListConfigs(admin, topicConfig, requestedFields.config == AllConfigs); err != nil {
return top, err
}
}
Expand Down

0 comments on commit 0ac3c55

Please sign in to comment.