Skip to content

Commit

Permalink
LC-191 Workaround Kafka Connect SMTs returning POJO (#1155)
Browse files Browse the repository at this point in the history
* LC-191 Workaround for bad Kafka Connect SMT practices

Some users have implemented SMTs that deviate from recommended best practices, returning Plain Old Java Objects (POJOs) instead of constructing Connect structs. Consequently, such SMTs encounter compatibility issues with the JsonConverter, which is designed to handle Connect data structures.

To address this, the proposed changes implement a mechanism to detect the presence of POJOs within the pipeline. Upon detection, the system leverages the Jackson Json writer to marshal the payload into JSON format, ensuring compatibility with the JsonConverter.

This code would only work for Json storage format and not for Avro/Parquet. We don't plan workarounds for those scenarios

* Suppression of invalid CVE warning. (#1138)

* Suppression of invalid CVE warning.

* Further suppressions

* Address the PR comments

---------

Co-authored-by: stheppi <[email protected]>
Co-authored-by: David Sloan <[email protected]>
  • Loading branch information
3 people authored Apr 16, 2024
1 parent 05c6aff commit da04836
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import io.lenses.streamreactor.connect.aws.s3.config._
import io.lenses.streamreactor.connect.aws.s3.model.location.S3LocationValidator
import io.lenses.streamreactor.connect.aws.s3.sink.config.S3SinkConfig
import io.lenses.streamreactor.connect.aws.s3.storage.S3FileMetadata
import io.lenses.streamreactor.connect.cloud.common.utils.ITSampleSchemaAndData.firstUsers
import io.lenses.streamreactor.connect.cloud.common.utils.ITSampleSchemaAndData.users
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings
import io.lenses.streamreactor.connect.cloud.common.config.JsonFormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData
Expand All @@ -48,12 +47,15 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.LeftPadP
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.NoOpPaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingService
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.CloudKeyNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamer
import io.lenses.streamreactor.connect.cloud.common.utils.ITSampleSchemaAndData.firstUsers
import io.lenses.streamreactor.connect.cloud.common.utils.ITSampleSchemaAndData.users
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.UsersSchemaDecimal
import org.apache.kafka.connect.data.Struct
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import scala.jdk.CollectionConverters._

class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest {

Expand Down Expand Up @@ -247,4 +249,92 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
s"""{"name":"sam","title":"mr","salary":100.430000000000000000}""",
)
}

"json sink" should "write single json record when the input is not best practice: Array of POJO" in {

val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
val config = S3SinkConfig(
S3ConnectionConfig(
None,
Some(s3Container.identity.identity),
Some(s3Container.identity.credential),
AuthMode.Credentials,
),
bucketOptions = Seq(
CloudSinkBucketOptions(
TopicName.some,
bucketAndPrefix,
commitPolicy = CommitPolicy(Count(1)),
formatSelection = JsonFormatSelection,
keyNamer = new CloudKeyNamer(
JsonFormatSelection,
defaultPartitionSelection(Values),
new OffsetFileNamer(
identity[String],
JsonFormatSelection.extension,
),
new PaddingService(Map[String, PaddingStrategy](
"partition" -> NoOpPaddingStrategy,
"offset" -> LeftPadPaddingStrategy(12, 0),
)),
),
localStagingArea = LocalStagingArea(localRoot),
partitionSelection = defaultPartitionSelection(Values),
dataStorage = DataStorageSettings.disabled,
), // JsonS3Format
),
offsetSeekerOptions = OffsetSeekerOptions(5),
compressionCodec,
batchDelete = true,
)

val sink = writerManagerCreator.from(config)
val topic = Topic(TopicName)
val offset = Offset(1)
val listOfPojo: java.util.List[Pojo] = List(
new Pojo("sam", "mr", 100.43),
new Pojo("laura", "ms", 429.06),
).asJava

sink.write(
TopicPartitionOffset(topic, 1, offset),
MessageDetail(NullSinkData(None),
ArraySinkData(listOfPojo, None),
Map.empty[String, SinkData],
None,
topic,
1,
offset,
),
)
sink.close()

listBucketPath(BucketName, "streamReactorBackups/myTopic/1/").size should be(1)

remoteFileAsString(BucketName, "streamReactorBackups/myTopic/1/1.json") should be(
"""[{"name":"sam","title":"mr","salary":100.43},{"name":"laura","title":"ms","salary":429.06}]""",
)
}
}

