Skip to content

Commit

Permalink
Merge pull request #20 from OSGP/feature/FDP-2470
Browse files Browse the repository at this point in the history
FDP-2470: Avro Deserializer for multiple schemas
  • Loading branch information
sanderv authored Jul 17, 2024
2 parents e4ac1f3 + 91aa15c commit dd55998
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 62 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.jetbrains.kotlin.gradle.dsl.KotlinJvmProjectExtension
import java.net.URI

plugins {
id("io.spring.dependency-management") version "1.1.4" apply false
id("io.spring.dependency-management") version "1.1.5" apply false
kotlin("jvm") version "2.0.0" apply false
kotlin("plugin.spring") version "2.0.0" apply false
id("org.sonarqube") version "5.0.0.4638"
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.8-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.9-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
4 changes: 4 additions & 0 deletions kafka-avro/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
plugins {
id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
}

dependencies {
implementation("org.apache.kafka:kafka-clients")
implementation(libs.avro)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,31 @@ SPDX-License-Identifier: Apache-2.0
*/
package com.gxf.utilities.kafka.avro

import org.apache.avro.Schema
import org.apache.avro.message.BinaryMessageDecoder
import org.apache.avro.specific.SpecificRecord
import org.apache.avro.specific.SpecificData
import org.apache.avro.specific.SpecificRecordBase
import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.Deserializer
import org.slf4j.LoggerFactory

class AvroDeserializer<T : SpecificRecord>(private val decoder: BinaryMessageDecoder<T>) : Deserializer<T> {
class AvroDeserializer(deserializerSchemas: List<Schema>) : Deserializer<SpecificRecordBase> {
companion object {
private val logger = LoggerFactory.getLogger(AvroDeserializer::class.java)
}

private val decoder = BinaryMessageDecoder<SpecificRecordBase>(SpecificData(), null)

init {
// Add all schema's to the decoder
deserializerSchemas
.forEach { decoder.addSchema(it) }
}

/**
* Deserializes a Byte Array to an Avro specific record
* Deserializes a Byte Array to an Avro SpecificRecordBase
*/
override fun deserialize(topic: String, payload: ByteArray): T? {
override fun deserialize(topic: String, payload: ByteArray): SpecificRecordBase {
try {
logger.trace("Deserializing for {}", topic)
return decoder.decode(payload)
Expand Down
21 changes: 21 additions & 0 deletions kafka-avro/src/test/avro/AvroSchema1.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"type": "record",
"name": "AvroSchema1",
"namespace": "com.alliander.gxf.utilities.kafka.avro",
"fields": [
{
"name": "field1",
"type": {
"type": "string",
"avro.java.string": "String"
}
},
{
"name": "field2",
"type": {
"type": "string",
"avro.java.string": "String"
}
}
]
}
14 changes: 14 additions & 0 deletions kafka-avro/src/test/avro/AvroSchema2.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"type": "record",
"name": "AvroSchema2",
"namespace": "com.alliander.gxf.utilities.kafka.avro",
"fields": [
{
"name": "message2",
"type": {
"type": "string",
"avro.java.string": "String"
}
}
]
}
14 changes: 14 additions & 0 deletions kafka-avro/src/test/avro/AvroSchema3.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"type": "record",
"name": "AvroSchema3",
"namespace": "com.alliander.gxf.utilities.kafka.avro",
"fields": [
{
"name": "message3",
"type": {
"type": "string",
"avro.java.string": "String"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.gxf.utilities.kafka.avro

import com.alliander.gxf.utilities.kafka.avro.AvroSchema1
import com.alliander.gxf.utilities.kafka.avro.AvroSchema2
import com.alliander.gxf.utilities.kafka.avro.AvroSchema3
import org.apache.kafka.common.errors.SerializationException
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.jupiter.api.Test

class AvroDeserializerTest {

@Test
fun avroDeserializerTest() {
val message1 = AvroSchema1("field no 1", "field no 2")
val message2 = AvroSchema2("message in a bottle")
val message3 = AvroSchema3("message in a bottle")
val deserializer = AvroDeserializer(listOf(AvroSchema1.getClassSchema(), AvroSchema2.getClassSchema()))

assertThat(deserializer.deserialize("topic1", message1.toByteBuffer().array()))
.isEqualTo(message1)
assertThat(deserializer.deserialize("topic2", message2.toByteBuffer().array()))
.isEqualTo(message2)

assertThatThrownBy({deserializer.deserialize("topic3", message3.toByteBuffer().array())})
.isInstanceOf(SerializationException::class.java)
.hasMessageContaining("Error deserializing Avro message for topic: topic3");

}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.gxf.utilities.kafka.avro

import org.apache.avro.Schema
import org.apache.avro.specific.SpecificRecordBase
import com.alliander.gxf.utilities.kafka.avro.AvroSchema1
import com.alliander.gxf.utilities.kafka.avro.AvroSchema2
import org.assertj.core.api.AbstractIntegerAssert
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import java.io.ByteArrayOutputStream
Expand All @@ -24,55 +25,3 @@ class AvroEncoderTest {
assertThat(AvroEncoder.encoders.size).isEqualTo(2)
}
}

class AvroSchema1(private var field1: String, private var field2: String): SpecificRecordBase() {
override fun getSchema(): Schema = Schema.Parser()
.parse("{\"type\":\"record\",\"name\":\"AvroSchema1\",\"namespace\":\"com.alliander.gxf.utilities.kafka.avro\",\"fields\":[{\"name\":\"field1\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}, {\"name\":\"field2\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}")

override fun put(field: Int, value: Any?) {
when(field) {
0 -> {
if(value != null) {
field1 = value.toString()
}
}
1 -> {
if(value != null) {
field2 = value.toString()
}
}
else -> throw IndexOutOfBoundsException()
}
}

override fun get(field: Int): Any {
return when(field) {
0 -> field1
1 -> field2
else -> throw IndexOutOfBoundsException()
}
}
}

class AvroSchema2(private var message: String): SpecificRecordBase() {
override fun getSchema(): Schema = Schema.Parser()
.parse("{\"type\":\"record\",\"name\":\"AvroSchema2\",\"namespace\":\"com.alliander.gxf.utilities.kafka.avro\",\"fields\":[{\"name\":\"message\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}")

override fun put(field: Int, value: Any?) {
when(field) {
0 -> {
if(value != null) {
message = value.toString()
}
}
else -> throw IndexOutOfBoundsException()
}
}

override fun get(field: Int): Any {
return when(field) {
0 -> message
else -> throw IndexOutOfBoundsException()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.gxf.utilities.kafka.avro

import com.alliander.gxf.utilities.kafka.avro.AvroSchema1
import com.alliander.gxf.utilities.kafka.avro.AvroSchema2
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test

class AvroSerializerTest {
@Test
fun testEncodersCache() {
val message1 = AvroSchema1("field no 1", "field no 2")
val message2 = AvroSchema2("message in a bottle")
val serializer = AvroSerializer()

serializer.serialize("", message1)
serializer.serialize("", message2)

assertThat(AvroEncoder.encoders).containsKeys(AvroSchema1::class)
assertThat(AvroEncoder.encoders).containsKeys(AvroSchema2::class)
assertThat(AvroEncoder.encoders.size).isEqualTo(2)
}
}
4 changes: 2 additions & 2 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ dependencyResolutionManagement {
versionCatalogs {
create("libs") {
version("avro", "1.11.3")
version("msal", "1.16.0")
version("msal4j", "1.16.1")

library("avro", "org.apache.avro", "avro").versionRef("avro")
library("msal", "com.microsoft.azure", "msal4j").versionRef("msal")
library("msal", "com.microsoft.azure", "msal4j").versionRef("msal4j")
}

create("testLibs") {
Expand Down

0 comments on commit dd55998

Please sign in to comment.