Skip to content

Commit

Permalink
Merge pull request #136 from deviceinsight/feature/avro_standard_json
Browse files Browse the repository at this point in the history
make avro json codec configurable (fixes #123)
  • Loading branch information
d-rk authored Sep 30, 2022
2 parents eddc3ee + fb87769 commit 7d27df1
Show file tree
Hide file tree
Showing 16 changed files with 222 additions and 12 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- [#123](https://github.com/deviceinsight/kafkactl/issues/123) Make avro json codec configurable and switch default to standard json

## 2.5.0 - 2022-08-19
### Added
- [#105](https://github.com/deviceinsight/kafkactl/issues/105) Add replication factor to `get topics`
- [#108](https://github.com/deviceinsight/kafkactl/issues/108) Fix "system root pool not available on Windows" by migrating to go 1.19
- Add `clone` command for `consumer-group` and `topic`

### Fixed
- [#108](https://github.com/deviceinsight/kafkactl/issues/108) Fix "system root pool not available on Windows" by migrating to go 1.19
- Print topic configs when yaml and json output format used with `-c` flag

## 2.4.0 - 2022-06-23
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ contexts:
# optional: avro schema registry
avro:
schemaRegistry: localhost:8081
# optional: configure codec for (de)serialization as standard,avro (defaults to standard)
# see: https://github.com/deviceinsight/kafkactl/issues/123
jsonCodec: avro

# optional: default protobuf messages search paths
protobuf:
Expand Down
116 changes: 116 additions & 0 deletions cmd/produce/produce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,122 @@ func TestProduceAvroMessageWithHeadersIntegration(t *testing.T) {
testutil.AssertEquals(t, fmt.Sprintf("key1:value1,key\\:2:value\\:2#test-key#%s", value), kafkaCtl.GetStdOut())
}

func TestProduceAvroMessageOmitDefaultValue(t *testing.T) {

testutil.StartIntegrationTest(t)

valueSchema := `{
"name": "CreateUserProfileWallet",
"namespace": "Messaging.Contracts.WalletManager.Commands",
"type": "record",
"fields": [
{ "name": "CurrencyCode", "type": "string" },
{ "name": "ExpiresOn", "type": ["null", "string"], "default": null}
]
}`
value := `{
"CurrencyCode": "EUR"
}`

topicName := testutil.CreateAvroTopic(t, "produce-avro-topic", "", valueSchema)

kafkaCtl := testutil.CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("produce", topicName, "--value", value); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--exit"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

stdout := kafkaCtl.GetStdOut()
testutil.AssertContainSubstring(t, `"CurrencyCode":"EUR"`, stdout)
testutil.AssertContainSubstring(t, `"ExpiresOn":null`, stdout)
}

func TestProduceAvroMessageWithUnionStandardJson(t *testing.T) {

testutil.StartIntegrationTest(t)

valueSchema := `{
"name": "CreateUserProfileWallet",
"namespace": "Messaging.Contracts.WalletManager.Commands",
"type": "record",
"fields": [
{ "name": "CurrencyCode", "type": "string" },
{ "name": "ExpiresOn", "type": ["null", "string"], "default": null}
]
}`

value := `{
"CurrencyCode": "EUR",
"ExpiresOn": "2022-12-12"
}`

topicName := testutil.CreateAvroTopic(t, "produce-topic", "", valueSchema)

kafkaCtl := testutil.CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("produce", topicName, "--value", value); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--exit"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

stdout := kafkaCtl.GetStdOut()
testutil.AssertContainSubstring(t, `"CurrencyCode":"EUR"`, stdout)
testutil.AssertContainSubstring(t, `"ExpiresOn":"2022-12-12"`, stdout)
}

func TestProduceAvroMessageWithUnionAvroJson(t *testing.T) {

testutil.StartIntegrationTest(t)

valueSchema := `{
"name": "CreateUserProfileWallet",
"namespace": "Messaging.Contracts.WalletManager.Commands",
"type": "record",
"fields": [
{ "name": "CurrencyCode", "type": "string" },
{ "name": "ExpiresOn", "type": ["null", "string"], "default": null}
]
}`

value := `{
"CurrencyCode": "EUR",
"ExpiresOn": {"string": "2022-12-12"}
}`

if err := os.Setenv("AVRO_JSONCODEC", "avro"); err != nil {
t.Fatalf("unable to set env variable: %v", err)
}

topicName := testutil.CreateAvroTopic(t, "produce-topic", "", valueSchema)

kafkaCtl := testutil.CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("produce", topicName, "--value", value); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--exit"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

stdout := kafkaCtl.GetStdOut()
testutil.AssertContainSubstring(t, `"CurrencyCode":"EUR"`, stdout)
testutil.AssertContainSubstring(t, `"ExpiresOn":{"string":"2022-12-12"}`, stdout)
}

func TestProduceTombstoneIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)
Expand Down
2 changes: 2 additions & 0 deletions cmd/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) {
_ = os.Setenv(env.ClientID, "my-client")
_ = os.Setenv(env.KafkaVersion, "2.0.1")
_ = os.Setenv(env.AvroSchemaRegistry, "registry:8888")
_ = os.Setenv(env.AvroJSONCodec, "avro")
_ = os.Setenv(env.ProtobufProtoSetFiles, "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset")
_ = os.Setenv(env.ProtobufImportPaths, "/usr/include/protobuf /usr/lib/protobuf")
_ = os.Setenv(env.ProtobufProtoFiles, "message.proto other.proto")
Expand Down Expand Up @@ -94,6 +95,7 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) {
testutil.AssertEquals(t, "my-client", viper.GetString("contexts.default.clientID"))
testutil.AssertEquals(t, "2.0.1", viper.GetString("contexts.default.kafkaVersion"))
testutil.AssertEquals(t, "registry:8888", viper.GetString("contexts.default.avro.schemaRegistry"))
testutil.AssertEquals(t, "avro", viper.GetString("contexts.default.avro.jsonCodec"))
testutil.AssertEquals(t, "/usr/include/protosets/ps1.protoset", viper.GetStringSlice("contexts.default.protobuf.protosetFiles")[0])
testutil.AssertEquals(t, "/usr/include/protobuf", viper.GetStringSlice("contexts.default.protobuf.importPaths")[0])
testutil.AssertEquals(t, "message.proto", viper.GetStringSlice("contexts.default.protobuf.protoFiles")[0])
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/golang/protobuf v1.4.2
github.com/jhump/protoreflect v1.10.1
github.com/landoop/schema-registry v0.0.0-20190327143759-50a5701c1891
github.com/linkedin/goavro/v2 v2.10.0
github.com/linkedin/goavro/v2 v2.11.2-0.20220819205120-9a4764661614
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v1.0.1-0.20200629195214-2c5a0d300f8b
github.com/spf13/pflag v1.0.5
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ github.com/landoop/schema-registry v0.0.0-20190327143759-50a5701c1891 h1:FADDInP
github.com/landoop/schema-registry v0.0.0-20190327143759-50a5701c1891/go.mod h1:yITyTTMx2IS5mpfZjQ64gJhL5U5RvcorFBu+z4/euXg=
github.com/linkedin/goavro/v2 v2.10.0 h1:eTBIRoInBM88gITGXYtUSqqxLTFXfOsJBiX8ZMW0o4U=
github.com/linkedin/goavro/v2 v2.10.0/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/linkedin/goavro/v2 v2.11.2-0.20220819205120-9a4764661614 h1:57IiYlL+Q6pWb+rCX7HuCLMQndj0HxGSedf6c1skI9U=
github.com/linkedin/goavro/v2 v2.11.2-0.20220819205120-9a4764661614/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.4 h1:8KGKTcQQGm0Kv7vEbKFErAoAOFyyacLStRtQSeYtvkY=
Expand Down Expand Up @@ -278,12 +280,16 @@ github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.5 h1:s5PTfem8p8EbKQOctVV53k6jCJt3UX4IEJzwh+C324Q=
github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down Expand Up @@ -505,6 +511,8 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/gotestsum v1.8.1 h1:C6dYd5K39WAv52jikEUuWgyMqJDhY90eauUjsFzwluc=
gotest.tools/gotestsum v1.8.1/go.mod h1:ctqdxBSCPv80kAFjYvFNpPntBrE5HAQnLiOKBGLmOBs=
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
Expand Down
4 changes: 4 additions & 0 deletions internal/common-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strings"
"time"

"github.com/deviceinsight/kafkactl/internal/helpers/avro"

"github.com/Shopify/sarama"
"github.com/deviceinsight/kafkactl/internal/helpers"
"github.com/deviceinsight/kafkactl/internal/helpers/protobuf"
Expand Down Expand Up @@ -63,6 +65,7 @@ type ClientContext struct {
ClientID string
KafkaVersion sarama.KafkaVersion
AvroSchemaRegistry string
AvroJSONCodec avro.JSONCodec
Protobuf protobuf.SearchContext
Producer ProducerConfig
}
Expand Down Expand Up @@ -104,6 +107,7 @@ func CreateClientContext() (ClientContext, error) {
return context, err
}
context.AvroSchemaRegistry = viper.GetString("contexts." + context.Name + ".avro.schemaRegistry")
context.AvroJSONCodec = avro.ParseJSONCodec(viper.GetString("contexts." + context.Name + ".avro.jsonCodec"))
context.Protobuf.ProtosetFiles = viper.GetStringSlice("contexts." + context.Name + ".protobuf.protosetFiles")
context.Protobuf.ProtoImportPaths = viper.GetStringSlice("contexts." + context.Name + ".protobuf.importPaths")
context.Protobuf.ProtoFiles = viper.GetStringSlice("contexts." + context.Name + ".protobuf.protoFiles")
Expand Down
15 changes: 12 additions & 3 deletions internal/consume/AvroMessageDeserializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/deviceinsight/kafkactl/internal/helpers/avro"

"github.com/Shopify/sarama"
"github.com/deviceinsight/kafkactl/output"
"github.com/deviceinsight/kafkactl/util"
Expand All @@ -16,14 +18,15 @@ import (
type AvroMessageDeserializer struct {
topic string
avroSchemaRegistry string
jsonCodec avro.JSONCodec
registry *CachingSchemaRegistry
}

func CreateAvroMessageDeserializer(topic string, avroSchemaRegistry string) (AvroMessageDeserializer, error) {
func CreateAvroMessageDeserializer(topic string, avroSchemaRegistry string, jsonCodec avro.JSONCodec) (AvroMessageDeserializer, error) {

var err error

deserializer := AvroMessageDeserializer{topic: topic, avroSchemaRegistry: avroSchemaRegistry}
deserializer := AvroMessageDeserializer{topic: topic, avroSchemaRegistry: avroSchemaRegistry, jsonCodec: jsonCodec}

deserializer.registry, err = CreateCachingSchemaRegistry(deserializer.avroSchemaRegistry)

Expand Down Expand Up @@ -131,7 +134,13 @@ func (deserializer AvroMessageDeserializer) decode(rawData []byte, flags Flags,
return decodedValue{}, errors.Errorf("failed to find avro schema for subject: %s id: %d (%v)", subject, schemaID, err)
}

avroCodec, err := goavro.NewCodec(schema)
var avroCodec *goavro.Codec

if deserializer.jsonCodec == avro.Avro {
avroCodec, err = goavro.NewCodec(schema)
} else {
avroCodec, err = goavro.NewCodecForStandardJSONFull(schema)
}

if err != nil {
return decodedValue{}, errors.Wrap(err, "failed to parse avro schema")
Expand Down
2 changes: 1 addition & 1 deletion internal/consume/consume-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (operation *Operation) Consume(topic string, flags Flags) error {
var deserializers MessageDeserializerChain

if clientContext.AvroSchemaRegistry != "" {
deserializer, err := CreateAvroMessageDeserializer(topic, clientContext.AvroSchemaRegistry)
deserializer, err := CreateAvroMessageDeserializer(topic, clientContext.AvroSchemaRegistry, clientContext.AvroJSONCodec)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions internal/env/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
ClientID = "CLIENTID"
KafkaVersion = "KAFKAVERSION"
AvroSchemaRegistry = "AVRO_SCHEMAREGISTRY"
AvroJSONCodec = "AVRO_JSONCODEC"
ProtobufProtoSetFiles = "PROTOBUF_PROTOSETFILES"
ProtobufImportPaths = "PROTOBUF_IMPORTPATHS"
ProtobufProtoFiles = "PROTOBUF_PROTOFILES"
Expand All @@ -38,6 +39,7 @@ var Variables = []string{
ClientID,
KafkaVersion,
AvroSchemaRegistry,
AvroJSONCodec,
ProtobufProtoSetFiles,
ProtobufImportPaths,
ProtobufProtoFiles,
Expand Down
33 changes: 33 additions & 0 deletions internal/helpers/avro/JsonCodec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package avro

import (
"strings"

"github.com/deviceinsight/kafkactl/output"
)

type JSONCodec int

const (
Standard JSONCodec = iota
Avro
)

func (codec JSONCodec) String() string {
return []string{"standard", "avro"}[codec]
}

func ParseJSONCodec(codec string) JSONCodec {

switch strings.ToLower(codec) {
case "":
fallthrough
case "standard":
return Standard
case "avro":
return Avro
default:
output.Warnf("unable to parse avro json codec: %s", codec)
return Standard
}
}
1 change: 1 addition & 0 deletions internal/k8s/k8s-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func parsePodEnvironment(context internal.ClientContext) []string {
envVariables = appendStringIfDefined(envVariables, env.ClientID, context.ClientID)
envVariables = appendStringIfDefined(envVariables, env.KafkaVersion, context.KafkaVersion.String())
envVariables = appendStringIfDefined(envVariables, env.AvroSchemaRegistry, context.AvroSchemaRegistry)
envVariables = appendStringIfDefined(envVariables, env.AvroJSONCodec, context.AvroJSONCodec.String())
envVariables = appendStrings(envVariables, env.ProtobufProtoSetFiles, context.Protobuf.ProtosetFiles)
envVariables = appendStrings(envVariables, env.ProtobufImportPaths, context.Protobuf.ProtoImportPaths)
envVariables = appendStrings(envVariables, env.ProtobufProtoFiles, context.Protobuf.ProtoFiles)
Expand Down
4 changes: 4 additions & 0 deletions internal/k8s/k8s-operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"
"time"

"github.com/deviceinsight/kafkactl/internal/helpers/avro"

"github.com/Shopify/sarama"
"github.com/deviceinsight/kafkactl/internal"
"github.com/deviceinsight/kafkactl/internal/env"
Expand All @@ -29,6 +31,7 @@ func TestAllAvailableEnvironmentVariablesAreParsed(t *testing.T) {
context.ClientID = "my-client"
context.KafkaVersion = sarama.V2_0_1_0
context.AvroSchemaRegistry = "registry:8888"
context.AvroJSONCodec = avro.Avro
context.Protobuf.ProtosetFiles = []string{"/usr/include/protosets/ps1.protoset", "/usr/lib/ps2.protoset"}
context.Protobuf.ProtoImportPaths = []string{"/usr/include/protobuf", "/usr/lib/protobuf"}
context.Protobuf.ProtoFiles = []string{"message.proto", "other.proto"}
Expand Down Expand Up @@ -65,6 +68,7 @@ func TestAllAvailableEnvironmentVariablesAreParsed(t *testing.T) {
testutil.AssertEquals(t, "my-client", envMap[env.ClientID])
testutil.AssertEquals(t, "2.0.1", envMap[env.KafkaVersion])
testutil.AssertEquals(t, "registry:8888", envMap[env.AvroSchemaRegistry])
testutil.AssertEquals(t, "avro", envMap[env.AvroJSONCodec])
testutil.AssertEquals(t, "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset", envMap[env.ProtobufProtoSetFiles])
testutil.AssertEquals(t, "/usr/include/protobuf /usr/lib/protobuf", envMap[env.ProtobufImportPaths])
testutil.AssertEquals(t, "message.proto other.proto", envMap[env.ProtobufProtoFiles])
Expand Down
Loading

0 comments on commit 7d27df1

Please sign in to comment.