diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala index ccb2c87972..132c411c43 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala @@ -27,7 +27,6 @@ import io.lenses.streamreactor.connect.aws.s3.storage.S3FileMetadata import io.lenses.streamreactor.connect.cloud.common.sink.CloudSinkTask import io.lenses.streamreactor.connect.cloud.common.sink.writer.WriterManager -import scala.jdk.CollectionConverters.MapHasAsJava import scala.util.Try object S3SinkTask {} @@ -43,7 +42,7 @@ class S3SinkTask def createWriterMan(props: Map[String, String]): Either[Throwable, WriterManager[S3FileMetadata]] = for { - config <- S3SinkConfig.fromProps(props.asJava) + config <- S3SinkConfig.fromProps(props) s3Client <- AwsS3ClientCreator.make(config.s3Config) storageInterface = new AwsS3StorageInterface(connectorTaskId, s3Client, config.batchDelete) _ <- Try(setErrorRetryInterval(config.s3Config)).toEither diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala index 3746532df8..3732f8622e 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala @@ -64,9 +64,9 @@ class S3SourceTask extends SourceTask with LazyLogging { logger.debug(s"Received call to S3SourceTask.start with ${props.size()} properties") val contextProperties = Option(context).flatMap(c => Option(c.configs()).map(_.asScala.toMap)).getOrElse(Map.empty) - val mergedProperties = MapUtils.mergeProps(contextProperties, props.asScala.toMap).asJava + val mergedProperties = MapUtils.mergeProps(contextProperties, props.asScala.toMap) (for { - result <- S3SourceState.make(mergedProperties.asScala.toMap, contextOffsetFn) + result <- S3SourceState.make(mergedProperties, contextOffsetFn) fiber <- result.partitionDiscoveryLoop.start } yield { s3SourceTaskState = result.state.some diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala index 62e4107c2c..850f48b6e8 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala @@ -40,13 +40,12 @@ import io.lenses.streamreactor.connect.cloud.common.storage.ListOfKeysResponse import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface import io.lenses.streamreactor.connect.config.kcqlprops.KcqlProperties -import java.util import scala.util.Try object S3SourceConfig { def fromProps( - props: util.Map[String, String], + props: Map[String, String], ): Either[Throwable, S3SourceConfig] = S3SourceConfig(S3SourceConfigDefBuilder(props)) diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/state/S3SourceBuilder.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/state/S3SourceBuilder.scala index bdd2e99bc5..20469e9c9d 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/state/S3SourceBuilder.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/state/S3SourceBuilder.scala @@ -31,12 +31,11 @@ import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManagerState import io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState -import java.util import scala.jdk.CollectionConverters.IteratorHasAsScala object S3SourceState extends StrictLogging { def make( - props: util.Map[String, String], + props: Map[String, String], contextOffsetFn: CloudLocation => Option[CloudLocation], )( implicit diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/CloudSinkTask.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/CloudSinkTask.scala index c0c1efd712..c7455bfdb1 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/CloudSinkTask.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/CloudSinkTask.scala @@ -62,7 +62,8 @@ abstract class CloudSinkTask[SM <: FileMetadata]( printAsciiHeader(manifest, sinkAsciiArtResource) - new ConnectorTaskIdCreator(connectorPrefix).fromProps(fallbackProps) match { + val scalaFallbackProps = fallbackProps.asScala.toMap + new ConnectorTaskIdCreator(connectorPrefix).fromProps(scalaFallbackProps) match { case Left(value) => throw new IllegalArgumentException(value) case Right(value) => connectorTaskId = value } @@ -70,7 +71,7 @@ abstract class CloudSinkTask[SM <: FileMetadata]( logger.debug(s"[{}] CloudSinkTask.start", connectorTaskId.show) val contextProps = Option(context).flatMap(c => Option(c.configs())).map(_.asScala.toMap).getOrElse(Map.empty) - val props = MapUtils.mergeProps(contextProps, fallbackProps.asScala.toMap) + val props = MapUtils.mergeProps(contextProps, scalaFallbackProps) val errOrWriterMan = createWriterMan(props) errOrWriterMan.leftMap(throw _).foreach(writerManager = _) diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/config/ConnectorTaskIdTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/config/ConnectorTaskIdTest.scala index 45b37714e4..30675843fc 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/config/ConnectorTaskIdTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/config/ConnectorTaskIdTest.scala @@ -19,21 +19,19 @@ import cats.implicits.catsSyntaxEitherId import io.lenses.streamreactor.connect.cloud.common.source.config.distribution.PartitionHasher import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec - -import scala.jdk.CollectionConverters._ class ConnectorTaskIdTest extends AnyWordSpec with Matchers with TaskIndexKey { private val connectorName = "connectorName" "ConnectorTaskId" should { "create the instance" in { val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0:2", "name" -> connectorName) - new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) shouldBe ConnectorTaskId(connectorName, + new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) shouldBe ConnectorTaskId(connectorName, 2, 0, ).asRight[String] } "fail if max tasks is not valid integer" in { val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0:2a", "name" -> connectorName) - val actual = new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) + val actual = new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) actual match { case Left(e) => e.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting an integer but found:2a" case Right(_) => fail("Should have failed") @@ -41,14 +39,14 @@ class ConnectorTaskIdTest extends AnyWordSpec with Matchers with TaskIndexKey { } "fail if task number is not a valid integer" in { val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0a:2", "name" -> connectorName) - new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) match { + new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) match { case Left(value) => value.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting an integer but found:0a" case Right(_) => fail("Should have failed") } } "fail if task number < 0" in { val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "-1:2", "name" -> connectorName) - new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) match { + new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) match { case Left(value) => value.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting a positive integer but found:-1" case Right(value) => fail(s"Should have failed but got $value") } @@ -56,14 +54,14 @@ class ConnectorTaskIdTest extends AnyWordSpec with Matchers with TaskIndexKey { } "fail if max tasks is zero" in { val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0:0", "name" -> connectorName) - new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) match { + new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) match { case Left(value) => value.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting a positive integer but found:0" case Right(value) => fail(s"Should have failed but got $value") } } "fail if max tasks is negative" in { val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0:-1", "name" -> connectorName) - new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) match { + new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) match { case Left(value) => value.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting a positive integer but found:-1" case Right(value) => fail(s"Should have failed but got $value") } @@ -72,7 +70,7 @@ class ConnectorTaskIdTest extends AnyWordSpec with Matchers with TaskIndexKey { "own the partitions when max task is 1" in { val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0:1", "name" -> connectorName) val actual = - new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava).getOrElse(fail("Should be valid")) + new ConnectorTaskIdCreator(connectorPrefix).fromProps(from).getOrElse(fail("Should be valid")) Seq("/myTopic/", "/anotherTopic/", "/thirdTopic/") .flatMap { value => @@ -89,12 +87,12 @@ class ConnectorTaskIdTest extends AnyWordSpec with Matchers with TaskIndexKey { "b" -> "2", TASK_INDEX -> "0:2", "name" -> connectorName, - ).asJava).getOrElse(fail("Should be valid")) + )).getOrElse(fail("Should be valid")) val two = new ConnectorTaskIdCreator(connectorPrefix).fromProps(Map("a" -> "1", "b" -> "2", TASK_INDEX -> "1:2", "name" -> connectorName, - ).asJava).getOrElse(fail("Should be valid")) + )).getOrElse(fail("Should be valid")) PartitionHasher.hash(2, "1") shouldBe 1 one.ownsDir("1") shouldBe false @@ -111,17 +109,17 @@ class ConnectorTaskIdTest extends AnyWordSpec with Matchers with TaskIndexKey { "b" -> "2", TASK_INDEX -> "0:3", "name" -> connectorName, - ).asJava).getOrElse(fail("Should be valid")) + )).getOrElse(fail("Should be valid")) val two = new ConnectorTaskIdCreator(connectorPrefix).fromProps(Map("a" -> "1", "b" -> "2", TASK_INDEX -> "1:3", "name" -> connectorName, - ).asJava).getOrElse(fail("Should be valid")) + )).getOrElse(fail("Should be valid")) val three = new ConnectorTaskIdCreator(connectorPrefix).fromProps(Map("a" -> "1", "b" -> "2", TASK_INDEX -> "2:3", "name" -> connectorName, - ).asJava).getOrElse(fail("Should be valid")) + )).getOrElse(fail("Should be valid")) PartitionHasher.hash(3, "1") shouldBe 1 one.ownsDir("1") shouldBe false diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/TestConfigDefBuilder.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/TestConfigDefBuilder.scala index 4bc8da6ef7..8cc6f2e627 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/TestConfigDefBuilder.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/TestConfigDefBuilder.scala @@ -18,15 +18,11 @@ package io.lenses.streamreactor.connect.cloud.common.sink.config import com.datamountaineer.streamreactor.common.config.base.traits.BaseConfig import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategyConfigKeys import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategySettings -import io.lenses.streamreactor.connect.cloud.common.sink.config.LocalStagingAreaConfigKeys -import io.lenses.streamreactor.connect.cloud.common.sink.config.LocalStagingAreaSettings import org.apache.kafka.common.config.ConfigDef -import java.util -import scala.jdk.CollectionConverters.MapHasAsJava import scala.jdk.CollectionConverters.MapHasAsScala -case class TestConfigDefBuilder(configDef: ConfigDef, props: util.Map[String, String]) +case class TestConfigDefBuilder(configDef: ConfigDef, props: Map[String, String]) extends BaseConfig("connect.testing", configDef, props) with PaddingStrategySettings with LocalStagingAreaSettings { @@ -51,7 +47,7 @@ object TestConfig { val newMap = map + { "connect.s3.kcql" -> "dummy value" } - TestConfigDefBuilder(defineProps(new ConfigDef()), newMap.asJava) + TestConfigDefBuilder(defineProps(new ConfigDef()), newMap) } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index e339e2921b..c36b74f455 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,6 +1,6 @@ -import Dependencies._ +import Dependencies.* import KafkaVersionAxis.kafkaVersionAxis -import sbt._ +import sbt.* import sbt.librarymanagement.InclExclRule object Dependencies { @@ -80,7 +80,7 @@ object Dependencies { val jerseyCommonVersion = "3.1.1" val calciteVersion = "1.34.0" - val awsSdkVersion = "2.20.153" + val awsSdkVersion = "2.20.153" val azureDataLakeVersion = "12.17.0" val azureIdentityVersion = "1.8.1" @@ -176,7 +176,7 @@ object Dependencies { } } - import Versions._ + import Versions.* // functional libraries val catsEffectKernel = "org.typelevel" %% "cats-effect-kernel" % catsEffectVersion @@ -457,7 +457,7 @@ object Dependencies { trait Dependencies { - import Versions._ + import Versions.* val loggingDeps: Seq[ModuleID] = Seq( "org.apache.logging.log4j" % "log4j-api" % "2.20.0",