Skip to content

Commit

Permalink
updated code and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
shraddha-ca committed Sep 12, 2023
1 parent 4097a4a commit 250fd45
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.util.Collections
* @param R is ConnectRecord<R>.
* @constructor Creates a RedShiftArrayTransformer Transformation<R> for a given ConnectRecord<T>
*/
class RedShiftArrayTransformer<R : ConnectRecord<R>> : Transformation<R> {
class RedShiftComplexDataTypeTransformer<R : ConnectRecord<R>> : Transformation<R> {
private val logger = LoggerFactory.getLogger(this::class.java.canonicalName)
private val purpose = "RedShift™ JSON Array to String Transform"

Expand Down Expand Up @@ -60,7 +60,7 @@ class RedShiftArrayTransformer<R : ConnectRecord<R>> : Transformation<R> {
}

private fun updateSchema(field: Field): Schema {
if (field.schema().type() == Schema.Type.ARRAY) {
if (field.schema().type() == Schema.Type.ARRAY || field.schema().type() == Schema.Type.MAP) {
return SchemaBuilder.string().build()
}
return field.schema()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,27 @@ import kotlin.test.assertTrue
* See https://docs.confluent.io/platform/current/connect/javadocs/javadoc/org/apache/kafka/connect/transforms/Transformation.html.
*
* @param R is ConnectRecord<R>.
* @constructor Creates a RedShiftArrayTransformer Transformation<R> for a given ConnectRecord<T>
* @constructor Creates a RedShiftComplexDataTypeTransformer Transformation<R> for a given ConnectRecord<T>
*
*/
class RedShiftArrayTransformerTest {
class RedShiftComplexDataTypeTransformerTest {

private lateinit var transformer: RedShiftArrayTransformer<SourceRecord>
private lateinit var transformer: RedShiftComplexDataTypeTransformer<SourceRecord>

private fun hasNoArrays(obj: SourceRecord): Boolean {
private fun hasNoComplexTypes(obj: SourceRecord): Boolean {

var hasNoArray = true
var hasNoComplexTypes = true
for (field in obj.valueSchema().fields()) {
if (field.schema().type() == Schema.Type.ARRAY) {
hasNoArray = false
if (field.schema().type() == Schema.Type.ARRAY || field.schema().type() == Schema.Type.MAP) {
hasNoComplexTypes = false
}
}
return hasNoArray
return hasNoComplexTypes
}

@Before
fun setUp() {
transformer = RedShiftArrayTransformer()
transformer = RedShiftComplexDataTypeTransformer()
}

@Test
Expand All @@ -62,8 +62,8 @@ class RedShiftArrayTransformerTest {
)

val transformedRecord = transformer.apply(sourceRecord)
hasNoArrays(sourceRecord)
assertTrue(hasNoArrays(transformedRecord))
hasNoComplexTypes(sourceRecord)
assertTrue(hasNoComplexTypes(transformedRecord))
}

private val sourceSchema = AvroSchema.fromJson(fileContent("com/cultureamp/employee-data.employees-value-v1.avsc"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@
}
}
],
"test_map": {
"added_users_count": 0,
"ignored_new_demographics_count": 0,
"ignored_users_count": 0,
"inactive_updated_users_count": 0,
"reactivated_users_count": 0,
"removed_users_count": 0,
"updated_users_count": 0
},
"test_string_array": [
"a", "b", "c"
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,17 @@
}
],
"default": null
},
{
"name": "test_map",
"type": [
"null",
{
"type": "map",
"values": "int"
}
],
"default": null
}
]
}

0 comments on commit 250fd45

Please sign in to comment.