Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FDP-2357: AvroSerializer can handle all Avro messages #17

Merged
merged 8 commits into from
Jul 4, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,41 @@ SPDX-License-Identifier: Apache-2.0
package com.gxf.utilities.kafka.avro

import org.apache.avro.message.BinaryMessageEncoder
import org.apache.avro.specific.SpecificRecord
import org.apache.avro.specific.SpecificData
import org.apache.avro.specific.SpecificRecordBase
import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.Serializer
import org.slf4j.LoggerFactory
import java.io.ByteArrayOutputStream
import kotlin.reflect.KClass

class AvroSerializer<T : SpecificRecordBase> : Serializer<T> {
private val encoders: HashMap<KClass<out T>, BinaryMessageEncoder<T>> = HashMap()

class AvroSerializer<T : SpecificRecord>(private val encoder: BinaryMessageEncoder<T>) : Serializer<T> {
companion object {
private val logger = LoggerFactory.getLogger(AvroSerializer::class.java)
}

private fun getEncoder(message: T): BinaryMessageEncoder<T> {
val existingEncoder = encoders[message::class]

if(existingEncoder != null) {
return existingEncoder
}

val newEncoder = BinaryMessageEncoder<T>(SpecificData(), message.schema)
encoders[message::class] = newEncoder
return newEncoder
}

/**
* Serializes a Byte Array to an Avro specific record
*/
override fun serialize(topic: String?, data: T): ByteArray {
try {
logger.trace("Serializing for {}", topic)
val outputStream = ByteArrayOutputStream()
val encoder = getEncoder(data)
encoder.encode(data, outputStream)
return outputStream.toByteArray()
} catch (ex: Exception) {
Expand Down
Loading