diff --git a/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufData.java b/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufData.java index 4f9d649e772..34ad6d578c1 100644 --- a/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufData.java +++ b/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufData.java @@ -306,6 +306,7 @@ public class ProtobufData { private boolean useWrapperForRawPrimitives; private boolean generateStructForNulls; private boolean generateIndexForUnions; + private boolean useJsonFieldNames; public ProtobufData() { this(new ProtobufDataConfig.Builder().with( @@ -332,6 +333,7 @@ public ProtobufData(ProtobufDataConfig protobufDataConfig) { this.useWrapperForRawPrimitives = protobufDataConfig.useWrapperForRawPrimitives(); this.generateStructForNulls = protobufDataConfig.generateStructForNulls(); this.generateIndexForUnions = protobufDataConfig.generateIndexForUnions(); + this.useJsonFieldNames = protobufDataConfig.useJsonFieldNames(); } /** @@ -519,10 +521,14 @@ private Object fromConnectData( throw new DataException("Invalid message name: " + scopedStructName); } for (Field field : schema.fields()) { - String fieldName = scrubName(field.name()); - Object fieldCtx = getFieldType(ctx, fieldName); + String fieldName = field.name(); + if (useJsonFieldNames) { + fieldName = fromJsonCase(fieldName); + } + String scrubbedFieldName = scrubName(fieldName); + Object fieldCtx = getFieldType(ctx, scrubbedFieldName); Object connectFieldVal = ignoreDefaultForNullables - ? struct.getWithoutDefault(field.name()) : struct.get(field); + ? struct.getWithoutDefault(fieldName) : struct.get(field); Object fieldValue = fromConnectData( fieldCtx, field.schema(), @@ -539,10 +545,10 @@ private Object fromConnectData( fieldValue = union.getValue(); } else { fieldDescriptor = messageBuilder.getDescriptorForType() - .findFieldByName(fieldName); + .findFieldByName(scrubbedFieldName); } if (fieldDescriptor == null) { - throw new DataException("Cannot find field with name " + fieldName); + throw new DataException("Cannot find field with name " + scrubbedFieldName); } if (fieldValue != null) { messageBuilder.setField(fieldDescriptor, fieldValue); @@ -727,12 +733,16 @@ private MessageDefinition messageDefinitionFromConnectSchema( String fieldTag = fieldSchema.parameters() != null ? fieldSchema.parameters() .get(PROTOBUF_TYPE_TAG) : null; int tag = fieldTag != null ? Integer.parseInt(fieldTag) : index++; + String fieldName = field.name(); + if (useJsonFieldNames) { + fieldName = fromJsonCase(fieldName); + } FieldDefinition fieldDef = fieldDefinitionFromConnectSchema( ctx, schema, message, fieldSchema, - scrubName(field.name()), + scrubName(fieldName), tag ); if (fieldDef != null) { @@ -763,6 +773,22 @@ private MessageDefinition messageDefinitionFromConnectSchema( return message.build(); } + private static String fromJsonCase(final String str) { + final StringBuilder sb = new StringBuilder(); + for (int i = 0; i < str.length(); i++) { + char c = str.charAt(i); + if (Character.isUpperCase(c)) { + if (i != 0) { + sb.append("_"); + } + sb.append(Character.toLowerCase(c)); + } else { + sb.append(c); + } + } + return sb.toString(); + } + private void oneofDefinitionFromConnectSchema( FromConnectContext ctx, DynamicSchema.Builder schema, @@ -1363,7 +1389,7 @@ private void setStructField( Struct result, FieldDescriptor fieldDescriptor ) { - final String fieldName = fieldDescriptor.getName(); + final String fieldName = useJsonFieldNames ? fieldDescriptor.getJsonName() : fieldDescriptor.getName(); final Field field = schema.field(fieldName); if ((isPrimitiveOrRepeated(fieldDescriptor) && !isOptional(fieldDescriptor)) || (generateStructForNulls || message.hasField(fieldDescriptor))) { @@ -1425,7 +1451,8 @@ private SchemaBuilder toConnectSchema( // Already added field as oneof continue; } - builder.field(fieldDescriptor.getName(), toConnectSchema(ctx, fieldDescriptor)); + final String fieldName = useJsonFieldNames ? fieldDescriptor.getJsonName() : fieldDescriptor.getName(); + builder.field(fieldName, toConnectSchema(ctx, fieldDescriptor)); } } diff --git a/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufDataConfig.java b/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufDataConfig.java index c90def4ebf3..34ea304ed40 100644 --- a/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufDataConfig.java +++ b/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufDataConfig.java @@ -69,6 +69,11 @@ public class ProtobufDataConfig extends AbstractDataConfig { public static final String GENERATE_INDEX_FOR_UNIONS_DOC = "Whether to suffix union" + "names with an underscore followed by an index"; + public static final String JSON_FIELD_NAMES_CONFIG = "json.field.names"; + public static final boolean JSON_FIELD_NAMES_DEFAULT = false; + public static final String JSON_FIELD_NAMES_DOC = "Whether to convert protobuf field names " + + "to camelcase for internal data representation and vice-versa."; + public static ConfigDef baseConfigDef() { return AbstractDataConfig.baseConfigDef() .define(ENHANCED_PROTOBUF_SCHEMA_SUPPORT_CONFIG, @@ -112,7 +117,12 @@ public static ConfigDef baseConfigDef() { ConfigDef.Type.BOOLEAN, GENERATE_INDEX_FOR_UNIONS_DEFAULT, ConfigDef.Importance.LOW, - GENERATE_INDEX_FOR_UNIONS_DOC + GENERATE_INDEX_FOR_UNIONS_DOC) + .define(JSON_FIELD_NAMES_CONFIG, + ConfigDef.Type.BOOLEAN, + JSON_FIELD_NAMES_DEFAULT, + ConfigDef.Importance.LOW, + JSON_FIELD_NAMES_DOC ); } @@ -156,6 +166,8 @@ public boolean generateIndexForUnions() { return this.getBoolean(GENERATE_INDEX_FOR_UNIONS_CONFIG); } + public boolean useJsonFieldNames() { return this.getBoolean(JSON_FIELD_NAMES_CONFIG); } + public static class Builder { private final Map props = new HashMap<>(); diff --git a/protobuf-converter/src/test/java/io/confluent/connect/protobuf/ProtobufConverterTest.java b/protobuf-converter/src/test/java/io/confluent/connect/protobuf/ProtobufConverterTest.java index 5b9fb3d4a93..c81197c32ac 100644 --- a/protobuf-converter/src/test/java/io/confluent/connect/protobuf/ProtobufConverterTest.java +++ b/protobuf-converter/src/test/java/io/confluent/connect/protobuf/ProtobufConverterTest.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.ListValue; import com.google.protobuf.Timestamp; @@ -28,21 +29,16 @@ import io.confluent.kafka.serializers.protobuf.test.KeyTimestampValueOuterClass.KeyTimestampValue; import io.confluent.kafka.serializers.protobuf.test.TestMessageProtos.TestMessage2; import io.confluent.kafka.serializers.protobuf.test.TimestampValueOuterClass.TimestampValue; -import java.util.List; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.apache.kafka.connect.data.Struct; + +import java.util.*; + +import org.apache.kafka.connect.data.*; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import io.confluent.connect.protobuf.test.Key; import io.confluent.connect.protobuf.test.KeyValue; @@ -281,6 +277,23 @@ public void testFromConnectDataForValue() { assertArrayEquals(expected, Arrays.copyOfRange(result, PROTOBUF_BYTES_START, result.length)); } + @Test + public void testFromConnectDataForValueUseJsonFieldNames() { + final byte[] expected = HELLO_WORLD_MESSAGE.toByteArray(); + + Map configs = new HashMap<>(SR_CONFIG); + configs.put(ProtobufDataConfig.JSON_FIELD_NAMES_CONFIG, true); + converter.configure(configs, false); + + SchemaAndValue schemaAndValue = getExpectedTestMessageWithJsonFieldNames(); + + byte[] result = converter.fromConnectData("my-topic", + schemaAndValue.schema(), schemaAndValue.value() + ); + + assertArrayEquals(expected, Arrays.copyOfRange(result, PROTOBUF_BYTES_START, result.length)); + } + @Test public void testFromConnectDataForValueWithNamespace() { final byte[] expected = HELLO_WORLD_MESSAGE.toByteArray(); @@ -531,6 +544,44 @@ public void testToConnectDataForValue() throws Exception { assertEquals(expected, result); } + @Test + public void testToConnectDataForValueUseJsonFieldNames() throws Exception { + Map configs = new HashMap<>(SR_CONFIG); + configs.put(ProtobufDataConfig.JSON_FIELD_NAMES_CONFIG, true); + converter.configure(configs, false); + // extra byte for message index + final byte[] input = concat(new byte[]{0, 0, 0, 0, 1, 0}, HELLO_WORLD_MESSAGE.toByteArray()); + schemaRegistry.register("my-topic-value", getSchema(TestMessage.getDescriptor())); + SchemaAndValue result = converter.toConnectData("my-topic", input); + + SchemaAndValue expected = getExpectedTestMessageWithJsonFieldNames(); + + assertEquals(expected.schema(), result.schema()); + assertEquals(expected, result); + } + + private SchemaAndValue getExpectedTestMessageWithJsonFieldNames() { + Struct testMessageStruct = getTestMessageStruct(TEST_MSG_STRING, 123); + Schema testMessageSchema = getTestMessageSchema(); + + final SchemaBuilder builder = SchemaBuilder.struct(); + builder.name("TestMessage").version(1); + List values = new ArrayList<>(); + for (Field field : testMessageSchema.fields()) { + String jsonFieldName = TestMessage.getDescriptor() + .findFieldByName(field.name()).getJsonName(); + builder.field(jsonFieldName, field.schema()); + values.add(testMessageStruct.get(field)); + } + final Schema jsonSchema = builder.build(); + final Struct jsonStruct = new Struct(jsonSchema); + final Iterator valuesIt = values.iterator(); + for (Field field : jsonSchema.fields()) { + jsonStruct.put(field, valuesIt.next()); + } + return new SchemaAndValue(jsonSchema, jsonStruct); + } + @Test public void testToConnectDataForValueWithSecondMessage() throws Exception { converter.configure(SR_CONFIG, false);