From afc96d8687a97086a641e6c2e94974b58344319c Mon Sep 17 00:00:00 2001
From: David Sloan <33483659+davidsloan@users.noreply.github.com>
Date: Wed, 27 Mar 2024 09:11:36 +0000
Subject: [PATCH 1/3] Feat/http tests (#1059)
* Http Sink testing
* HTTP Sink Functional Testing
* Allow not specifying headers
* Remove commented line
* Adjust default to 5 seconds
* Making naming more consistent
* Align http sink default flush configuration with the cloud connectors
* Fix from review
---
build.sbt | 4 +-
.../connect/S3CompressionTest.scala | 6 +-
.../lenses/streamreactor/connect/S3Test.scala | 5 +-
.../sink/extractors/MapExtractorTest.scala | 4 +
.../common/sink/extractors/MapExtractor.scala | 27 +-
.../connect/test/HttpConfiguration.scala | 55 ++++
.../connect/test/HttpSinkTest.scala | 235 ++++++++++++++++++
.../connect/test/WiremockContainer.scala | 25 ++
.../connect/http/sink/HttpSinkTaskIT.scala | 44 ++--
.../connect/http/sink/HttpSinkTask.scala | 2 +-
.../connect/http/sink/HttpWriterManager.scala | 4 +-
.../http/sink/commit/HttpCommitPolicy.scala | 6 +-
.../http/sink/config/HttpSinkConfig.scala | 20 +-
.../http/sink/HttpSinkConfigTest.scala | 30 ++-
.../streamreactor/connect/model/Order.scala | 4 +-
15 files changed, 410 insertions(+), 61 deletions(-)
create mode 100644 kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpConfiguration.scala
create mode 100644 kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala
create mode 100644 kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/WiremockContainer.scala
diff --git a/build.sbt b/build.sbt
index 76aa0a696..6420e7a86 100644
--- a/build.sbt
+++ b/build.sbt
@@ -268,7 +268,7 @@ lazy val elastic7 = (project in file("kafka-connect-elastic7"))
lazy val http = (project in file("kafka-connect-http"))
.dependsOn(common)
- //.dependsOn(`test-common` % "fun->compile")
+ .dependsOn(`test-common` % "fun->compile")
.settings(
settings ++
Seq(
@@ -285,7 +285,7 @@ lazy val http = (project in file("kafka-connect-http"))
.configureAssembly(false)
.configureTests(baseTestDeps ++ kafkaConnectHttpTestDeps)
.configureIntegrationTests(baseTestDeps ++ kafkaConnectHttpTestDeps)
- //.configureFunctionalTests(kafkaConnectS3FuncTestDeps)
+ .configureFunctionalTests()
.enablePlugins(PackPlugin, ProtocPlugin)
lazy val influxdb = (project in file("kafka-connect-influxdb"))
diff --git a/kafka-connect-aws-s3/src/fun/scala/io/lenses/streamreactor/connect/S3CompressionTest.scala b/kafka-connect-aws-s3/src/fun/scala/io/lenses/streamreactor/connect/S3CompressionTest.scala
index 7fc3429db..bfbe77c52 100644
--- a/kafka-connect-aws-s3/src/fun/scala/io/lenses/streamreactor/connect/S3CompressionTest.scala
+++ b/kafka-connect-aws-s3/src/fun/scala/io/lenses/streamreactor/connect/S3CompressionTest.scala
@@ -11,7 +11,6 @@ import _root_.io.lenses.streamreactor.connect.testcontainers.scalatest.StreamRea
import cats.effect.IO
import cats.effect.testing.scalatest.AsyncIOSpec
import cats.implicits._
-import com.datastax.driver.core.utils.UUIDs
import com.typesafe.scalalogging.LazyLogging
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.ProducerRecord
@@ -23,6 +22,7 @@ import org.scalatest.prop.TableDrivenPropertyChecks
import org.scalatest.prop.TableFor3
import software.amazon.awssdk.services.s3.model._
+import java.util.UUID
import scala.jdk.CollectionConverters.ListHasAsScala
import scala.util.Random
@@ -98,8 +98,8 @@ class S3CompressionTest
case (s3Client, producer) =>
IO {
// Write records to
- val order = Order(1, "OP-DAX-P-20150201-95.7", 94.2, 100, UUIDs.timeBased.toString)
- val record = order.toRecord(order)
+ val order = Order(1, "OP-DAX-P-20150201-95.7", 94.2, 100, UUID.randomUUID().toString)
+ val record = order.toRecord
producer.send(new ProducerRecord[String, GenericRecord](topic, record)).get
producer.flush()
diff --git a/kafka-connect-aws-s3/src/fun/scala/io/lenses/streamreactor/connect/S3Test.scala b/kafka-connect-aws-s3/src/fun/scala/io/lenses/streamreactor/connect/S3Test.scala
index 417f30c18..6c3b71e39 100644
--- a/kafka-connect-aws-s3/src/fun/scala/io/lenses/streamreactor/connect/S3Test.scala
+++ b/kafka-connect-aws-s3/src/fun/scala/io/lenses/streamreactor/connect/S3Test.scala
@@ -11,7 +11,6 @@ import _root_.io.lenses.streamreactor.connect.testcontainers.S3Container
import _root_.io.lenses.streamreactor.connect.testcontainers.SchemaRegistryContainer
import cats.effect.IO
import cats.effect.testing.scalatest.AsyncIOSpec
-import com.datastax.driver.core.utils.UUIDs
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
@@ -21,6 +20,8 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks
import software.amazon.awssdk.services.s3.model._
+import java.util.UUID
+
class S3Test
extends AsyncFlatSpec
with AsyncIOSpec
@@ -52,7 +53,7 @@ class S3Test
it should "sink records" in {
- val order = Order(1, "OP-DAX-P-20150201-95.7", 94.2, 100, UUIDs.timeBased.toString)
+ val order = Order(1, "OP-DAX-P-20150201-95.7", 94.2, 100, UUID.randomUUID().toString)
val resources = for {
s3Client <- createS3ClientResource(container.identity, container.getEndpointUrl)
diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractorTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractorTest.scala
index 7bcfc0ef0..5ed3c1fa1 100644
--- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractorTest.scala
+++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractorTest.scala
@@ -43,4 +43,8 @@ class MapExtractorTest extends AnyFlatSpec with Matchers {
mapOfMapsOfStringsSchema,
) should be(Right("2"))
}
+
+ "lookupFieldValueFromStruct" should "handle map of maps without schema" in {
+ MapExtractor.extractPathFromMap(mapOfMapsOfStrings, PartitionNamePath("c", "d"), null) should be(Right("2"))
+ }
}
diff --git a/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala
index fadc2fc04..054239114 100644
--- a/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala
+++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala
@@ -39,11 +39,20 @@ object MapExtractor extends LazyLogging {
fieldName: PartitionNamePath,
schema: Schema,
): Either[ExtractorError, String] = {
- val mapKey = fieldName.head
- Option(map.get(mapKey))
- .fold(ExtractorError(ExtractorErrorType.MissingValue).asLeft[String]) {
- ComplexTypeExtractor.extractComplexType(_, fieldName.tail, schema.valueSchema())
- }
+
+ val maybeSchema = Option(schema)
+
+ val mapKey = fieldName.head
+ val maybeMapValue = Option(map.get(mapKey))
+
+ (maybeMapValue, maybeSchema) match {
+ case (None, _) =>
+ ExtractorError(ExtractorErrorType.MissingValue).asLeft[String]
+ case (Some(mapValue), Some(sch)) =>
+ ComplexTypeExtractor.extractComplexType(mapValue, fieldName.tail, sch.valueSchema())
+ case (Some(mapValue), None) =>
+ WrappedComplexTypeExtractor.extractFromComplexType(mapValue, fieldName.tail)
+ }
}
private def extractPrimitive(
@@ -51,9 +60,9 @@ object MapExtractor extends LazyLogging {
fieldName: String,
mapSchema: Schema,
): Either[ExtractorError, String] =
- Option(mapSchema.valueSchema())
- .fold(ExtractorError(ExtractorErrorType.MissingValue).asLeft[String]) {
- PrimitiveExtractor.extractPrimitiveValue(map.get(fieldName), _)
- }
+ Option(mapSchema).flatMap(_.valueSchema().some) match {
+ case Some(sch) => PrimitiveExtractor.extractPrimitiveValue(map.get(fieldName), sch)
+ case None => WrappedPrimitiveExtractor.extractFromPrimitive(map.get(fieldName))
+ }
}
diff --git a/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpConfiguration.scala b/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpConfiguration.scala
new file mode 100644
index 000000000..57c3a8a86
--- /dev/null
+++ b/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpConfiguration.scala
@@ -0,0 +1,55 @@
+package io.lenses.streamreactor.connect.test
+
+import _root_.io.lenses.streamreactor.connect.testcontainers.connect._
+import cats.implicits.catsSyntaxOptionId
+import cats.implicits.none
+import com.typesafe.scalalogging.LazyLogging
+import io.lenses.streamreactor.connect.http.sink.client.HttpMethod
+import io.lenses.streamreactor.connect.http.sink.config.BatchConfig
+import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig
+
+trait HttpConfiguration extends LazyLogging {
+
+ def sinkConfig(
+ randomTestId: String,
+ endpointUrl: String,
+ httpMethod: String,
+ contentTemplate: String,
+ headerTemplates: Seq[(String, String)],
+ topicName: String,
+ converters: Map[String, String],
+ ): ConnectorConfiguration = {
+ val configMap: Map[String, ConfigValue[_]] = converters.view.mapValues(new ConfigValue[String](_)).toMap ++
+ Map(
+ "connector.class" -> ConfigValue("io.lenses.streamreactor.connect.http.sink.HttpSinkConnector"),
+ "tasks.max" -> ConfigValue(1),
+ "topics" -> ConfigValue(topicName),
+ "connect.http.config" -> ConfigValue(
+ HttpSinkConfig(
+ HttpMethod.withNameInsensitive(httpMethod),
+ endpoint = endpointUrl,
+ content = contentTemplate,
+ Option.empty,
+ headers = headerTemplates.some,
+ ssl = Option.empty,
+ batch = Option(BatchConfig(1L.some, none, none)),
+ errorThreshold = Option.empty,
+ uploadSyncPeriod = Option.empty,
+ ).toJson,
+ ),
+ )
+ debugLogConnectorConfig(configMap)
+ ConnectorConfiguration(
+ "connector" + randomTestId,
+ configMap,
+ )
+ }
+
+ private def debugLogConnectorConfig(configMap: Map[String, ConfigValue[_]]): Unit = {
+ logger.debug("Creating connector with configuration:")
+ configMap.foreachEntry {
+ case (k, v) => logger.debug(s" $k => ${v.underlying}")
+ }
+ logger.debug(s"End connector config.")
+ }
+}
diff --git a/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala b/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala
new file mode 100644
index 000000000..e4690fd40
--- /dev/null
+++ b/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala
@@ -0,0 +1,235 @@
+package io.lenses.streamreactor.connect.test
+
+import cats.effect.IO
+import cats.effect.kernel.Resource
+import cats.effect.testing.scalatest.AsyncIOSpec
+import com.github.tomakehurst.wiremock.client.WireMock
+import com.github.tomakehurst.wiremock.client.WireMock._
+import com.github.tomakehurst.wiremock.http.RequestMethod
+import com.github.tomakehurst.wiremock.verification.LoggedRequest
+import com.typesafe.scalalogging.LazyLogging
+import io.confluent.kafka.serializers.KafkaAvroSerializer
+import io.confluent.kafka.serializers.KafkaJsonSerializer
+import io.lenses.streamreactor.connect.model.Order
+import io.lenses.streamreactor.connect.testcontainers.connect.KafkaConnectClient.createConnector
+import io.lenses.streamreactor.connect.testcontainers.scalatest.StreamReactorContainerPerSuite
+import org.apache.avro.generic.GenericRecord
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.StringSerializer
+import org.scalatest.funsuite.AsyncFunSuiteLike
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.BeforeAndAfter
+import org.scalatest.EitherValues
+
+import java.util.UUID
+import scala.jdk.CollectionConverters.ListHasAsScala
+
+class HttpSinkTest
+ extends AsyncFunSuiteLike
+ with BeforeAndAfter
+ with AsyncIOSpec
+ with Matchers
+ with StreamReactorContainerPerSuite
+ with LazyLogging
+ with EitherValues
+ with HttpConfiguration {
+
+ private val stringSerializer = classOf[StringSerializer]
+ private val stringProducer = createProducer[String, String](stringSerializer, stringSerializer)
+ private val orderProducer = createProducer[String, Order](stringSerializer, classOf[KafkaJsonSerializer[Order]])
+ private val avroOrderProducer = createProducer[String, GenericRecord](stringSerializer, classOf[KafkaAvroSerializer])
+ private val stringConverters = Map(
+ "value.converter" -> "org.apache.kafka.connect.storage.StringConverter",
+ "key.converter" -> "org.apache.kafka.connect.storage.StringConverter",
+ )
+
+ private val avroConverters = Map(
+ "value.converter" -> "io.confluent.connect.avro.AvroConverter",
+ "value.converter.schemas.enable" -> "true",
+ "value.converter.schema.registry.url" -> schemaRegistryContainer.map(_.schemaRegistryUrl).getOrElse(
+ fail("No SR url"),
+ ),
+ "key.converter" -> "org.apache.kafka.connect.storage.StringConverter",
+ )
+
+ private val jsonConverters = Map(
+ "value.converter" -> "org.apache.kafka.connect.json.JsonConverter",
+ "value.converter.schemas.enable" -> "false",
+ "key.converter" -> "org.apache.kafka.connect.storage.StringConverter",
+ )
+
+ private lazy val container: WiremockContainer = new WiremockContainer()
+ .withNetwork(network)
+
+ override val connectorModule: String = "http"
+ private var randomTestId = UUID.randomUUID().toString
+ private def topic = "topic" + randomTestId
+
+ before {
+ randomTestId = UUID.randomUUID().toString
+ }
+
+ override def beforeAll(): Unit = {
+ container.start()
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ container.stop()
+ }
+
+ test("static string template should be sent to endpoint") {
+
+ setUpWiremockResponse()
+
+ val record = "My First Record"
+ sendRecordWithProducer(stringProducer,
+ stringConverters,
+ randomTestId,
+ topic,
+ record,
+ "My Static Content Template",
+ ).asserting {
+ requests =>
+ requests.size should be(1)
+ val firstRequest = requests.head
+ firstRequest.getMethod should be(RequestMethod.POST)
+ new String(firstRequest.getBody) should be("My Static Content Template")
+ }
+ }
+
+ test("dynamic string template containing message content should be sent to endpoint") {
+
+ setUpWiremockResponse()
+
+ val record = "My First Record"
+ sendRecordWithProducer(stringProducer, stringConverters, randomTestId, topic, record, "{{value}}").asserting {
+ requests =>
+ requests.size should be(1)
+ val firstRequest = requests.head
+ firstRequest.getMethod should be(RequestMethod.POST)
+ new String(firstRequest.getBody) should be("My First Record")
+ }
+ }
+
+ test("dynamic string template containing json message fields should be sent to endpoint") {
+
+ setUpWiremockResponse()
+
+ sendRecordWithProducer[String, Order](
+ orderProducer,
+ jsonConverters,
+ randomTestId,
+ topic,
+ Order(1, "myOrder product", 1.3d, 10),
+ "product: {{value.product}}",
+ ).asserting {
+ requests =>
+ requests.size should be(1)
+ val firstRequest = requests.head
+ firstRequest.getMethod should be(RequestMethod.POST)
+ new String(firstRequest.getBody) should be("product: myOrder product")
+ }
+ }
+
+ test("dynamic string template containing whole json message should be sent to endpoint") {
+
+ setUpWiremockResponse()
+
+ sendRecordWithProducer[String, Order](
+ orderProducer,
+ stringConverters,
+ randomTestId,
+ topic,
+ Order(1, "myOrder product", 1.3d, 10),
+ "whole product message: {{value}}",
+ ).asserting {
+ requests =>
+ requests.size should be(1)
+ val firstRequest = requests.head
+ firstRequest.getMethod should be(RequestMethod.POST)
+ new String(firstRequest.getBody) should be(
+ "whole product message: {\"id\":1,\"product\":\"myOrder product\",\"price\":1.3,\"qty\":10,\"created\":null}",
+ )
+ }
+ }
+
+ test("dynamic string template containing avro message fields should be sent to endpoint") {
+
+ setUpWiremockResponse()
+
+ val order = Order(1, "myOrder product", 1.3d, 10, "March").toRecord
+ sendRecordWithProducer[String, GenericRecord](
+ avroOrderProducer,
+ avroConverters,
+ randomTestId,
+ topic,
+ order,
+ "product: {{value.product}}",
+ ).asserting {
+ requests =>
+ requests.size should be(1)
+ val firstRequest = requests.head
+ firstRequest.getMethod should be(RequestMethod.POST)
+ new String(firstRequest.getBody) should be("product: myOrder product")
+ }
+ }
+
+ private def setUpWiremockResponse(): Unit = {
+ WireMock.configureFor(container.getHost, container.getFirstMappedPort)
+ WireMock.reset()
+
+ val url = s"/$randomTestId"
+ stubFor(post(urlEqualTo(url))
+ .willReturn(aResponse.withHeader("Content-Type", "text/plain")
+ .withBody("Hello world!")))
+ ()
+ }
+
+ private def sendRecordWithProducer[K, V](
+ producer: Resource[IO, KafkaProducer[K, V]],
+ converters: Map[String, String],
+ randomTestId: String,
+ topic: String,
+ record: V,
+ contentTemplate: String,
+ ): IO[List[LoggedRequest]] =
+ producer.use {
+ producer =>
+ createConnectorResource(randomTestId, topic, contentTemplate, converters).use {
+ _ =>
+ IO {
+ sendRecord[K, V](topic, producer, record)
+ eventually {
+ verify(postRequestedFor(urlEqualTo(s"/$randomTestId")))
+ findAll(postRequestedFor(urlEqualTo(s"/$randomTestId"))).asScala.toList
+ }
+ }
+ }
+ }
+ private def sendRecord[K, V](topic: String, producer: KafkaProducer[K, V], record: V): Unit = {
+ producer.send(new ProducerRecord[K, V](topic, record)).get
+ producer.flush()
+ }
+
+ def createConnectorResource(
+ randomTestId: String,
+ topic: String,
+ contentTemplate: String,
+ converters: Map[String, String],
+ ): Resource[IO, String] =
+ createConnector(
+ sinkConfig(
+ randomTestId,
+ s"${container.getNetworkAliasUrl}/$randomTestId",
+ "post",
+ contentTemplate,
+ Seq(),
+ topic,
+ converters,
+ ),
+ )
+
+}
diff --git a/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/WiremockContainer.scala b/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/WiremockContainer.scala
new file mode 100644
index 000000000..33128ecab
--- /dev/null
+++ b/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/WiremockContainer.scala
@@ -0,0 +1,25 @@
+package io.lenses.streamreactor.connect.test
+
+import org.testcontainers.containers.GenericContainer
+import org.testcontainers.containers.wait.strategy.Wait
+import org.testcontainers.utility.DockerImageName
+
+class WiremockContainer(
+ dockerImageName: DockerImageName = DockerImageName.parse("wiremock/wiremock"),
+ dockerTag: String = "3.4.2-1",
+ networkAlias: String = "wiremock",
+) extends GenericContainer[WiremockContainer](dockerImageName.withTag(dockerTag)) {
+
+ private val port = 8080
+
+ withNetworkAliases(networkAlias)
+ withExposedPorts(port)
+ waitingFor(Wait.forListeningPort())
+
+ def getNetworkAliasUrl: String =
+ s"http://$networkAlias:$port"
+
+ def getEndpointUrl: String =
+ s"http://$getHost:${getMappedPort(port)}"
+
+}
diff --git a/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTaskIT.scala b/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTaskIT.scala
index 200532b1a..267aaf562 100644
--- a/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTaskIT.scala
+++ b/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTaskIT.scala
@@ -16,7 +16,7 @@ import com.github.tomakehurst.wiremock.client.WireMock.equalToXml
import com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor
import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
import io.lenses.streamreactor.connect.http.sink.client.HttpMethod
-import io.lenses.streamreactor.connect.http.sink.config.BatchConfiguration
+import io.lenses.streamreactor.connect.http.sink.config.BatchConfig
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord
@@ -51,13 +51,13 @@ class HttpSinkTaskIT extends AsyncFunSuite with AsyncIOSpec with Eventually {
(for {
server <- wireMockServer
configuration = HttpSinkConfig(
- authentication = Option.empty,
method = HttpMethod.Post,
endpoint = s"http://$Host:${server.port()}/awesome/endpoint",
content = "test",
- headers = Seq(),
- sslConfig = Option.empty,
- batch = BatchConfiguration(2L.some, none, none).some,
+ authentication = Option.empty,
+ headers = none,
+ ssl = Option.empty,
+ batch = BatchConfig(2L.some, none, none).some,
errorThreshold = none,
uploadSyncPeriod = none,
).toJson
@@ -78,13 +78,13 @@ class HttpSinkTaskIT extends AsyncFunSuite with AsyncIOSpec with Eventually {
(for {
server <- wireMockServer
config = HttpSinkConfig(
- authentication = Option.empty,
method = HttpMethod.Post,
endpoint = s"http://$Host:${server.port()}/awesome/endpoint/{{value.name}}",
content = "{salary: {{value.salary}}}",
- headers = Seq(),
- sslConfig = Option.empty,
- batch = BatchConfiguration(1L.some, none, none).some,
+ authentication = Option.empty,
+ headers = none,
+ ssl = Option.empty,
+ batch = BatchConfig(1L.some, none, none).some,
errorThreshold = none,
uploadSyncPeriod = none,
).toJson
@@ -120,13 +120,13 @@ class HttpSinkTaskIT extends AsyncFunSuite with AsyncIOSpec with Eventually {
(for {
server <- wireMockServer
configuration = HttpSinkConfig(
- authentication = Option.empty,
method = HttpMethod.Post,
endpoint = s"http://$Host:${server.port()}/awesome/endpoint/{{value.name}}",
content = "{salary: {{value.salary}}}",
- headers = Seq(),
- sslConfig = Option.empty,
- batch = BatchConfiguration(7L.some, none, none).some,
+ authentication = Option.empty,
+ headers = none,
+ ssl = Option.empty,
+ batch = BatchConfig(7L.some, none, none).some,
errorThreshold = none,
uploadSyncPeriod = none,
).toJson
@@ -168,9 +168,8 @@ class HttpSinkTaskIT extends AsyncFunSuite with AsyncIOSpec with Eventually {
(for {
server <- wireMockServer
configuration = HttpSinkConfig(
- authentication = Option.empty,
- method = HttpMethod.Post,
- endpoint = s"http://$Host:${server.port()}/awesome/endpoint/{{value.name}}",
+ method = HttpMethod.Post,
+ endpoint = s"http://$Host:${server.port()}/awesome/endpoint/{{value.name}}",
content =
s"""
|
@@ -178,9 +177,10 @@ class HttpSinkTaskIT extends AsyncFunSuite with AsyncIOSpec with Eventually {
| {{value.salary}}
| {{/message}}
| """.stripMargin,
- headers = Seq(),
- sslConfig = Option.empty,
- batch = BatchConfiguration(7L.some, none, none).some,
+ authentication = Option.empty,
+ headers = none,
+ ssl = Option.empty,
+ batch = BatchConfig(7L.some, none, none).some,
errorThreshold = none,
uploadSyncPeriod = none,
).toJson
@@ -210,14 +210,14 @@ class HttpSinkTaskIT extends AsyncFunSuite with AsyncIOSpec with Eventually {
(for {
server <- wireMockServer
configuration = HttpSinkConfig(
- Option.empty,
HttpMethod.Post,
s"http://$Host:${server.port()}/awesome/endpoint",
s"""
| Ultimately not important for this test""".stripMargin,
- Seq(),
Option.empty,
- BatchConfiguration(
+ none,
+ Option.empty,
+ BatchConfig(
1L.some,
none,
none,
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala
index 3d2625c03..9fd9accc4 100644
--- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala
@@ -62,7 +62,7 @@ class HttpSinkTask extends SinkTask with LazyLogging {
IO
.fromEither(parseConfig(propsAsScala.get(configProp)))
.flatMap { config =>
- val template = RawTemplate(config.endpoint, config.content, config.headers)
+ val template = RawTemplate(config.endpoint, config.content, config.headers.getOrElse(Seq.empty))
val writerManager = HttpWriterManager(sinkName, config, template, deferred)
val refUpdateCallback: Throwable => Unit =
(err: Throwable) => {
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala
index 09b831340..130ddac87 100644
--- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala
@@ -43,8 +43,8 @@ import scala.collection.immutable.Queue
object HttpWriterManager {
- val DefaultErrorThreshold = 5
- val DefaultUploadSyncPeriod = 5
+ private val DefaultErrorThreshold = 5
+ private val DefaultUploadSyncPeriod = 5000
def apply(
sinkName: String,
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/commit/HttpCommitPolicy.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/commit/HttpCommitPolicy.scala
index 67714c2cc..79e3b1bb5 100644
--- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/commit/HttpCommitPolicy.scala
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/commit/HttpCommitPolicy.scala
@@ -26,9 +26,9 @@ import java.time.Duration
object HttpCommitPolicy extends LazyLogging {
- private val defaultFlushSize = 100L
- private val defaultFlushInterval = Duration.ofSeconds(100)
- private val defaultFlushCount = 100L
+ private val defaultFlushSize = 500_000_000L
+ private val defaultFlushInterval = Duration.ofSeconds(3600)
+ private val defaultFlushCount = 50_000L
val Default: CommitPolicy =
CommitPolicy(FileSize(defaultFlushSize),
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala
index 8a8940d37..17f76e7ad 100644
--- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala
@@ -43,7 +43,7 @@ object HttpSinkConfig {
}
-case class BatchConfiguration(
+case class BatchConfig(
batchCount: Option[Long],
batchSize: Option[Long],
timeInterval: Option[Long],
@@ -59,21 +59,21 @@ case class BatchConfiguration(
}
}
-object BatchConfiguration {
+object BatchConfig {
- implicit val decoder: Decoder[BatchConfiguration] = deriveDecoder
- implicit val encoder: Encoder[BatchConfiguration] = deriveEncoder
+ implicit val decoder: Decoder[BatchConfig] = deriveDecoder
+ implicit val encoder: Encoder[BatchConfig] = deriveEncoder
}
case class HttpSinkConfig(
- authentication: Option[Authentication], // basic, oauth2, proxy
method: HttpMethod,
- endpoint: String, // tokenised
- content: String, // tokenised
- headers: Seq[(String, String)], // tokenised
- sslConfig: Option[SSLConfig],
- batch: Option[BatchConfiguration],
+ endpoint: String,
+ content: String,
+ authentication: Option[Authentication],
+ headers: Option[Seq[(String, String)]],
+ ssl: Option[SSLConfig],
+ batch: Option[BatchConfig],
errorThreshold: Option[Int],
uploadSyncPeriod: Option[Int],
) {
diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala
index 8919cf65b..f2276e45c 100644
--- a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala
+++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala
@@ -14,30 +14,50 @@
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink
-
+import io.circe.parser.decode
+import cats.implicits.catsSyntaxOptionId
import cats.implicits.none
import io.lenses.streamreactor.connect.http.sink.client.BasicAuthentication
import io.lenses.streamreactor.connect.http.sink.client.HttpMethod.Put
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig
+import org.scalatest.EitherValues
import org.scalatest.funsuite.AnyFunSuiteLike
import org.scalatest.matchers.should.Matchers
-class HttpSinkConfigTest extends AnyFunSuiteLike with Matchers {
+class HttpSinkConfigTest extends AnyFunSuiteLike with Matchers with EitherValues {
test("should write config to json") {
HttpSinkConfig(
- Some(BasicAuthentication("user", "pass")),
Put,
"http://myaddress.example.com",
"\nDave\nJason\nHooray for Kafka Connect!\n",
- Seq("something" -> "somethingelse"),
+ Some(BasicAuthentication("user", "pass")),
+ Seq("something" -> "somethingelse").some,
none,
none,
none,
none,
).toJson should be(
- """{"authentication":{"username":"user","password":"pass","type":"BasicAuthentication"},"method":"Put","endpoint":"http://myaddress.example.com","content":"\nDave\nJason\nHooray for Kafka Connect!\n","headers":[["something","somethingelse"]],"sslConfig":null,"batch":null,"errorThreshold":null,"uploadSyncPeriod":null}""",
+ """{"method":"Put","endpoint":"http://myaddress.example.com","content":"\nDave\nJason\nHooray for Kafka Connect!\n","authentication":{"username":"user","password":"pass","type":"BasicAuthentication"},"headers":[["something","somethingelse"]],"ssl":null,"batch":null,"errorThreshold":null,"uploadSyncPeriod":null}""",
)
}
+ test("read minimal config") {
+ val minConfig =
+ """{"method":"Put","endpoint":"http://myaddress.example.com","content":"\nDave\nJason\nHooray for Kafka Connect!\n"}"""
+ val minConfigSink = decode[HttpSinkConfig](minConfig)
+ minConfigSink.value should be(
+ HttpSinkConfig(
+ Put,
+ "http://myaddress.example.com",
+ "\nDave\nJason\nHooray for Kafka Connect!\n",
+ none,
+ none,
+ none,
+ none,
+ none,
+ none,
+ ),
+ )
+ }
}
diff --git a/test-common/src/main/scala/io/lenses/streamreactor/connect/model/Order.scala b/test-common/src/main/scala/io/lenses/streamreactor/connect/model/Order.scala
index c079e58b7..8f6a80402 100644
--- a/test-common/src/main/scala/io/lenses/streamreactor/connect/model/Order.scala
+++ b/test-common/src/main/scala/io/lenses/streamreactor/connect/model/Order.scala
@@ -32,10 +32,10 @@ case class Order(
@BeanProperty created: String,
) {
- def toRecord(order: Order): GenericRecord = {
+ def toRecord: GenericRecord = {
val orderSchema = SchemaFor.apply[Order]
implicit val encoder = AvroEncoder[Order].withSchema(orderSchema)
- ToRecord.apply[Order].to(order)
+ ToRecord.apply[Order].to(this)
}
}
From 3ba9b9c5843884d336f86593e6fc7916468c0284 Mon Sep 17 00:00:00 2001
From: David Sloan <33483659+davidsloan@users.noreply.github.com>
Date: Thu, 28 Mar 2024 16:31:11 +0000
Subject: [PATCH 2/3] Security Concerns, Vulnerabilities and Dependency
Upgrades (#1060)
Address CVE vulnerabilities.
- upgrading to the latest versions of Kafka and Confluent images ((kafka) "3.6.1" -> "7.6.0" (confluent))
- removes some unused dependencies
- improves the build pipeline so that new CVEs will now cause build failure
- upgrades any dependencies causing a CVE or add into `suppressions.xml`. These should be reviewed regularly.
- updates the build pipelines to use newer version of the build actions
- switches the ftp jsch dependency to one being actively maintained
- updates scala 2.13 version to latest available and fixes minor implicits issue to enable
- removes version axis in favour of keeping the kafka dependencies in the Dependencies file
- (for functional tests run from Github Actions see connectImageVersion in build.yml for equivalent functionality. Kafka version axis is not required as much now that we are only building one distributable)
- all connector modules have a clean bill of health, with the exception of some false positives.
---
.github/workflows/build.yml | 51 +++---
.github/workflows/release.yml | 18 +--
build.sbt | 12 +-
.../aws/s3/sink/S3AvroWriterManagerTest.scala | 5 +-
.../config/S3SinkConfigDefBuilderTest.scala | 2 +-
.../aws/s3/sink/config/S3SinkConfigTest.scala | 2 +-
.../source/config/S3SourceConfigTests.scala | 5 +-
.../connect/azure/documentdb/Json.scala | 2 +-
.../connect/cassandra/sink/Json.scala | 2 +-
.../cloud/common/source/CloudSourceTask.scala | 1 -
.../serialization/AvroSerializerTest.scala | 5 +-
.../connect/ftp/source/SFTPClient.scala | 6 +-
...geGCPStorageSinkConfigDefBuilderTest.scala | 2 +-
.../config/GCPStorageSinkConfigTest.scala | 2 +-
.../config/GCPStorageSourceConfigTest.scala | 5 +-
.../connect/http/sink/HttpSinkTask.scala | 2 +-
.../converters/SinkRecordConverterTest.scala | 3 +-
.../connect/mqtt/sink/MqttWriter.scala | 3 +-
.../converters/source/JacksonJson.scala | 2 +-
project/Dependencies.scala | 153 ++++++++----------
project/KafkaVersionAxis.scala | 20 ---
project/Settings.scala | 7 +-
project/plugins.sbt | 2 -
project/protoc.sbt | 4 +-
suppression.xml | 73 ++++++++-
.../connect/mongodb/JacksonJson.scala | 2 +-
26 files changed, 210 insertions(+), 181 deletions(-)
delete mode 100644 project/KafkaVersionAxis.scala
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index f215e1f99..261936476 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -22,9 +22,9 @@ jobs:
fun_matrix: ${{ steps.read-mods.outputs.fun-matrix }}
dep_check_matrix: ${{ steps.read-mods.outputs.dep-check-matrix }}
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
- name: Set up JDK 17
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
@@ -51,9 +51,9 @@ jobs:
timeout-minutes: 5
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
- name: Set up JDK 17
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
@@ -77,9 +77,9 @@ jobs:
matrix:
module: ${{fromJSON(needs.initiate.outputs.matrix)}}
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
- name: Set up JDK 17
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
@@ -89,7 +89,7 @@ jobs:
env:
JVM_OPTS: -Xmx3200m
- name: Publish test results
- uses: EnricoMi/publish-unit-test-result-action@v1
+ uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
files: "**/target/**/test-reports/*.xml"
@@ -107,9 +107,9 @@ jobs:
matrix:
module: ${{fromJSON(needs.initiate.outputs.it_matrix)}}
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
- name: Set up JDK 17
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
@@ -119,7 +119,7 @@ jobs:
env:
JVM_OPTS: -Xmx3200m
- name: Publish test results
- uses: EnricoMi/publish-unit-test-result-action@v1
+ uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
files: "**/target/**/it-reports/*.xml"
@@ -129,20 +129,18 @@ jobs:
build-and-cache-assembly:
needs:
- initiate
- - test
- - integration-test
timeout-minutes: 30
runs-on: ubuntu-latest
strategy:
matrix:
module: ${{fromJSON(needs.initiate.outputs.matrix)}}
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
name: Checkout repository
with:
fetch-depth: 0
- name: Set up JDK 17
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
@@ -162,7 +160,7 @@ jobs:
VERSION: ${{ steps.version.outputs.version }}
run: sbt "project ${{ matrix.module }};set assembly / test := {}" assembly
- name: Cache assembly
- uses: actions/cache/save@v3
+ uses: actions/cache/save@v4
with:
path: ~/**/target/libs/*.jar
key: assembly-${{ matrix.module }}-${{ github.run_id }}
@@ -178,15 +176,15 @@ jobs:
module: ${{fromJSON(needs.initiate.outputs.fun_matrix)}}
connectImageVersion: [7.3.1, 6.2.2]
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
- name: Set up JDK 17
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
cache: 'sbt'
- name: Restore assembly
- uses: actions/cache/restore@v3
+ uses: actions/cache/restore@v4
with:
path: ~/**/target/libs/*.jar
key: assembly-${{ matrix.module }}-${{ github.run_id }}
@@ -197,7 +195,7 @@ jobs:
JVM_OPTS: -Xmx3200m
CONNECT_IMAGE_VERSION: ${{matrix.connectImageVersion}}
- name: Publish test results
- uses: EnricoMi/publish-unit-test-result-action@v1
+ uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
files: "**/target/**/test-reports/*.xml"
@@ -215,17 +213,20 @@ jobs:
module: ${{fromJSON(needs.initiate.outputs.dep_check_matrix)}}
steps:
- name: Restore assembly
- uses: actions/cache/restore@v3
+ uses: actions/cache/restore@v4
with:
path: ~/**/target/libs/*.jar
key: assembly-${{ matrix.module }}-${{ github.run_id }}
fail-on-cache-miss: true
- name: Dependency Check
- uses: dependency-check/Dependency-Check_Action@1.1.0
+ uses: dependency-check/Dependency-Check_Action@main
with:
project: kafka-connect-${{matrix.module}}-deps
path: kafka-connect-${{matrix.module}}/target/libs/
format: 'HTML'
+ args: >-
+ --failOnCVSS 5
+ --suppression https://raw.githubusercontent.com/lensesio/stream-reactor/chore/update-kafka-versions/suppression.xml
- name: Upload Test results
uses: actions/upload-artifact@master
with:
@@ -243,21 +244,21 @@ jobs:
matrix:
module: ${{fromJSON(needs.initiate.outputs.matrix)}}
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
- name: Set up JDK 17
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
cache: 'sbt'
- name: Restore assembly
- uses: actions/cache/restore@v3
+ uses: actions/cache/restore@v4
with:
path: ~/**/target/libs/*.jar
key: assembly-${{ matrix.module }}-${{ github.run_id }}
fail-on-cache-miss: true
- name: Upload artifact
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@v4
with:
name: assembly-${{ matrix.module }}-${{ github.run_id }}
path: ~/**/target/libs/*.jar
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 7d87748cf..331ca6e1d 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -14,7 +14,7 @@ jobs:
tag: ${{ steps.get_tag.outputs.tag }}
steps:
- name: Checkout repository
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
with:
fetch-depth: 0
@@ -47,7 +47,7 @@ jobs:
timeout-minutes: 15
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
- name: Install and Update Antivirus Software
run: |
# Install ClamAV
@@ -60,7 +60,7 @@ jobs:
clamscan --recursive --alert-broken --alert-encrypted \
--alert-encrypted-archive --alert-exceeds-max --detect-pua .
- name: Cache Antivirus Database
- uses: actions/cache/save@v3
+ uses: actions/cache/save@v4
with:
path: /var/lib/clamav
key: clamav-database-${{ github.run_id }}
@@ -75,9 +75,9 @@ jobs:
matrix:
module: ${{fromJSON(needs.initiate.outputs.dep_check_matrix)}}
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
- name: Restore assembly
- uses: actions/cache/restore@v3
+ uses: actions/cache/restore@v4
with:
path: ~/**/target/libs/*.jar
key: assembly-${{ matrix.module }}-${{ github.run_id }}
@@ -89,7 +89,7 @@ jobs:
sudo systemctl stop clamav-freshclam
sudo chmod 777 /var/lib/clamav
- name: Restore Antivirus Database
- uses: actions/cache/restore@v3
+ uses: actions/cache/restore@v4
with:
path: /var/lib/clamav
key: clamav-database-${{ github.run_id }}
@@ -121,16 +121,16 @@ jobs:
steps:
- name: Checkout repository
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: Set up JDK 17
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
cache: 'sbt'
- name: Uncache assembly
- uses: actions/cache/restore@v3
+ uses: actions/cache/restore@v4
with:
path: |
~/**/target/libs/*.jar
diff --git a/build.sbt b/build.sbt
index 6420e7a86..8662b22a7 100644
--- a/build.sbt
+++ b/build.sbt
@@ -1,3 +1,4 @@
+import Dependencies.Versions
import Dependencies.globalExcludeDeps
import Dependencies.gson
import Settings.*
@@ -146,7 +147,7 @@ lazy val `azure-datalake` = (project in file("kafka-connect-azure-datalake"))
),
),
)
- .configureAssembly(false)
+ .configureAssembly(true)
.configureTests(baseTestDeps)
//.configureIntegrationTests(kafkaConnectAzureDatalakeTestDeps)
//.configureFunctionalTests(kafkaConnectAzureDatalakeFuncTestDeps)
@@ -325,14 +326,13 @@ lazy val jms = (project in file("kafka-connect-jms"))
),
Compile / PB.protoSources := Seq(sourceDirectory.value / "test" / "resources" / "example"),
Compile / PB.targets := Seq(
- PB.gens.java -> (Test / sourceManaged).value,
+ PB.gens.java(Versions.googleProtobufVersion) -> (Test / sourceManaged).value,
),
),
)
.configureAssembly(true)
.configureTests(kafkaConnectJmsTestDeps)
.configureIntegrationTests(kafkaConnectJmsTestDeps)
- //.configureFunctionalTests(kafkaConnectS3FuncTestDeps)
.disableParallel()
.enablePlugins(PackPlugin, ProtocPlugin)
@@ -449,12 +449,6 @@ addCommandAlias(
addCommandAlias("fullTest", ";test;it:test;fun:test")
addCommandAlias("fullCoverageTest", ";coverage;test;it:test;coverageReport;coverageAggregate")
-dependencyCheckFormats := Seq("XML", "HTML")
-dependencyCheckNodeAnalyzerEnabled := Some(false)
-dependencyCheckNodeAuditAnalyzerEnabled := Some(false)
-dependencyCheckNPMCPEAnalyzerEnabled := Some(false)
-dependencyCheckRetireJSAnalyzerEnabled := Some(false)
-
excludeDependencies ++= globalExcludeDeps
val generateModulesList = taskKey[Seq[File]]("generateModulesList")
diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala
index 39100e7d3..ea28b07b1 100644
--- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala
+++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala
@@ -35,6 +35,7 @@ import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
+import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
import io.lenses.streamreactor.connect.cloud.common.sink.WriterManagerCreator
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
import io.lenses.streamreactor.connect.cloud.common.sink.commit.Count
@@ -70,8 +71,8 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
private val PathPrefix = "streamReactorBackups"
private val avroFormatReader = new AvroFormatReader
- private implicit val cloudLocationValidator = S3LocationValidator
- private val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
+ private implicit val cloudLocationValidator: CloudLocationValidator = S3LocationValidator
+ private val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
private def avroConfig = S3SinkConfig(
S3ConnectionConfig(
None,
diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigDefBuilderTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigDefBuilderTest.scala
index 2cbb40508..5a5949866 100644
--- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigDefBuilderTest.scala
+++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigDefBuilderTest.scala
@@ -40,7 +40,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
val BucketName = "mybucket"
private implicit val cloudLocationValidator: CloudLocationValidator = S3LocationValidator
- private implicit val connectorTaskId = ConnectorTaskId("connector", 1, 0)
+ private implicit val connectorTaskId: ConnectorTaskId = ConnectorTaskId("connector", 1, 0)
"S3SinkConfigDefBuilder" should "respect defined properties" in {
val props = Map(
diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigTest.scala
index f3d01e622..0c69e83e8 100644
--- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigTest.scala
+++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigTest.scala
@@ -25,7 +25,7 @@ import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
class S3SinkConfigTest extends AnyFunSuite with Matchers {
- private implicit val connectorTaskId = ConnectorTaskId("connector", 1, 0)
+ private implicit val connectorTaskId: ConnectorTaskId = ConnectorTaskId("connector", 1, 0)
private implicit val cloudLocationValidator: CloudLocationValidator = S3LocationValidator
test("envelope and CSV storage is not allowed") {
val props = Map(
diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfigTests.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfigTests.scala
index cb87a7ffa..1557c6556 100644
--- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfigTests.scala
+++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfigTests.scala
@@ -19,6 +19,7 @@ import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings._
import io.lenses.streamreactor.connect.aws.s3.model.location.S3LocationValidator
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.config.TaskIndexKey
+import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
import io.lenses.streamreactor.connect.cloud.common.source.config.PartitionSearcherOptions
import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceSettingsKeys
import io.lenses.streamreactor.connect.cloud.common.source.config.PartitionSearcherOptions.ExcludeIndexes
@@ -29,8 +30,8 @@ import scala.concurrent.duration._
class S3SourceConfigTests extends AnyFunSuite with Matchers with TaskIndexKey with CloudSourceSettingsKeys {
- implicit val taskId = ConnectorTaskId("test", 1, 1)
- implicit val validator = S3LocationValidator
+ implicit val taskId: ConnectorTaskId = ConnectorTaskId("test", 1, 1)
+ implicit val validator: CloudLocationValidator = S3LocationValidator
test("default recursive levels is 0") {
S3SourceConfig.fromProps(
diff --git a/kafka-connect-azure-documentdb/src/main/scala/io/lenses/streamreactor/connect/azure/documentdb/Json.scala b/kafka-connect-azure-documentdb/src/main/scala/io/lenses/streamreactor/connect/azure/documentdb/Json.scala
index 6a9a0f53e..0f5a2931d 100644
--- a/kafka-connect-azure-documentdb/src/main/scala/io/lenses/streamreactor/connect/azure/documentdb/Json.scala
+++ b/kafka-connect-azure-documentdb/src/main/scala/io/lenses/streamreactor/connect/azure/documentdb/Json.scala
@@ -20,7 +20,7 @@ import org.json4s._
import org.json4s.native.JsonMethods._
object Json {
- implicit val formats = DefaultFormats
+ implicit val formats: Formats = DefaultFormats
val mapper = new ObjectMapper
diff --git a/kafka-connect-cassandra/src/test/scala/io/lenses/streamreactor/connect/cassandra/sink/Json.scala b/kafka-connect-cassandra/src/test/scala/io/lenses/streamreactor/connect/cassandra/sink/Json.scala
index b3507d4ef..cf1c664d6 100644
--- a/kafka-connect-cassandra/src/test/scala/io/lenses/streamreactor/connect/cassandra/sink/Json.scala
+++ b/kafka-connect-cassandra/src/test/scala/io/lenses/streamreactor/connect/cassandra/sink/Json.scala
@@ -19,7 +19,7 @@ import org.json4s._
import org.json4s.native.JsonMethods._
object Json {
- implicit val formats = DefaultFormats
+ implicit val formats: Formats = DefaultFormats
def parseJson(json: String): JValue =
parse(json)
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/CloudSourceTask.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/CloudSourceTask.scala
index 0820ddc96..51083e131 100644
--- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/CloudSourceTask.scala
+++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/CloudSourceTask.scala
@@ -45,7 +45,6 @@ import org.apache.kafka.connect.source.SourceTask
import java.util
import java.util.Collections
-import scala.jdk.CollectionConverters.MapHasAsScala
import scala.jdk.CollectionConverters._
abstract class CloudSourceTask[MD <: FileMetadata, C <: CloudSourceConfig[MD], CT]
extends SourceTask
diff --git a/kafka-connect-common/src/test/scala/io/lenses/streamreactor/common/serialization/AvroSerializerTest.scala b/kafka-connect-common/src/test/scala/io/lenses/streamreactor/common/serialization/AvroSerializerTest.scala
index 3bb2f35e9..cdeb963cd 100644
--- a/kafka-connect-common/src/test/scala/io/lenses/streamreactor/common/serialization/AvroSerializerTest.scala
+++ b/kafka-connect-common/src/test/scala/io/lenses/streamreactor/common/serialization/AvroSerializerTest.scala
@@ -17,6 +17,7 @@ package io.lenses.streamreactor.common.serialization
import com.sksamuel.avro4s.AvroSchema
import com.sksamuel.avro4s.RecordFormat
+import org.apache.avro.Schema
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
@@ -25,8 +26,8 @@ import java.io.ByteArrayOutputStream
class AvroSerializerTest extends AnyWordSpec with Matchers {
- implicit val recordFormat = RecordFormat[Book]
- implicit val avroSchema = AvroSchema[Book]
+ implicit val recordFormat: RecordFormat[Book] = RecordFormat[Book]
+ implicit val avroSchema: Schema = AvroSchema[Book]
"AvroSerializer" should {
"read and write from and to Avro" in {
diff --git a/kafka-connect-ftp/src/main/scala/io/lenses/streamreactor/connect/ftp/source/SFTPClient.scala b/kafka-connect-ftp/src/main/scala/io/lenses/streamreactor/connect/ftp/source/SFTPClient.scala
index f92abf043..6460772f7 100644
--- a/kafka-connect-ftp/src/main/scala/io/lenses/streamreactor/connect/ftp/source/SFTPClient.scala
+++ b/kafka-connect-ftp/src/main/scala/io/lenses/streamreactor/connect/ftp/source/SFTPClient.scala
@@ -203,13 +203,13 @@ class SFTPClient extends FTPClient with StrictLogging {
.map(lsEntry => createFtpFile(lsEntry))
}
- private def transformToLsEntry(file: Any): ChannelSftp#LsEntry =
+ private def transformToLsEntry(file: Any): ChannelSftp.LsEntry =
file match {
- case lsEntry: ChannelSftp#LsEntry => lsEntry
+ case lsEntry: ChannelSftp.LsEntry => lsEntry
case unknown: Any => throw new ClassCastException(s"SFTPClient Error obtaining LsEntry. Unknown type $unknown")
}
- private def createFtpFile(lsEntry: ChannelSftp#LsEntry) = {
+ private def createFtpFile(lsEntry: ChannelSftp.LsEntry) = {
val ftpFile: FTPFile = new FTPFile()
ftpFile.setType(0)
ftpFile.setName(lsEntry.getFilename)
diff --git a/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/sink/config/GCPStorageGCPStorageSinkConfigDefBuilderTest.scala b/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/sink/config/GCPStorageGCPStorageSinkConfigDefBuilderTest.scala
index e23483f16..888d1a1f6 100644
--- a/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/sink/config/GCPStorageGCPStorageSinkConfigDefBuilderTest.scala
+++ b/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/sink/config/GCPStorageGCPStorageSinkConfigDefBuilderTest.scala
@@ -43,7 +43,7 @@ class GCPStorageGCPStorageSinkConfigDefBuilderTest
val BucketName = "mybucket"
private implicit val cloudLocationValidator: CloudLocationValidator = GCPStorageLocationValidator
- private implicit val connectorTaskId = ConnectorTaskId("connector", 1, 0)
+ private implicit val connectorTaskId: ConnectorTaskId = ConnectorTaskId("connector", 1, 0)
"GCPSinkConfigDefBuilder" should "respect defined properties" in {
val props = Map(
diff --git a/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/sink/config/GCPStorageSinkConfigTest.scala b/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/sink/config/GCPStorageSinkConfigTest.scala
index 8ad0fba25..59c02130d 100644
--- a/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/sink/config/GCPStorageSinkConfigTest.scala
+++ b/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/sink/config/GCPStorageSinkConfigTest.scala
@@ -24,7 +24,7 @@ import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
class GCPStorageSinkConfigTest extends AnyFunSuite with Matchers {
- private implicit val connectorTaskId = ConnectorTaskId("connector", 1, 0)
+ private implicit val connectorTaskId: ConnectorTaskId = ConnectorTaskId("connector", 1, 0)
private implicit val cloudLocationValidator: CloudLocationValidator = GCPStorageLocationValidator
test("envelope and CSV storage is not allowed") {
val props = Map(
diff --git a/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/source/config/GCPStorageSourceConfigTest.scala b/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/source/config/GCPStorageSourceConfigTest.scala
index 376f568f4..4bdfc0a73 100644
--- a/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/source/config/GCPStorageSourceConfigTest.scala
+++ b/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/source/config/GCPStorageSourceConfigTest.scala
@@ -16,6 +16,7 @@
package io.lenses.streamreactor.connect.gcp.storage.source.config
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
+import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
import io.lenses.streamreactor.connect.gcp.storage.model.location.GCPStorageLocationValidator
import org.apache.kafka.common.config.ConfigException
import org.scalatest.EitherValues
@@ -24,8 +25,8 @@ import org.scalatest.matchers.should.Matchers._
class GCPStorageSourceConfigTest extends AnyFunSuite with EitherValues {
- val taskId = ConnectorTaskId("name", 1, 1)
- implicit val validator = GCPStorageLocationValidator
+ val taskId = ConnectorTaskId("name", 1, 1)
+ implicit val validator: CloudLocationValidator = GCPStorageLocationValidator
test("fromProps should reject configuration when no kcql string is provided") {
val props = Map[String, String]()
val result = GCPStorageSourceConfig.fromProps(taskId, props)
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala
index 9fd9accc4..83d9c8ca0 100644
--- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala
@@ -43,7 +43,7 @@ import scala.jdk.CollectionConverters.MapHasAsScala
class HttpSinkTask extends SinkTask with LazyLogging {
private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)
- implicit val runtime = IORuntime.global
+ implicit val runtime: IORuntime = IORuntime.global
private var maybeTemplate: Option[TemplateType] = Option.empty
private var maybeWriterManager: Option[HttpWriterManager] = Option.empty
private var maybeSinkName: Option[String] = Option.empty
diff --git a/kafka-connect-mongodb/src/test/scala/io/lenses/streamreactor/connect/mongodb/converters/SinkRecordConverterTest.scala b/kafka-connect-mongodb/src/test/scala/io/lenses/streamreactor/connect/mongodb/converters/SinkRecordConverterTest.scala
index eb6f70957..9af863ea0 100644
--- a/kafka-connect-mongodb/src/test/scala/io/lenses/streamreactor/connect/mongodb/converters/SinkRecordConverterTest.scala
+++ b/kafka-connect-mongodb/src/test/scala/io/lenses/streamreactor/connect/mongodb/converters/SinkRecordConverterTest.scala
@@ -19,6 +19,7 @@ import io.lenses.streamreactor.connect.mongodb.config.MongoConfig
import io.lenses.streamreactor.connect.mongodb.config.MongoConfigConstants
import io.lenses.streamreactor.connect.mongodb.config.MongoSettings
import org.bson.Document
+import org.json4s.Formats
import org.json4s.jackson.JsonMethods._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
@@ -35,7 +36,7 @@ import scala.util.Try
class SinkRecordConverterTest extends AnyWordSpec with Matchers {
- implicit val jsonFormats = org.json4s.DefaultFormats
+ implicit val jsonFormats: Formats = org.json4s.DefaultFormats
// create java.util.Date from iso date string.
def createDate(isoDate: String): java.util.Date = {
diff --git a/kafka-connect-mqtt/src/main/scala/io/lenses/streamreactor/connect/mqtt/sink/MqttWriter.scala b/kafka-connect-mqtt/src/main/scala/io/lenses/streamreactor/connect/mqtt/sink/MqttWriter.scala
index 85ecf58e1..297259651 100644
--- a/kafka-connect-mqtt/src/main/scala/io/lenses/streamreactor/connect/mqtt/sink/MqttWriter.scala
+++ b/kafka-connect-mqtt/src/main/scala/io/lenses/streamreactor/connect/mqtt/sink/MqttWriter.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.connect.sink.SinkRecord
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.json4s.DefaultFormats
+import org.json4s.Formats
import org.json4s.native.JsonMethods._
import scala.annotation.nowarn
@@ -48,7 +49,7 @@ class MqttWriter(client: MqttClient, settings: MqttSinkSettings, convertersMap:
with ErrorHandler {
//initialize error tracker
- implicit val formats = DefaultFormats
+ implicit val formats: Formats = DefaultFormats
initialize(settings.maxRetries, settings.errorPolicy)
val mappings: Map[String, Set[Kcql]] = settings.kcql.groupBy(k => k.getSource)
val kcql = settings.kcql
diff --git a/kafka-connect-sql-common/src/test/scala/io/lenses/streamreactor/connect/converters/source/JacksonJson.scala b/kafka-connect-sql-common/src/test/scala/io/lenses/streamreactor/connect/converters/source/JacksonJson.scala
index ad44edff2..64e15220f 100644
--- a/kafka-connect-sql-common/src/test/scala/io/lenses/streamreactor/connect/converters/source/JacksonJson.scala
+++ b/kafka-connect-sql-common/src/test/scala/io/lenses/streamreactor/connect/converters/source/JacksonJson.scala
@@ -22,7 +22,7 @@ import org.json4s.jackson.Serialization.write
object JacksonJson {
//implicit val formats: DefaultFormats.type = DefaultFormats
- implicit val formats = Serialization.formats(NoTypeHints)
+ implicit val formats: Formats = Serialization.formats(NoTypeHints)
/*def toJson(value: Map[Symbol, Any]): String = {
toJson(value map { case (k,v) => k.name -> v})
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index e52862e75..64036a2ce 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -1,33 +1,27 @@
import Dependencies._
-import KafkaVersionAxis.kafkaVersionAxis
import sbt._
import sbt.librarymanagement.InclExclRule
object Dependencies {
- val kafkaVersion: String = kafkaVersionAxis.kafkaVersion
-
- val confluentVersion: String = kafkaVersionAxis.confluentPlatformVersion
-
val globalExcludeDeps: Seq[InclExclRule] = Seq(
- "org.jboss.logging" % "commons-logging-jboss-logging",
- "org.jboss.logging" % "jboss-logging",
- "org.jboss.logging" % "jboss-logging-annotations",
- "org.jboss.logmanager" % "jboss-logmanager-embedded",
- "org.jboss.sif4j" % "sIf4j-jboss-logmanager",
+ "org.jboss.logging" % "*",
+ "org.jboss.logmanager" % "*",
+ "org.jboss.sif4j" % "*",
"commons-logging" % "commons-logging",
"log4j" % "log4j",
"org.slf4j" % "slf4j-log4j12",
"org.apache.logging.log4j" % "log4j",
- //"org.apache.logging.log4j" % "log4j-api",
"org.apache.logging.log4j" % "log4j-core",
"org.apache.logging.log4j" % "log4j-slf4j-impl",
"com.sun.jersey" % "*",
+ "org.jline" % "*",
+ "org.codehaus.janino" % "*",
)
// scala versions
val scalaOrganization = "org.scala-lang"
- val scalaVersion = "2.13.10"
+ val scalaVersion = "2.13.13"
val supportedScalaVersions: Seq[String] = List(Dependencies.scalaVersion)
val commonResolvers: Seq[MavenRepository] = Seq(
@@ -43,10 +37,11 @@ object Dependencies {
object Versions {
// libraries versions
val scalatestVersion = "3.2.17"
- val scalaCheckPlusVersion = "3.1.0.0"
val scalatestPlusScalaCheckVersion = "3.1.0.0-RC2"
val scalaCheckVersion = "1.17.0"
- val randomDataGeneratorVersion = "2.8"
+
+ val kafkaVersion: String = "3.6.1"
+ val confluentVersion: String = "7.6.0"
val enumeratumVersion = "1.7.3"
@@ -55,21 +50,15 @@ object Dependencies {
val avroVersion = "1.11.3"
val avro4sVersion = "4.1.1"
- val catsVersion = "2.9.0"
val catsEffectVersion = "3.4.8"
val `cats-effect-testing` = "1.5.0"
val antlr4Version: String = "4.13.1"
- val urlValidatorVersion = "1.7"
val circeVersion = "0.15.0-M1"
val circeGenericExtrasVersion = "0.14.3"
- val circeJsonSchemaVersion = "0.2.0"
- val shapelessVersion = "2.3.10"
- // build plugins versions
- val silencerVersion = "1.7.1"
- val kindProjectorVersion = "0.13.2"
+ // build plugins version
val betterMonadicForVersion = "0.3.1"
val logbackVersion = "1.4.14"
@@ -83,24 +72,22 @@ object Dependencies {
val calciteVersion = "1.34.0"
val awsSdkVersion = "2.25.6"
- val azureDataLakeVersion = "12.18.1"
- val azureIdentityVersion = "1.11.1"
+ val azureDataLakeVersion = "12.18.3"
+ val azureIdentityVersion = "1.11.4"
+ val azureCoreVersion = "1.47.0"
val gcpStorageVersion = "2.32.1"
- val guavaVersion = "31.0.1-jre"
- val javaxBindVersion = "2.3.1"
- val jacksonVersion = "2.16.2"
- val json4sVersion = "4.0.6"
+ val jacksonVersion = "2.17.0"
+ val json4sVersion = "4.0.7"
val mockitoScalaVersion = "1.17.30"
- val snakeYamlVersion = "2.0"
- val openCsvVersion = "5.7.1"
+ val openCsvVersion = "5.9"
+ val jsonSmartVersion = "2.5.1"
val xzVersion = "1.9"
val lz4Version = "1.8.0"
- val californiumVersion = "3.5.0"
val bouncyCastleVersion = "1.70"
- val nettyVersion = "4.1.71.Final"
+ val nettyVersion = "4.1.108.Final"
val cassandraDriverVersion = "3.11.3"
val jsonPathVersion = "2.7.0"
@@ -111,36 +98,35 @@ object Dependencies {
val influxVersion = "6.8.0"
- val jmsApiVersion = "3.1.0"
- val activeMqVersion = "6.0.1"
- val protocVersion = "3.11.4"
- val googleProtobufVersion = "3.21.12"
- val protobufCompilerPluginVersion = "0.11.12"
+ val jmsApiVersion = "3.1.0"
+ val activeMqVersion = "6.0.1"
+ val protocVersion = "3.11.4"
+ val googleProtobufVersion = "3.25.3"
val mqttVersion = "1.2.5"
- val httpClientVersion = "4.5.14"
- val commonsBeanUtilsVersion = "1.9.4"
- val commonsNetVersion = "3.9.0"
- val commonsCodecVersion = "1.15"
- val commonsIOVersion = "2.11.0"
- val commonsLang3Version = "3.14.0"
- val jschVersion = "0.1.55"
+ val commonsNetVersion = "3.10.0"
+ val commonsCodecVersion = "1.15"
+ val commonsCompressVersion = "1.26.0"
+ val commonsConfigVersion = "2.10.1"
+ val commonsIOVersion = "2.11.0"
+ val commonsHttpVersion = "4.5.14"
+ val commonsLang3Version = "3.14.0"
+ val jschVersion = "0.2.17"
val minaVersion = "2.2.1"
val betterFilesVersion = "3.9.2"
val ftpServerVersion = "1.2.0"
val fakeSftpServerVersion = "2.0.0"
- val zookeeperServerVersion = "3.8.1"
-
val mongoDbVersion = "3.12.12"
val jedisVersion = "4.4.0"
val gsonVersion = "2.10.1"
- val nimbusJoseJwtVersion = "9.30.2"
- val hadoopVersion = "3.3.6"
+ val nimbusJoseJwtVersion = "9.37.3"
+ val hadoopVersion = "3.4.0"
+ val hadoopShadedProtobufVersion = "1.2.0"
trait ElasticVersions {
val elastic4sVersion, elasticSearchVersion, jnaVersion: String
@@ -167,12 +153,9 @@ object Dependencies {
val catsEffectStd = "org.typelevel" %% "cats-effect-std" % catsEffectVersion
val catsEffect = "org.typelevel" %% "cats-effect" % catsEffectVersion
- val urlValidator = "commons-validator" % "commons-validator" % urlValidatorVersion
-
val circeGeneric = "io.circe" %% "circe-generic" % circeVersion
val circeGenericExtras = "io.circe" %% "circe-generic-extras" % circeGenericExtrasVersion
val circeParser = "io.circe" %% "circe-parser" % circeVersion
- val circeRefined = "io.circe" %% "circe-refined" % circeVersion
val circe: Seq[ModuleID] = Seq(circeGeneric, circeParser, circeGenericExtras)
val betterMonadicFor = addCompilerPlugin("com.olegpy" %% "better-monadic-for" % Versions.betterMonadicForVersion)
@@ -183,7 +166,6 @@ object Dependencies {
val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion
val log4jToSlf4j = "org.slf4j" % "log4j-over-slf4j" % "2.0.9"
val jclToSlf4j = "org.slf4j" % "jcl-over-slf4j" % "2.0.9"
- val slf4jApi = "org.slf4j" % "slf4j-api" % "2.0.9"
// testing
val scalatest = "org.scalatest" %% "scalatest" % scalatestVersion
@@ -233,21 +215,17 @@ object Dependencies {
val bouncyTls = "org.bouncycastle" % "bctls-jdk15on" % bouncyCastleVersion
val bouncyCastle: Seq[ModuleID] = Seq(bouncyProv, bouncyUtil, bouncyPkix, bouncyBcpg, bouncyTls)
- lazy val avro = "org.apache.avro" % "avro" % avroVersion
- lazy val avroProtobuf = "org.apache.avro" % "avro-protobuf" % avroVersion
- lazy val avro4s = "com.sksamuel.avro4s" %% "avro4s-core" % avro4sVersion
- lazy val avro4sJson = "com.sksamuel.avro4s" %% "avro4s-json" % avro4sVersion
- lazy val avro4sProtobuf = "com.sksamuel.avro4s" %% "avro4s-protobuf" % avro4sVersion
+ lazy val avro = "org.apache.avro" % "avro" % avroVersion
+ lazy val avroProtobuf = "org.apache.avro" % "avro-protobuf" % avroVersion
+ lazy val avro4s = "com.sksamuel.avro4s" %% "avro4s-core" % avro4sVersion
+ lazy val avro4sJson = "com.sksamuel.avro4s" %% "avro4s-json" % avro4sVersion
val `wiremock` = "org.wiremock" % "wiremock" % wiremockVersion
val jerseyCommon = "org.glassfish.jersey.core" % "jersey-common" % jerseyCommonVersion
- lazy val parquetAvro: ModuleID = "org.apache.parquet" % "parquet-avro" % parquetVersion
- lazy val parquetHadoop: ModuleID = "org.apache.parquet" % "parquet-hadoop" % parquetVersion
- lazy val parquetColumn: ModuleID = "org.apache.parquet" % "parquet-column" % parquetVersion
- lazy val parquetEncoding: ModuleID = "org.apache.parquet" % "parquet-encoding" % parquetVersion
- lazy val parquetHadoopBundle: ModuleID = "org.apache.parquet" % "parquet-hadoop-bundle" % parquetVersion
+ lazy val parquetAvro: ModuleID = "org.apache.parquet" % "parquet-avro" % parquetVersion
+ lazy val parquetHadoop: ModuleID = "org.apache.parquet" % "parquet-hadoop" % parquetVersion
lazy val hadoopCommon: ModuleID = hiveExcludes("org.apache.hadoop" % "hadoop-common" % hadoopVersion)
.excludeAll(ExclusionRule(organization = "javax.servlet"))
@@ -266,6 +244,8 @@ object Dependencies {
hiveExcludes("org.apache.hadoop" % "hadoop-mapreduce-client" % hadoopVersion)
lazy val hadoopMapReduceClientCore: ModuleID =
hiveExcludes("org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVersion)
+ lazy val hadoopShadedProtobuf: ModuleID =
+ hiveExcludes("org.apache.hadoop.thirdparty" % "hadoop-shaded-protobuf_3_21" % hadoopShadedProtobufVersion)
lazy val calciteCore = hiveExcludes("org.apache.calcite" % "calcite-core" % calciteVersion)
.excludeAll(ExclusionRule(organization = "io.swagger"))
@@ -278,15 +258,14 @@ object Dependencies {
lazy val calciteLinq4J = "org.apache.calcite" % "calcite-linq4j" % calciteVersion
- lazy val s3Sdk = "software.amazon.awssdk" % "s3" % awsSdkVersion
- lazy val stsSdk = "software.amazon.awssdk" % "sts" % awsSdkVersion
- lazy val javaxBind = "javax.xml.bind" % "jaxb-api" % javaxBindVersion
+ lazy val s3Sdk = "software.amazon.awssdk" % "s3" % awsSdkVersion
+ lazy val stsSdk = "software.amazon.awssdk" % "sts" % awsSdkVersion
lazy val azureDataLakeSdk: ModuleID = "com.azure" % "azure-storage-file-datalake" % azureDataLakeVersion
lazy val azureIdentity: ModuleID = "com.azure" % "azure-identity" % azureIdentityVersion
+ lazy val azureCore: ModuleID = "com.azure" % "azure-core" % azureCoreVersion
lazy val gcpStorageSdk = "com.google.cloud" % "google-cloud-storage" % gcpStorageVersion
- lazy val guava = "com.google.guava" % "guava" % guavaVersion
lazy val json4sNative = "org.json4s" %% "json4s-native" % json4sVersion
lazy val json4sJackson = "org.json4s" %% "json4s-jackson" % json4sVersion
@@ -298,15 +277,15 @@ object Dependencies {
val jacksonModuleScala: ModuleID =
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion
val woodstoxCore: ModuleID =
- "com.fasterxml.woodstox" % "woodstox-core" % "6.5.0"
+ "com.fasterxml.woodstox" % "woodstox-core" % "6.6.1"
+ val jsonSmart: ModuleID =
+ "net.minidev" % "json-smart" % jsonSmartVersion
- lazy val snakeYaml = "org.yaml" % "snakeyaml" % snakeYamlVersion
- lazy val openCsv = "com.opencsv" % "opencsv" % openCsvVersion
+ lazy val openCsv = "com.opencsv" % "opencsv" % openCsvVersion
lazy val cassandraDriver = "com.datastax.cassandra" % "cassandra-driver-core" % cassandraDriverVersion
lazy val jsonPath = "com.jayway.jsonpath" % "json-path" % jsonPathVersion
- lazy val nettyAll = "io.netty" % "netty-all" % nettyVersion
lazy val nettyCommon = "io.netty" % "netty-common" % nettyVersion
lazy val nettyHandler = "io.netty" % "netty-handler" % nettyVersion
lazy val nettyHandlerProxy = "io.netty" % "netty-handler-proxy" % nettyVersion
@@ -346,19 +325,18 @@ object Dependencies {
lazy val mqttClient = "org.eclipse.paho" % "org.eclipse.paho.client.mqttv3" % mqttVersion
- lazy val httpClient = "org.apache.httpcomponents" % "httpclient" % httpClientVersion
- lazy val commonsBeanUtils = "commons-beanutils" % "commons-beanutils" % commonsBeanUtilsVersion
- lazy val commonsNet = "commons-net" % "commons-net" % commonsNetVersion
- lazy val commonsCodec = "commons-codec" % "commons-codec" % commonsCodecVersion
- lazy val commonsIO = "commons-io" % "commons-io" % commonsIOVersion
- lazy val commonsLang3 = "org.apache.commons" % "commons-lang3" % commonsLang3Version
- lazy val jsch = "com.jcraft" % "jsch" % jschVersion
- lazy val mina = "org.apache.mina" % "mina-core" % minaVersion
- lazy val betterFiles = "com.github.pathikrit" %% "better-files" % betterFilesVersion
- lazy val ftpServer = "org.apache.ftpserver" % "ftpserver-core" % ftpServerVersion
- lazy val fakeSftpServer = "com.github.stefanbirkner" % "fake-sftp-server-lambda" % fakeSftpServerVersion
-
- lazy val zookeeperServer = "org.apache.zookeeper" % "zookeeper" % zookeeperServerVersion
+ lazy val commonsNet = "commons-net" % "commons-net" % commonsNetVersion
+ lazy val commonsCodec = "commons-codec" % "commons-codec" % commonsCodecVersion
+ lazy val commonsIO = "commons-io" % "commons-io" % commonsIOVersion
+ lazy val commonsLang3 = "org.apache.commons" % "commons-lang3" % commonsLang3Version
+ lazy val commonsCompress = "org.apache.commons" % "commons-compress" % commonsCompressVersion
+ lazy val commonsConfig = "org.apache.commons" % "commons-configuration2" % commonsConfigVersion
+ lazy val commonsHttp = "org.apache.httpcomponents" % "httpclient" % commonsHttpVersion
+ lazy val jsch = "com.github.mwiede" % "jsch" % jschVersion
+ lazy val mina = "org.apache.mina" % "mina-core" % minaVersion
+ lazy val betterFiles = "com.github.pathikrit" %% "better-files" % betterFilesVersion
+ lazy val ftpServer = "org.apache.ftpserver" % "ftpserver-core" % ftpServerVersion
+ lazy val fakeSftpServer = "com.github.stefanbirkner" % "fake-sftp-server-lambda" % fakeSftpServerVersion
lazy val mongoDb = "org.mongodb" % "mongo-java-driver" % mongoDbVersion
@@ -403,7 +381,7 @@ trait Dependencies {
import Versions._
val loggingDeps: Seq[ModuleID] = Seq(
- "org.apache.logging.log4j" % "log4j-api" % "2.20.0",
+ "org.apache.logging.log4j" % "log4j-api" % "2.22.1",
"org.apache.logging.log4j" % "log4j-to-slf4j" % "2.20.0",
log4jToSlf4j,
jclToSlf4j,
@@ -448,6 +426,8 @@ trait Dependencies {
//Specific modules dependencies
val baseDeps: Seq[ModuleID] = loggingDeps ++ Seq(
+ jacksonDatabind,
+ commonsCompress,
avro4s,
catsEffectKernel,
catsEffect,
@@ -466,6 +446,8 @@ trait Dependencies {
hadoopMapReduce,
hadoopMapReduceClient,
hadoopMapReduceClientCore,
+ hadoopShadedProtobuf,
+ commonsConfig,
openCsv,
jacksonCore,
jacksonDatabind,
@@ -474,6 +456,7 @@ trait Dependencies {
)
val kafkaConnectS3Deps: Seq[ModuleID] = Seq(
+ commonsIO,
s3Sdk,
stsSdk,
)
@@ -483,6 +466,7 @@ trait Dependencies {
val kafkaConnectAzureDatalakeDeps: Seq[ModuleID] = Seq(
azureDataLakeSdk,
azureIdentity,
+ azureCore,
)
val kafkaConnectGcpStorageDeps: Seq[ModuleID] = Seq(
@@ -518,6 +502,8 @@ trait Dependencies {
confluentProtobufConverter,
protoc,
googleProtobuf,
+ googleProtobufJava,
+ gson,
)
val kafkaConnectJmsTestDeps: Seq[ModuleID] = baseTestDeps ++ Seq(
@@ -533,6 +519,7 @@ trait Dependencies {
elastic4sCore(v.elastic4sVersion),
jna(v.jnaVersion),
elasticSearch(v.elasticSearchVersion),
+ commonsHttp,
//elasticSearchAnalysis(v.elasticSearchVersion)
)
diff --git a/project/KafkaVersionAxis.scala b/project/KafkaVersionAxis.scala
deleted file mode 100644
index bcc17f218..000000000
--- a/project/KafkaVersionAxis.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-import KafkaVersionAxis.kafkaToConfluentVersion
-
-case class KafkaVersionAxis(kafkaVersion: String) {
-
- val confluentPlatformVersion: String =
- kafkaToConfluentVersion.getOrElse(kafkaVersion, throw new IllegalStateException("unexpected kafka version"))
-
-}
-
-object KafkaVersionAxis {
- private val kafkaToConfluentVersion = Map(
- "2.8.1" -> "6.2.2",
- "3.3.0" -> "7.3.1",
- )
-
- // KafkaVersion could later be a parameter
- val kafkaVersion: String = "3.3.0"
- val kafkaVersionAxis: KafkaVersionAxis = KafkaVersionAxis(kafkaVersion)
-
-}
diff --git a/project/Settings.scala b/project/Settings.scala
index 2aeda20e0..ae39e7417 100644
--- a/project/Settings.scala
+++ b/project/Settings.scala
@@ -1,11 +1,14 @@
+import Dependencies.Versions.kafkaVersion
import Dependencies.betterMonadicFor
import Dependencies.globalExcludeDeps
import Dependencies.googleProtobuf
import Dependencies.googleProtobufJava
import Dependencies.hadoopCommon
import Dependencies.hadoopMapReduceClientCore
+import Dependencies.jsonSmart
import Dependencies.nettyCodecSocks
import Dependencies.nettyHandlerProxy
+import Dependencies.nimbusJoseJwt
import Dependencies.woodstoxCore
import com.eed3si9n.jarjarabrams.ShadeRule
import com.simplytyped.Antlr4Plugin
@@ -128,7 +131,7 @@ object Settings extends Dependencies {
("Git-Commit-Hash", "git rev-parse HEAD".!!.trim),
("Git-Repo", "git config --get remote.origin.url".!!.trim),
("Git-Tag", sys.env.getOrElse("SNAPSHOT_TAG", "n/a")),
- ("Kafka-Version", KafkaVersionAxis.kafkaVersion),
+ ("Kafka-Version", kafkaVersion),
("StreamReactor-Version", artifactVersion),
("StreamReactor-Docs", "https://docs.lenses.io/5.0/integrations/connectors/stream-reactor/"),
),
@@ -207,6 +210,8 @@ object Settings extends Dependencies {
hadoopCommon,
hadoopMapReduceClientCore,
woodstoxCore,
+ jsonSmart,
+ nimbusJoseJwt,
) ++ nettyDepOverrides ++ avroOverrides,
),
)
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 024734ace..17beed20a 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -6,8 +6,6 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.3")
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.9.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-license-report" % "1.2.0")
-addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "4.0.0")
-
addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.14")
addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")
diff --git a/project/protoc.sbt b/project/protoc.sbt
index 9a9957da6..4fc0772e9 100644
--- a/project/protoc.sbt
+++ b/project/protoc.sbt
@@ -1,3 +1 @@
-addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.6")
-
-libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.13"
+addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.7")
diff --git a/suppression.xml b/suppression.xml
index 1c2ba01b2..03a8bac7f 100644
--- a/suppression.xml
+++ b/suppression.xml
@@ -2,14 +2,75 @@
-
+
- ^pkg:maven/net\.minidev/json\-smart@.*$
- CVE-2021-31684
+ file name: kafka-connect-aws-s3-assembly-6.3-SNAPSHOT.jar (shaded: org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_21:1.2.0)
+ ]]>
+ ^pkg:maven/org\.apache\.hadoop\.thirdparty/hadoop\-shaded\-protobuf_3_21@.*$
+ cpe:/a:apache:hadoop
+
+
+
+ CVE-2023-22551
+
+
+
+
+
+ ^pkg:maven/com\.azure/azure\-json@.*$
+ CVE-2023-36052
+
+
+
+ ^pkg:maven/com\.azure/azure\-identity@.*$
+ CVE-2023-36415
+
+
+
+ ^pkg:maven/com\.azure/azure\-identity@.*$
+ cpe:/a:microsoft:azure_sdk_for_java
+
+
+
+ ^pkg:maven/com\.azure/azure\-core\-http\-netty@.*$
+ CVE-2023-36052
+
+
+
+ ^pkg:maven/com\.azure/azure\-core@.*$
+ cpe:/a:microsoft:azure_cli
+
+
+
+ ^pkg:maven/com\.azure/azure\-core@.*$
+ cpe:/a:microsoft:azure_sdk_for_java
+
+
+
+ ^pkg:maven/com\.azure/azure\-identity@.*$
+ cpe:/a:microsoft:azure_cli
+
\ No newline at end of file
diff --git a/test-common/src/main/scala/io/lenses/streamreactor/connect/mongodb/JacksonJson.scala b/test-common/src/main/scala/io/lenses/streamreactor/connect/mongodb/JacksonJson.scala
index 0402686a7..ace8677aa 100644
--- a/test-common/src/main/scala/io/lenses/streamreactor/connect/mongodb/JacksonJson.scala
+++ b/test-common/src/main/scala/io/lenses/streamreactor/connect/mongodb/JacksonJson.scala
@@ -19,7 +19,7 @@ import org.json4s._
import org.json4s.native.JsonMethods._
object Json {
- implicit val formats = DefaultFormats
+ implicit val formats: Formats = DefaultFormats
def parseJson(json: String): JValue =
parse(json)
From bce4a33627bfed3a4e8bea4df573b4518c87e9c1 Mon Sep 17 00:00:00 2001
From: David Sloan <33483659+davidsloan@users.noreply.github.com>
Date: Tue, 2 Apr 2024 09:40:02 +0100
Subject: [PATCH 3/3] Chore/scala steward merge (#1102)
* Update logback-classic, logback-core to 1.5.3
* Update cassandra-driver-core to 3.11.5
* Update testcontainers-scala-cassandra, ... to 0.40.17
* Update sbt-assembly to 2.1.5
* Update google-cloud-storage to 2.36.1
* Update influxdb-client-java to 6.12.0
* Update json-path to 2.9.0
* Update avro4s-core, avro4s-json to 4.1.2
* Update elastic4s-client-esjava, ... to 7.17.4
* Update sbt-license-report to 1.6.1
* Update commons-codec to 1.16.1
* Update commons-io to 2.16.0
* Update sbt-header to 5.10.0
* Update jna to 3.3.0
* Update jna to 4.5.2
* Update activemq-broker, activemq-client to 6.1.0
* Update commons-compress to 1.26.1
* Update connect-json, kafka-clients to 3.7.0
* Update log4j-api to 2.23.1
* Update log4j-to-slf4j to 2.23.1
* Update mina-core to 2.2.3
* Update bcpg-jdk15on, bcpkix-jdk15on, ... to 1.77
* Update elasticsearch to 7.17.19
* Update jersey-common to 3.1.5
* Update mongo-java-driver to 3.12.14
* Update sbt to 1.9.9
* Update scalafmt-core to 2.6.4
* Update scalatest to 3.2.18
* Update sbt-scoverage to 2.0.11
* Update jcl-over-slf4j, log4j-over-slf4j to 2.0.12
* Update cassandra, elasticsearch, kafka, ... to 1.19.7
* Update cats-effect, cats-effect-kernel, ... to 3.4.11
* Update sbt-pack to 0.19
* Update jedis to 4.4.7
* Update s3, sts to 2.25.21
* Update wiremock to 3.5.2
* Formatting
* updating sbt sonatypeOssRepos
* Fix compile issue
* Formatting
* Fixes
* Fix branch name
---------
Co-authored-by: Scala Steward
---
.github/workflows/build.yml | 6 +-
.scalafmt.conf | 2 +-
.../streamreactor/connect/MqttContainer.scala | 3 +-
.../MqttManagerConnectionFailureTest.scala | 2 +
kafka-connect-redis/project/build.properties | 1 -
project/Dependencies.scala | 88 +++++++++----------
project/assembly.sbt | 2 +-
project/build.properties | 2 +-
project/plugins.sbt | 11 ++-
.../KafkaConnectContainer.scala | 7 +-
10 files changed, 65 insertions(+), 59 deletions(-)
delete mode 100644 kafka-connect-redis/project/build.properties
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 261936476..698e40cad 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -218,6 +218,10 @@ jobs:
path: ~/**/target/libs/*.jar
key: assembly-${{ matrix.module }}-${{ github.run_id }}
fail-on-cache-miss: true
+ - name: Extract branch name
+ shell: bash
+ run: echo "branch=${GITHUB_HEAD_REF:-${GITHUB_REF#refs/heads/}}" >> $GITHUB_OUTPUT
+ id: extract_branch
- name: Dependency Check
uses: dependency-check/Dependency-Check_Action@main
with:
@@ -226,7 +230,7 @@ jobs:
format: 'HTML'
args: >-
--failOnCVSS 5
- --suppression https://raw.githubusercontent.com/lensesio/stream-reactor/chore/update-kafka-versions/suppression.xml
+ --suppression https://raw.githubusercontent.com/lensesio/stream-reactor/${{ steps.extract_branch.outputs.branch }}/suppression.xml
- name: Upload Test results
uses: actions/upload-artifact@master
with:
diff --git a/.scalafmt.conf b/.scalafmt.conf
index 127293901..0083ded42 100644
--- a/.scalafmt.conf
+++ b/.scalafmt.conf
@@ -1,4 +1,4 @@
-version=2.6.1
+version=2.6.4
maxColumn = 120
preset = IntelliJ
align.preset = most
diff --git a/kafka-connect-mqtt/src/fun/scala/io/lenses/streamreactor/connect/MqttContainer.scala b/kafka-connect-mqtt/src/fun/scala/io/lenses/streamreactor/connect/MqttContainer.scala
index e637100f8..f62ce2a87 100644
--- a/kafka-connect-mqtt/src/fun/scala/io/lenses/streamreactor/connect/MqttContainer.scala
+++ b/kafka-connect-mqtt/src/fun/scala/io/lenses/streamreactor/connect/MqttContainer.scala
@@ -8,6 +8,7 @@ import _root_.io.lenses.streamreactor.connect.testcontainers.S3Authentication
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.utility.DockerImageName
+import org.testcontainers.utility.MountableFile
class MqttContainer(
dockerImage: DockerImageName,
@@ -17,7 +18,7 @@ class MqttContainer(
val identity: S3Authentication = RandomAuthentication(),
) extends GenericContainer[MqttContainer](dockerImage.withTag(dockerTag)) {
- withFileSystemBind(this.getClass.getResource("/mosquitto.config").getPath, "/mosquitto/config/mosquitto.conf")
+ withCopyToContainer(MountableFile.forClasspathResource("/mosquitto.config"), "/mosquitto/config/mosquitto.conf")
withNetworkAliases(networkAlias)
withExposedPorts(port)
waitingFor(Wait.forListeningPort())
diff --git a/kafka-connect-mqtt/src/it/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManagerConnectionFailureTest.scala b/kafka-connect-mqtt/src/it/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManagerConnectionFailureTest.scala
index 475ab3fa5..4a9fd76cd 100644
--- a/kafka-connect-mqtt/src/it/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManagerConnectionFailureTest.scala
+++ b/kafka-connect-mqtt/src/it/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManagerConnectionFailureTest.scala
@@ -17,6 +17,7 @@ import org.testcontainers.containers.Network
import java.util
import java.util.UUID
+import scala.annotation.nowarn
class MqttManagerConnectionFailureTest extends AnyWordSpec with ForAllTestContainer with Matchers {
@@ -37,6 +38,7 @@ class MqttManagerConnectionFailureTest extends AnyWordSpec with ForAllTestContai
// mqtt broker port will be mapped to a different host network port upon restart
// using a proxy container to overcome this
+ @nowarn("cat=deprecation")
val proxy = toxiProxyContainer.container.getProxy(mqttContainer.container, mqttPort)
val mqttProxyUrl = s"tcp://${proxy.getContainerIpAddress}:${proxy.getProxyPort}"
diff --git a/kafka-connect-redis/project/build.properties b/kafka-connect-redis/project/build.properties
deleted file mode 100644
index c8fcab543..000000000
--- a/kafka-connect-redis/project/build.properties
+++ /dev/null
@@ -1 +0,0 @@
-sbt.version=1.6.2
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 64036a2ce..12267eb68 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -24,23 +24,23 @@ object Dependencies {
val scalaVersion = "2.13.13"
val supportedScalaVersions: Seq[String] = List(Dependencies.scalaVersion)
- val commonResolvers: Seq[MavenRepository] = Seq(
- Resolver sonatypeRepo "public",
- Resolver typesafeRepo "releases",
- Resolver.mavenLocal,
- "confluent" at "https://packages.confluent.io/maven/",
- "typesafe" at "https://repo.typesafe.com/typesafe/releases/",
- "cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
- "jitpack" at "https://jitpack.io",
- )
+ val commonResolvers: Seq[MavenRepository] = Resolver.sonatypeOssRepos("public") ++
+ Seq(
+ Resolver typesafeRepo "releases",
+ Resolver.mavenLocal,
+ "confluent" at "https://packages.confluent.io/maven/",
+ "typesafe" at "https://repo.typesafe.com/typesafe/releases/",
+ "cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
+ "jitpack" at "https://jitpack.io",
+ )
object Versions {
// libraries versions
- val scalatestVersion = "3.2.17"
+ val scalatestVersion = "3.2.18"
val scalatestPlusScalaCheckVersion = "3.1.0.0-RC2"
val scalaCheckVersion = "1.17.0"
- val kafkaVersion: String = "3.6.1"
+ val kafkaVersion: String = "3.7.0"
val confluentVersion: String = "7.6.0"
val enumeratumVersion = "1.7.3"
@@ -48,9 +48,9 @@ object Dependencies {
val http4sVersion = "1.0.0-M32"
val http4sJdkVersion = "1.0.0-M1"
val avroVersion = "1.11.3"
- val avro4sVersion = "4.1.1"
+ val avro4sVersion = "4.1.2"
- val catsEffectVersion = "3.4.8"
+ val catsEffectVersion = "3.4.11"
val `cats-effect-testing` = "1.5.0"
val antlr4Version: String = "4.13.1"
@@ -61,21 +61,21 @@ object Dependencies {
// build plugins version
val betterMonadicForVersion = "0.3.1"
- val logbackVersion = "1.4.14"
+ val logbackVersion = "1.5.3"
val scalaLoggingVersion = "3.9.5"
- val wiremockVersion = "3.3.1"
+ val wiremockVersion = "3.5.2"
val parquetVersion = "1.13.1"
- val jerseyCommonVersion = "3.1.1"
+ val jerseyCommonVersion = "3.1.5"
val calciteVersion = "1.34.0"
- val awsSdkVersion = "2.25.6"
+ val awsSdkVersion = "2.25.21"
val azureDataLakeVersion = "12.18.3"
val azureIdentityVersion = "1.11.4"
val azureCoreVersion = "1.47.0"
- val gcpStorageVersion = "2.32.1"
+ val gcpStorageVersion = "2.36.1"
val jacksonVersion = "2.17.0"
val json4sVersion = "4.0.7"
@@ -86,42 +86,42 @@ object Dependencies {
val xzVersion = "1.9"
val lz4Version = "1.8.0"
- val bouncyCastleVersion = "1.70"
+ val bouncyCastleVersion = "1.77"
val nettyVersion = "4.1.108.Final"
- val cassandraDriverVersion = "3.11.3"
- val jsonPathVersion = "2.7.0"
+ val cassandraDriverVersion = "3.11.5"
+ val jsonPathVersion = "2.9.0"
val azureDocumentDbVersion = "2.6.5"
- val testcontainersScalaVersion = "0.40.14"
- val testcontainersVersion = "1.17.6"
+ val testcontainersScalaVersion = "0.40.17"
+ val testcontainersVersion = "1.19.7"
- val influxVersion = "6.8.0"
+ val influxVersion = "6.12.0"
val jmsApiVersion = "3.1.0"
- val activeMqVersion = "6.0.1"
+ val activeMqVersion = "6.1.0"
val protocVersion = "3.11.4"
val googleProtobufVersion = "3.25.3"
val mqttVersion = "1.2.5"
val commonsNetVersion = "3.10.0"
- val commonsCodecVersion = "1.15"
- val commonsCompressVersion = "1.26.0"
+ val commonsCodecVersion = "1.16.1"
+ val commonsCompressVersion = "1.26.1"
val commonsConfigVersion = "2.10.1"
- val commonsIOVersion = "2.11.0"
+ val commonsIOVersion = "2.16.0"
val commonsHttpVersion = "4.5.14"
val commonsLang3Version = "3.14.0"
val jschVersion = "0.2.17"
- val minaVersion = "2.2.1"
+ val minaVersion = "2.2.3"
val betterFilesVersion = "3.9.2"
val ftpServerVersion = "1.2.0"
val fakeSftpServerVersion = "2.0.0"
- val mongoDbVersion = "3.12.12"
+ val mongoDbVersion = "3.12.14"
- val jedisVersion = "4.4.0"
+ val jedisVersion = "4.4.7"
val gsonVersion = "2.10.1"
val nimbusJoseJwtVersion = "9.37.3"
@@ -135,13 +135,13 @@ object Dependencies {
object Elastic6Versions extends ElasticVersions() {
override val elastic4sVersion: String = "6.7.8"
override val elasticSearchVersion: String = "6.8.23"
- override val jnaVersion: String = "3.0.9"
+ override val jnaVersion: String = "3.3.0"
}
object Elastic7Versions extends ElasticVersions {
- override val elastic4sVersion: String = "7.17.2"
- override val elasticSearchVersion: String = "7.17.2"
- override val jnaVersion: String = "4.5.1"
+ override val elastic4sVersion: String = "7.17.4"
+ override val elasticSearchVersion: String = "7.17.19"
+ override val jnaVersion: String = "4.5.2"
}
}
@@ -164,8 +164,8 @@ object Dependencies {
val logback = "ch.qos.logback" % "logback-classic" % logbackVersion
lazy val logbackCore = "ch.qos.logback" % "logback-core" % logbackVersion
val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion
- val log4jToSlf4j = "org.slf4j" % "log4j-over-slf4j" % "2.0.9"
- val jclToSlf4j = "org.slf4j" % "jcl-over-slf4j" % "2.0.9"
+ val log4jToSlf4j = "org.slf4j" % "log4j-over-slf4j" % "2.0.12"
+ val jclToSlf4j = "org.slf4j" % "jcl-over-slf4j" % "2.0.12"
// testing
val scalatest = "org.scalatest" %% "scalatest" % scalatestVersion
@@ -208,11 +208,11 @@ object Dependencies {
val http4sCirce = "org.http4s" %% "http4s-circe" % http4sVersion
val http4s: Seq[ModuleID] = Seq(http4sDsl, http4sJdkClient, http4sCirce)
- val bouncyProv = "org.bouncycastle" % "bcprov-jdk15on" % bouncyCastleVersion
- val bouncyUtil = "org.bouncycastle" % "bcutil-jdk15on" % bouncyCastleVersion
- val bouncyPkix = "org.bouncycastle" % "bcpkix-jdk15on" % bouncyCastleVersion
- val bouncyBcpg = "org.bouncycastle" % "bcpg-jdk15on" % bouncyCastleVersion
- val bouncyTls = "org.bouncycastle" % "bctls-jdk15on" % bouncyCastleVersion
+ val bouncyProv = "org.bouncycastle" % "bcprov-jdk18on" % bouncyCastleVersion
+ val bouncyUtil = "org.bouncycastle" % "bcutil-jdk18on" % bouncyCastleVersion
+ val bouncyPkix = "org.bouncycastle" % "bcpkix-jdk18on" % bouncyCastleVersion
+ val bouncyBcpg = "org.bouncycastle" % "bcpg-jdk18on" % bouncyCastleVersion
+ val bouncyTls = "org.bouncycastle" % "bctls-jdk18on" % bouncyCastleVersion
val bouncyCastle: Seq[ModuleID] = Seq(bouncyProv, bouncyUtil, bouncyPkix, bouncyBcpg, bouncyTls)
lazy val avro = "org.apache.avro" % "avro" % avroVersion
@@ -381,8 +381,8 @@ trait Dependencies {
import Versions._
val loggingDeps: Seq[ModuleID] = Seq(
- "org.apache.logging.log4j" % "log4j-api" % "2.22.1",
- "org.apache.logging.log4j" % "log4j-to-slf4j" % "2.20.0",
+ "org.apache.logging.log4j" % "log4j-api" % "2.23.1",
+ "org.apache.logging.log4j" % "log4j-to-slf4j" % "2.23.1",
log4jToSlf4j,
jclToSlf4j,
logback,
diff --git a/project/assembly.sbt b/project/assembly.sbt
index f55a62c9c..d83c88302 100644
--- a/project/assembly.sbt
+++ b/project/assembly.sbt
@@ -1 +1 @@
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.5")
diff --git a/project/build.properties b/project/build.properties
index c8fcab543..04267b14a 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -1 +1 @@
-sbt.version=1.6.2
+sbt.version=1.9.9
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 17beed20a..98315970e 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -1,16 +1,15 @@
// Activate the following only when needed to use specific tasks like `whatDependsOn` etc...
//addDependencyTreePlugin
-addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")
-addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.3")
-addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.9.0")
-addSbtPlugin("com.typesafe.sbt" % "sbt-license-report" % "1.2.0")
+addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")
+addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.11")
+addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0")
-addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.14")
+addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.19")
addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")
-addSbtPlugin("com.typesafe.sbt" % "sbt-license-report" % "1.2.0")
+addSbtPlugin("com.github.sbt" % "sbt-license-report" % "1.6.1")
//addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.13.2")
diff --git a/test-common/src/main/scala/io/lenses/streamreactor/connect/testcontainers/KafkaConnectContainer.scala b/test-common/src/main/scala/io/lenses/streamreactor/connect/testcontainers/KafkaConnectContainer.scala
index 619df81a5..83f8eee8c 100644
--- a/test-common/src/main/scala/io/lenses/streamreactor/connect/testcontainers/KafkaConnectContainer.scala
+++ b/test-common/src/main/scala/io/lenses/streamreactor/connect/testcontainers/KafkaConnectContainer.scala
@@ -23,6 +23,7 @@ import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.utility.DockerImageName
+import org.testcontainers.utility.MountableFile
import java.time.Duration
@@ -51,13 +52,13 @@ class KafkaConnectContainer(
}
providedJars.foreach {
jarLocation =>
- withFileSystemBind(
- jarLocation,
+ withCopyToContainer(
+ MountableFile.forHostPath(jarLocation),
s"/usr/share/java/kafka/${jarLocation.substring(jarLocation.lastIndexOf("/"))}",
)
}
- connectPluginPath.foreach(f => withFileSystemBind(f, "/usr/share/plugins"))
+ connectPluginPath.foreach(f => withCopyToContainer(MountableFile.forHostPath(f), "/usr/share/plugins"))
if (schemaRegistryContainer.isDefined) {
withEnv("CONNECT_KEY_CONVERTER", "io.confluent.connect.avro.AvroConverter")