Skip to content

Commit

Permalink
Post rebase edits
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan committed Nov 7, 2023
1 parent 5f64d48 commit 5e4359e
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import io.lenses.streamreactor.connect.aws.s3.storage.S3FileMetadata
import io.lenses.streamreactor.connect.cloud.common.sink.CloudSinkTask
import io.lenses.streamreactor.connect.cloud.common.sink.writer.WriterManager

import scala.jdk.CollectionConverters.MapHasAsJava
import scala.util.Try

object S3SinkTask {}
Expand All @@ -43,7 +42,7 @@ class S3SinkTask

def createWriterMan(props: Map[String, String]): Either[Throwable, WriterManager[S3FileMetadata]] =
for {
config <- S3SinkConfig.fromProps(props.asJava)
config <- S3SinkConfig.fromProps(props)
s3Client <- AwsS3ClientCreator.make(config.s3Config)
storageInterface = new AwsS3StorageInterface(connectorTaskId, s3Client, config.batchDelete)
_ <- Try(setErrorRetryInterval(config.s3Config)).toEither
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ class S3SourceTask extends SourceTask with LazyLogging {
logger.debug(s"Received call to S3SourceTask.start with ${props.size()} properties")

val contextProperties = Option(context).flatMap(c => Option(c.configs()).map(_.asScala.toMap)).getOrElse(Map.empty)
val mergedProperties = MapUtils.mergeProps(contextProperties, props.asScala.toMap).asJava
val mergedProperties = MapUtils.mergeProps(contextProperties, props.asScala.toMap)
(for {
result <- S3SourceState.make(mergedProperties.asScala.toMap, contextOffsetFn)
result <- S3SourceState.make(mergedProperties, contextOffsetFn)
fiber <- result.partitionDiscoveryLoop.start
} yield {
s3SourceTaskState = result.state.some
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@ import io.lenses.streamreactor.connect.cloud.common.storage.ListOfKeysResponse
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
import io.lenses.streamreactor.connect.config.kcqlprops.KcqlProperties

import java.util
import scala.util.Try

object S3SourceConfig {

def fromProps(
props: util.Map[String, String],
props: Map[String, String],
): Either[Throwable, S3SourceConfig] =
S3SourceConfig(S3SourceConfigDefBuilder(props))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager
import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManagerState
import io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState

import java.util
import scala.jdk.CollectionConverters.IteratorHasAsScala

object S3SourceState extends StrictLogging {
def make(
props: util.Map[String, String],
props: Map[String, String],
contextOffsetFn: CloudLocation => Option[CloudLocation],
)(
implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,16 @@ abstract class CloudSinkTask[SM <: FileMetadata](

printAsciiHeader(manifest, sinkAsciiArtResource)

new ConnectorTaskIdCreator(connectorPrefix).fromProps(fallbackProps) match {
val scalaFallbackProps = fallbackProps.asScala.toMap
new ConnectorTaskIdCreator(connectorPrefix).fromProps(scalaFallbackProps) match {
case Left(value) => throw new IllegalArgumentException(value)
case Right(value) => connectorTaskId = value
}

logger.debug(s"[{}] CloudSinkTask.start", connectorTaskId.show)

val contextProps = Option(context).flatMap(c => Option(c.configs())).map(_.asScala.toMap).getOrElse(Map.empty)
val props = MapUtils.mergeProps(contextProps, fallbackProps.asScala.toMap)
val props = MapUtils.mergeProps(contextProps, scalaFallbackProps)
val errOrWriterMan = createWriterMan(props)

errOrWriterMan.leftMap(throw _).foreach(writerManager = _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,51 +19,49 @@ import cats.implicits.catsSyntaxEitherId
import io.lenses.streamreactor.connect.cloud.common.source.config.distribution.PartitionHasher
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import scala.jdk.CollectionConverters._
class ConnectorTaskIdTest extends AnyWordSpec with Matchers with TaskIndexKey {
private val connectorName = "connectorName"
"ConnectorTaskId" should {
"create the instance" in {
val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0:2", "name" -> connectorName)
new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) shouldBe ConnectorTaskId(connectorName,
new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) shouldBe ConnectorTaskId(connectorName,
2,
0,
).asRight[String]
}
"fail if max tasks is not valid integer" in {
val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0:2a", "name" -> connectorName)
val actual = new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava)
val actual = new ConnectorTaskIdCreator(connectorPrefix).fromProps(from)
actual match {
case Left(e) => e.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting an integer but found:2a"
case Right(_) => fail("Should have failed")
}
}
"fail if task number is not a valid integer" in {
val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0a:2", "name" -> connectorName)
new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) match {
new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) match {
case Left(value) => value.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting an integer but found:0a"
case Right(_) => fail("Should have failed")
}
}
"fail if task number < 0" in {
val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "-1:2", "name" -> connectorName)
new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) match {
new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) match {
case Left(value) => value.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting a positive integer but found:-1"
case Right(value) => fail(s"Should have failed but got $value")
}

}
"fail if max tasks is zero" in {
val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0:0", "name" -> connectorName)
new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) match {
new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) match {
case Left(value) => value.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting a positive integer but found:0"
case Right(value) => fail(s"Should have failed but got $value")
}
}
"fail if max tasks is negative" in {
val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0:-1", "name" -> connectorName)
new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) match {
new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) match {
case Left(value) => value.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting a positive integer but found:-1"
case Right(value) => fail(s"Should have failed but got $value")
}
Expand All @@ -72,7 +70,7 @@ class ConnectorTaskIdTest extends AnyWordSpec with Matchers with TaskIndexKey {
"own the partitions when max task is 1" in {
val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0:1", "name" -> connectorName)
val actual =
new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava).getOrElse(fail("Should be valid"))
new ConnectorTaskIdCreator(connectorPrefix).fromProps(from).getOrElse(fail("Should be valid"))

Seq("/myTopic/", "/anotherTopic/", "/thirdTopic/")
.flatMap { value =>
Expand All @@ -89,12 +87,12 @@ class ConnectorTaskIdTest extends AnyWordSpec with Matchers with TaskIndexKey {
"b" -> "2",
TASK_INDEX -> "0:2",
"name" -> connectorName,
).asJava).getOrElse(fail("Should be valid"))
)).getOrElse(fail("Should be valid"))
val two = new ConnectorTaskIdCreator(connectorPrefix).fromProps(Map("a" -> "1",
"b" -> "2",
TASK_INDEX -> "1:2",
"name" -> connectorName,
).asJava).getOrElse(fail("Should be valid"))
)).getOrElse(fail("Should be valid"))

PartitionHasher.hash(2, "1") shouldBe 1
one.ownsDir("1") shouldBe false
Expand All @@ -111,17 +109,17 @@ class ConnectorTaskIdTest extends AnyWordSpec with Matchers with TaskIndexKey {
"b" -> "2",
TASK_INDEX -> "0:3",
"name" -> connectorName,
).asJava).getOrElse(fail("Should be valid"))
)).getOrElse(fail("Should be valid"))
val two = new ConnectorTaskIdCreator(connectorPrefix).fromProps(Map("a" -> "1",
"b" -> "2",
TASK_INDEX -> "1:3",
"name" -> connectorName,
).asJava).getOrElse(fail("Should be valid"))
)).getOrElse(fail("Should be valid"))
val three = new ConnectorTaskIdCreator(connectorPrefix).fromProps(Map("a" -> "1",
"b" -> "2",
TASK_INDEX -> "2:3",
"name" -> connectorName,
).asJava).getOrElse(fail("Should be valid"))
)).getOrElse(fail("Should be valid"))

