Kafka Admin Tool provides an interface to perform many admin operations on kafka in a straight-forward manner.
brew tap gojek/stable
brew install kat
go install github.com/gojek/kat
- Clone the repo
- Run
make all
to run lint checks, unit tests and build the project - Manual testing: Running
docker-compose up -d
will create 2 local kafka clusters. Commands can be run against these clusters for testing
- List Topics
- Describe Topics
- Delete Topics
- List Consumer Groups for a topic
- Increase Replication Factor
- Reassign Partitions
- Show Topic Configs
- Alter Topic Configs
- Mirror Topic Configs from Source to Destination Cluster
- Display the various args accepted by each command and the corresponding defaults
kat --help
kat <cmd> --help
- List all the topics in a cluster
kat topic list --broker-list <"broker1:9092,broker2:9092">
- List all topics with a particular replication factor
kat topic list --broker-list <"broker1:9092,broker2:9092"> --replication-factor <replication factor>
- List all topics with last write time before given time (unused/stale topics)
kat topic list --broker-list <"broker1:9092,broker2:9092"> --last-write=<epoch time> --data-dir=<kafka logs directory>
- List topic with size less than or equal to given size
kat topic list --broker-list <"broker1:9092,broker2:9092"> --size=<size in bytes>
Topic throughput metrics or last modified time is not available in topic metadata response from kafka. Hence, this tool has a custom implementation of ssh'ing into all the brokers and filtering through the kafka logs directory to find the topics that were not written after the given time.
- Describe metadata for topics
kat topic describe --broker-list <"broker1:9092,broker2:9092"> --topics <"topic1,topic2">
- Delete the topics that match the given topic-whitelist regex
kat topic delete --broker-list <"broker1:9092,broker2:9092"> --topic-whitelist=<*test*>
- Delete the topics that do not match the given topic-blacklist regex
kat topic delete --broker-list <"broker1:9092,broker2:9092"> --topic-blacklist=<*test*>
- Delete the topics that are not modified since the last-write epoch time and match the topic-whitelist regex
kat topic delete --broker-list <"broker1:9092,broker2:9092"> --last-write=<epoch time> --data-dir=<kafka logs directory> --topic-whitelist=<*test*>
- Delete the topics that are not modified since the last-write epoch time and do not match the topic-blacklist regex
kat topic delete --broker-list <"broker1:9092,broker2:9092"> --last-write=<epoch time> --data-dir=<kafka logs directory> --topic-blacklist=<*test*>
- Lists all the consumer groups that are subscribed to a given topic
kat consumergroup list -b <"broker1:9092,broker2:9092"> -t <topic-name>
- Increase the replication factor of topics that match given regex
kat topic increase-replication-factor --broker-list <"broker1:9092,broker2:9092"> --zookeeper <"zookeeper1,zookeeper2"> --topics <"topic1|topic2.*"> --replication-factor <r> --num-of-brokers <n> --batch <b> --timeout-per-batch <t> --poll-interval <p> --throttle <t>
- Reassign partitions for topics that match given regex
kat topic reassign-partitions --broker-list <"broker1:9092,broker2:9092"> --zookeeper <"zookeeper1,zookeeper2"> --topics <"topic1|topic2.*"> --broker-ids <i,j,k> --batch <b> --timeout-per-batch <t> --poll-interval <p> --throttle <t>
- Show config for topics
kat topic config show --topics <"topic1,topic2"> --broker-list <"broker1:9092,broker2:9092">
- Alter config for topics
kat topic config alter --topics <"topic1,topic2"> --broker-list <"broker1:9092,broker2:9092"> --config <"retention.ms=500000000,segment.bytes=1000000000">
- Mirror all configs for topics present in both source and destination cluster
kat mirror --source-broker-ips=<"broker1:9092,broker2:9092"> --destination-broker-ips=<"broker3,broker4">
- Mirror configs for topics present in both source and destination cluster, with some configs as exception
kat mirror --source-broker-ips=<"broker1:9092,broker2:9092"> --destination-broker-ips=<"broker3,broker4"> --exclude-configs=<"retention.ms,segment.bytes">
- Mirror configs for topics present in source cluster, but not in destination cluster
kat mirror --source-broker-ips=<"broker1:9092,broker2:9092"> --destination-broker-ips=<"broker3,broker4"> --exclude-configs=<"retention.ms,segment.bytes"> --create-topics
- Mirror configs for topics, with increase in partition count if there is a difference
kat mirror --source-broker-ips=<"broker1:9092,broker2:9092"> --destination-broker-ips=<"broker3,broker4"> --exclude-configs=<"retention.ms,segment.bytes"> --create-topics --increase-partitions
- Preview changes that will be applied on the destination cluster after mirroring
kat mirror --source-broker-ips=<"broker1:9092,broker2:9092"> --destination-broker-ips=<"broker3,broker4"> --exclude-configs=<"retention.ms,segment.bytes"> --create-topics --increase-partitions --dry-run
Increasing Replication Factor and Partition Reassignment are not one step processes. On a high level, the following steps need to be executed:
- Generating the reassignment.json file
- Executing
kafka-reassign-partitions
command - Verifying the status of reassignment
- Topics are split into batches of the number passed in
batch
arg. - Reassignment json file is created for each batch.
- This file is created using custom round-robin mechanism, that assigns leaders and ISRs per partition.
kafka-reassign-partitions
command is executed for each batch.- Status is polled for every
poll-interval
until thetimeout-per-batch
is reached. If the timeout breaches, the command exits. Once replication factor for all partitions in the batch are increased, then next batch is processed. - The reassignment.json and rollback.json files for all the batches are stored in /tmp directory. In case of any failure, running the
kafka-reassign-partitions
by passing the rollback.json of the failed batch will restore the state of those partitions.
- Topics are split into multi level batches of the number passed in
topic-batch-size
andpartition-batch-size
(Applicapble only for partition reassignment command) arg.- The reassignment is first into a topic level batch according to the
topic-batch-size
. - Each batch is then divided into sub-batches according to
partition-batch-size
. - This ensures that any point of time maximum
partition-batch-size
partitions are being moved in the cluster.
- The reassignment is first into a topic level batch according to the
- Reassignment json file is created for each batch.
- This is created using
--generate
flag provided by kafka cli tool.
- This is created using
kafka-reassign-partitions
command is executed for each batch.- Status is polled for every
poll-interval
until thetimeout-per-batch
is reached. If the timeout breaches, the command exits. Once all partitions in the batch are migrated, then next batch is processed. - The reassignment.json and rollback.json files for all the batches are stored in /tmp directory. In case of any failure, running the
kafka-reassign-partitions
by passing the rollback.json of the failed batch will restore the state of those partitions.
- Partition reassignment tool also has support for graceful shutdown and resume.
- If the a SIGINT(Ctrl+C) is supplied to the ongoing reassignment job, it will stop execution after the current batch is executed.
- To resume the job from the previous state, just add
--resume
flag to the command with all the input flags provided to the previously stopped job. - The resume flag assumes that the cluster state has remained the same between the previously stopped job and the resumption job. If new topics have been added during that interval, it's possible the new topics skip reassignment.
- Add support for more admin operations
- Beautify the response of list and show config commands. Add custom features to ui pkg
- Fetch values from a kat config file instead of passing everything as cmd args
- Raise an issue to clarify scope/questions, followed by PR
- Follow go guidelines for development
- Ensure
make
succeeds
Thanks for all the Contributors.
Licensed under the Apache License, Version 2.0