Skip to content

Commit

Permalink
JSON conversion for array, strings and array of strings
Browse files Browse the repository at this point in the history
cleanup
  • Loading branch information
shraddha-ca committed Aug 4, 2023
1 parent 55274dd commit d0913bd
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 14 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {

// Kafka dependencies
implementation("org.apache.kafka:connect-api:$kafkaVersion")
implementation("org.apache.kafka:connect-json:$kafkaVersion")
implementation("org.apache.kafka:connect-transforms:$kafkaVersion")
implementation("org.apache.avro:avro:1.11.1")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import org.apache.kafka.connect.data.Field
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.json.JsonConverter
import org.apache.kafka.connect.transforms.Transformation
import org.apache.kafka.connect.transforms.util.Requirements
import org.apache.kafka.connect.transforms.util.SchemaUtil
import org.slf4j.LoggerFactory
import java.util.Collections

/**
* A generic custom transform for RedShift
Expand Down Expand Up @@ -46,7 +48,7 @@ class RedShiftArrayTransformer<R : ConnectRecord<R>> : Transformation<R> {
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.valueSchema(),
targetPayload.schema(),
targetPayload,
record.timestamp()
)
Expand All @@ -65,6 +67,8 @@ class RedShiftArrayTransformer<R : ConnectRecord<R>> : Transformation<R> {
}

private fun targetPayload(sourceValue: Struct, sourceSchema: Schema): Struct {
val props = Collections.singletonMap("schemas.enable", false)
jsonConverter.configure(props, true)
val builder = SchemaUtil.copySchemaBasics(sourceSchema, SchemaBuilder.struct())
for (field in sourceSchema.fields()) {
builder.field(field.name(), updateSchema(field))
Expand All @@ -73,14 +77,18 @@ class RedShiftArrayTransformer<R : ConnectRecord<R>> : Transformation<R> {
val targetPayload = Struct(newSchema)
for (field in newSchema.fields()) {
val fieldVal = sourceValue.get(field.name())
if (field.schema().type() == sourceSchema.field(field.name()).schema().type()) {
val fieldSchema = sourceSchema.field(field.name()).schema()
if (field.schema().type() == fieldSchema.type()) {
targetPayload.put(field.name(), fieldVal)
} else {
targetPayload.put(field.name(), objectMapper.writeValueAsString(fieldVal))
val converted = jsonConverter.fromConnectData("", fieldSchema, fieldVal)
val fieldString = objectMapper.writeValueAsString(objectMapper.readTree(converted))
targetPayload.put(field.name(), fieldString)
}
}
return targetPayload
}

private val objectMapper = ObjectMapper()
private val jsonConverter = JsonConverter()
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.mongodb.kafka.connect.source.schema.AvroSchema
import com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue
import com.mongodb.kafka.connect.util.ClassHelper
import com.mongodb.kafka.connect.util.ConfigHelper
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaAndValue
import org.apache.kafka.connect.source.SourceRecord
import org.junit.Before
Expand All @@ -32,14 +33,15 @@ class RedShiftArrayTransformerTest {

private lateinit var transformer: RedShiftArrayTransformer<SourceRecord>

private fun hasNoArrays(obj: Any): Boolean {
var hasArray = false
obj::class.members.forEach { member ->
when {
member is Array<*> -> hasArray = true
private fun hasNoArrays(obj: SourceRecord): Boolean {

var hasNoArray = true
for (field in obj.valueSchema().fields()) {
if (field.schema().type() == Schema.Type.ARRAY) {
hasNoArray = false
}
}
return hasArray
return hasNoArray
}

@Before
Expand All @@ -60,9 +62,9 @@ class RedShiftArrayTransformerTest {
)

val transformedRecord = transformer.apply(sourceRecord)
System.out.println(sourceRecord)
System.out.println(transformedRecord)

// System.out.println(sourceRecord)
// System.out.println(transformedRecord)
hasNoArrays(sourceRecord)
assertTrue(hasNoArrays(transformedRecord))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
},
"service": "Influx"
},
"demographic_value_assignments": [
"test_array_of_structs": [
{
"demographic_id": {
"string": "5c579970-684e-4911-a077-6bf407fb478d"
Expand All @@ -56,5 +56,13 @@
"string": "ecc0db2e-486e-4f4a-a54a-db21673e1a2b"
}
}
],
"test_string_array": [
"a", "b", "c"
],
"test_array_of_arrays": [
["a", "b", "c"],
["e"],
["f", "g"]
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@
}
},
{
"name": "demographic_value_assignments",
"name": "test_array_of_structs",
"type": {
"type": "array",
"items": {
Expand Down Expand Up @@ -243,6 +243,28 @@
]
}
}
},
{
"name": "test_string_array",
"type": [
"null",
{
"type": "array",
"items": "string"
}
],
"default": null
},
{
"name": "test_array_of_arrays",
"type": [
"null",
{
"type" : "array",
"items" : "string"
}
],
"default": null
}
]
}

0 comments on commit d0913bd

Please sign in to comment.