Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consume: add Avro support #25

Merged
merged 1 commit into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/kt
/quickfix
hkt
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@ $ hkt topic -filter news -partitions
<details><summary>Produce messages</summary>

```sh
$ echo 'Alice wins Oscar' | kt produce -topic actor-news -literal
$ echo 'Alice wins Oscar' | hkt produce -topic actor-news -literal
{
"count": 1,
"partition": 0,
"startOffset": 0
}
$ echo 'Bob wins Oscar' | kt produce -tlsca myca.pem -tlscert myclientcert.pem -tlscertkey mycertkey.pem -topic actor-news -literal
$ echo 'Bob wins Oscar' | hkt produce -tlsca myca.pem -tlscert myclientcert.pem -tlscertkey mycertkey.pem -topic actor-news -literal
{
"count": 1,
"partition": 0,
"startOffset": 0
}
$ for i in {6..9} ; do echo Bourne sequel $i in production. | kt produce -topic actor-news -literal ;done
$ for i in {6..9} ; do echo Bourne sequel $i in production. | hkt produce -topic actor-news -literal ;done
{
"count": 1,
"partition": 0,
Expand Down
34 changes: 0 additions & 34 deletions common.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,40 +284,6 @@ func decoderForType(typ string) (func(m json.RawMessage) ([]byte, error), error)

var nullJSON = json.RawMessage("null")

func encoderForType(typ string) (func([]byte) (json.RawMessage, error), error) {
var enc func([]byte) string
switch typ {
case "json":
return func(data []byte) (json.RawMessage, error) {
if err := json.Unmarshal(data, new(json.RawMessage)); err != nil {
return nil, fmt.Errorf("invalid JSON value %q: %v", data, err)
}
return json.RawMessage(data), nil
}, nil
case "hex":
enc = hex.EncodeToString
case "base64":
enc = base64.StdEncoding.EncodeToString
case "string":
enc = func(data []byte) string {
return string(data)
}
default:
return nil, fmt.Errorf(`unsupported decoder %#v, only json, string, hex and base64 are supported`, typ)
}
return func(data []byte) (json.RawMessage, error) {
if data == nil {
return nullJSON, nil
}
data1, err := json.Marshal(enc(data))
if err != nil {
// marshaling a string cannot fail but be defensive.
return nil, err
}
return json.RawMessage(data1), nil
}, nil
}

