Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Redshift generic transform for arrays to strings #11

Merged
merged 8 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ plugins {
}

// Package version
version = "0.5.0"
version = "0.6.0"

repositories {
// Use Maven Central for resolving dependencies.
Expand All @@ -31,6 +31,7 @@ dependencies {

// Kafka dependencies
implementation("org.apache.kafka:connect-api:$kafkaVersion")
implementation("org.apache.kafka:connect-json:$kafkaVersion")
implementation("org.apache.kafka:connect-transforms:$kafkaVersion")
implementation("org.apache.avro:avro:1.11.1")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.cultureamp.kafka.connect.plugins.transforms

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.ConnectRecord
import org.apache.kafka.connect.data.Field
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.json.JsonConverter
import org.apache.kafka.connect.transforms.Transformation
import org.apache.kafka.connect.transforms.util.Requirements
import org.apache.kafka.connect.transforms.util.SchemaUtil
import org.slf4j.LoggerFactory
import java.util.Collections

/**
* A generic custom transform for RedShift
*
* This transformer class manipulates fields in schemas by turning them from arrays into strings
* which is necessary for this:
* https://stackoverflow.com/questions/61360342/kafka-connect-flatten-transformation-of-a-postgres-record-with-array-field issue to be solved
* as RedShift does not support array types and arrays must be converted into strings.
* See https://docs.confluent.io/platform/current/connect/javadocs/javadoc/org/apache/kafka/connect/transforms/Transformation.html.
*
* @param R is ConnectRecord<R>.
* @constructor Creates a RedShiftArrayTransformer Transformation<R> for a given ConnectRecord<T>
*/
class RedShiftArrayTransformer<R : ConnectRecord<R>> : Transformation<R> {
private val logger = LoggerFactory.getLogger(this::class.java.canonicalName)
private val purpose = "RedShift™ JSON Array to String Transform"

override fun configure(configs: MutableMap<String, *>?) {}

override fun config(): ConfigDef {
return ConfigDef()
}

override fun close() {}

override fun apply(record: R): R {
try {
val sourceValue = Requirements.requireStruct(record.value(), purpose)
val targetPayload = targetPayload(sourceValue, record.valueSchema())

return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
targetPayload.schema(),
targetPayload,
record.timestamp()
)
} catch (e: Exception) {
logger.error("Exception: ", e)
logger.error("Record Received: " + record.value())
throw e
}
}

private fun updateSchema(field: Field): Schema {
if (field.schema().type() == Schema.Type.ARRAY) {
return SchemaBuilder.string().build()
}
return field.schema()
}

private fun targetPayload(sourceValue: Struct, sourceSchema: Schema): Struct {
val props = Collections.singletonMap("schemas.enable", false)
jsonConverter.configure(props, true)
val builder = SchemaUtil.copySchemaBasics(sourceSchema, SchemaBuilder.struct())
for (field in sourceSchema.fields()) {
builder.field(field.name(), updateSchema(field))
}
val newSchema = builder.build()
val targetPayload = Struct(newSchema)
for (field in newSchema.fields()) {
val fieldVal = sourceValue.get(field.name())
val fieldSchema = sourceSchema.field(field.name()).schema()
if (field.schema().type() == fieldSchema.type()) {
targetPayload.put(field.name(), fieldVal)
} else {
val converted = jsonConverter.fromConnectData("", fieldSchema, fieldVal)
var fieldString = objectMapper.readTree(converted).toString()
fieldString = fieldString.replace("\"[", "[").replace("]\"", "]").replace("\"{", "{").replace("}\"", "}")
targetPayload.put(field.name(), fieldString)
}
}
return targetPayload
}

private val objectMapper = ObjectMapper()
private val jsonConverter = JsonConverter()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.cultureamp.kafka.connect.plugins.transforms

import com.mongodb.kafka.connect.source.MongoSourceConfig
import com.mongodb.kafka.connect.source.json.formatter.JsonWriterSettingsProvider
import com.mongodb.kafka.connect.source.schema.AvroSchema
import com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue
import com.mongodb.kafka.connect.util.ClassHelper
import com.mongodb.kafka.connect.util.ConfigHelper
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaAndValue
import org.apache.kafka.connect.source.SourceRecord
import org.junit.Before
import java.io.File
import java.nio.file.Files
import kotlin.test.Test
import kotlin.test.assertTrue

/**
*
* A generic custom transform for RedShift
*
* This transformer class manipulates fields in schemas by turning them from arrays into strings
* which is necessary for this:
* https://stackoverflow.com/questions/61360342/kafka-connect-flatten-transformation-of-a-postgres-record-with-array-field issue to be solved
* as RedShift does not support array types and arrays must be converted into strings.
* See https://docs.confluent.io/platform/current/connect/javadocs/javadoc/org/apache/kafka/connect/transforms/Transformation.html.
*
* @param R is ConnectRecord<R>.
* @constructor Creates a RedShiftArrayTransformer Transformation<R> for a given ConnectRecord<T>
*
*/
class RedShiftArrayTransformerTest {

private lateinit var transformer: RedShiftArrayTransformer<SourceRecord>

private fun hasNoArrays(obj: SourceRecord): Boolean {

var hasNoArray = true
for (field in obj.valueSchema().fields()) {
if (field.schema().type() == Schema.Type.ARRAY) {
hasNoArray = false
}
}
return hasNoArray
}

@Before
fun setUp() {
transformer = RedShiftArrayTransformer()
}

@Test
fun `can transform ECST Employee data that has arrays into string fields`() {

val avroRecord = payload("com/cultureamp/employee-data.employees-v1.json")
val sourceRecord = SourceRecord(
null,
null,
"employee data ecst test",
avroRecord.schema(),
avroRecord.value()
)

val transformedRecord = transformer.apply(sourceRecord)
hasNoArrays(sourceRecord)
assertTrue(hasNoArrays(transformedRecord))
}

private val sourceSchema = AvroSchema.fromJson(fileContent("com/cultureamp/employee-data.employees-value-v1.avsc"))

private fun payload(fileName: String): SchemaAndValue {
val document = ConfigHelper.documentFromString(fileContent(fileName)).get()

return BsonValueToSchemaAndValue(jsonWriterSettings)
.toSchemaAndValue(sourceSchema, document.toBsonDocument())
}

private fun fileContent(fileName: String): String {
val url = this.javaClass.classLoader
.getResource(fileName) ?: throw IllegalArgumentException("$fileName is not found 1")

return String(Files.readAllBytes(File(url.file).toPath()))
}

private val jsonWriterSettings =
ClassHelper.createInstance(
MongoSourceConfig.OUTPUT_JSON_FORMATTER_CONFIG,
"com.mongodb.kafka.connect.source.json.formatter.DefaultJson",
JsonWriterSettingsProvider::class.java
).jsonWriterSettings
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
{
"id": "c63526f8-dec7-4ef8-96d8-18756076f064",
"account_id": "0a05e2a3-7258-4cf5-a7f4-e21b08c030c5",
"employee_id": "c63526f8-dec7-4ef8-96d8-18756076f064",
"event_created_at": 1536899741117,
"body": {
"source": {
"string": ""
},
"employee_id": null,
"email": {
"string": "[email protected]"
},
"name": {
"string": "Test User 800702"
},
"preferred_name": null,
"birth_date": null,
"start_date": {
"string": "2017-02-24"
},
"end_date": null,
"locale": null,
"observer": false,
"gdpr_erasure_request_id": null,
"manager_assignment": null,
"erased": false,
"created_at": 1536899741113,
"updated_at": 1536899741117,
"deleted_at": null
},
"metadata": {
"correlation_id": {
"string": "b9098254-a1db-4114-9a39-baa17ab18fbf"
},
"causation_id": null,
"executor_id": {
"string": "379907ca-632c-4e83-89c4-9dbe0e759ad3"
},
"service": "Influx"
},
"test_array_of_structs": [
{
"demographic_id": {
"string": "5c579970-684e-4911-a077-6bf407fb478d"
},
"demographic_value_id": {
"string": "427b936f-e932-4673-95a2-acd3e3b900b1"
}
},
{
"demographic_id": {
"string": "460f6b2d-03c5-46cf-ba55-aa14477a12dc"
},
"demographic_value_id": {
"string": "ecc0db2e-486e-4f4a-a54a-db21673e1a2b"
}
}
],
"test_string_array": [
"a", "b", "c"
],
"test_array_of_arrays": [
["a", "b", "c"],
["e"],
["f", "g"]
]
}
Loading
Loading