diff --git a/README.md b/README.md index eb1bf995..bbc618dc 100644 --- a/README.md +++ b/README.md @@ -523,6 +523,8 @@ The `apply` subcommand can make changes, but under the following conditions: 8. Before applying, the tool checks the cluster ID against the expected value in the cluster config. This can help prevent errors around applying in the wrong cluster when multiple clusters are accessed through the same address, e.g `localhost:2181`. +9. If the `destructive` CLI argument is passed, `apply` deletes the settings that are + set on the broker but not set in configuration. The `reset-offsets` command can also make changes in the cluster and should be used carefully. diff --git a/cmd/topicctl/subcmd/apply.go b/cmd/topicctl/subcmd/apply.go index 5e4e6ee4..9044268e 100644 --- a/cmd/topicctl/subcmd/apply.go +++ b/cmd/topicctl/subcmd/apply.go @@ -36,6 +36,7 @@ type applyCmdConfig struct { retentionDropStepDurationStr string skipConfirm bool ignoreFewerPartitionsError bool + destructive bool sleepLoopDuration time.Duration failFast bool @@ -107,6 +108,12 @@ func init() { false, "Don't return error when topic's config specifies fewer partitions than it currently has", ) + applyCmd.Flags().BoolVar( + &applyConfig.destructive, + "destructive", + false, + "Deletes topic settings from the broker if the settings are present on the broker but not in the config", + ) applyCmd.Flags().DurationVar( &applyConfig.sleepLoopDuration, "sleep-loop-duration", @@ -259,6 +266,7 @@ func applyTopic( RetentionDropStepDuration: applyConfig.retentionDropStepDuration, SkipConfirm: applyConfig.skipConfirm, IgnoreFewerPartitionsError: applyConfig.ignoreFewerPartitionsError, + Destructive: applyConfig.destructive, SleepLoopDuration: applyConfig.sleepLoopDuration, TopicConfig: topicConfig, } diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 7823a251..2b629023 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -307,6 +307,7 @@ func rebalanceApplyTopic( AutoContinueRebalance: true, // to continue without prompts RetentionDropStepDuration: retentionDropStepDuration, // not needed for rebalance SkipConfirm: true, // to enforce action: rebalance + Destructive: false, // Irrelevant here SleepLoopDuration: rebalanceConfig.sleepLoopDuration, TopicConfig: topicConfig, } diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index 20eede18..408dbf8b 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -37,6 +37,7 @@ type TopicApplierConfig struct { RetentionDropStepDuration time.Duration SkipConfirm bool IgnoreFewerPartitionsError bool + Destructive bool SleepLoopDuration time.Duration TopicConfig config.TopicConfig } @@ -392,6 +393,8 @@ func (t *TopicApplier) updateSettings( return err } + configEntries := []kafka.ConfigEntry{} + if len(diffKeys) > 0 { diffsTable, err := FormatSettingsDiff(topicSettings, topicInfo.Config, diffKeys) if err != nil { @@ -416,6 +419,23 @@ func (t *TopicApplier) updateSettings( ) } + configEntries, err = topicSettings.ToConfigEntries(diffKeys) + if err != nil { + return err + } + } + + if len(missingKeys) > 0 && t.config.Destructive { + log.Infof( + "Found %d key(s) set in cluster but missing from config to be deleted:\n%s", + len(missingKeys), + FormatMissingKeys(topicInfo.Config, missingKeys), + ) + + configEntries = append(configEntries, topicSettings.ToEmptyConfigEntries(missingKeys)...) + } + + if len(configEntries) > 0 { if t.config.DryRun { log.Infof("Skipping update because dryRun is set to true") return nil @@ -430,11 +450,6 @@ func (t *TopicApplier) updateSettings( } log.Infof("OK, updating") - configEntries, err := topicSettings.ToConfigEntries(diffKeys) - if err != nil { - return err - } - _, err = t.adminClient.UpdateTopicConfig( ctx, t.topicName, @@ -446,7 +461,7 @@ func (t *TopicApplier) updateSettings( } } - if len(missingKeys) > 0 { + if len(missingKeys) > 0 && !t.config.Destructive { log.Warnf( "Found %d key(s) set in cluster but missing from config:\n%s\nThese will be left as-is.", len(missingKeys), diff --git a/pkg/apply/apply_test.go b/pkg/apply/apply_test.go index 033cdff9..78d4c9ad 100644 --- a/pkg/apply/apply_test.go +++ b/pkg/apply/apply_test.go @@ -80,6 +80,27 @@ func TestApplyBasicUpdates(t *testing.T) { applier.topicConfig.Spec.ReplicationFactor = 3 err = applier.Apply(ctx) require.NotNil(t, err) + applier.topicConfig.Spec.ReplicationFactor = 2 + + // Settings are not deleted if Destructive is false. They are + // if it is true + delete(applier.topicConfig.Spec.Settings, "cleanup.policy") + err = applier.Apply(ctx) + require.NoError(t, err) + topicInfo, err = applier.adminClient.GetTopic(ctx, topicName, true) + require.NoError(t, err) + + assert.Equal(t, "delete", topicInfo.Config["cleanup.policy"]) + + applier.config.Destructive = true + err = applier.Apply(ctx) + require.NoError(t, err) + topicInfo, err = applier.adminClient.GetTopic(ctx, topicName, true) + require.NoError(t, err) + + _, present := topicInfo.Config["cleanup.policy"] + assert.False(t, present) + } func TestApplyPlacementUpdates(t *testing.T) { diff --git a/pkg/config/settings.go b/pkg/config/settings.go index 1a5c6621..bfbe6b1a 100644 --- a/pkg/config/settings.go +++ b/pkg/config/settings.go @@ -346,6 +346,22 @@ func (t TopicSettings) ToConfigEntries(keys []string) ([]kafka.ConfigEntry, erro return entries, nil } +// Produces a slice of kafka-go config entries with empty value. Thus used +// for deletion of the setting. +func (t TopicSettings) ToEmptyConfigEntries(keys []string) []kafka.ConfigEntry { + entries := []kafka.ConfigEntry{} + + if keys != nil { + for _, key := range keys { + entries = append( + entries, + kafka.ConfigEntry{ConfigName: key, ConfigValue: ""}, + ) + } + } + return entries +} + // HasKey returns whether the current settings instance contains the argument key. func (t TopicSettings) HasKey(key string) bool { _, ok := t[key]