Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support --delete option for reset-offsets command #172

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
test010:
runs-on: ubuntu-latest
container:
image: cimg/go:1.19
image: cimg/go:1.21
env:
GO111MODULE: "on"
KAFKA_TOPICS_TEST_ZK_ADDR: zookeeper:2181
Expand All @@ -39,7 +39,7 @@ jobs:
- name: Go setup
uses: actions/setup-go@v3
with:
go-version: 1.19
go-version: 1.21
- name: Display Go version
run: go version
- name: Run tests
Expand Down Expand Up @@ -132,7 +132,7 @@ jobs:
test270:
runs-on: ubuntu-latest
container:
image: cimg/go:1.19
image: cimg/go:1.21
env:
GO111MODULE: "on"
KAFKA_TOPICS_TEST_ZK_ADDR: zookeeper:2181
Expand All @@ -143,7 +143,7 @@ jobs:
- name: Go setup
uses: actions/setup-go@v3
with:
go-version: 1.19
go-version: 1.21
- name: Display Go version
run: go version
- name: Run tests
Expand Down
44 changes: 40 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ docker-compose up -d
3. Apply the topic configs in [`examples/local-cluster/topics`](/examples/local-cluster/topics):

```
topicctl apply --skip-confirm examples/local-cluster/topics/*yaml
topicctl apply --skip-confirm examples/local-cluster/topics/*.yaml
```

