diff --git a/.gitignore b/.gitignore index 3fa5ddc..75ee31f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /kt /quickfix +hkt diff --git a/README.md b/README.md index 1b1198b..c2c3a3d 100644 --- a/README.md +++ b/README.md @@ -38,19 +38,19 @@ $ hkt topic -filter news -partitions
Produce messages ```sh -$ echo 'Alice wins Oscar' | kt produce -topic actor-news -literal +$ echo 'Alice wins Oscar' | hkt produce -topic actor-news -literal { "count": 1, "partition": 0, "startOffset": 0 } -$ echo 'Bob wins Oscar' | kt produce -tlsca myca.pem -tlscert myclientcert.pem -tlscertkey mycertkey.pem -topic actor-news -literal +$ echo 'Bob wins Oscar' | hkt produce -tlsca myca.pem -tlscert myclientcert.pem -tlscertkey mycertkey.pem -topic actor-news -literal { "count": 1, "partition": 0, "startOffset": 0 } -$ for i in {6..9} ; do echo Bourne sequel $i in production. | kt produce -topic actor-news -literal ;done +$ for i in {6..9} ; do echo Bourne sequel $i in production. | hkt produce -topic actor-news -literal ;done { "count": 1, "partition": 0, diff --git a/common.go b/common.go index b5cec49..369eb7e 100644 --- a/common.go +++ b/common.go @@ -284,40 +284,6 @@ func decoderForType(typ string) (func(m json.RawMessage) ([]byte, error), error) var nullJSON = json.RawMessage("null") -func encoderForType(typ string) (func([]byte) (json.RawMessage, error), error) { - var enc func([]byte) string - switch typ { - case "json": - return func(data []byte) (json.RawMessage, error) { - if err := json.Unmarshal(data, new(json.RawMessage)); err != nil { - return nil, fmt.Errorf("invalid JSON value %q: %v", data, err) - } - return json.RawMessage(data), nil - }, nil - case "hex": - enc = hex.EncodeToString - case "base64": - enc = base64.StdEncoding.EncodeToString - case "string": - enc = func(data []byte) string { - return string(data) - } - default: - return nil, fmt.Errorf(`unsupported decoder %#v, only json, string, hex and base64 are supported`, typ) - } - return func(data []byte) (json.RawMessage, error) { - if data == nil { - return nullJSON, nil - } - data1, err := json.Marshal(enc(data)) - if err != nil { - // marshaling a string cannot fail but be defensive. - return nil, err - } - return json.RawMessage(data1), nil - }, nil -} - func min(x, y int64) int64 { if x < y { return x diff --git a/consume.go b/consume.go index aea3690..64af863 100644 --- a/consume.go +++ b/consume.go @@ -3,6 +3,8 @@ package main import ( "bytes" "context" + "encoding/base64" + "encoding/hex" "encoding/json" "flag" "fmt" @@ -13,12 +15,14 @@ import ( "time" "github.com/Shopify/sarama" + "github.com/heetch/avro/avroregistry" + "github.com/linkedin/goavro/v2" ) type consumeCmd struct { commonFlags + topic string - offsets string timeout time.Duration valueCodecType string keyCodecType string @@ -33,6 +37,9 @@ type consumeCmd struct { encodeKey func([]byte) (json.RawMessage, error) client sarama.Client consumer sarama.Consumer + + registryURL string + registry *avroregistry.Registry } // consumedMessage defines the format that's used to @@ -48,13 +55,15 @@ type consumedMessage struct { func (cmd *consumeCmd) addFlags(flags *flag.FlagSet) { cmd.commonFlags.addFlags(flags) cmd.partitioners = []string{"sarama"} + flags.Var(listFlag{&cmd.partitioners}, "partitioners", "Comma-separated list of partitioners to consider when using the key flag. See below for details") flags.DurationVar(&cmd.timeout, "timeout", time.Duration(0), "Timeout after not reading messages (default 0 to disable).") flags.StringVar(&cmd.keyStr, "key", "", "Print only messages with this key. Note: this relies on the producer using one of the partitioning algorithms specified with the -partitioners argument") flags.BoolVar(&cmd.pretty, "pretty", true, "Control output pretty printing.") flags.BoolVar(&cmd.follow, "f", false, "Follow topic by waiting new messages (default is to stop at end of topic)") - flags.StringVar(&cmd.valueCodecType, "valuecodec", "json", "Present message value as (json|string|hex|base64), defaults to json.") + flags.StringVar(&cmd.valueCodecType, "valuecodec", "json", "Present message value as (json|string|hex|base64|avro), defaults to json.") flags.StringVar(&cmd.keyCodecType, "keycodec", "string", "Present message key as (string|hex|base64), defaults to string.") + flags.StringVar(&cmd.registryURL, "registry", "", "The Avro schema registry server URL.") flags.Usage = func() { fmt.Fprintln(os.Stderr, "Usage: hkt consume [flags] TOPIC [OFFSETS]") @@ -65,7 +74,8 @@ func (cmd *consumeCmd) addFlags(flags *flag.FlagSet) { func (cmd *consumeCmd) environFlags() map[string]string { return map[string]string{ - "brokers": "KT_BROKERS", + "brokers": "KT_BROKERS", + "registry": "KT_REGISTRY", } } @@ -88,7 +98,16 @@ func (cmd *consumeCmd) run(args []string) error { sarama.Logger = log.New(os.Stderr, "", log.LstdFlags) } var err error - cmd.encodeValue, err = encoderForType(cmd.valueCodecType) + if cmd.valueCodecType == "avro" { + if cmd.registryURL == "" { + return fmt.Errorf("-registry or $KT_REGISTRY is required for avro codec type") + } + cmd.registry, err = avroregistry.New(avroregistry.Params{ServerURL: cmd.registryURL}) + if err != nil { + return fmt.Errorf("cannot make Avro registry client: %v", err) + } + } + cmd.encodeValue, err = cmd.encoderForType(cmd.valueCodecType) if err != nil { return fmt.Errorf("bad -valuecodec argument: %v", err) } @@ -96,10 +115,11 @@ func (cmd *consumeCmd) run(args []string) error { // JSON for keys is not a good idea. return fmt.Errorf("JSON key codec not supported") } - cmd.encodeKey, err = encoderForType(cmd.keyCodecType) + cmd.encodeKey, err = cmd.encoderForType(cmd.keyCodecType) if err != nil { return fmt.Errorf("bad -keycodec argument: %v", err) } + offsets, err := parseOffsets(offsetsStr, time.Now()) if err != nil { return err @@ -108,16 +128,14 @@ func (cmd *consumeCmd) run(args []string) error { if err != nil { return fmt.Errorf("bad -partitioners argument: %v", err) } - c, err := cmd.newClient() + cmd.client, err = cmd.newClient() if err != nil { return err } - cmd.client = c - consumer, err := sarama.NewConsumerFromClient(cmd.client) + cmd.consumer, err = sarama.NewConsumerFromClient(cmd.client) if err != nil { return fmt.Errorf("cannot create kafka consumer: %v", err) } - cmd.consumer = consumer defer logClose("consumer", cmd.consumer) cmd.allPartitions, err = cmd.consumer.Partitions(cmd.topic) if err != nil { @@ -145,10 +163,7 @@ func (cmd *consumeCmd) run(args []string) error { if err != nil { return fmt.Errorf("cannot resolve offsets: %v", err) } - if err := cmd.consume(resolvedOffsets, limits); err != nil { - return err - } - return nil + return cmd.consume(resolvedOffsets, limits) } func (cmd *consumeCmd) newClient() (sarama.Client, error) { @@ -291,6 +306,68 @@ func (cmd *consumeCmd) newConsumedMessage(m *sarama.ConsumerMessage) (consumedMe return result, nil } +func (cmd *consumeCmd) encoderForType(typ string) (func([]byte) (json.RawMessage, error), error) { + var enc func([]byte) string + switch typ { + case "json": + return func(data []byte) (json.RawMessage, error) { + if err := json.Unmarshal(data, new(json.RawMessage)); err != nil { + return nil, fmt.Errorf("invalid JSON value %q: %v", data, err) + } + return json.RawMessage(data), nil + }, nil + case "hex": + enc = hex.EncodeToString + case "base64": + enc = base64.StdEncoding.EncodeToString + case "string": + enc = func(data []byte) string { + return string(data) + } + case "avro": + return cmd.encodeAvro, nil + default: + return nil, fmt.Errorf(`unsupported decoder %#v, only json, string, hex, base64 and avro are supported`, typ) + } + return func(data []byte) (json.RawMessage, error) { + if data == nil { + return nullJSON, nil + } + data1, err := json.Marshal(enc(data)) + if err != nil { + // marshaling a string cannot fail but be defensive. + return nil, err + } + return json.RawMessage(data1), nil + }, nil +} + +func (cmd *consumeCmd) encodeAvro(data []byte) (json.RawMessage, error) { + dec := cmd.registry.Decoder() + id, body := dec.DecodeSchemaID(data) + if body == nil { + return nil, fmt.Errorf("cannot decode schema id") + } + // TODO: cache the schema + schema, err := dec.SchemaForID(context.Background(), id) + if err != nil { + return nil, fmt.Errorf("cannot get schema for id %d: %v", id, err) + } + codec, err := goavro.NewCodec(schema.String()) + if err != nil { + return nil, fmt.Errorf("cannot create codec from schema %s", schema.String()) + } + native, _, err := codec.NativeFromBinary(body) + if err != nil { + return nil, fmt.Errorf("cannot convert native from binary: %v", err) + } + textual, err := codec.TextualFromNative(nil, native) + if err != nil { + return nil, fmt.Errorf("cannot convert textual from native: %v", err) + } + return json.RawMessage(textual), nil +} + // mergeConsumers merges all the given channels in timestamp order // until all existing messages have been received; it then produces // messages as soon as they're received. diff --git a/consume_test.go b/consume_test.go index acfc321..1c6b3a7 100644 --- a/consume_test.go +++ b/consume_test.go @@ -1,13 +1,23 @@ package main import ( + "context" + "encoding/binary" + "encoding/json" + "net/http" + "net/http/httptest" + "os" "sort" + "strings" "testing" "time" "github.com/Shopify/sarama" qt "github.com/frankban/quicktest" "github.com/google/go-cmp/cmp" + "github.com/heetch/avro" + "github.com/heetch/avro/avroregistry" + "gopkg.in/retry.v1" ) func TestParseOffsets(t *testing.T) { @@ -733,15 +743,19 @@ type tPartitionConsumer struct { } func (pc tPartitionConsumer) AsyncClose() {} + func (pc tPartitionConsumer) Close() error { return pc.closeErr } + func (pc tPartitionConsumer) HighWaterMarkOffset() int64 { return pc.highWaterMarkOffset } + func (pc tPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage { return pc.messages } + func (pc tPartitionConsumer) Errors() <-chan *sarama.ConsumerError { return pc.errors } @@ -783,12 +797,17 @@ func TestConsumeParseArgsUsesEnvVar(t *testing.T) { c := qt.New(t) defer c.Done() - c.Setenv("KT_BROKERS", "hans:2000") + registry := "localhost:8084" + broker := "hans:2000" + + c.Setenv("KT_BROKERS", broker) + c.Setenv("KT_REGISTRY", registry) cmd0, _, err := parseCmd("hkt", "consume") c.Assert(err, qt.Equals, nil) cmd := cmd0.(*consumeCmd) - c.Assert(cmd.brokers(), qt.DeepEquals, []string{"hans:2000"}) + c.Assert(cmd.brokers(), qt.DeepEquals, []string{broker}) + c.Assert(cmd.registryURL, qt.Equals, registry) } // brokers default to localhost:9092 @@ -797,23 +816,78 @@ func TestConsumeParseArgsDefault(t *testing.T) { defer c.Done() c.Setenv("KT_BROKERS", "") + c.Setenv("KT_REGISTRY", "") + cmd0, _, err := parseCmd("hkt", "consume") c.Assert(err, qt.Equals, nil) cmd := cmd0.(*consumeCmd) c.Assert(cmd.brokers(), qt.DeepEquals, []string{"localhost:9092"}) + c.Assert(cmd.registryURL, qt.Equals, "") } func TestConsumeParseArgsFlagsOverrideEnv(t *testing.T) { c := qt.New(t) defer c.Done() + registry := "localhost:8084" + broker := "hans:2000" + // command line arg wins c.Setenv("KT_BROKERS", "BLABB") + c.Setenv("KT_REGISTRY", "BLABB") - cmd0, _, err := parseCmd("hkt", "consume", "-brokers", "hans:2000") + cmd0, _, err := parseCmd("hkt", "consume", "-brokers", broker, "-registry", registry) c.Assert(err, qt.Equals, nil) cmd := cmd0.(*consumeCmd) - c.Assert(cmd.brokers(), qt.DeepEquals, []string{"hans:2000"}) + c.Assert(cmd.brokers(), qt.DeepEquals, []string{broker}) + c.Assert(cmd.registryURL, qt.Equals, registry) +} + +func TestConsumeAvroMessage(t *testing.T) { + c := qt.New(t) + defer c.Done() + + type record struct { + A int + B int + } + + // In the byte slice below: + // 80: A=40 + // 40: B=20 + rec := record{A: 40, B: 20} + data := []byte{80, 40} + + _, wType, err := avro.Marshal(rec) + c.Assert(err, qt.IsNil) + + reg := newTestRegistry(c) + schemaID := reg.register(c, wType) + + cmd := consumeCmd{registry: reg.registry} + + enc, err := cmd.encoderForType("string") + c.Assert(err, qt.IsNil) + cmd.encodeKey = enc + + enc, err = cmd.encoderForType("avro") + c.Assert(err, qt.IsNil) + cmd.encodeValue = enc + + msg := &sarama.ConsumerMessage{ + Key: []byte("foo"), + Value: createAvroMessage(schemaID, data), + Partition: 1, + Offset: 0, + } + + consumed, err := cmd.newConsumedMessage(msg) + c.Assert(err, qt.IsNil) + + var got record + err = json.Unmarshal(consumed.Value, &got) + c.Assert(err, qt.IsNil) + c.Assert(got, qt.DeepEquals, rec) } func T(s string) time.Time { @@ -845,3 +919,76 @@ func positionAtTime(t time.Time) position { anchor: anchorAtTime(t), } } + +type testRegistry struct { + registry *avroregistry.Registry + srv *httptest.Server + faked bool + schema string + sub string + url string +} + +func newTestRegistry(c *qt.C) *testRegistry { + ctx := context.Background() + reg := &testRegistry{ + sub: randomString(10), + url: os.Getenv("KT_REGISTRY"), + } + // If KT_REGISTRY is not explicitly set, we use a fake server. + if reg.url == "" { + reg.faked = true + reg.srv = httptest.NewServer(http.HandlerFunc(reg.fakeServerHandler)) + reg.url = reg.srv.URL + } + var err error + reg.registry, err = avroregistry.New(avroregistry.Params{ + ServerURL: reg.url, + RetryStrategy: retry.Regular{}, + }) + c.Assert(err, qt.IsNil) + c.Defer(func() { + err := reg.registry.DeleteSubject(ctx, reg.sub) + c.Check(err, qt.IsNil) + if reg.srv != nil { + reg.srv.Close() + } + }) + return reg +} + +func (reg *testRegistry) register(c *qt.C, schema *avro.Type) int64 { + if reg.faked { + reg.schema = schema.String() + return 1 + } + id, err := reg.registry.Register(context.Background(), reg.sub, schema) + c.Assert(err, qt.IsNil) + return id +} + +func (reg *testRegistry) fakeServerHandler(w http.ResponseWriter, r *http.Request) { + var body []byte + if r.Method == http.MethodGet && strings.HasPrefix(r.RequestURI, "/schemas/ids") { + var err error + body, err = json.Marshal(struct { + Schema string `json:"schema"` + }{reg.schema}) + if err != nil { + panic(err) + } + } + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/vnd.schemaregistry.v1+json") + w.Write(body) +} + +// createAvroMessage is a helper to create Avro message. +// See https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format. +func createAvroMessage(schemaID int64, data []byte) []byte { + b := []byte{0} // magic byte + id := make([]byte, 4) // 4-byte schema id + binary.BigEndian.PutUint32(id, uint32(schemaID)) + b = append(b, id...) + return append(b, data...) +} diff --git a/go.mod b/go.mod index 2fa6e9d..4d3955b 100644 --- a/go.mod +++ b/go.mod @@ -2,12 +2,15 @@ module github.com/heetch/hkt require ( github.com/Shopify/sarama v1.24.0 - github.com/frankban/quicktest v1.4.1 - github.com/google/go-cmp v0.3.0 - github.com/kr/pretty v0.1.0 + github.com/frankban/quicktest v1.7.2 + github.com/google/go-cmp v0.3.1 + github.com/heetch/avro v0.0.0-20200303141118-76474e818869 + github.com/klauspost/cpuid v1.2.3 // indirect + github.com/linkedin/goavro/v2 v2.9.7 github.com/rogpeppe/go-internal v1.5.0 golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e + gopkg.in/retry.v1 v1.0.3 ) go 1.12 diff --git a/go.sum b/go.sum index aa3ba6e..8e80210 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,6 @@ -github.com/Shopify/sarama v1.19.0 h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s= -github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/sarama v1.24.0 h1:99vo5VAgQybHwZwiOy/RX/S3i0somjGxur3pLeheqzI= github.com/Shopify/sarama v1.24.0/go.mod h1:fGP8eQ6PugKEI0iUETYYtnP6d1pH/bdDMTel1X5ajsU= -github.com/Shopify/toxiproxy v2.1.3+incompatible h1:awiJqUYH4q4OmoBiRccJykjd7B+w0loJi2keSna4X/M= -github.com/Shopify/toxiproxy v2.1.3+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -14,47 +11,55 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.2.2/go.mod h1:Qh/WofXFeiAFII1aEBu529AtJo6Zg2VHscnEsbBnJ20= github.com/frankban/quicktest v1.4.1 h1:Wv2VwvNn73pAdFIVUQRXYDFp31lXKbqblIXo/Q5GPSg= github.com/frankban/quicktest v1.4.1/go.mod h1:36zfPVQyHxymz4cH7wlDmVwDrJuljRB60qkgn7rorfQ= -github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= -github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/frankban/quicktest v1.7.2 h1:2QxQoC1TS09S7fhCPsrvqYdvP1H5M1P1ih5ABm3BTYk= +github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.2.1-0.20190312032427-6f77996f0c42/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/heetch/avro v0.0.0-20200303141118-76474e818869 h1:6ABC0oqCJ9dx94caqZ7F5yctOkXEx4uqGZyN4v+KERE= +github.com/heetch/avro v0.0.0-20200303141118-76474e818869/go.mod h1:eBhK6bQm237ZFemHrXS94a7me11pJnBJpqtcjpgNoj0= github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM= github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs= github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/cpuid v1.2.3 h1:CCtW0xUnWGVINKvE/WWOYKdsPV6mawAtvQuSl8guwQs= +github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= -github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/linkedin/goavro/v2 v2.9.7 h1:Vd++Rb/RKcmNJjM0HP/JJFMEWa21eUBVKPYlKehOGrM= +github.com/linkedin/goavro/v2 v2.9.7/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= github.com/pierrec/lz4 v2.2.6+incompatible h1:6aCX4/YZ9v8q69hTyiR7dNLnTA3fgtKHVVW5BCd5Znw= github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165 h1:nkcn14uNmFEuGCb2mBZbBb24RdNRL08b/wb+xBOYpuk= -github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/rogpeppe/go-internal v1.3.2 h1:XU784Pr0wdahMY2bYcyK6N1KuaRAdLtqD4qd8D18Bfs= -github.com/rogpeppe/go-internal v1.3.2/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= -github.com/rogpeppe/go-internal v1.4.0 h1:LUa41nrWTQNGhzdsZ5lTnkwbNjj6rXTdazA1cSdjkOY= -github.com/rogpeppe/go-internal v1.4.0/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a h1:3QH7VyOaaiUHNrA9Se4YQIRkDTCw1EJls9xTUCaCeRM= +github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a/go.mod h1:4r5QyqhjIWCcK8DO4KMclc5Iknq5qVBAlbYYzAbUScQ= github.com/rogpeppe/go-internal v1.5.0 h1:Usqs0/lDK/NqTkvrmKSwA/3XkZAs7ZAW/eLeQ2MVBTw= github.com/rogpeppe/go-internal v1.5.0/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/gogen-avro/v7 v7.2.1 h1:laf1RaIs397v8rAhLGtpznjOIuXYQDJ/7ij0dAss4Gg= +github.com/rogpeppe/gogen-avro/v7 v7.2.1/go.mod h1:awhtQwpFg18PdUpdnOFr0ceVLYAn/oDCa/HE1hdbk50= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= -golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4 h1:Vk3wNqEZwyGyei9yq5ekj7frek2u7HUfffJ1/opblzc= -golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 h1:bselrhR0Or1vomJZC8ZIjWtbDmn9OYFLX5Ik9alpJpE= golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= @@ -62,12 +67,11 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2eP golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20181005133103-4497e2df6f9e h1:EfdBzeKbFSvOjoIqSZcfS8wp0FBLokGBEs9lz1OtSg0= -golang.org/x/sys v0.0.0-20181005133103-4497e2df6f9e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e h1:nFYrTHrdrAOpShe27kaFHjsqYSEQ0KWqdWLu3xuZJts= golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= @@ -75,8 +79,11 @@ gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hr gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= +gopkg.in/jcmturner/goidentity.v3 v3.0.0 h1:1duIyWiTaYvVx3YX2CYtpJbUFd7/UuPYCfgXtQ3VTbI= gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010= gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= +gopkg.in/retry.v1 v1.0.3 h1:a9CArYczAVv6Qs6VGoLMio99GEs7kY9UzSF9+LD+iGs= +gopkg.in/retry.v1 v1.0.3/go.mod h1:FJkXmWiMaAo7xB+xhvDF59zhfjDWyzmyAxiT4dB688g= diff --git a/main.go b/main.go index 12cf159..119586b 100644 --- a/main.go +++ b/main.go @@ -12,11 +12,11 @@ type command interface { run(args []string) error } -var usageMessage = `kt is a tool for Kafka. +var usageMessage = `hkt is a tool for Kafka. Usage: - kt command [arguments] + hkt command [arguments] The commands are: @@ -26,9 +26,9 @@ The commands are: group consumer group information and modification. admin basic cluster administration. -Use "kt [command] -help" for for information about the command. +Use "hkt [command] -help" for for information about the command. -More at https://github.com/fgeller/kt +More at https://github.com/heetch/hkt ` var commands = map[string]command{ diff --git a/offsetresolve.go b/offsetresolve.go index f53bbb6..7705961 100644 --- a/offsetresolve.go +++ b/offsetresolve.go @@ -399,10 +399,3 @@ func unixMilliseconds(t time.Time) int64 { ns := time.Duration(t.UnixNano()) return int64(ns / time.Millisecond) } - -func fromUnixMilliseconds(ts int64) time.Time { - if ts <= 0 { - return time.Time{} - } - return time.Unix(ts/1000, (ts%1000)*1e6) -} diff --git a/offsetresolve_test.go b/offsetresolve_test.go index ed05fc7..3401463 100644 --- a/offsetresolve_test.go +++ b/offsetresolve_test.go @@ -13,32 +13,6 @@ import ( var epoch = mustParseTime("2010-01-02T12:00:00+02:00") -var resolveTestData = ` -0 100 12:00 +2s +3s 13:00 +2s -1 50 09:00 10:00 - -all=0:newest -0 100 104 -1 50 52 - -all=0: -0 100 104 -1 50 52 - -all=:newest -0 100 104 -1 50 52 - -0=: -0 100 104 - -0=100:103 -0 100 103 - ----- - -` - type resolveTestGroup struct { times map[int32][]time.Time offsets map[int32]int64 diff --git a/produce.go b/produce.go index 9c3e66c..a5141b2 100644 --- a/produce.go +++ b/produce.go @@ -280,21 +280,21 @@ Examples: Send a single message with a specific key: - $ echo '{"key": "id-23", "value": "ola", "partition": 0}' | kt produce -topic greetings + $ echo '{"key": "id-23", "value": "ola", "partition": 0}' | hkt produce -topic greetings Sent message to partition 0 at offset 3. - $ kt consume -topic greetings -timeout 1s -offsets 0:3- + $ hkt consume -topic greetings -timeout 1s -offsets 0:3- {"partition":0,"offset":3,"key":"id-23","message":"ola"} Keep reading input from stdin until interrupted (via ^C). - $ kt produce -topic greetings + $ hkt produce -topic greetings hello. Sent message to partition 0 at offset 4. bonjour. Sent message to partition 0 at offset 5. - $ kt consume -topic greetings -timeout 1s -offsets 0:4- + $ hkt consume -topic greetings -timeout 1s -offsets 0:4- {"partition":0,"offset":4,"key":"hello.","message":"hello."} {"partition":0,"offset":5,"key":"bonjour.","message":"bonjour."} `