//create a class with the following fields
//.put("name", "sam").put("title", "mr").put("salary", 100.43),
class Pojo {
private var name: String = _
private var title: String = _
private var salary: Double = _

def this(name: String, title: String, salary: Double) = {
this()
this.name = name
this.title = title
this.salary = salary
}

def getName: String = name
def setName(name: String): Unit = this.name = name
def getTitle: String = title
def setTitle(title: String): Unit = this.title = title
def getSalary: Double = salary
def setSalary(salary: Double): Unit = this.salary = salary
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.lenses.streamreactor.connect.cloud.common.sink.conversion

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ByteArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MapSinkData
Expand All @@ -23,26 +25,42 @@ import io.lenses.streamreactor.connect.cloud.common.formats.writer.PrimitiveSink
import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.json.JsonConverter

import java.nio.ByteBuffer

import scala.jdk.CollectionConverters._
object ToJsonDataConverter {

private val jacksonJson: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule)

def convertMessageValueToByteArray(converter: JsonConverter, topic: Topic, data: SinkData): Array[Byte] =
data match {
case data: PrimitiveSinkData => converter.fromConnectData(topic.value, data.schema().orNull, data.safeValue)
case StructSinkData(structVal) => converter.fromConnectData(topic.value, data.schema().orNull, structVal)
case MapSinkData(map, schema) => converter.fromConnectData(topic.value, schema.orNull, map)
case ArraySinkData(array, schema) => converter.fromConnectData(topic.value, schema.orNull, array)
case ByteArraySinkData(_, _) => throw new IllegalStateException("Cannot currently write byte array as json")
case NullSinkData(schema) => converter.fromConnectData(topic.value, schema.orNull, null)
case other => throw new IllegalStateException(s"Unknown SinkData type, ${other.getClass.getSimpleName}")
case StructSinkData(structVal) => converter.fromConnectData(topic.value, data.schema().orNull, structVal)
case MapSinkData(map, schema) => converter.fromConnectData(topic.value, schema.orNull, map)
case ArraySinkData(array, _) if isPojo(array) =>
val json = jacksonJson.writeValueAsString(array)
json.getBytes()
case ArraySinkData(array, schema) =>
converter.fromConnectData(topic.value, schema.orNull, array)
case ByteArraySinkData(_, _) => throw new IllegalStateException("Cannot currently write byte array as json")
case NullSinkData(schema) => converter.fromConnectData(topic.value, schema.orNull, null)
case other => throw new IllegalStateException(s"Unknown SinkData type, ${other.getClass.getSimpleName}")
}

def convert(data: SinkData): Any = data match {
case data: PrimitiveSinkData => data.safeValue
case ByteArraySinkData(bArray, _) => ByteBuffer.wrap(bArray)
case data => data.value
}

/**
* This is a workaround to help some of the customers who use Kafka Connect SMT ignoring the best practices
*/
private def isPojo(array: java.util.List[_]) =
array.size() > 0 && array.asScala.exists {
case _: Struct => false
case _ => true
}
}
30 changes: 30 additions & 0 deletions suppression.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,34 @@
<packageUrl regex="true">^pkg:maven/com\.azure/azure\-identity@.*$</packageUrl>
<cpe>cpe:/a:microsoft:azure_cli</cpe>
</suppress>

<!--This CVE is not valid, verified by the project author.
https://github.com/JodaOrg/joda-time/issues/780
-->
<suppress>
<notes><![CDATA[
file name: kafka-connect-common-assembly-6.4-SNAPSHOT.jar (shaded: joda-time:joda-time:2.10.8)
]]></notes>
<packageUrl regex="true">^pkg:maven/joda\-time/joda\-time@.*$</packageUrl>
<vulnerabilityName>CVE-2024-23080</vulnerabilityName>
</suppress>

<!-- Similar to the above, there seems to be insufficient evidence for this one
https://vulners.com/cve/CVE-2024-23081
https://vulners.com/cve/CVE-2024-23082
-->
<suppress>
<notes><![CDATA[
file name: kafka-connect-gcp-storage-assembly-6.4-SNAPSHOT.jar (shaded: org.threeten:threetenbp:1.6.8)
]]></notes>
<packageUrl regex="true">^pkg:maven/org\.threeten/threetenbp@.*$</packageUrl>
<vulnerabilityName>CVE-2024-23081</vulnerabilityName>
</suppress>
<suppress>
<notes><![CDATA[
file name: kafka-connect-gcp-storage-assembly-6.4-SNAPSHOT.jar (shaded: org.threeten:threetenbp:1.6.8)
]]></notes>
<packageUrl regex="true">^pkg:maven/org\.threeten/threetenbp@.*$</packageUrl>
<vulnerabilityName>CVE-2024-23082</vulnerabilityName>
</suppress>
</suppressions>

0 comments on commit da04836

Please sign in to comment.