PartitionHasher.hash(3, "1") shouldBe 1
one.ownsDir("1") shouldBe false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@ package io.lenses.streamreactor.connect.cloud.common.sink.config
import com.datamountaineer.streamreactor.common.config.base.traits.BaseConfig
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategyConfigKeys
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategySettings
import io.lenses.streamreactor.connect.cloud.common.sink.config.LocalStagingAreaConfigKeys
import io.lenses.streamreactor.connect.cloud.common.sink.config.LocalStagingAreaSettings
import org.apache.kafka.common.config.ConfigDef

import java.util
import scala.jdk.CollectionConverters.MapHasAsJava
import scala.jdk.CollectionConverters.MapHasAsScala

case class TestConfigDefBuilder(configDef: ConfigDef, props: util.Map[String, String])
case class TestConfigDefBuilder(configDef: ConfigDef, props: Map[String, String])
extends BaseConfig("connect.testing", configDef, props)
with PaddingStrategySettings
with LocalStagingAreaSettings {
Expand All @@ -51,7 +47,7 @@ object TestConfig {
val newMap = map + {
"connect.s3.kcql" -> "dummy value"
}
TestConfigDefBuilder(defineProps(new ConfigDef()), newMap.asJava)
TestConfigDefBuilder(defineProps(new ConfigDef()), newMap)
}

}
10 changes: 5 additions & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Dependencies._
import Dependencies.*
import KafkaVersionAxis.kafkaVersionAxis
import sbt._
import sbt.*
import sbt.librarymanagement.InclExclRule

object Dependencies {
Expand Down Expand Up @@ -80,7 +80,7 @@ object Dependencies {
val jerseyCommonVersion = "3.1.1"

val calciteVersion = "1.34.0"
val awsSdkVersion = "2.20.153"
val awsSdkVersion = "2.20.153"

val azureDataLakeVersion = "12.17.0"
val azureIdentityVersion = "1.8.1"
Expand Down Expand Up @@ -176,7 +176,7 @@ object Dependencies {
}
}

import Versions._
import Versions.*

// functional libraries
val catsEffectKernel = "org.typelevel" %% "cats-effect-kernel" % catsEffectVersion
Expand Down Expand Up @@ -457,7 +457,7 @@ object Dependencies {

trait Dependencies {

import Versions._
import Versions.*

val loggingDeps: Seq[ModuleID] = Seq(
"org.apache.logging.log4j" % "log4j-api" % "2.20.0",
Expand Down

0 comments on commit 5e4359e

Please sign in to comment.