Skip to content

Commit

Permalink
added back tests
Browse files Browse the repository at this point in the history
  • Loading branch information
shraddha-ca committed Aug 3, 2023
1 parent 0c32218 commit 55274dd
Show file tree
Hide file tree
Showing 4 changed files with 421 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package com.cultureamp.kafka.connect.plugins.transforms
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.ConnectRecord
import org.apache.kafka.connect.data.Field
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.transforms.Transformation
import org.apache.kafka.connect.transforms.util.Requirements
import org.apache.kafka.connect.transforms.util.SchemaUtil
import org.slf4j.LoggerFactory

/**
Expand Down Expand Up @@ -54,19 +57,28 @@ class RedShiftArrayTransformer<R : ConnectRecord<R>> : Transformation<R> {
}
}

private fun arrayToStringMapping(obj: Any) {
when (obj) {
is Array<*> -> objectMapper.writeValueAsString(obj)
else -> obj
private fun updateSchema(field: Field): Schema {
if (field.schema().type() == Schema.Type.ARRAY) {
return SchemaBuilder.string().build()
}
return field.schema()
}

private fun targetPayload(sourceValue: Struct, targetSchema: Schema): Struct {

targetSchema::class.members.forEach { member -> arrayToStringMapping(member) }

val targetPayload = Struct(targetSchema)

private fun targetPayload(sourceValue: Struct, sourceSchema: Schema): Struct {
val builder = SchemaUtil.copySchemaBasics(sourceSchema, SchemaBuilder.struct())
for (field in sourceSchema.fields()) {
builder.field(field.name(), updateSchema(field))
}
val newSchema = builder.build()
val targetPayload = Struct(newSchema)
for (field in newSchema.fields()) {
val fieldVal = sourceValue.get(field.name())
if (field.schema().type() == sourceSchema.field(field.name()).schema().type()) {
targetPayload.put(field.name(), fieldVal)
} else {
targetPayload.put(field.name(), objectMapper.writeValueAsString(fieldVal))
}
}
return targetPayload
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.cultureamp.kafka.connect.plugins.transforms

import com.mongodb.kafka.connect.source.MongoSourceConfig
import com.mongodb.kafka.connect.source.json.formatter.JsonWriterSettingsProvider
import com.mongodb.kafka.connect.source.schema.AvroSchema
import com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue
import com.mongodb.kafka.connect.util.ClassHelper
import com.mongodb.kafka.connect.util.ConfigHelper
import org.apache.kafka.connect.data.SchemaAndValue
import org.apache.kafka.connect.source.SourceRecord
import org.junit.Before
import java.io.File
import java.nio.file.Files
import kotlin.test.Test
import kotlin.test.assertTrue

/**
*
* A generic custom transform for RedShift
*
* This transformer class manipulates fields in schemas by turning them from arrays into strings
* which is necessary for this:
* https://stackoverflow.com/questions/61360342/kafka-connect-flatten-transformation-of-a-postgres-record-with-array-field issue to be solved
* as RedShift does not support array types and arrays must be converted into strings.
* See https://docs.confluent.io/platform/current/connect/javadocs/javadoc/org/apache/kafka/connect/transforms/Transformation.html.
*
* @param R is ConnectRecord<R>.
* @constructor Creates a RedShiftArrayTransformer Transformation<R> for a given ConnectRecord<T>
*
*/
class RedShiftArrayTransformerTest {

private lateinit var transformer: RedShiftArrayTransformer<SourceRecord>

private fun hasNoArrays(obj: Any): Boolean {
var hasArray = false
obj::class.members.forEach { member ->
when {
member is Array<*> -> hasArray = true
}
}
return hasArray
}

@Before
fun setUp() {
transformer = RedShiftArrayTransformer()
}

@Test
fun `can transform ECST Employee data that has arrays into string fields`() {

val avroRecord = payload("com/cultureamp/employee-data.employees-v1.json")
val sourceRecord = SourceRecord(
null,
null,
"employee data ecst test",
avroRecord.schema(),
avroRecord.value()
)

val transformedRecord = transformer.apply(sourceRecord)
System.out.println(sourceRecord)
System.out.println(transformedRecord)

assertTrue(hasNoArrays(transformedRecord))
}

private val sourceSchema = AvroSchema.fromJson(fileContent("com/cultureamp/employee-data.employees-value-v1.avsc"))

private fun payload(fileName: String): SchemaAndValue {
val document = ConfigHelper.documentFromString(fileContent(fileName)).get()

return BsonValueToSchemaAndValue(jsonWriterSettings)
.toSchemaAndValue(sourceSchema, document.toBsonDocument())
}

private fun fileContent(fileName: String): String {
val url = this.javaClass.classLoader
.getResource(fileName) ?: throw IllegalArgumentException("$fileName is not found 1")

return String(Files.readAllBytes(File(url.file).toPath()))
}

private val jsonWriterSettings =
ClassHelper.createInstance(
MongoSourceConfig.OUTPUT_JSON_FORMATTER_CONFIG,
"com.mongodb.kafka.connect.source.json.formatter.DefaultJson",
JsonWriterSettingsProvider::class.java
).jsonWriterSettings
}
60 changes: 60 additions & 0 deletions src/test/resources/com/cultureamp/employee-data.employees-v1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{
"id": "c63526f8-dec7-4ef8-96d8-18756076f064",
"account_id": "0a05e2a3-7258-4cf5-a7f4-e21b08c030c5",
"employee_id": "c63526f8-dec7-4ef8-96d8-18756076f064",
"event_created_at": 1536899741117,
"body": {
"source": {
"string": ""
},
"employee_id": null,
"email": {
"string": "[email protected]"
},
"name": {
"string": "Test User 800702"
},
"preferred_name": null,
"birth_date": null,
"start_date": {
"string": "2017-02-24"
},
"end_date": null,
"locale": null,
"observer": false,
"gdpr_erasure_request_id": null,
"manager_assignment": null,
"erased": false,
"created_at": 1536899741113,
"updated_at": 1536899741117,
"deleted_at": null
},
"metadata": {
"correlation_id": {
"string": "b9098254-a1db-4114-9a39-baa17ab18fbf"
},
"causation_id": null,
"executor_id": {
"string": "379907ca-632c-4e83-89c4-9dbe0e759ad3"
},
"service": "Influx"
},
"demographic_value_assignments": [
{
"demographic_id": {
"string": "5c579970-684e-4911-a077-6bf407fb478d"
},
"demographic_value_id": {
"string": "427b936f-e932-4673-95a2-acd3e3b900b1"
}
},
{
"demographic_id": {
"string": "460f6b2d-03c5-46cf-ba55-aa14477a12dc"
},
"demographic_value_id": {
"string": "ecc0db2e-486e-4f4a-a54a-db21673e1a2b"
}
}
]
}
Loading

0 comments on commit 55274dd

Please sign in to comment.