4. Send some test messages to the `topic-default` topic:
Expand Down Expand Up @@ -225,13 +225,49 @@ subcommands interactively.
topicctl reset-offsets [topic] [group] [flags]
```

The `reset-offsets` subcommand allows resetting the offsets for a consumer group in a topic. There are 2 main approaches for setting the offsets:
The `reset-offsets` subcommand allows resetting the offsets
for a consumer group in a topic.
There are a few typical approaches for setting the offsets:

1. Use a combination of `--partitions`, `--offset`, `--to-earliest` and `--to-latest` flags. `--partitions` flag specifies a list of partitions to be reset e.g. `1,2,3 ...`. If not used, the command defaults to resetting consumer group offsets for ALL of the partitions. `--offset` flag indicates the specific value that all desired consumer group partitions will be set to. If not set, it will default to -2. Finally, `--to-earliest` flag resets offsets of consumer group members to earliest offsets of partitions while `--to-latest` resets offsets of consumer group members to latest offsets of partitions. However, only one of the `--to-earliest`, `--to-latest` and `--offset` flags can be used at a time. This approach is easy to use but lacks the ability for detailed offset configuration.
1. Use `--partitions` and combine it with one of the offset operators:
`--delete`, `--offset`, `--to-earliest` or `--to-latest`.
2. Use `--partition-offset-map` to pass specific offsets per partition.
For example, `1=5,2=10` means that the consumer group offset
for partition 1 must be set to 5, and partition 2 to offset 10.
This is mainly used for replays of specific traffic,
such as when a deploy has mishandled or corrupted state,
and the prior release must be rerun
starting at a specific offset per partition.
This is the most flexible approach for offset setting.

2. Use `--partition-offset-map` flag to specify a detailed offset configuration for individual partitions. For example, `1=5,2=10,7=12,...` means that the consumer group offset for partition 1 must be set to 5, partition 2 to offset 10, partition 7 to offset 12 and so on. This approach provides greater flexibility and fine-grained control for this operation. Note that `--partition-offset-map` flag is standalone and cannot be coupled with any of the previous flags.
Note that `--partition-offset-map` flag is standalone
and cannot be coupled with other flags.

##### Partition selection flags

At most one of the following may be selected:

* `--partitions` specifies a comma-separated list of partitions IDs.

If none of these are specified,
the command defaults to selecting ALL of the partitions.

##### Offset selection flags

At most one of the following may be selected:

* `--delete` removes stored group offsets.
This will generally have the same effect as `--to-earliest` or `--to-latest`,
depending on the consumer group configuration.
However, `--delete` is more reliable and convenient,
since `--to-earliest` in particular involves a race with message retention
that may require numerous attempts.
* `--offset` indicates the specific value that all selected
consumer group partitions will be set to.
* `--to-earliest` resets group offsets to oldest still-retained per partition.
* `--to-latest` resets group offsets to newest per partitions.

If none of these are specified, `--to-earliest` will be the default.

#### tail

Expand Down
205 changes: 136 additions & 69 deletions cmd/topicctl/subcmd/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ import (
"fmt"
"strconv"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/cli"
"github.com/segmentio/topicctl/pkg/groups"
"github.com/segmentio/topicctl/pkg/util"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var resetOffsetsCmd = &cobra.Command{
Use: "reset-offsets [topic name] [group name]",
Use: "reset-offsets <topic-name> <group-name>",
Short: "reset consumer group offsets",
Args: cobra.MinimumNArgs(2),
Args: cobra.ExactArgs(2),
PreRunE: resetOffsetsPreRun,
RunE: resetOffsetsRun,
}
Expand All @@ -27,6 +29,7 @@ type resetOffsetsCmdConfig struct {
partitionOffsetMap map[string]int64
toEarliest bool
toLatest bool
delete bool

shared sharedOptions
}
Expand Down Expand Up @@ -62,121 +65,185 @@ func init() {
"to-latest",
false,
"Resets offsets of consumer group members to latest offsets of partitions")
resetOffsetsCmd.Flags().BoolVar(
&resetOffsetsConfig.delete,
"delete",
false,
"Deletes offsets for the given consumer group")

addSharedFlags(resetOffsetsCmd, &resetOffsetsConfig.shared)
RootCmd.AddCommand(resetOffsetsCmd)
}

func resetOffsetsPreRun(cmd *cobra.Command, args []string) error {
resetOffsetSpecification := "You must choose only one of the following reset-offset specifications: --to-earliest, --to-latest, --offset."
offsetMapSpecification := "--partition-offset-map option cannot be coupled with any of the following options: --partitions, --to-earliest, --to-latest, --offset."

if len(resetOffsetsConfig.partitionOffsetMap) > 0 && (cmd.Flags().Changed("offset") ||
len(resetOffsetsConfig.partitions) > 0 ||
resetOffsetsConfig.toEarliest ||
resetOffsetsConfig.toLatest) {
return errors.New(offsetMapSpecification)
resetOffsetSpec := "You must choose only one of the following " +
"reset-offset specifications: --delete, --to-earliest, --to-latest, " +
"--offset, or --partition-offset-map."
offsetMapSpec := "--partition-offset-map option cannot be used with --partitions."

cfg := resetOffsetsConfig

numOffsetSpecs := numTrue(
cfg.toEarliest,
cfg.toLatest,
cfg.delete,
cmd.Flags().Changed("offset"),
len(cfg.partitionOffsetMap) > 0,
)

} else if resetOffsetsConfig.toEarliest && resetOffsetsConfig.toLatest {
return errors.New(resetOffsetSpecification)
if numOffsetSpecs > 1 {
return errors.New(resetOffsetSpec)
}

} else if cmd.Flags().Changed("offset") && (resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest) {
return errors.New(resetOffsetSpecification)
if len(cfg.partitionOffsetMap) > 0 && len(cfg.partitions) > 0 {
return errors.New(offsetMapSpec)
}
return resetOffsetsConfig.shared.validate()

return cfg.shared.validate()
}

func resetOffsetsRun(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

adminClient, err := resetOffsetsConfig.shared.getAdminClient(ctx, nil, true)
cfg := resetOffsetsConfig

adminClient, err := cfg.shared.getAdminClient(ctx, nil, true)
if err != nil {
return err
}

defer adminClient.Close()

connector := adminClient.GetConnector()

topic := args[0]
group := args[1]

topicInfo, err := adminClient.GetTopic(ctx, topic, false)
if err != nil {
return err
}
partitionIDsMap := map[int]struct{}{}

partitionIDsMap := make(map[int]struct{}, len(topicInfo.Partitions))
for _, partitionInfo := range topicInfo.Partitions {
partitionIDsMap[partitionInfo.ID] = struct{}{}
}
var resetOffsetsStrategy string
if resetOffsetsConfig.toLatest {
resetOffsetsStrategy = groups.LatestResetOffsetsStrategy
} else if resetOffsetsConfig.toEarliest {
resetOffsetsStrategy = groups.EarliestResetOffsetsStrategy

var strategy string

switch {
case resetOffsetsConfig.toLatest:
strategy = groups.LatestResetOffsetsStrategy
case resetOffsetsConfig.toEarliest:
strategy = groups.EarliestResetOffsetsStrategy
}
partitionOffsets := map[int]int64{}

if len(resetOffsetsConfig.partitionOffsetMap) > 0 {
for partition, offset := range resetOffsetsConfig.partitionOffsetMap {
var partitionID int
if partitionID, err = strconv.Atoi(partition); err != nil {
return fmt.Errorf("Partition value %s must be a number", partition)
}
if _, ok := partitionIDsMap[partitionID]; !ok {
return fmt.Errorf("Partition %d not found in topic %s", partitionID, topic)
}
// If explicit per-partition offsets were specified, set them now.
partitionOffsets, err := parsePartitionOffsetMap(partitionIDsMap, cfg.partitionOffsetMap)
if err != nil {
return err
}

partitionOffsets[partitionID] = offset
// Set explicit partitions (without offsets) if specified,
// otherwise operate on fetched partition info;
// these will only take effect of per-partition offsets were not specified.
partitions := cfg.partitions
if len(partitions) == 0 && len(partitionOffsets) == 0 {
convert := func(info admin.PartitionInfo) int { return info.ID }
partitions = convertSlice(topicInfo.Partitions, convert)
}

for _, partition := range partitions {
_, ok := partitionIDsMap[partition]
if !ok {
format := "Partition %d not found in topic %s"
return fmt.Errorf(format, partition, topic)
}

} else if len(resetOffsetsConfig.partitions) > 0 {
for _, partition := range resetOffsetsConfig.partitions {
if _, ok := partitionIDsMap[partition]; !ok {
return fmt.Errorf("Partition %d not found in topic %s", partition, topic)
}
if resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest {
partitionOffsets[partition], err = groups.GetEarliestOrLatestOffset(ctx, adminClient.GetConnector(), topic, resetOffsetsStrategy, partition)
if err != nil {
return err
}
} else {
partitionOffsets[partition] = resetOffsetsConfig.offset
}

if strategy == "" {
partitionOffsets[partition] = cfg.offset
return nil
}
} else {
for _, partitionInfo := range topicInfo.Partitions {
if resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest {
partitionOffsets[partitionInfo.ID], err = groups.GetEarliestOrLatestOffset(ctx, adminClient.GetConnector(), topic, resetOffsetsStrategy, partitionInfo.ID)
if err != nil {
return err
}
} else {
partitionOffsets[partitionInfo.ID] = resetOffsetsConfig.offset
}

offset, err := groups.GetEarliestOrLatestOffset(ctx, connector, topic, strategy, partition)
if err != nil {
return err
}

partitionOffsets[partition] = offset
}

log.Infof(
"This will reset the offsets for the following partitions in topic %s for group %s:\n%s",
"This will reset the offsets for the following partitions "+
"in topic %s for group %s:\n%s",
topic,
group,
groups.FormatPartitionOffsets(partitionOffsets),
)
log.Info(
"Please ensure that all other consumers are stopped, otherwise the reset might be overridden.",
)

log.Info("Please ensure that all other consumers are stopped, " +
"otherwise the reset might be overridden.")

ok, _ := util.Confirm("OK to continue?", false)
if !ok {
return errors.New("Stopping because of user response")
}

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
return cliRunner.ResetOffsets(
ctx,
topic,
group,
partitionOffsets,
)

if resetOffsetsConfig.delete {
input := groups.DeleteOffsetsInput{
GroupID: group,
Topic: topic,
Partitions: partitions,
}

return cliRunner.DeleteOffsets(ctx, &input)
}

return cliRunner.ResetOffsets(ctx, topic, group, partitionOffsets)
}

func numTrue(bools ...bool) int {
var n int
for _, b := range bools {
if b {
n++
}
}

return n
}

func convertSlice[T1, T2 any](input []T1, fn func(T1) T2) []T2 {
out := make([]T2, len(input))

for i, v := range input {
out[i] = fn(v)
}

return out
}

func parsePartitionOffsetMap(partitionIDsMap map[int]struct{}, input map[string]int64) (map[int]int64, error) {
out := make(map[int]int64, len(input))

for partition, offset := range input {
partitionID, err := strconv.Atoi(partition)
if err != nil {
format := "Partition value %s must be an integer"
return nil, fmt.Errorf(format, partition)
}

_, ok := partitionIDsMap[partitionID]
if !ok {
format := "Partition %d not found"
return nil, fmt.Errorf(format, partitionID)
}

out[partitionID] = offset
}

return out, nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/segmentio/topicctl

go 1.18
go 1.21

require (
github.com/aws/aws-sdk-go v1.44.208
Expand Down
Loading
Loading