Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reset-offsets: add --delete option #169

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
test010:
runs-on: ubuntu-latest
container:
image: cimg/go:1.19
image: cimg/go:1.21
env:
GO111MODULE: "on"
KAFKA_TOPICS_TEST_ZK_ADDR: zookeeper:2181
Expand All @@ -39,7 +39,7 @@ jobs:
- name: Go setup
uses: actions/setup-go@v3
with:
go-version: 1.19
go-version: 1.21
- name: Display Go version
run: go version
- name: Run tests
Expand Down Expand Up @@ -124,7 +124,7 @@ jobs:
test271:
runs-on: ubuntu-latest
container:
image: cimg/go:1.19
image: cimg/go:1.21
env:
GO111MODULE: "on"
KAFKA_TOPICS_TEST_ZK_ADDR: zookeeper:2181
Expand All @@ -135,7 +135,7 @@ jobs:
- name: Go setup
uses: actions/setup-go@v3
with:
go-version: 1.19
go-version: 1.21
- name: Display Go version
run: go version
- name: Run tests
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$BUILDPLATFORM golang:1.19 as builder
FROM --platform=$BUILDPLATFORM golang:1.21 as builder
ENV SRC github.com/segmentio/topicctl
ENV CGO_ENABLED=0

Expand Down
65 changes: 57 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ docker-compose up -d
3. Apply the topic configs in [`examples/local-cluster/topics`](/examples/local-cluster/topics):

```
topicctl apply --skip-confirm examples/local-cluster/topics/*yaml
topicctl apply --skip-confirm examples/local-cluster/topics/*.yaml
```

4. Send some test messages to the `topic-default` topic:
Expand Down Expand Up @@ -205,13 +205,62 @@ subcommands interactively.
topicctl reset-offsets [topic] [group] [flags]
```

The `reset-offsets` subcommand allows resetting the offsets for a consumer group in a topic. There are 2 main approaches for setting the offsets:

1. Use a combination of `--partitions`, `--offset`, `--to-earliest` and `--to-latest` flags. `--partitions` flag specifies a list of partitions to be reset e.g. `1,2,3 ...`. If not used, the command defaults to resetting consumer group offsets for ALL of the partitions. `--offset` flag indicates the specific value that all desired consumer group partitions will be set to. If not set, it will default to -2. Finally, `--to-earliest` flag resets offsets of consumer group members to earliest offsets of partitions while `--to-latest` resets offsets of consumer group members to latest offsets of partitions. However, only one of the `--to-earliest`, `--to-latest` and `--offset` flags can be used at a time. This approach is easy to use but lacks the ability for detailed offset configuration.

2. Use `--partition-offset-map` flag to specify a detailed offset configuration for individual partitions. For example, `1=5,2=10,7=12,...` means that the consumer group offset for partition 1 must be set to 5, partition 2 to offset 10, partition 7 to offset 12 and so on. This approach provides greater flexibility and fine-grained control for this operation. Note that `--partition-offset-map` flag is standalone and cannot be coupled with any of the previous flags.


The `reset-offsets` subcommand allows resetting the offsets
for a consumer group in a topic.
There are a few typical approaches for setting the offsets:

1. Use `--delete` alongside `--before-earliest`:
This will unblock consumers which are stuck on an offsets
which are no longer in range,
without affecting healthy consumers.
Typically this follows an outage or sustained slow consumption.
2. Use one of the partition selectors:
`--before-earliest`, `--after-latest`, or `--partitions`,
and combine it with one of the offset operators:
`--delete`, `--offset`, `--to-earliest` or `--to-latest`.
Aside from `--to-latest`, this is a legacy approach that is largely
superseded by approach 1.
3. Use `--partition-offset-map` to pass specific offsets per partition.
For example, `1=5,2=10` means that the consumer group offset
for partition 1 must be set to 5, and partition 2 to offset 10.
This is mainly used for replays of specific traffic,
such as when a deploy has mishandled or corrupted state,
and the prior release must be rerun
starting at a specific offset per partition.
This is the most flexible approach for offset setting.

Note that `--partition-offset-map` flag is standalone
and cannot be coupled with other flags.

##### Partition selection flags

At most one of the following may be selected:

* `--partitions` specifies a comma-separated list of partitions IDs.
* `--before-earliest` selects partitions whose group offset is older
than the oldest still-retained offset.
* `--after-latest` selects partitions whose group offset is newer
than the newest offset that has been published to the topic.

If none of these are specified,
the command defaults to selecting ALL of the partitions.

##### Offset selection flags

At most one of the following may be selected:

* `--delete` removes stored group offsets.
This will generally have the same effect as `--to-earliest` or `--to-latest`,
depending on the consumer group configuration.
However, `--delete` is more reliable and convenient,
since `--to-earliest` in particular involves a race with message retention
that may require numerous attempts.
* `--offset` indicates the specific value that all selected
consumer group partitions will be set to.
* `--to-earliest` resets group offsets to oldest still-retained per partition.
* `--to-latest` resets group offsets to newest per partitions.

If none of these are specified, `--to-earliest` will be the default.

#### tail

Expand Down
9 changes: 2 additions & 7 deletions cmd/topicctl/subcmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func partitionsCmd() *cobra.Command {
Use: "partitions [optional: topics]",
Short: "Get all partitions information for topics",
Args: cobra.MinimumNArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(cmd *cobra.Command, topics []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())

Expand All @@ -239,11 +239,6 @@ func partitionsCmd() *cobra.Command {
}
defer adminClient.Close()

topics := []string{}
for _, arg := range args {
topics = append(topics, arg)
}

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
return cliRunner.GetPartitions(
ctx,
Expand All @@ -264,7 +259,7 @@ func partitionsCmd() *cobra.Command {
&partitionsConfig.summary,
"summary",
false,
fmt.Sprintf("Display summary of partitions"),
"Display summary of partitions",
)

return partitionsCommand
Expand Down
9 changes: 5 additions & 4 deletions cmd/topicctl/subcmd/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package subcmd
import (
"context"
"fmt"
"github.com/spf13/cobra"
"os"
"os/signal"
"path/filepath"
"strconv"
"syscall"
"time"

"github.com/spf13/cobra"

"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/apply"
"github.com/segmentio/topicctl/pkg/cli"
Expand Down Expand Up @@ -102,6 +103,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}

ctx = context.WithValue(ctx, "progress", rebalanceCtxStruct)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -147,11 +149,10 @@ func rebalanceRun(cmd *cobra.Command, args []string) error {
}

// iterate through each topic config and initiate rebalance
topicConfigs := []config.TopicConfig{}
topicErrorDict := make(map[string]error)
for _, topicFile := range topicFiles {
// do not consider invalid topic yaml files for rebalance
topicConfigs, err = config.LoadTopicsFile(topicFile)
topicConfigs, err := config.LoadTopicsFile(topicFile)
if err != nil {
log.Errorf("Invalid topic yaml file: %s", topicFile)
continue
Expand Down Expand Up @@ -205,8 +206,8 @@ func rebalanceRun(cmd *cobra.Command, args []string) error {
errorTopics += 1
log.Errorf("topic: %s rebalance failed with error: %v", thisTopicName, thisTopicError)
} else {
log.Infof("topic: %s rebalance is successful", thisTopicName)
successTopics += 1
log.Infof("topic: %s rebalance is successful", thisTopicName)
}
}

Expand Down
Loading
Loading