Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make avro json codec configurable (fixes #123) #136

Merged
merged 1 commit into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- [#123](https://github.com/deviceinsight/kafkactl/issues/123) Make avro json codec configurable and switch default to standard json

## 2.5.0 - 2022-08-19
### Added
- [#105](https://github.com/deviceinsight/kafkactl/issues/105) Add replication factor to `get topics`
- [#108](https://github.com/deviceinsight/kafkactl/issues/108) Fix "system root pool not available on Windows" by migrating to go 1.19
- Add `clone` command for `consumer-group` and `topic`

### Fixed
- [#108](https://github.com/deviceinsight/kafkactl/issues/108) Fix "system root pool not available on Windows" by migrating to go 1.19
- Print topic configs when yaml and json output format used with `-c` flag

## 2.4.0 - 2022-06-23
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ contexts:
# optional: avro schema registry
avro:
schemaRegistry: localhost:8081
# optional: configure codec for (de)serialization as standard,avro (defaults to standard)
# see: https://github.com/deviceinsight/kafkactl/issues/123
jsonCodec: avro

# optional: default protobuf messages search paths
protobuf:
Expand Down
116 changes: 116 additions & 0 deletions cmd/produce/produce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,122 @@ func TestProduceAvroMessageWithHeadersIntegration(t *testing.T) {
testutil.AssertEquals(t, fmt.Sprintf("key1:value1,key\\:2:value\\:2#test-key#%s", value), kafkaCtl.GetStdOut())
}

func TestProduceAvroMessageOmitDefaultValue(t *testing.T) {

testutil.StartIntegrationTest(t)

valueSchema := `{
"name": "CreateUserProfileWallet",
"namespace": "Messaging.Contracts.WalletManager.Commands",
"type": "record",
"fields": [
{ "name": "CurrencyCode", "type": "string" },
{ "name": "ExpiresOn", "type": ["null", "string"], "default": null}
]
}`
value := `{
"CurrencyCode": "EUR"
}`

topicName := testutil.CreateAvroTopic(t, "produce-avro-topic", "", valueSchema)

kafkaCtl := testutil.CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("produce", topicName, "--value", value); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--exit"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

stdout := kafkaCtl.GetStdOut()
testutil.AssertContainSubstring(t, `"CurrencyCode":"EUR"`, stdout)
testutil.AssertContainSubstring(t, `"ExpiresOn":null`, stdout)
}

func TestProduceAvroMessageWithUnionStandardJson(t *testing.T) {

testutil.StartIntegrationTest(t)

valueSchema := `{
"name": "CreateUserProfileWallet",
"namespace": "Messaging.Contracts.WalletManager.Commands",
"type": "record",
"fields": [
{ "name": "CurrencyCode", "type": "string" },
{ "name": "ExpiresOn", "type": ["null", "string"], "default": null}
]
}`

value := `{
"CurrencyCode": "EUR",
"ExpiresOn": "2022-12-12"
}`

topicName := testutil.CreateAvroTopic(t, "produce-topic", "", valueSchema)

kafkaCtl := testutil.CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("produce", topicName, "--value", value); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--exit"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

stdout := kafkaCtl.GetStdOut()
testutil.AssertContainSubstring(t, `"CurrencyCode":"EUR"`, stdout)
testutil.AssertContainSubstring(t, `"ExpiresOn":"2022-12-12"`, stdout)
}

func TestProduceAvroMessageWithUnionAvroJson(t *testing.T) {

testutil.StartIntegrationTest(t)

valueSchema := `{
"name": "CreateUserProfileWallet",
"namespace": "Messaging.Contracts.WalletManager.Commands",
"type": "record",
"fields": [
{ "name": "CurrencyCode", "type": "string" },
{ "name": "ExpiresOn", "type": ["null", "string"], "default": null}
]
}`

value := `{
"CurrencyCode": "EUR",
"ExpiresOn": {"string": "2022-12-12"}
}`

if err := os.Setenv("AVRO_JSONCODEC", "avro"); err != nil {
t.Fatalf("unable to set env variable: %v", err)
}

topicName := testutil.CreateAvroTopic(t, "produce-topic", "", valueSchema)

kafkaCtl := testutil.CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("produce", topicName, "--value", value); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--exit"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

stdout := kafkaCtl.GetStdOut()
testutil.AssertContainSubstring(t, `"CurrencyCode":"EUR"`, stdout)
testutil.AssertContainSubstring(t, `"ExpiresOn":{"string":"2022-12-12"}`, stdout)
}

func TestProduceTombstoneIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)
Expand Down
2 changes: 2 additions & 0 deletions cmd/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) {
_ = os.Setenv(env.ClientID, "my-client")
_ = os.Setenv(env.KafkaVersion, "2.0.1")
_ = os.Setenv(env.AvroSchemaRegistry, "registry:8888")
_ = os.Setenv(env.AvroJSONCodec, "avro")
_ = os.Setenv(env.ProtobufProtoSetFiles, "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset")
_ = os.Setenv(env.ProtobufImportPaths, "/usr/include/protobuf /usr/lib/protobuf")
_ = os.Setenv(env.ProtobufProtoFiles, "message.proto other.proto")
Expand Down Expand Up @@ -94,6 +95,7 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) {
testutil.AssertEquals(t, "my-client", viper.GetString("contexts.default.clientID"))
testutil.AssertEquals(t, "2.0.1", viper.GetString("contexts.default.kafkaVersion"))
testutil.AssertEquals(t, "registry:8888", viper.GetString("contexts.default.avro.schemaRegistry"))
testutil.AssertEquals(t, "avro", viper.GetString("contexts.default.avro.jsonCodec"))
testutil.AssertEquals(t, "/usr/include/protosets/ps1.protoset", viper.GetStringSlice("contexts.default.protobuf.protosetFiles")[0])
testutil.AssertEquals(t, "/usr/include/protobuf", viper.GetStringSlice("contexts.default.protobuf.importPaths")[0])
testutil.AssertEquals(t, "message.proto", viper.GetStringSlice("contexts.default.protobuf.protoFiles")[0])
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/golang/protobuf v1.4.2
github.com/jhump/protoreflect v1.10.1
github.com/landoop/schema-registry v0.0.0-20190327143759-50a5701c1891
github.com/linkedin/goavro/v2 v2.10.0
github.com/linkedin/goavro/v2 v2.11.2-0.20220819205120-9a4764661614
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v1.0.1-0.20200629195214-2c5a0d300f8b
github.com/spf13/pflag v1.0.5
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ github.com/landoop/schema-registry v0.0.0-20190327143759-50a5701c1891 h1:FADDInP
github.com/landoop/schema-registry v0.0.0-20190327143759-50a5701c1891/go.mod h1:yITyTTMx2IS5mpfZjQ64gJhL5U5RvcorFBu+z4/euXg=
github.com/linkedin/goavro/v2 v2.10.0 h1:eTBIRoInBM88gITGXYtUSqqxLTFXfOsJBiX8ZMW0o4U=
github.com/linkedin/goavro/v2 v2.10.0/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/linkedin/goavro/v2 v2.11.2-0.20220819205120-9a4764661614 h1:57IiYlL+Q6pWb+rCX7HuCLMQndj0HxGSedf6c1skI9U=
github.com/linkedin/goavro/v2 v2.11.2-0.20220819205120-9a4764661614/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.4 h1:8KGKTcQQGm0Kv7vEbKFErAoAOFyyacLStRtQSeYtvkY=
Expand Down Expand Up @@ -278,12 +280,16 @@ github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.5 h1:s5PTfem8p8EbKQOctVV53k6jCJt3UX4IEJzwh+C324Q=
github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down Expand Up @@ -505,6 +511,8 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/gotestsum v1.8.1 h1:C6dYd5K39WAv52jikEUuWgyMqJDhY90eauUjsFzwluc=
gotest.tools/gotestsum v1.8.1/go.mod h1:ctqdxBSCPv80kAFjYvFNpPntBrE5HAQnLiOKBGLmOBs=
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
Expand Down
4 changes: 4 additions & 0 deletions internal/common-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strings"
"time"

"github.com/deviceinsight/kafkactl/internal/helpers/avro"

"github.com/Shopify/sarama"
"github.com/deviceinsight/kafkactl/internal/helpers"
"github.com/deviceinsight/kafkactl/internal/helpers/protobuf"
Expand Down Expand Up @@ -63,6 +65,7 @@ type ClientContext struct {
ClientID string
KafkaVersion sarama.KafkaVersion
AvroSchemaRegistry string
AvroJSONCodec avro.JSONCodec
Protobuf protobuf.SearchContext
Producer ProducerConfig
}
Expand Down Expand Up @@ -104,6 +107,7 @@ func CreateClientContext() (ClientContext, error) {
return context, err
}
context.AvroSchemaRegistry = viper.GetString("contexts." + context.Name + ".avro.schemaRegistry")
context.AvroJSONCodec = avro.ParseJSONCodec(viper.GetString("contexts." + context.Name + ".avro.jsonCodec"))
context.Protobuf.ProtosetFiles = viper.GetStringSlice("contexts." + context.Name + ".protobuf.protosetFiles")
context.Protobuf.ProtoImportPaths = viper.GetStringSlice("contexts." + context.Name + ".protobuf.importPaths")
context.Protobuf.ProtoFiles = viper.GetStringSlice("contexts." + context.Name + ".protobuf.protoFiles")
Expand Down
15 changes: 12 additions & 3 deletions internal/consume/AvroMessageDeserializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/deviceinsight/kafkactl/internal/helpers/avro"

"github.com/Shopify/sarama"
"github.com/deviceinsight/kafkactl/output"
"github.com/deviceinsight/kafkactl/util"
Expand All @@ -16,14 +18,15 @@ import (
type AvroMessageDeserializer struct {
topic string
avroSchemaRegistry string
jsonCodec avro.JSONCodec
registry *CachingSchemaRegistry
}

func CreateAvroMessageDeserializer(topic string, avroSchemaRegistry string) (AvroMessageDeserializer, error) {
func CreateAvroMessageDeserializer(topic string, avroSchemaRegistry string, jsonCodec avro.JSONCodec) (AvroMessageDeserializer, error) {

var err error

deserializer := AvroMessageDeserializer{topic: topic, avroSchemaRegistry: avroSchemaRegistry}
deserializer := AvroMessageDeserializer{topic: topic, avroSchemaRegistry: avroSchemaRegistry, jsonCodec: jsonCodec}

deserializer.registry, err = CreateCachingSchemaRegistry(deserializer.avroSchemaRegistry)

Expand Down Expand Up @@ -131,7 +134,13 @@ func (deserializer AvroMessageDeserializer) decode(rawData []byte, flags Flags,
return decodedValue{}, errors.Errorf("failed to find avro schema for subject: %s id: %d (%v)", subject, schemaID, err)
}

avroCodec, err := goavro.NewCodec(schema)
var avroCodec *goavro.Codec

if deserializer.jsonCodec == avro.Avro {
avroCodec, err = goavro.NewCodec(schema)
} else {
avroCodec, err = goavro.NewCodecForStandardJSONFull(schema)
}

if err != nil {
return decodedValue{}, errors.Wrap(err, "failed to parse avro schema")
Expand Down
2 changes: 1 addition & 1 deletion internal/consume/consume-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (operation *Operation) Consume(topic string, flags Flags) error {
var deserializers MessageDeserializerChain

if clientContext.AvroSchemaRegistry != "" {
deserializer, err := CreateAvroMessageDeserializer(topic, clientContext.AvroSchemaRegistry)
deserializer, err := CreateAvroMessageDeserializer(topic, clientContext.AvroSchemaRegistry, clientContext.AvroJSONCodec)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions internal/env/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
ClientID = "CLIENTID"
KafkaVersion = "KAFKAVERSION"
AvroSchemaRegistry = "AVRO_SCHEMAREGISTRY"
AvroJSONCodec = "AVRO_JSONCODEC"
ProtobufProtoSetFiles = "PROTOBUF_PROTOSETFILES"
ProtobufImportPaths = "PROTOBUF_IMPORTPATHS"
ProtobufProtoFiles = "PROTOBUF_PROTOFILES"
Expand All @@ -38,6 +39,7 @@ var Variables = []string{
ClientID,
KafkaVersion,
AvroSchemaRegistry,
AvroJSONCodec,
ProtobufProtoSetFiles,
ProtobufImportPaths,
ProtobufProtoFiles,
Expand Down
33 changes: 33 additions & 0 deletions internal/helpers/avro/JsonCodec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package avro

import (
"strings"

"github.com/deviceinsight/kafkactl/output"
)

type JSONCodec int

const (
Standard JSONCodec = iota
Avro
)

func (codec JSONCodec) String() string {
return []string{"standard", "avro"}[codec]
}

func ParseJSONCodec(codec string) JSONCodec {

switch strings.ToLower(codec) {
case "":
fallthrough
case "standard":
return Standard
case "avro":
return Avro
default:
output.Warnf("unable to parse avro json codec: %s", codec)
return Standard
}
}
1 change: 1 addition & 0 deletions internal/k8s/k8s-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func parsePodEnvironment(context internal.ClientContext) []string {
envVariables = appendStringIfDefined(envVariables, env.ClientID, context.ClientID)
envVariables = appendStringIfDefined(envVariables, env.KafkaVersion, context.KafkaVersion.String())
envVariables = appendStringIfDefined(envVariables, env.AvroSchemaRegistry, context.AvroSchemaRegistry)
envVariables = appendStringIfDefined(envVariables, env.AvroJSONCodec, context.AvroJSONCodec.String())
envVariables = appendStrings(envVariables, env.ProtobufProtoSetFiles, context.Protobuf.ProtosetFiles)
envVariables = appendStrings(envVariables, env.ProtobufImportPaths, context.Protobuf.ProtoImportPaths)
envVariables = appendStrings(envVariables, env.ProtobufProtoFiles, context.Protobuf.ProtoFiles)
Expand Down
4 changes: 4 additions & 0 deletions internal/k8s/k8s-operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"
"time"

"github.com/deviceinsight/kafkactl/internal/helpers/avro"

"github.com/Shopify/sarama"
"github.com/deviceinsight/kafkactl/internal"
"github.com/deviceinsight/kafkactl/internal/env"
Expand All @@ -29,6 +31,7 @@ func TestAllAvailableEnvironmentVariablesAreParsed(t *testing.T) {
context.ClientID = "my-client"
context.KafkaVersion = sarama.V2_0_1_0
context.AvroSchemaRegistry = "registry:8888"
context.AvroJSONCodec = avro.Avro
context.Protobuf.ProtosetFiles = []string{"/usr/include/protosets/ps1.protoset", "/usr/lib/ps2.protoset"}
context.Protobuf.ProtoImportPaths = []string{"/usr/include/protobuf", "/usr/lib/protobuf"}
context.Protobuf.ProtoFiles = []string{"message.proto", "other.proto"}
Expand Down Expand Up @@ -65,6 +68,7 @@ func TestAllAvailableEnvironmentVariablesAreParsed(t *testing.T) {
testutil.AssertEquals(t, "my-client", envMap[env.ClientID])
testutil.AssertEquals(t, "2.0.1", envMap[env.KafkaVersion])
testutil.AssertEquals(t, "registry:8888", envMap[env.AvroSchemaRegistry])
testutil.AssertEquals(t, "avro", envMap[env.AvroJSONCodec])
testutil.AssertEquals(t, "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset", envMap[env.ProtobufProtoSetFiles])
testutil.AssertEquals(t, "/usr/include/protobuf /usr/lib/protobuf", envMap[env.ProtobufImportPaths])
testutil.AssertEquals(t, "message.proto other.proto", envMap[env.ProtobufProtoFiles])
Expand Down
Loading