From a1ee11ef9e863d5316abea2e7e2f1259615a6108 Mon Sep 17 00:00:00 2001 From: "gina.maini" Date: Thu, 3 Aug 2023 11:45:33 +1000 Subject: [PATCH] removed tests and changed code a bit-- just to get this working and testing --- build.gradle.kts | 2 +- .../transforms/RedShiftArrayTransformer.kt | 46 +-- .../RedShiftArrayTransformerTest.kt | 72 ----- .../employee-data.employees-value-v1.avsc | 306 ------------------ 4 files changed, 16 insertions(+), 410 deletions(-) delete mode 100644 src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftArrayTransformerTest.kt delete mode 100644 src/test/resources/com/cultureamp/employee-data.employees-value-v1.avsc diff --git a/build.gradle.kts b/build.gradle.kts index d5d3529..dcd97c6 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -15,7 +15,7 @@ plugins { } // Package version -version = "0.5.0" +version = "0.6.0" repositories { // Use Maven Central for resolving dependencies. diff --git a/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftArrayTransformer.kt b/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftArrayTransformer.kt index 47953a4..d949c16 100644 --- a/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftArrayTransformer.kt +++ b/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftArrayTransformer.kt @@ -4,23 +4,22 @@ import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.common.config.ConfigDef import org.apache.kafka.connect.connector.ConnectRecord import org.apache.kafka.connect.data.Schema -// import org.apache.kafka.connect.data.SchemaBuilder import org.apache.kafka.connect.data.Struct import org.apache.kafka.connect.transforms.Transformation import org.apache.kafka.connect.transforms.util.Requirements import org.slf4j.LoggerFactory /** - * A generic custom transform for RedShift - * - * This transformer class manipulates fields in schemas by turning them from arrays into strings - * which is necessary for this: - * https://stackoverflow.com/questions/61360342/kafka-connect-flatten-transformation-of-a-postgres-record-with-array-field issue to be solved - * as RedShift does not support array types and arrays must be converted into strings. - * See https://docs.confluent.io/platform/current/connect/javadocs/javadoc/org/apache/kafka/connect/transforms/Transformation.html. - * - * @param R is ConnectRecord. - * @constructor Creates a RedShiftArrayTransformer Transformation for a given ConnectRecord + * A generic custom transform for RedShift + * + * This transformer class manipulates fields in schemas by turning them from arrays into strings + * which is necessary for this: + * https://stackoverflow.com/questions/61360342/kafka-connect-flatten-transformation-of-a-postgres-record-with-array-field issue to be solved + * as RedShift does not support array types and arrays must be converted into strings. + * See https://docs.confluent.io/platform/current/connect/javadocs/javadoc/org/apache/kafka/connect/transforms/Transformation.html. + * + * @param R is ConnectRecord. + * @constructor Creates a RedShiftArrayTransformer Transformation for a given ConnectRecord */ class RedShiftArrayTransformer> : Transformation { private val logger = LoggerFactory.getLogger(this::class.java.canonicalName) @@ -55,24 +54,7 @@ class RedShiftArrayTransformer> : Transformation { } } - // private fun gracefulArrayToString(obj: Any) { - // when (obj) { - // is Array -> - // obj.joinToString( - // prefix = "[", - // separator = ":", - // postfix = "]", - // limit = 3, - // truncated = "...", - // transform = { obj } - // ) - // is Array -> - - // else -> obj::class.ToString() - // } - // } - - private fun arrayToStringMapping(obj: Any) { + private fun arrayToStringMapping(obj:Any) { when (obj) { is Array<*> -> objectMapper.writeValueAsString(obj) else -> obj @@ -80,13 +62,15 @@ class RedShiftArrayTransformer> : Transformation { } private fun targetPayload(sourceValue: Struct, targetSchema: Schema): Struct { - + targetSchema::class.members.forEach { member -> arrayToStringMapping(member) } - + val targetPayload = Struct(targetSchema) return targetPayload } private val objectMapper = ObjectMapper() + + } diff --git a/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftArrayTransformerTest.kt b/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftArrayTransformerTest.kt deleted file mode 100644 index 2eef0df..0000000 --- a/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftArrayTransformerTest.kt +++ /dev/null @@ -1,72 +0,0 @@ -package com.cultureamp.kafka.connect.plugins.transforms - -import org.apache.avro.Schema -import org.apache.avro.generic.GenericData -import org.apache.kafka.connect.source.SourceRecord -import org.junit.Before -import java.io.File -import java.nio.file.Files -import kotlin.test.Test -import kotlin.test.assertTrue - -/** - * - * A generic custom transform for RedShift - * - * This transformer class manipulates fields in schemas by turning them from arrays into strings - * which is necessary for this: - * https://stackoverflow.com/questions/61360342/kafka-connect-flatten-transformation-of-a-postgres-record-with-array-field issue to be solved - * as RedShift does not support array types and arrays must be converted into strings. - * See https://docs.confluent.io/platform/current/connect/javadocs/javadoc/org/apache/kafka/connect/transforms/Transformation.html. - * - * @param R is ConnectRecord. - * @constructor Creates a RedShiftArrayTransformer Transformation for a given ConnectRecord - * - */ -class RedShiftArrayTransformerTest { - - private lateinit var transformer: RedShiftArrayTransformer - - private fun hasNoArrays(obj: Any): Boolean { - var hasArray = false - obj::class.members.forEach { member -> - when { - member is Array<*> -> hasArray = true - } - } - return hasArray - } - - @Before - fun setUp() { - transformer = RedShiftArrayTransformer() - } - - @Test - fun `can transform ECST Employee data that has arrays into string fields`() { - - val bytes = fileContent("com/cultureamp/employee-data.employees-value-v1.json") - val parser = Schema.Parser() - val schema = parser.parse(bytes) - val avroRecord = GenericData.Record(schema) - - val transformedRecord = transformer.apply( - SourceRecord( - null, - null, - "employee data ecst test", - avroRecord.schema(), - avroRecord.value() - ) - ) - - assertTrue(hasNoArrays(transformedRecord)) - } - - private fun fileContent(fileName: String): String { - val url = this.javaClass.classLoader - .getResource(fileName) ?: throw IllegalArgumentException("$fileName is not found 1") - - return String(Files.readAllBytes(File(url.file).toPath())) - } -} diff --git a/src/test/resources/com/cultureamp/employee-data.employees-value-v1.avsc b/src/test/resources/com/cultureamp/employee-data.employees-value-v1.avsc deleted file mode 100644 index 9395c05..0000000 --- a/src/test/resources/com/cultureamp/employee-data.employees-value-v1.avsc +++ /dev/null @@ -1,306 +0,0 @@ -{ - "type": "record", - "name": "Event", - "namespace": "com.cultureamp.employee.v1", - "fields": [ - { - "name": "id", - "type": { - "type": "string", - "logicalType": "uuid" - } - }, - { - "name": "event_type", - "type": { - "name": "event_type", - "type": "enum", - "namespace": "com.cultureamp.employee.v1", - "symbols": [ - "employee_added", - "employee_birth_date_changed", - "employee_demographic_values_adjusted", - "employee_email_changed", - "employee_employee_id_changed", - "employee_end_date_changed", - "employee_erased", - "employee_locale_changed", - "employee_manager_assigned", - "employee_name_changed", - "employee_name_locked", - "employee_name_unlocked", - "employee_observer_status_changed", - "employee_preferred_name_changed", - "employee_start_date_changed" - ] - } - }, - { - "name": "account_id", - "type": { - "type": "string", - "logicalType": "uuid" - } - }, - { - "name": "employee_id", - "type": { - "type": "string", - "logicalType": "uuid" - } - }, - { - "name": "event_created_at", - "type": { - "type": "long", - "logicalType": "timestamp-millis" - } - }, - { - "name": "body", - "type": { - "type": "record", - "name": "Employee", - "namespace": "com.cultureamp.employee.v1", - "fields": [ - { - "name": "source", - "type": [ - "null", - "string" - ], - "default": null - }, - { - "name": "employee_id", - "type": [ - "null", - "string" - ], - "default": null - }, - { - "name": "email", - "type": [ - "null", - "string" - ], - "default": null - }, - { - "name": "name", - "type": [ - "null", - "string" - ], - "default": null - }, - { - "name": "preferred_name", - "type": [ - "null", - "string" - ], - "default": null - }, - { - "name": "birth_date", - "type": [ - "null", - { - "type": "string", - "logicalType": "date" - } - ], - "default": null - }, - { - "name": "start_date", - "type": [ - "null", - { - "type": "string", - "logicalType": "date" - } - ], - "default": null - }, - { - "name": "end_date", - "type": [ - "null", - { - "type": "string", - "logicalType": "date" - } - ], - "default": null - }, - { - "name": "locale", - "type": [ - "null", - "string" - ], - "default": null - }, - { - "name": "observer", - "type": [ - "null", - "boolean" - ], - "default": null - }, - { - "name": "gdpr_erasure_request_id", - "type": [ - "null", - "string" - ], - "default": null - }, - { - "name": "demographic_value_assignments", - "type": { - "type": "array", - "items": { - "type": "record", - "name": "DemographicValueAssignment", - "namespace": "com.cultureamp.employee.v1", - "fields": [ - { - "name": "demographic_id", - "type": { - "type": "string", - "logicalType": "uuid" - } - }, - { - "name": "demographic_value_id", - "type": [ - "null", - { - "type": "string", - "logicalType": "uuid" - } - ], - "default": null - } - ] - } - } - }, - { - "name": "manager_assignment", - "type": [ - "null", - { - "type": "record", - "name": "ManagerAssignment", - "namespace": "com.cultureamp.employee.v1", - "fields": [ - { - "name": "manager_id", - "type": [ - "null", - { - "type": "string", - "logicalType": "uuid" - } - ], - "default": null - }, - { - "name": "demographic_id", - "type": { - "type": "string", - "logicalType": "uuid" - } - } - ] - } - ], - "default": null - }, - { - "name": "erased", - "type": "boolean" - }, - { - "name": "created_at", - "type": { - "type": "long", - "logicalType": "timestamp-millis" - } - }, - { - "name": "updated_at", - "type": { - "type": "long", - "logicalType": "timestamp-millis" - } - }, - { - "name": "deleted_at", - "type": [ - "null", - { - "type": "long", - "logicalType": "timestamp-millis" - } - ], - "default": null - } - ] - } - }, - { - "name": "metadata", - "type": { - "type": "record", - "name": "Metadata", - "namespace": "com.cultureamp.employee.v1", - "fields": [ - { - "name": "correlation_id", - "type": [ - "null", - { - "type": "string", - "logicalType": "uuid" - } - ], - "default": null - }, - { - "name": "causation_id", - "type": [ - "null", - { - "type": "string", - "logicalType": "uuid" - } - ], - "default": null - }, - { - "name": "executor_id", - "type": [ - "null", - { - "type": "string", - "logicalType": "uuid" - } - ], - "default": null - }, - { - "name": "service", - "type": "string" - } - ] - } - } - ] -} \ No newline at end of file