diff --git a/pom.xml b/pom.xml index 05a68dc..4123913 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ io.kcache kwack - 0.6.0 + 0.7.0 jar kwack diff --git a/src/main/java/io/kcache/kwack/KwackEngine.java b/src/main/java/io/kcache/kwack/KwackEngine.java index 562c389..044ef9a 100644 --- a/src/main/java/io/kcache/kwack/KwackEngine.java +++ b/src/main/java/io/kcache/kwack/KwackEngine.java @@ -22,6 +22,7 @@ import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import io.confluent.kafka.serializers.KafkaJsonDeserializerConfig; import io.kcache.CacheUpdateHandler; import io.kcache.KafkaCache; import io.kcache.KafkaCacheConfig; @@ -438,7 +439,7 @@ private Tuple2 deserialize(boolean isKey, String topic, byte[] Tuple2 schema = isKey ? getKeySchema(serde, topic) : getValueSchema(serde, topic); - Deserializer deserializer = getDeserializer(schema); + Deserializer deserializer = getDeserializer(isKey, schema); if (serde.usesExternalSchema() || config.getSkipBytes() > 0) { try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { @@ -492,11 +493,11 @@ private Tuple2 deserialize(boolean isKey, String topic, byte[] return Tuple.of(ctx, object); } - public Deserializer getDeserializer(Tuple2 schema) { - return deserializers.computeIfAbsent(schema, this::createDeserializer); + public Deserializer getDeserializer(boolean isKey, Tuple2 schema) { + return deserializers.computeIfAbsent(schema, k -> createDeserializer(isKey, schema)); } - private Deserializer createDeserializer(Tuple2 schema) { + private Deserializer createDeserializer(boolean isKey, Tuple2 schema) { if (schema._2 != null) { ParsedSchema parsedSchema = schema._2; SchemaRegistryClient schemaRegistry = null; @@ -517,18 +518,28 @@ private Deserializer createDeserializer(Tuple2 schema) { originals.put(AbstractKafkaSchemaSerDeConfig.USE_SCHEMA_ID, schema._1.getId()); break; } + Deserializer deserializer = null; switch (parsedSchema.schemaType()) { case "AVRO": // This allows BigDecimal to be passed through unchanged originals.put(KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true); - return new KafkaAvroDeserializer(schemaRegistry, originals); + deserializer = new KafkaAvroDeserializer(schemaRegistry); + break; case "JSON": - return new KafkaJsonSchemaDeserializer<>(schemaRegistry, originals); + // Set the type to null so JsonNode is produced + // Otherwise the type defaults to Object.class which produces a LinkedHashMap + originals.put(KafkaJsonDeserializerConfig.JSON_KEY_TYPE, null); + originals.put(KafkaJsonDeserializerConfig.JSON_VALUE_TYPE, null); + deserializer = new KafkaJsonSchemaDeserializer<>(schemaRegistry); + break; case "PROTOBUF": - return new KafkaProtobufDeserializer<>(schemaRegistry, originals); + deserializer = new KafkaProtobufDeserializer<>(schemaRegistry); + break; default: throw new IllegalArgumentException("Illegal type " + parsedSchema.schemaType()); } + deserializer.configure(originals, isKey); + return deserializer; } else { switch (schema._1.getSerdeType()) { case STRING: diff --git a/src/test/java/io/kcache/kwack/AbstractSchemaTest.java b/src/test/java/io/kcache/kwack/AbstractSchemaTest.java index 38a80b1..38c4fda 100644 --- a/src/test/java/io/kcache/kwack/AbstractSchemaTest.java +++ b/src/test/java/io/kcache/kwack/AbstractSchemaTest.java @@ -16,14 +16,17 @@ protected Properties createProducerProps(String schemaRegistryUrl) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.put(SCHEMA_REGISTRY_URL, schemaRegistryUrl); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - org.apache.kafka.common.serialization.BytesSerializer.class); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, getKeySerializer()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getValueSerializer()); return props; } protected abstract String getTopic(); + protected Class getKeySerializer() { + return org.apache.kafka.common.serialization.BytesSerializer.class; + } + protected abstract Class getValueSerializer(); protected KafkaProducer createProducer(Properties props) { @@ -31,10 +34,20 @@ protected KafkaProducer createProducer(Properties props) { } protected void produce(KafkaProducer producer, String topic, Object[] objects) { - ProducerRecord record; - for (Object object : objects) { - byte[] bytes = ByteBuffer.allocate(4).putInt(object.hashCode()).array(); - record = new ProducerRecord<>(topic, Bytes.wrap(bytes), object); + produce(producer, topic, null, objects); + } + + protected void produce(KafkaProducer producer, String topic, Object[] keys, Object[] values) { + ProducerRecord record; + for (int i = 0; i < values.length; i++) { + Object value = values[i]; + Object key; + if (keys != null) { + key = keys[i]; + } else { + key = Bytes.wrap(ByteBuffer.allocate(4).putInt(value.hashCode()).array()); + } + record = new ProducerRecord<>(topic, key, value); producer.send(record); } } diff --git a/src/test/java/io/kcache/kwack/AvroKeyTest.java b/src/test/java/io/kcache/kwack/AvroKeyTest.java new file mode 100644 index 0000000..9f1df41 --- /dev/null +++ b/src/test/java/io/kcache/kwack/AvroKeyTest.java @@ -0,0 +1,216 @@ +package io.kcache.kwack; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; +import io.reactivex.rxjava3.core.Observable; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.Base64; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.junit.jupiter.api.Test; + +public class AvroKeyTest extends AbstractSchemaTest { + + @Override + protected Properties createProducerProps(String schemaRegistryUrl) { + Properties props = super.createProducerProps(schemaRegistryUrl); + props.put(KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true); + return props; + } + + private Schema createSimpleSchema() { + return new Schema.Parser().parse( + "{\"namespace\": \"namespace\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"test\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f1\", \"type\": \"string\"},\n" + + " {\"name\": \"f2\", \"type\": \"int\"}\n" + + "]\n" + + "}"); + } + + private IndexedRecord createSimpleRecord() { + return createSimpleRecord(123); + } + + private IndexedRecord createSimpleRecord(int f2) { + Schema schema = createSimpleSchema(); + GenericRecord avroRecord = new GenericData.Record(schema); + avroRecord.put("f1", "hi"); + avroRecord.put("f2", f2); + return avroRecord; + } + + private Schema createEnumSchema() { + String enumSchema = "{\"name\": \"Kind\",\"namespace\": \"example.avro\",\n" + + " \"type\": \"enum\",\n" + + " \"symbols\" : [\"ONE\", \"TWO\", \"THREE\"]\n" + + "}"; + Schema.Parser parser = new Schema.Parser(); + return parser.parse(enumSchema); + } + + private Schema createFixedSchema() { + String fixedSchema = "{\"name\": \"Fixed\",\n" + + " \"type\": \"fixed\",\n" + + " \"size\" : 4\n" + + "}"; + Schema.Parser parser = new Schema.Parser(); + return parser.parse(fixedSchema); + } + + private Schema createComplexSchema() { + return new Schema.Parser().parse( + "{\"namespace\": \"namespace\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"test\",\n" + + " \"fields\": [\n" + + " {\"name\": \"null\", \"type\": \"null\"},\n" + + " {\"name\": \"boolean\", \"type\": \"boolean\"},\n" + + " {\"name\": \"int\", \"type\": \"int\"},\n" + + " {\"name\": \"long\", \"type\": \"long\"},\n" + + " {\"name\": \"float\", \"type\": \"float\"},\n" + + " {\"name\": \"double\", \"type\": \"double\"},\n" + + " {\"name\": \"bytes\", \"type\": \"bytes\"},\n" + + " {\"name\": \"string\", \"type\": \"string\", \"aliases\": [\"string_alias\"]},\n" + + " {\"name\": \"enum\",\n" + + " \"type\": {\n" + + " \"name\": \"Kind\",\n" + + " \"type\": \"enum\",\n" + + " \"symbols\" : [\"ONE\", \"TWO\", \"THREE\"]\n" + + " }\n" + + " },\n" + + " {\"name\": \"array\",\n" + + " \"type\": {\n" + + " \"type\": \"array\",\n" + + " \"items\" : \"string\"\n" + + " }\n" + + " },\n" + + " {\"name\": \"map\",\n" + + " \"type\": {\n" + + " \"type\": \"map\",\n" + + " \"values\" : \"string\"\n" + + " }\n" + + " },\n" + + " {\"name\": \"nullable_string\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"union\", \"type\": [\"null\", \"string\", \"int\"]},\n" + + " {\"name\": \"fixed\",\n" + + " \"type\": {\n" + + " \"name\": \"Fixed\",\n" + + " \"type\": \"fixed\",\n" + + " \"size\" : 4\n" + + " }\n" + + " },\n" + + " {\"name\": \"decimal\", \"type\": {\"type\": \"bytes\",\n" + + " \"logicalType\": \"decimal\", \"precision\": 5, \"scale\": 2}},\n" + + " {\"name\": \"uuid\", \"type\": {\"type\": \"string\", \"logicalType\": \"uuid\"}},\n" + + " {\"name\": \"date\", \"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},\n" + + " {\"name\": \"time\", \"type\": {\"type\": \"int\", \"logicalType\": \"time-millis\"}},\n" + + " {\"name\": \"timestamp\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}\n" + + "]\n" + + "}"); + } + + private IndexedRecord createComplexRecord() { + Schema enumSchema = createEnumSchema(); + Schema fixedSchema = createFixedSchema(); + Schema schema = createComplexSchema(); + GenericRecord avroRecord = new GenericData.Record(schema); + avroRecord.put("null", null); + avroRecord.put("boolean", true); + avroRecord.put("int", 1); + avroRecord.put("long", 2L); + avroRecord.put("float", 3.0f); + avroRecord.put("double", 4.0d); + avroRecord.put("bytes", ByteBuffer.wrap(new byte[]{0, 1, 2})); + avroRecord.put("string", "testUser"); + avroRecord.put("enum", new GenericData.EnumSymbol(enumSchema, "ONE")); + avroRecord.put("array", ImmutableList.of("hi", "there")); + avroRecord.put("map", ImmutableMap.of("bye", "there")); + avroRecord.put("nullable_string", "zap"); + avroRecord.put("union", 123); + avroRecord.put("fixed", new GenericData.Fixed(fixedSchema, new byte[]{0, 0, 0, 0})); + avroRecord.put("decimal", new BigDecimal("123.45")); + avroRecord.put("uuid", UUID.fromString("d21998e8-8737-432e-a83c-13768dabd821")); + avroRecord.put("date", LocalDate.of(2024, 1, 1)); + avroRecord.put("time", LocalTime.of(8, 30, 30)); + avroRecord.put("timestamp", Instant.ofEpochSecond(1234567890L)); + return avroRecord; + } + + @Test + public void testComplexKey() throws IOException { + IndexedRecord key = createComplexRecord(); + IndexedRecord value = createSimpleRecord(); + Properties producerProps = createProducerProps(MOCK_URL); + KafkaProducer producer = createProducer(producerProps); + produce(producer, getTopic(), new Object[] { key }, new Object[] { value }); + producer.close(); + + engine.init(); + Observable> obs = engine.start(); + List> lm = Lists.newArrayList(obs.blockingIterable().iterator()); + Map row = lm.get(0); + Map m = (Map) row.get("rowkey"); + assertNull(m.get("null")); + assertEquals(true, m.get("boolean")); + assertEquals(1, m.get("int")); + assertEquals(2L, m.get("long")); + assertEquals(3.0f, m.get("float")); + assertEquals(4.0d, m.get("double")); + assertEquals(Base64.getEncoder().encodeToString(new byte[]{0, 1, 2}), m.get("bytes")); + assertEquals("testUser", m.get("string")); + assertEquals("ONE", m.get("enum")); + assertEquals(ImmutableList.of("hi", "there"), m.get("array")); + assertEquals(ImmutableMap.of("bye", "there"), m.get("map")); + assertEquals("zap", m.get("nullable_string")); + assertEquals(123, m.get("union")); + assertEquals(Base64.getEncoder().encodeToString(new byte[]{0, 0, 0, 0}), m.get("fixed")); + assertEquals(new BigDecimal("123.45"), m.get("decimal")); + assertEquals(UUID.fromString("d21998e8-8737-432e-a83c-13768dabd821"), m.get("uuid")); + assertEquals(LocalDate.of(2024, 1, 1), m.get("date")); + assertEquals(LocalTime.of(8, 30, 30), m.get("time")); + assertEquals(Timestamp.from(Instant.ofEpochSecond(1234567890L)), m.get("timestamp")); + } + + @Override + protected String getTopic() { + return "test-avro"; + } + + @Override + protected Class getKeySerializer() { + return io.confluent.kafka.serializers.KafkaAvroSerializer.class; + } + + @Override + protected Class getValueSerializer() { + return io.confluent.kafka.serializers.KafkaAvroSerializer.class; + } + + @Override + protected void injectKwackProperties(Properties props) { + super.injectKwackProperties(props); + props.put(KwackConfig.KEY_SERDES_CONFIG, getTopic() + "=latest"); + } +}