diff --git a/CHANGELOG.md b/CHANGELOG.md index 33de464e..d3f11cf4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,11 +5,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Changed +- [#123](https://github.com/deviceinsight/kafkactl/issues/123) Make avro json codec configurable and switch default to standard json ## 2.5.0 - 2022-08-19 +### Added - [#105](https://github.com/deviceinsight/kafkactl/issues/105) Add replication factor to `get topics` -- [#108](https://github.com/deviceinsight/kafkactl/issues/108) Fix "system root pool not available on Windows" by migrating to go 1.19 - Add `clone` command for `consumer-group` and `topic` + +### Fixed +- [#108](https://github.com/deviceinsight/kafkactl/issues/108) Fix "system root pool not available on Windows" by migrating to go 1.19 - Print topic configs when yaml and json output format used with `-c` flag ## 2.4.0 - 2022-06-23 diff --git a/README.md b/README.md index 4f222e13..32a3b3b2 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,9 @@ contexts: # optional: avro schema registry avro: schemaRegistry: localhost:8081 + # optional: configure codec for (de)serialization as standard,avro (defaults to standard) + # see: https://github.com/deviceinsight/kafkactl/issues/123 + jsonCodec: avro # optional: default protobuf messages search paths protobuf: diff --git a/cmd/produce/produce_test.go b/cmd/produce/produce_test.go index a0668b2e..50c7c6a2 100644 --- a/cmd/produce/produce_test.go +++ b/cmd/produce/produce_test.go @@ -92,6 +92,122 @@ func TestProduceAvroMessageWithHeadersIntegration(t *testing.T) { testutil.AssertEquals(t, fmt.Sprintf("key1:value1,key\\:2:value\\:2#test-key#%s", value), kafkaCtl.GetStdOut()) } +func TestProduceAvroMessageOmitDefaultValue(t *testing.T) { + + testutil.StartIntegrationTest(t) + + valueSchema := `{ + "name": "CreateUserProfileWallet", + "namespace": "Messaging.Contracts.WalletManager.Commands", + "type": "record", + "fields": [ + { "name": "CurrencyCode", "type": "string" }, + { "name": "ExpiresOn", "type": ["null", "string"], "default": null} + ] + }` + value := `{ + "CurrencyCode": "EUR" + }` + + topicName := testutil.CreateAvroTopic(t, "produce-avro-topic", "", valueSchema) + + kafkaCtl := testutil.CreateKafkaCtlCommand() + + if _, err := kafkaCtl.Execute("produce", topicName, "--value", value); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut()) + + if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--exit"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + stdout := kafkaCtl.GetStdOut() + testutil.AssertContainSubstring(t, `"CurrencyCode":"EUR"`, stdout) + testutil.AssertContainSubstring(t, `"ExpiresOn":null`, stdout) +} + +func TestProduceAvroMessageWithUnionStandardJson(t *testing.T) { + + testutil.StartIntegrationTest(t) + + valueSchema := `{ + "name": "CreateUserProfileWallet", + "namespace": "Messaging.Contracts.WalletManager.Commands", + "type": "record", + "fields": [ + { "name": "CurrencyCode", "type": "string" }, + { "name": "ExpiresOn", "type": ["null", "string"], "default": null} + ] + }` + + value := `{ + "CurrencyCode": "EUR", + "ExpiresOn": "2022-12-12" + }` + + topicName := testutil.CreateAvroTopic(t, "produce-topic", "", valueSchema) + + kafkaCtl := testutil.CreateKafkaCtlCommand() + + if _, err := kafkaCtl.Execute("produce", topicName, "--value", value); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut()) + + if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--exit"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + stdout := kafkaCtl.GetStdOut() + testutil.AssertContainSubstring(t, `"CurrencyCode":"EUR"`, stdout) + testutil.AssertContainSubstring(t, `"ExpiresOn":"2022-12-12"`, stdout) +} + +func TestProduceAvroMessageWithUnionAvroJson(t *testing.T) { + + testutil.StartIntegrationTest(t) + + valueSchema := `{ + "name": "CreateUserProfileWallet", + "namespace": "Messaging.Contracts.WalletManager.Commands", + "type": "record", + "fields": [ + { "name": "CurrencyCode", "type": "string" }, + { "name": "ExpiresOn", "type": ["null", "string"], "default": null} + ] + }` + + value := `{ + "CurrencyCode": "EUR", + "ExpiresOn": {"string": "2022-12-12"} + }` + + if err := os.Setenv("AVRO_JSONCODEC", "avro"); err != nil { + t.Fatalf("unable to set env variable: %v", err) + } + + topicName := testutil.CreateAvroTopic(t, "produce-topic", "", valueSchema) + + kafkaCtl := testutil.CreateKafkaCtlCommand() + + if _, err := kafkaCtl.Execute("produce", topicName, "--value", value); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut()) + + if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--exit"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + stdout := kafkaCtl.GetStdOut() + testutil.AssertContainSubstring(t, `"CurrencyCode":"EUR"`, stdout) + testutil.AssertContainSubstring(t, `"ExpiresOn":{"string":"2022-12-12"}`, stdout) +} + func TestProduceTombstoneIntegration(t *testing.T) { testutil.StartIntegrationTest(t) diff --git a/cmd/root_test.go b/cmd/root_test.go index 3873434b..52c6af54 100644 --- a/cmd/root_test.go +++ b/cmd/root_test.go @@ -56,6 +56,7 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) { _ = os.Setenv(env.ClientID, "my-client") _ = os.Setenv(env.KafkaVersion, "2.0.1") _ = os.Setenv(env.AvroSchemaRegistry, "registry:8888") + _ = os.Setenv(env.AvroJSONCodec, "avro") _ = os.Setenv(env.ProtobufProtoSetFiles, "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset") _ = os.Setenv(env.ProtobufImportPaths, "/usr/include/protobuf /usr/lib/protobuf") _ = os.Setenv(env.ProtobufProtoFiles, "message.proto other.proto") @@ -94,6 +95,7 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) { testutil.AssertEquals(t, "my-client", viper.GetString("contexts.default.clientID")) testutil.AssertEquals(t, "2.0.1", viper.GetString("contexts.default.kafkaVersion")) testutil.AssertEquals(t, "registry:8888", viper.GetString("contexts.default.avro.schemaRegistry")) + testutil.AssertEquals(t, "avro", viper.GetString("contexts.default.avro.jsonCodec")) testutil.AssertEquals(t, "/usr/include/protosets/ps1.protoset", viper.GetStringSlice("contexts.default.protobuf.protosetFiles")[0]) testutil.AssertEquals(t, "/usr/include/protobuf", viper.GetStringSlice("contexts.default.protobuf.importPaths")[0]) testutil.AssertEquals(t, "message.proto", viper.GetStringSlice("contexts.default.protobuf.protoFiles")[0]) diff --git a/go.mod b/go.mod index 8ad1fc7b..6a3b2d15 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/golang/protobuf v1.4.2 github.com/jhump/protoreflect v1.10.1 github.com/landoop/schema-registry v0.0.0-20190327143759-50a5701c1891 - github.com/linkedin/goavro/v2 v2.10.0 + github.com/linkedin/goavro/v2 v2.11.2-0.20220819205120-9a4764661614 github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.0.1-0.20200629195214-2c5a0d300f8b github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index fc5d4f2a..17bd7596 100644 --- a/go.sum +++ b/go.sum @@ -190,6 +190,8 @@ github.com/landoop/schema-registry v0.0.0-20190327143759-50a5701c1891 h1:FADDInP github.com/landoop/schema-registry v0.0.0-20190327143759-50a5701c1891/go.mod h1:yITyTTMx2IS5mpfZjQ64gJhL5U5RvcorFBu+z4/euXg= github.com/linkedin/goavro/v2 v2.10.0 h1:eTBIRoInBM88gITGXYtUSqqxLTFXfOsJBiX8ZMW0o4U= github.com/linkedin/goavro/v2 v2.10.0/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= +github.com/linkedin/goavro/v2 v2.11.2-0.20220819205120-9a4764661614 h1:57IiYlL+Q6pWb+rCX7HuCLMQndj0HxGSedf6c1skI9U= +github.com/linkedin/goavro/v2 v2.11.2-0.20220819205120-9a4764661614/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.4 h1:8KGKTcQQGm0Kv7vEbKFErAoAOFyyacLStRtQSeYtvkY= @@ -278,12 +280,16 @@ github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.5 h1:s5PTfem8p8EbKQOctVV53k6jCJt3UX4IEJzwh+C324Q= +github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -505,6 +511,8 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/gotestsum v1.8.1 h1:C6dYd5K39WAv52jikEUuWgyMqJDhY90eauUjsFzwluc= gotest.tools/gotestsum v1.8.1/go.mod h1:ctqdxBSCPv80kAFjYvFNpPntBrE5HAQnLiOKBGLmOBs= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= diff --git a/internal/common-operation.go b/internal/common-operation.go index 7fa8e709..736f06c8 100644 --- a/internal/common-operation.go +++ b/internal/common-operation.go @@ -10,6 +10,8 @@ import ( "strings" "time" + "github.com/deviceinsight/kafkactl/internal/helpers/avro" + "github.com/Shopify/sarama" "github.com/deviceinsight/kafkactl/internal/helpers" "github.com/deviceinsight/kafkactl/internal/helpers/protobuf" @@ -63,6 +65,7 @@ type ClientContext struct { ClientID string KafkaVersion sarama.KafkaVersion AvroSchemaRegistry string + AvroJSONCodec avro.JSONCodec Protobuf protobuf.SearchContext Producer ProducerConfig } @@ -104,6 +107,7 @@ func CreateClientContext() (ClientContext, error) { return context, err } context.AvroSchemaRegistry = viper.GetString("contexts." + context.Name + ".avro.schemaRegistry") + context.AvroJSONCodec = avro.ParseJSONCodec(viper.GetString("contexts." + context.Name + ".avro.jsonCodec")) context.Protobuf.ProtosetFiles = viper.GetStringSlice("contexts." + context.Name + ".protobuf.protosetFiles") context.Protobuf.ProtoImportPaths = viper.GetStringSlice("contexts." + context.Name + ".protobuf.importPaths") context.Protobuf.ProtoFiles = viper.GetStringSlice("contexts." + context.Name + ".protobuf.protoFiles") diff --git a/internal/consume/AvroMessageDeserializer.go b/internal/consume/AvroMessageDeserializer.go index b57633ec..2934ec70 100644 --- a/internal/consume/AvroMessageDeserializer.go +++ b/internal/consume/AvroMessageDeserializer.go @@ -6,6 +6,8 @@ import ( "strings" "time" + "github.com/deviceinsight/kafkactl/internal/helpers/avro" + "github.com/Shopify/sarama" "github.com/deviceinsight/kafkactl/output" "github.com/deviceinsight/kafkactl/util" @@ -16,14 +18,15 @@ import ( type AvroMessageDeserializer struct { topic string avroSchemaRegistry string + jsonCodec avro.JSONCodec registry *CachingSchemaRegistry } -func CreateAvroMessageDeserializer(topic string, avroSchemaRegistry string) (AvroMessageDeserializer, error) { +func CreateAvroMessageDeserializer(topic string, avroSchemaRegistry string, jsonCodec avro.JSONCodec) (AvroMessageDeserializer, error) { var err error - deserializer := AvroMessageDeserializer{topic: topic, avroSchemaRegistry: avroSchemaRegistry} + deserializer := AvroMessageDeserializer{topic: topic, avroSchemaRegistry: avroSchemaRegistry, jsonCodec: jsonCodec} deserializer.registry, err = CreateCachingSchemaRegistry(deserializer.avroSchemaRegistry) @@ -131,7 +134,13 @@ func (deserializer AvroMessageDeserializer) decode(rawData []byte, flags Flags, return decodedValue{}, errors.Errorf("failed to find avro schema for subject: %s id: %d (%v)", subject, schemaID, err) } - avroCodec, err := goavro.NewCodec(schema) + var avroCodec *goavro.Codec + + if deserializer.jsonCodec == avro.Avro { + avroCodec, err = goavro.NewCodec(schema) + } else { + avroCodec, err = goavro.NewCodecForStandardJSONFull(schema) + } if err != nil { return decodedValue{}, errors.Wrap(err, "failed to parse avro schema") diff --git a/internal/consume/consume-operation.go b/internal/consume/consume-operation.go index 8e102df5..678d1e62 100644 --- a/internal/consume/consume-operation.go +++ b/internal/consume/consume-operation.go @@ -77,7 +77,7 @@ func (operation *Operation) Consume(topic string, flags Flags) error { var deserializers MessageDeserializerChain if clientContext.AvroSchemaRegistry != "" { - deserializer, err := CreateAvroMessageDeserializer(topic, clientContext.AvroSchemaRegistry) + deserializer, err := CreateAvroMessageDeserializer(topic, clientContext.AvroSchemaRegistry, clientContext.AvroJSONCodec) if err != nil { return err } diff --git a/internal/env/variables.go b/internal/env/variables.go index 967ef3ae..7946b441 100644 --- a/internal/env/variables.go +++ b/internal/env/variables.go @@ -15,6 +15,7 @@ const ( ClientID = "CLIENTID" KafkaVersion = "KAFKAVERSION" AvroSchemaRegistry = "AVRO_SCHEMAREGISTRY" + AvroJSONCodec = "AVRO_JSONCODEC" ProtobufProtoSetFiles = "PROTOBUF_PROTOSETFILES" ProtobufImportPaths = "PROTOBUF_IMPORTPATHS" ProtobufProtoFiles = "PROTOBUF_PROTOFILES" @@ -38,6 +39,7 @@ var Variables = []string{ ClientID, KafkaVersion, AvroSchemaRegistry, + AvroJSONCodec, ProtobufProtoSetFiles, ProtobufImportPaths, ProtobufProtoFiles, diff --git a/internal/helpers/avro/JsonCodec.go b/internal/helpers/avro/JsonCodec.go new file mode 100644 index 00000000..398b5116 --- /dev/null +++ b/internal/helpers/avro/JsonCodec.go @@ -0,0 +1,33 @@ +package avro + +import ( + "strings" + + "github.com/deviceinsight/kafkactl/output" +) + +type JSONCodec int + +const ( + Standard JSONCodec = iota + Avro +) + +func (codec JSONCodec) String() string { + return []string{"standard", "avro"}[codec] +} + +func ParseJSONCodec(codec string) JSONCodec { + + switch strings.ToLower(codec) { + case "": + fallthrough + case "standard": + return Standard + case "avro": + return Avro + default: + output.Warnf("unable to parse avro json codec: %s", codec) + return Standard + } +} diff --git a/internal/k8s/k8s-operation.go b/internal/k8s/k8s-operation.go index 94752141..047a77fb 100644 --- a/internal/k8s/k8s-operation.go +++ b/internal/k8s/k8s-operation.go @@ -154,6 +154,7 @@ func parsePodEnvironment(context internal.ClientContext) []string { envVariables = appendStringIfDefined(envVariables, env.ClientID, context.ClientID) envVariables = appendStringIfDefined(envVariables, env.KafkaVersion, context.KafkaVersion.String()) envVariables = appendStringIfDefined(envVariables, env.AvroSchemaRegistry, context.AvroSchemaRegistry) + envVariables = appendStringIfDefined(envVariables, env.AvroJSONCodec, context.AvroJSONCodec.String()) envVariables = appendStrings(envVariables, env.ProtobufProtoSetFiles, context.Protobuf.ProtosetFiles) envVariables = appendStrings(envVariables, env.ProtobufImportPaths, context.Protobuf.ProtoImportPaths) envVariables = appendStrings(envVariables, env.ProtobufProtoFiles, context.Protobuf.ProtoFiles) diff --git a/internal/k8s/k8s-operation_test.go b/internal/k8s/k8s-operation_test.go index 4f760380..2ad7d1f7 100644 --- a/internal/k8s/k8s-operation_test.go +++ b/internal/k8s/k8s-operation_test.go @@ -5,6 +5,8 @@ import ( "testing" "time" + "github.com/deviceinsight/kafkactl/internal/helpers/avro" + "github.com/Shopify/sarama" "github.com/deviceinsight/kafkactl/internal" "github.com/deviceinsight/kafkactl/internal/env" @@ -29,6 +31,7 @@ func TestAllAvailableEnvironmentVariablesAreParsed(t *testing.T) { context.ClientID = "my-client" context.KafkaVersion = sarama.V2_0_1_0 context.AvroSchemaRegistry = "registry:8888" + context.AvroJSONCodec = avro.Avro context.Protobuf.ProtosetFiles = []string{"/usr/include/protosets/ps1.protoset", "/usr/lib/ps2.protoset"} context.Protobuf.ProtoImportPaths = []string{"/usr/include/protobuf", "/usr/lib/protobuf"} context.Protobuf.ProtoFiles = []string{"message.proto", "other.proto"} @@ -65,6 +68,7 @@ func TestAllAvailableEnvironmentVariablesAreParsed(t *testing.T) { testutil.AssertEquals(t, "my-client", envMap[env.ClientID]) testutil.AssertEquals(t, "2.0.1", envMap[env.KafkaVersion]) testutil.AssertEquals(t, "registry:8888", envMap[env.AvroSchemaRegistry]) + testutil.AssertEquals(t, "avro", envMap[env.AvroJSONCodec]) testutil.AssertEquals(t, "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset", envMap[env.ProtobufProtoSetFiles]) testutil.AssertEquals(t, "/usr/include/protobuf /usr/lib/protobuf", envMap[env.ProtobufImportPaths]) testutil.AssertEquals(t, "message.proto other.proto", envMap[env.ProtobufProtoFiles]) diff --git a/internal/producer/AvroMessageSerializer.go b/internal/producer/AvroMessageSerializer.go index df5d4e19..cf0a64fd 100644 --- a/internal/producer/AvroMessageSerializer.go +++ b/internal/producer/AvroMessageSerializer.go @@ -3,6 +3,8 @@ package producer import ( "encoding/binary" + "github.com/deviceinsight/kafkactl/internal/helpers/avro" + "github.com/Shopify/sarama" "github.com/deviceinsight/kafkactl/util" schemaregistry "github.com/landoop/schema-registry" @@ -13,14 +15,15 @@ import ( type AvroMessageSerializer struct { topic string avroSchemaRegistry string + jsonCodec avro.JSONCodec client *schemaregistry.Client } -func CreateAvroMessageSerializer(topic string, avroSchemaRegistry string) (AvroMessageSerializer, error) { +func CreateAvroMessageSerializer(topic string, avroSchemaRegistry string, jsonCodec avro.JSONCodec) (AvroMessageSerializer, error) { var err error - serializer := AvroMessageSerializer{topic: topic, avroSchemaRegistry: avroSchemaRegistry} + serializer := AvroMessageSerializer{topic: topic, avroSchemaRegistry: avroSchemaRegistry, jsonCodec: jsonCodec} serializer.client, err = schemaregistry.NewClient(serializer.avroSchemaRegistry) @@ -62,18 +65,24 @@ func (serializer AvroMessageSerializer) encode(rawData []byte, schemaVersion int } } - codec, err := goavro.NewCodec(schema.Schema) + var avroCodec *goavro.Codec + + if serializer.jsonCodec == avro.Avro { + avroCodec, err = goavro.NewCodec(schema.Schema) + } else { + avroCodec, err = goavro.NewCodecForStandardJSONFull(schema.Schema) + } if err != nil { return nil, errors.Wrap(err, "failed to parse avro schema") } - native, _, err := codec.NativeFromTextual(rawData) + native, _, err := avroCodec.NativeFromTextual(rawData) if err != nil { return nil, errors.Wrap(err, "failed to convert value to avro data") } - data, err := codec.BinaryFromNative(nil, native) + data, err := avroCodec.BinaryFromNative(nil, native) if err != nil { return nil, errors.Wrap(err, "failed to convert value to avro data") } diff --git a/internal/producer/producer-operation.go b/internal/producer/producer-operation.go index 5b639957..ac2ab9b7 100644 --- a/internal/producer/producer-operation.go +++ b/internal/producer/producer-operation.go @@ -78,7 +78,7 @@ func (operation *Operation) Produce(topic string, flags Flags) error { serializers := MessageSerializerChain{topic: topic} if clientContext.AvroSchemaRegistry != "" { - serializer, err := CreateAvroMessageSerializer(topic, clientContext.AvroSchemaRegistry) + serializer, err := CreateAvroMessageSerializer(topic, clientContext.AvroSchemaRegistry, clientContext.AvroJSONCodec) if err != nil { return err } diff --git a/testutil/test_util.go b/testutil/test_util.go index 36570e29..d4f0940d 100644 --- a/testutil/test_util.go +++ b/testutil/test_util.go @@ -13,6 +13,8 @@ import ( "testing" "time" + "github.com/deviceinsight/kafkactl/internal/env" + "github.com/Shopify/sarama" "github.com/deviceinsight/kafkactl/cmd" "github.com/deviceinsight/kafkactl/internal" @@ -65,6 +67,12 @@ func init() { if err := os.Setenv("KAFKA_CTL_CONFIG", filepath.Join(rootDir, configFile)); err != nil { panic(err) } + + for _, variable := range env.Variables { + if err := os.Setenv(variable, ""); err != nil { + panic(err) + } + } } func StartUnitTest(t *testing.T) { @@ -192,6 +200,12 @@ func AssertErrorContains(t *testing.T, expected string, err error) { } } +func AssertContainSubstring(t *testing.T, expected, actual string) { + if !strings.Contains(actual, expected) { + t.Fatalf("expected string to contain: %s actual: %s", expected, actual) + } +} + func AssertContains(t *testing.T, expected string, array []string) { if !util.ContainsString(array, expected) { t.Fatalf("expected array to contain: %s\narray: %v", expected, array)