func min(x, y int64) int64 {
if x < y {
return x
Expand Down
103 changes: 90 additions & 13 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"bytes"
"context"
"encoding/base64"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
Expand All @@ -13,12 +15,14 @@ import (
"time"

"github.com/Shopify/sarama"
"github.com/heetch/avro/avroregistry"
"github.com/linkedin/goavro/v2"
)

type consumeCmd struct {
commonFlags

topic string
offsets string
timeout time.Duration
valueCodecType string
keyCodecType string
Expand All @@ -33,6 +37,9 @@ type consumeCmd struct {
encodeKey func([]byte) (json.RawMessage, error)
client sarama.Client
consumer sarama.Consumer

registryURL string
registry *avroregistry.Registry
}

// consumedMessage defines the format that's used to
Expand All @@ -48,13 +55,15 @@ type consumedMessage struct {
func (cmd *consumeCmd) addFlags(flags *flag.FlagSet) {
cmd.commonFlags.addFlags(flags)
cmd.partitioners = []string{"sarama"}

flags.Var(listFlag{&cmd.partitioners}, "partitioners", "Comma-separated list of partitioners to consider when using the key flag. See below for details")
flags.DurationVar(&cmd.timeout, "timeout", time.Duration(0), "Timeout after not reading messages (default 0 to disable).")
flags.StringVar(&cmd.keyStr, "key", "", "Print only messages with this key. Note: this relies on the producer using one of the partitioning algorithms specified with the -partitioners argument")
flags.BoolVar(&cmd.pretty, "pretty", true, "Control output pretty printing.")
flags.BoolVar(&cmd.follow, "f", false, "Follow topic by waiting new messages (default is to stop at end of topic)")
flags.StringVar(&cmd.valueCodecType, "valuecodec", "json", "Present message value as (json|string|hex|base64), defaults to json.")
flags.StringVar(&cmd.valueCodecType, "valuecodec", "json", "Present message value as (json|string|hex|base64|avro), defaults to json.")
flags.StringVar(&cmd.keyCodecType, "keycodec", "string", "Present message key as (string|hex|base64), defaults to string.")
flags.StringVar(&cmd.registryURL, "registry", "", "The Avro schema registry server URL.")

flags.Usage = func() {
fmt.Fprintln(os.Stderr, "Usage: hkt consume [flags] TOPIC [OFFSETS]")
Expand All @@ -65,7 +74,8 @@ func (cmd *consumeCmd) addFlags(flags *flag.FlagSet) {

func (cmd *consumeCmd) environFlags() map[string]string {
return map[string]string{
"brokers": "KT_BROKERS",
"brokers": "KT_BROKERS",
"registry": "KT_REGISTRY",
}
}

Expand All @@ -88,18 +98,28 @@ func (cmd *consumeCmd) run(args []string) error {
sarama.Logger = log.New(os.Stderr, "", log.LstdFlags)
}
var err error
cmd.encodeValue, err = encoderForType(cmd.valueCodecType)
if cmd.valueCodecType == "avro" {
if cmd.registryURL == "" {
return fmt.Errorf("-registry or $KT_REGISTRY is required for avro codec type")
}
cmd.registry, err = avroregistry.New(avroregistry.Params{ServerURL: cmd.registryURL})
if err != nil {
return fmt.Errorf("cannot make Avro registry client: %v", err)
}
}
cmd.encodeValue, err = cmd.encoderForType(cmd.valueCodecType)
if err != nil {
return fmt.Errorf("bad -valuecodec argument: %v", err)
}
if cmd.keyCodecType == "json" {
// JSON for keys is not a good idea.
return fmt.Errorf("JSON key codec not supported")
}
cmd.encodeKey, err = encoderForType(cmd.keyCodecType)
cmd.encodeKey, err = cmd.encoderForType(cmd.keyCodecType)
if err != nil {
return fmt.Errorf("bad -keycodec argument: %v", err)
}

offsets, err := parseOffsets(offsetsStr, time.Now())
if err != nil {
return err
Expand All @@ -108,16 +128,14 @@ func (cmd *consumeCmd) run(args []string) error {
if err != nil {
return fmt.Errorf("bad -partitioners argument: %v", err)
}
c, err := cmd.newClient()
cmd.client, err = cmd.newClient()
if err != nil {
return err
}
cmd.client = c
consumer, err := sarama.NewConsumerFromClient(cmd.client)
cmd.consumer, err = sarama.NewConsumerFromClient(cmd.client)
if err != nil {
return fmt.Errorf("cannot create kafka consumer: %v", err)
}
cmd.consumer = consumer
defer logClose("consumer", cmd.consumer)
cmd.allPartitions, err = cmd.consumer.Partitions(cmd.topic)
if err != nil {
Expand Down Expand Up @@ -145,10 +163,7 @@ func (cmd *consumeCmd) run(args []string) error {
if err != nil {
return fmt.Errorf("cannot resolve offsets: %v", err)
}
if err := cmd.consume(resolvedOffsets, limits); err != nil {
return err
}
return nil
return cmd.consume(resolvedOffsets, limits)
}

func (cmd *consumeCmd) newClient() (sarama.Client, error) {
Expand Down Expand Up @@ -291,6 +306,68 @@ func (cmd *consumeCmd) newConsumedMessage(m *sarama.ConsumerMessage) (consumedMe
return result, nil
}

func (cmd *consumeCmd) encoderForType(typ string) (func([]byte) (json.RawMessage, error), error) {
var enc func([]byte) string
switch typ {
case "json":
return func(data []byte) (json.RawMessage, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point I'd probably consider pushing the generation of this function (and the one like it below) out to named functions of their own, just because the switch statement is getting a bit long.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. I will do it in another PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to "simplify" the switch using a map with dedicated method per type: 0e4a242 If you think it's okay, I can propose the PR (it's based on this branch).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I don't think it's too unwieldy currently - not sure that splitting it up will help readability.

if err := json.Unmarshal(data, new(json.RawMessage)); err != nil {
return nil, fmt.Errorf("invalid JSON value %q: %v", data, err)
}
return json.RawMessage(data), nil
}, nil
case "hex":
enc = hex.EncodeToString
case "base64":
enc = base64.StdEncoding.EncodeToString
case "string":
enc = func(data []byte) string {
return string(data)
}
case "avro":
return cmd.encodeAvro, nil
default:
return nil, fmt.Errorf(`unsupported decoder %#v, only json, string, hex, base64 and avro are supported`, typ)
}
return func(data []byte) (json.RawMessage, error) {
if data == nil {
return nullJSON, nil
}
data1, err := json.Marshal(enc(data))
if err != nil {
// marshaling a string cannot fail but be defensive.
return nil, err
}
return json.RawMessage(data1), nil
}, nil
}

func (cmd *consumeCmd) encodeAvro(data []byte) (json.RawMessage, error) {
dec := cmd.registry.Decoder()
id, body := dec.DecodeSchemaID(data)
if body == nil {
return nil, fmt.Errorf("cannot decode schema id")
}
// TODO: cache the schema
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO's are great, but I like them to be converted into issues, so that they can be scheduled and not forgotten when we don't regularly visit the code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Done (#26).

schema, err := dec.SchemaForID(context.Background(), id)
if err != nil {
return nil, fmt.Errorf("cannot get schema for id %d: %v", id, err)
}
codec, err := goavro.NewCodec(schema.String())
if err != nil {
return nil, fmt.Errorf("cannot create codec from schema %s", schema.String())
}
native, _, err := codec.NativeFromBinary(body)
if err != nil {
return nil, fmt.Errorf("cannot convert native from binary: %v", err)
}
textual, err := codec.TextualFromNative(nil, native)
if err != nil {
return nil, fmt.Errorf("cannot convert textual from native: %v", err)
}
return json.RawMessage(textual), nil
}

// mergeConsumers merges all the given channels in timestamp order
// until all existing messages have been received; it then produces
// messages as soon as they're received.
Expand Down
Loading