Skip to content

Commit

Permalink
removed tests and changed code a bit-- just to get this working and t…
Browse files Browse the repository at this point in the history
…esting
  • Loading branch information
ginamaini committed Aug 3, 2023
1 parent 9388a85 commit a1ee11e
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 410 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ plugins {
}

// Package version
version = "0.5.0"
version = "0.6.0"

repositories {
// Use Maven Central for resolving dependencies.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,22 @@ import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.ConnectRecord
import org.apache.kafka.connect.data.Schema
// import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.transforms.Transformation
import org.apache.kafka.connect.transforms.util.Requirements
import org.slf4j.LoggerFactory

/**
* A generic custom transform for RedShift
*
* This transformer class manipulates fields in schemas by turning them from arrays into strings
* which is necessary for this:
* https://stackoverflow.com/questions/61360342/kafka-connect-flatten-transformation-of-a-postgres-record-with-array-field issue to be solved
* as RedShift does not support array types and arrays must be converted into strings.
* See https://docs.confluent.io/platform/current/connect/javadocs/javadoc/org/apache/kafka/connect/transforms/Transformation.html.
*
* @param R is ConnectRecord<R>.
* @constructor Creates a RedShiftArrayTransformer Transformation<R> for a given ConnectRecord<T>
* A generic custom transform for RedShift
*
* This transformer class manipulates fields in schemas by turning them from arrays into strings
* which is necessary for this:
* https://stackoverflow.com/questions/61360342/kafka-connect-flatten-transformation-of-a-postgres-record-with-array-field issue to be solved
* as RedShift does not support array types and arrays must be converted into strings.
* See https://docs.confluent.io/platform/current/connect/javadocs/javadoc/org/apache/kafka/connect/transforms/Transformation.html.
*
* @param R is ConnectRecord<R>.
* @constructor Creates a RedShiftArrayTransformer Transformation<R> for a given ConnectRecord<T>
*/
class RedShiftArrayTransformer<R : ConnectRecord<R>> : Transformation<R> {
private val logger = LoggerFactory.getLogger(this::class.java.canonicalName)
Expand Down Expand Up @@ -55,38 +54,23 @@ class RedShiftArrayTransformer<R : ConnectRecord<R>> : Transformation<R> {
}
}

// private fun gracefulArrayToString(obj: Any) {
// when (obj) {
// is Array<CharSequence> ->
// obj.joinToString(
// prefix = "[",
// separator = ":",
// postfix = "]",
// limit = 3,
// truncated = "...",
// transform = { obj }
// )
// is Array<Struct> ->

// else -> obj::class.ToString()
// }
// }

private fun arrayToStringMapping(obj: Any) {
private fun arrayToStringMapping(obj:Any) {
when (obj) {
is Array<*> -> objectMapper.writeValueAsString(obj)
else -> obj
}
}

private fun targetPayload(sourceValue: Struct, targetSchema: Schema): Struct {

targetSchema::class.members.forEach { member -> arrayToStringMapping(member) }

val targetPayload = Struct(targetSchema)

return targetPayload
}

private val objectMapper = ObjectMapper()


}

This file was deleted.

Loading

0 comments on commit a1ee11e

Please sign in to comment.