diff --git a/build.sbt b/build.sbt
index ccc0db598..82a9ac914 100644
--- a/build.sbt
+++ b/build.sbt
@@ -1,9 +1,8 @@
import Dependencies.globalExcludeDeps
import Dependencies.gson
-
-import Settings._
+import Settings.*
import sbt.Keys.libraryDependencies
-import sbt._
+import sbt.*
import sbt.Project.projectToLocalProject
import java.io.File
@@ -22,6 +21,7 @@ lazy val subProjects: Seq[Project] = Seq(
elastic7,
ftp,
`gcp-storage`,
+ http,
influxdb,
jms,
mongodb,
@@ -246,6 +246,28 @@ lazy val elastic7 = (project in file("kafka-connect-elastic7"))
.configureFunctionalTests()
.enablePlugins(PackPlugin)
+lazy val http = (project in file("kafka-connect-http"))
+ .dependsOn(common)
+ //.dependsOn(`test-common` % "fun->compile")
+ .settings(
+ settings ++
+ Seq(
+ name := "kafka-connect-http",
+ description := "Kafka Connect compatible connectors to move data between Kafka and http",
+ libraryDependencies ++= baseDeps ++ kafkaConnectHttpDeps,
+ publish / skip := true,
+ packExcludeJars := Seq(
+ "scala-.*\\.jar",
+ "zookeeper-.*\\.jar",
+ ),
+ ),
+ )
+ .configureAssembly(false)
+ .configureTests(baseTestDeps ++ kafkaConnectHttpTestDeps)
+ .configureIntegrationTests(baseTestDeps ++ kafkaConnectHttpTestDeps)
+ //.configureFunctionalTests(kafkaConnectS3FuncTestDeps)
+ .enablePlugins(PackPlugin, ProtocPlugin)
+
lazy val influxdb = (project in file("kafka-connect-influxdb"))
.dependsOn(common)
.settings(
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CloudCommitPolicy.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CloudCommitPolicy.scala
new file mode 100644
index 000000000..94ba7c545
--- /dev/null
+++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CloudCommitPolicy.scala
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.cloud.common.sink.commit
+
+import io.lenses.streamreactor.connect.cloud.common.sink.config.FlushSettings.defaultFlushCount
+import io.lenses.streamreactor.connect.cloud.common.sink.config.FlushSettings.defaultFlushInterval
+import io.lenses.streamreactor.connect.cloud.common.sink.config.FlushSettings.defaultFlushSize
+
+object CloudCommitPolicy {
+ val Default: CommitPolicy =
+ CommitPolicy(FileSize(defaultFlushSize), Interval(defaultFlushInterval), Count(defaultFlushCount))
+
+}
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/CloudSinkBucketOptions.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/CloudSinkBucketOptions.scala
index 04173ff12..c6e3895a5 100644
--- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/CloudSinkBucketOptions.scala
+++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/CloudSinkBucketOptions.scala
@@ -24,6 +24,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings
import io.lenses.streamreactor.connect.cloud.common.config.FormatSelection
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.CloudCommitPolicy
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
import io.lenses.streamreactor.connect.cloud.common.sink.commit.Count
import io.lenses.streamreactor.connect.cloud.common.sink.config.kcqlprops.CloudSinkProps
@@ -114,7 +115,7 @@ case class CloudSinkBucketOptions(
formatSelection: FormatSelection,
keyNamer: KeyNamer,
partitionSelection: PartitionSelection,
- commitPolicy: CommitPolicy = CommitPolicy.Default,
+ commitPolicy: CommitPolicy = CloudCommitPolicy.Default,
localStagingArea: LocalStagingArea,
dataStorage: DataStorageSettings,
) extends WithTransformableDataStorage
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/SinkDataExtractor.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/SinkDataExtractor.scala
index 4f514169b..1b0202683 100644
--- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/SinkDataExtractor.scala
+++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/SinkDataExtractor.scala
@@ -17,12 +17,7 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors
import cats.implicits._
import com.typesafe.scalalogging.LazyLogging
-import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData
-import io.lenses.streamreactor.connect.cloud.common.formats.writer.ByteArraySinkData
-import io.lenses.streamreactor.connect.cloud.common.formats.writer.MapSinkData
-import io.lenses.streamreactor.connect.cloud.common.formats.writer.PrimitiveSinkData
-import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData
-import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
+import io.lenses.streamreactor.connect.cloud.common.formats.writer._
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
/**
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/CloudKeyNamer.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/CloudKeyNamer.scala
index 994d6c183..e82198c2b 100644
--- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/CloudKeyNamer.scala
+++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/CloudKeyNamer.scala
@@ -30,10 +30,10 @@ import io.lenses.streamreactor.connect.cloud.common.sink.FatalCloudSinkError
import io.lenses.streamreactor.connect.cloud.common.sink.SinkError
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionDisplay.KeysAndValues
import io.lenses.streamreactor.connect.cloud.common.sink.config.DatePartitionField
+import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import io.lenses.streamreactor.connect.cloud.common.sink.config.HeaderPartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.KeyPartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionField
-import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionPartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionSelection
import io.lenses.streamreactor.connect.cloud.common.sink.config.TopicPartitionField
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/Writer.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/Writer.scala
index add5f8a72..83ae3663c 100644
--- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/Writer.scala
+++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/Writer.scala
@@ -18,8 +18,8 @@ package io.lenses.streamreactor.connect.cloud.common.sink.writer
import cats.implicits._
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
-import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.formats.writer.FormatWriter
+import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
import io.lenses.streamreactor.connect.cloud.common.model.UploadableFile
@@ -27,14 +27,10 @@ import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.sink.FatalCloudSinkError
import io.lenses.streamreactor.connect.cloud.common.sink.NonFatalCloudSinkError
import io.lenses.streamreactor.connect.cloud.common.sink.SinkError
-import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitContext
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.CloudCommitContext
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManager
-import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata
-import io.lenses.streamreactor.connect.cloud.common.storage.NonExistingFileError
-import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
-import io.lenses.streamreactor.connect.cloud.common.storage.UploadFailedError
-import io.lenses.streamreactor.connect.cloud.common.storage.ZeroByteFileError
+import io.lenses.streamreactor.connect.cloud.common.storage._
import org.apache.kafka.connect.data.Schema
import java.io.File
@@ -155,7 +151,7 @@ class Writer[SM <: FileMetadata](
def shouldFlush: Boolean =
writeState match {
case Writing(commitState, _, file, uncommittedOffset) => commitPolicy.shouldFlush(
- CommitContext(
+ CloudCommitContext(
topicPartition.withOffset(uncommittedOffset),
commitState.recordCount,
commitState.lastKnownFileSize,
diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitPolicyTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitPolicyTest.scala
index 52eb218ba..8944c7ca7 100644
--- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitPolicyTest.scala
+++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitPolicyTest.scala
@@ -19,11 +19,6 @@ import com.typesafe.scalalogging.Logger
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset
-import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitContext
-import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
-import io.lenses.streamreactor.connect.cloud.common.sink.commit.Count
-import io.lenses.streamreactor.connect.cloud.common.sink.commit.FileSize
-import io.lenses.streamreactor.connect.cloud.common.sink.commit.Interval
import org.mockito.MockitoSugar
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
@@ -46,12 +41,12 @@ class CommitPolicyTest extends AnyWordSpec with Matchers with MockitoSugar {
val lastFlushTimeAdjusted: Option[Long] = lastFlushTimestampAdjust.fold(Option.empty[Long])(e => Some(nowTime + e))
val tpo = TopicPartitionOffset(Topic("myTopic"), 1, Offset(100))
- policy.shouldFlush(CommitContext(tpo,
- count,
- fileSize,
- creationTimeAdjusted,
- lastFlushTimeAdjusted,
- "my/filename.txt",
+ policy.shouldFlush(CloudCommitContext(tpo,
+ count,
+ fileSize,
+ creationTimeAdjusted,
+ lastFlushTimeAdjusted,
+ "my/filename.txt",
))
}
diff --git a/kafka-connect-common/src/main/scala/io/lenses/streamreactor/common/config/SSLConfigContext.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/common/config/SSLConfigContext.scala
index 756f8361e..69198d29b 100644
--- a/kafka-connect-common/src/main/scala/io/lenses/streamreactor/common/config/SSLConfigContext.scala
+++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/common/config/SSLConfigContext.scala
@@ -15,6 +15,11 @@
*/
package io.lenses.streamreactor.common.config
+import io.circe.Decoder
+import io.circe.Encoder
+import io.circe.generic.semiauto.deriveDecoder
+import io.circe.generic.semiauto.deriveEncoder
+
import java.io.FileInputStream
import java.security.KeyStore
import java.security.SecureRandom
@@ -38,7 +43,7 @@ object SSLConfigContext {
* @param config An SSLConfig containing key and truststore credentials
* @return a SSLContext
*/
- def getSSLContext(config: SSLConfig): SSLContext = {
+ private def getSSLContext(config: SSLConfig): SSLContext = {
val useClientCertAuth = config.useClientCert
//is client certification authentication set
@@ -100,3 +105,10 @@ case class SSLConfig(
keyStoreType: String = "JKS",
trustStoreType: String = "JKS",
)
+
+object SSLConfig {
+
+ implicit val decoder: Decoder[SSLConfig] = deriveDecoder
+ implicit val encoder: Encoder[SSLConfig] = deriveEncoder
+
+}
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/TopicPartitionOffset.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/TopicPartitionOffset.scala
similarity index 100%
rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/TopicPartitionOffset.scala
rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/TopicPartitionOffset.scala
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitContext.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitContext.scala
similarity index 64%
rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitContext.scala
rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitContext.scala
index 9bd6a3584..1b09d12da 100644
--- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitContext.scala
+++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitContext.scala
@@ -15,20 +15,37 @@
*/
package io.lenses.streamreactor.connect.cloud.common.sink.commit
+import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset
+trait CommitContext {
+ def count: Long
+ def fileSize: Long
+ def createdTimestamp: Long
+ def lastFlushedTimestamp: Option[Long]
+
+ def lastModified: Long = lastFlushedTimestamp.getOrElse(createdTimestamp)
+
+ def generateLogLine(flushing: Boolean, result: Seq[ConditionCommitResult]): String
+}
+
/**
* @param tpo the [[TopicPartitionOffset]] of the last record written
* @param count the number of records written thus far to the file
* @param createdTimestamp the time in milliseconds when the the file was created/accessed first time
*/
-case class CommitContext(
+case class CloudCommitContext(
tpo: TopicPartitionOffset,
count: Long,
fileSize: Long,
createdTimestamp: Long,
lastFlushedTimestamp: Option[Long],
partitionFile: String,
-) {
- def lastModified: Long = lastFlushedTimestamp.getOrElse(createdTimestamp)
+) extends CommitContext
+ with LazyLogging {
+ override def generateLogLine(flushing: Boolean, result: Seq[ConditionCommitResult]): String = {
+ val flushingOrNot = if (flushing) "" else "Not "
+ s"${flushingOrNot}Flushing '$partitionFile' for {topic:'${tpo.topic.value}', partition:${tpo.partition}, offset:${tpo.offset.value}, ${result.flatMap(_.logLine).mkString(", ")}}"
+ }
+
}
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitPolicy.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitPolicy.scala
similarity index 71%
rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitPolicy.scala
rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitPolicy.scala
index fb23de796..ffb49462b 100644
--- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitPolicy.scala
+++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitPolicy.scala
@@ -17,9 +17,6 @@ package io.lenses.streamreactor.connect.cloud.common.sink.commit
import com.typesafe.scalalogging.LazyLogging
import com.typesafe.scalalogging.Logger
-import io.lenses.streamreactor.connect.cloud.common.sink.config.FlushSettings.defaultFlushCount
-import io.lenses.streamreactor.connect.cloud.common.sink.config.FlushSettings.defaultFlushInterval
-import io.lenses.streamreactor.connect.cloud.common.sink.config.FlushSettings.defaultFlushSize
import scala.util.Try
@@ -48,25 +45,15 @@ case class CommitPolicy(logger: Logger, conditions: CommitPolicyCondition*) {
case ConditionCommitResult(true, _) => true
case _ => false
}
- val flushingOrNot = if (flush) "" else "Not "
- if (debugEnabled)
- logger.debug(
- "{}Flushing '{}' for {topic:'{}', partition:{}, offset:{}, {}}",
- flushingOrNot,
- context.partitionFile,
- context.tpo.topic.value,
- context.tpo.partition,
- context.tpo.offset.value,
- res.flatMap(_.logLine).mkString(", "),
- )
+ if (debugEnabled) {
+ logger.debug(context.generateLogLine(flush, res))
+ }
flush
}
}
object CommitPolicy extends LazyLogging {
- val Default: CommitPolicy =
- CommitPolicy(FileSize(defaultFlushSize), Interval(defaultFlushInterval), Count(defaultFlushCount))
def apply(conditions: CommitPolicyCondition*): CommitPolicy =
CommitPolicy(logger, conditions: _*)
}
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitPolicyConditions.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitPolicyConditions.scala
similarity index 100%
rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitPolicyConditions.scala
rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitPolicyConditions.scala
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/PartitionNamePath.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/PartitionNamePath.scala
similarity index 100%
rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/PartitionNamePath.scala
rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/PartitionNamePath.scala
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayExtractor.scala
similarity index 100%
rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayExtractor.scala
rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayExtractor.scala
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayIndexUtil.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayIndexUtil.scala
similarity index 100%
rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayIndexUtil.scala
rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayIndexUtil.scala
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ComplexTypeExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ComplexTypeExtractor.scala
similarity index 100%
rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ComplexTypeExtractor.scala
rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ComplexTypeExtractor.scala
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ExtractorErrorType.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ExtractorErrorType.scala
similarity index 100%
rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ExtractorErrorType.scala
rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ExtractorErrorType.scala
diff --git a/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/KafkaConnectExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/KafkaConnectExtractor.scala
new file mode 100644
index 000000000..a9c0c82ca
--- /dev/null
+++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/KafkaConnectExtractor.scala
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.cloud.common.sink.extractors
+
+import cats.implicits.catsSyntaxEitherId
+import com.typesafe.scalalogging.LazyLogging
+import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
+import org.apache.kafka.connect.data.Schema
+import org.apache.kafka.connect.data.Struct
+import org.apache.kafka.connect.errors.ConnectException
+import org.apache.kafka.connect.sink.SinkRecord
+
+import java.util
+import java.lang
+import java.nio.ByteBuffer
+import scala.jdk.CollectionConverters.MapHasAsJava
+
+object KafkaConnectExtractor extends LazyLogging {
+
+ def extractFromKey(sinkRecord: SinkRecord, path: Option[String]): Either[Throwable, AnyRef] =
+ extract(sinkRecord.key(), Option(sinkRecord.keySchema()), path)
+
+ def extractFromValue(sinkRecord: SinkRecord, path: Option[String]): Either[Throwable, AnyRef] =
+ extract(sinkRecord.value(), Option(sinkRecord.valueSchema()), path)
+
+ // TODO: test with all different types
+ private def extract(
+ extractFrom: AnyRef,
+ extractSchema: Option[Schema],
+ maybePath: Option[String],
+ ): Either[Throwable, AnyRef] = {
+
+ val maybePnp: Option[PartitionNamePath] = maybePath.map(path => PartitionNamePath(path.split('.').toIndexedSeq: _*))
+ (extractFrom, maybePnp) match {
+ case (shortVal: lang.Short, _) => shortVal.asRight
+ case (boolVal: lang.Boolean, _) => boolVal.asRight
+ case (stringVal: lang.String, _) => stringVal.asRight
+ case (longVal: lang.Long, _) => longVal.asRight
+ case (intVal: lang.Integer, _) => intVal.asRight
+ case (byteVal: lang.Byte, _) => byteVal.asRight
+ case (doubleVal: lang.Double, _) => doubleVal.asRight
+ case (floatVal: lang.Float, _) => floatVal.asRight
+ case (bytesVal: Array[Byte], _) => bytesVal.asRight
+ case (bytesVal: ByteBuffer, _) => bytesVal.array().asRight
+ case (arrayVal: Array[_], _) => arrayVal.asRight
+ case (decimal: BigDecimal, _) => decimal.asRight
+ case (decimal: java.math.BigDecimal, _) => decimal.asRight
+ case null => null
+ case (structVal: Struct, Some(pnp)) => StructExtractor.extractPathFromStruct(structVal, pnp)
+ case (mapVal: Map[_, _], Some(pnp)) => MapExtractor.extractPathFromMap(mapVal.asJava, pnp, extractSchema.orNull)
+ case (mapVal: util.Map[_, _], Some(pnp)) => MapExtractor.extractPathFromMap(mapVal, pnp, extractSchema.orNull)
+ case (listVal: util.List[_], Some(pnp)) => ArrayExtractor.extractPathFromArray(listVal, pnp, extractSchema.orNull)
+ case otherVal => new ConnectException("Unknown value type: " + otherVal.getClass.getName).asLeft
+ }
+ }
+
+}
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala
similarity index 100%
rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala
rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PrimitiveExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PrimitiveExtractor.scala
similarity index 100%
rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PrimitiveExtractor.scala
rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PrimitiveExtractor.scala
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala
similarity index 62%
rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala
rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala
index 8aabd3b8b..92e27a411 100644
--- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala
+++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala
@@ -21,6 +21,8 @@ import ExtractorErrorType.UnexpectedType
import PrimitiveExtractor.anyToEither
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import org.apache.kafka.connect.data.Schema.Type._
+import org.apache.kafka.connect.data.Decimal
+import org.apache.kafka.connect.data.Field
import org.apache.kafka.connect.data.Struct
/**
@@ -37,23 +39,29 @@ object StructExtractor extends LazyLogging {
private def extractPrimitive(struct: Struct, fieldName: String): Either[ExtractorError, String] =
Option(struct.schema().field(fieldName))
.fold(ExtractorError(ExtractorErrorType.MissingValue).asLeft[String]) {
- _.schema().`type`() match {
- case INT8 => anyToEither(struct.getInt8(fieldName))
- case INT16 => anyToEither(struct.getInt16(fieldName))
- case INT32 => anyToEither(struct.getInt32(fieldName))
- case INT64 => anyToEither(struct.getInt64(fieldName))
- case FLOAT32 => anyToEither(struct.getFloat32(fieldName))
- case FLOAT64 => anyToEither(struct.getFloat64(fieldName))
- case BOOLEAN => anyToEither(struct.getBoolean(fieldName))
- case STRING => anyToEither(struct.getString(fieldName))
- case BYTES =>
- Option(struct.getBytes(fieldName))
- .fold(ExtractorError(ExtractorErrorType.MissingValue).asLeft[String])(byteVal =>
- new String(byteVal).asRight[ExtractorError],
- )
- case other => logger.error("Non-primitive values not supported: " + other)
- ExtractorError(ExtractorErrorType.UnexpectedType).asLeft[String]
- }
+ f: Field =>
+ (f.schema().`type`(), f.schema().name()) match {
+ case (INT8, _) => anyToEither(struct.getInt8(fieldName))
+ case (INT16, _) => anyToEither(struct.getInt16(fieldName))
+ case (INT32, _) => anyToEither(struct.getInt32(fieldName))
+ case (INT64, _) => anyToEither(struct.getInt64(fieldName))
+ case (FLOAT32, _) => anyToEither(struct.getFloat32(fieldName))
+ case (FLOAT64, _) => anyToEither(struct.getFloat64(fieldName))
+ case (BOOLEAN, _) => anyToEither(struct.getBoolean(fieldName))
+ case (STRING, _) => anyToEither(struct.getString(fieldName))
+ case (BYTES, Decimal.LOGICAL_NAME) =>
+ struct.get(fieldName) match {
+ case bd: java.math.BigDecimal => anyToEither(bd.toPlainString)
+ case _ => ExtractorError(ExtractorErrorType.UnexpectedType).asLeft[String]
+ }
+ case (BYTES, _) =>
+ Option(struct.getBytes(fieldName))
+ .fold(ExtractorError(ExtractorErrorType.MissingValue).asLeft[String])(byteVal =>
+ new String(byteVal).asRight[ExtractorError],
+ )
+ case (other, _) => logger.error("Non-primitive values not supported: " + other)
+ ExtractorError(ExtractorErrorType.UnexpectedType).asLeft[String]
+ }
}
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedArrayExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedArrayExtractor.scala
similarity index 100%
rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedArrayExtractor.scala
rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedArrayExtractor.scala
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedComplexTypeExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedComplexTypeExtractor.scala
similarity index 100%
rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedComplexTypeExtractor.scala
rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedComplexTypeExtractor.scala
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedMapExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedMapExtractor.scala
similarity index 100%
rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedMapExtractor.scala
rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedMapExtractor.scala
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedPrimitiveExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedPrimitiveExtractor.scala
similarity index 100%
rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedPrimitiveExtractor.scala
rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedPrimitiveExtractor.scala
diff --git a/kafka-connect-http/src/it/resources/logback.xml b/kafka-connect-http/src/it/resources/logback.xml
new file mode 100644
index 000000000..5096e4808
--- /dev/null
+++ b/kafka-connect-http/src/it/resources/logback.xml
@@ -0,0 +1,28 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ %d{HH:mm:ss} %-5p %-25.25C{1} %15.15M %3.3L] %m%n
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTaskIT.scala b/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTaskIT.scala
new file mode 100644
index 000000000..200532b1a
--- /dev/null
+++ b/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTaskIT.scala
@@ -0,0 +1,247 @@
+package io.lenses.streamreactor.connect.http.sink
+
+import cats.effect.IO
+import cats.effect.Resource
+import cats.effect.testing.scalatest.AsyncIOSpec
+import cats.implicits.catsSyntaxOptionId
+import cats.implicits.none
+import com.github.tomakehurst.wiremock.WireMockServer
+import com.github.tomakehurst.wiremock.client.WireMock.{ post => httpPost }
+import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo
+import com.github.tomakehurst.wiremock.client.WireMock.aResponse
+import com.github.tomakehurst.wiremock.client.WireMock.exactly
+import com.github.tomakehurst.wiremock.client.WireMock.urlMatching
+import com.github.tomakehurst.wiremock.client.WireMock.containing
+import com.github.tomakehurst.wiremock.client.WireMock.equalToXml
+import com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
+import io.lenses.streamreactor.connect.http.sink.client.HttpMethod
+import io.lenses.streamreactor.connect.http.sink.config.BatchConfiguration
+import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig
+import org.apache.kafka.connect.errors.ConnectException
+import org.apache.kafka.connect.sink.SinkRecord
+import org.scalatest.concurrent.Eventually
+import org.scalatest.funsuite.AsyncFunSuite
+import org.scalatest.time.Minute
+import org.scalatest.time.Span
+import scala.jdk.CollectionConverters.MapHasAsJava
+import scala.jdk.CollectionConverters.SeqHasAsJava
+
+class HttpSinkTaskIT extends AsyncFunSuite with AsyncIOSpec with Eventually {
+ override implicit def patienceConfig: PatienceConfig = PatienceConfig(timeout = Span(1, Minute))
+ def wireMockServer: Resource[IO, WireMockServer] =
+ for {
+ server <- Resource.eval(
+ IO.delay(new WireMockServer(wireMockConfig().dynamicPort().bindAddress(Host))),
+ )
+ _ <- Resource.make(IO.delay(server.start()))(_ => IO.delay(server.stop()))
+ } yield server
+
+ def sinkTask(config: String): Resource[IO, HttpSinkTask] =
+ for {
+ task <- Resource.eval(IO.delay(new HttpSinkTask()))
+ _ <- Resource.make(IO.delay(task.start(Map("connect.http.config" -> config).asJava)))(_ => IO.delay(task.stop()))
+ } yield task
+
+ private val Host = "localhost"
+ private val users = SampleData.Employees
+
+ test("data triggers post calls") {
+ val path = "/awesome/endpoint"
+ (for {
+ server <- wireMockServer
+ configuration = HttpSinkConfig(
+ authentication = Option.empty,
+ method = HttpMethod.Post,
+ endpoint = s"http://$Host:${server.port()}/awesome/endpoint",
+ content = "test",
+ headers = Seq(),
+ sslConfig = Option.empty,
+ batch = BatchConfiguration(2L.some, none, none).some,
+ errorThreshold = none,
+ uploadSyncPeriod = none,
+ ).toJson
+ _ = server.stubFor(httpPost(urlEqualTo(path)).willReturn(aResponse().withStatus(200)))
+ sinkTask <- sinkTask(configuration)
+ _ = sinkTask.put(
+ users.zipWithIndex.map {
+ case (struct, i) => new SinkRecord("myTopic", 0, null, null, SampleData.EmployeesSchema, struct, i.toLong)
+ }.asJava,
+ )
+ } yield server).use { server =>
+ IO.delay(eventually(server.verify(exactly(3), postRequestedFor(urlEqualTo(path)))))
+ }
+ }
+
+ test("data triggers post calls to individual templated endpoints for single records") {
+ val path = "/awesome/endpoint/.*"
+ (for {
+ server <- wireMockServer
+ config = HttpSinkConfig(
+ authentication = Option.empty,
+ method = HttpMethod.Post,
+ endpoint = s"http://$Host:${server.port()}/awesome/endpoint/{{value.name}}",
+ content = "{salary: {{value.salary}}}",
+ headers = Seq(),
+ sslConfig = Option.empty,
+ batch = BatchConfiguration(1L.some, none, none).some,
+ errorThreshold = none,
+ uploadSyncPeriod = none,
+ ).toJson
+ _ = server.stubFor(httpPost(urlMatching(path)).willReturn(aResponse().withStatus(200)))
+ sinkTask <- sinkTask(config)
+ _ = sinkTask.put(
+ users.zipWithIndex.map {
+ case (struct, i) => new SinkRecord("myTopic", 0, null, null, SampleData.EmployeesSchema, struct, i.toLong)
+ }.asJava,
+ )
+ } yield server).use { server =>
+ // verify REST calls
+ IO.delay {
+ eventually {
+ server.verify(
+ exactly(1),
+ postRequestedFor(urlEqualTo("/awesome/endpoint/martin")).withRequestBody(containing("{salary: 35896.00}")),
+ )
+ server.verify(exactly(1), postRequestedFor(urlEqualTo("/awesome/endpoint/jackie")))
+ server.verify(exactly(1), postRequestedFor(urlEqualTo("/awesome/endpoint/adam")))
+ server.verify(exactly(1), postRequestedFor(urlEqualTo("/awesome/endpoint/jonny")))
+ server.verify(exactly(1), postRequestedFor(urlEqualTo("/awesome/endpoint/jim")))
+ server.verify(exactly(1), postRequestedFor(urlEqualTo("/awesome/endpoint/wilson")))
+ server.verify(exactly(1), postRequestedFor(urlEqualTo("/awesome/endpoint/milson")))
+ }
+ }
+ }
+ }
+
+ // TODO: I don't think this is a valid use case unless you want to aggregate records. It doesn't really make sense, perhaps this should throw an error instead.
+ test("data batched to single endpoint for multiple records using a simple template uses the first record") {
+ val path = "/awesome/endpoint/.*"
+ (for {
+ server <- wireMockServer
+ configuration = HttpSinkConfig(
+ authentication = Option.empty,
+ method = HttpMethod.Post,
+ endpoint = s"http://$Host:${server.port()}/awesome/endpoint/{{value.name}}",
+ content = "{salary: {{value.salary}}}",
+ headers = Seq(),
+ sslConfig = Option.empty,
+ batch = BatchConfiguration(7L.some, none, none).some,
+ errorThreshold = none,
+ uploadSyncPeriod = none,
+ ).toJson
+ _ = server.stubFor(httpPost(urlMatching(path)).willReturn(aResponse().withStatus(200)))
+ task <- sinkTask(configuration)
+ _ = task.put(
+ users.zipWithIndex.map {
+ case (struct, i) => new SinkRecord("myTopic", 0, null, null, SampleData.EmployeesSchema, struct, i.toLong)
+ }.asJava,
+ )
+ } yield server).use { server =>
+ IO.delay {
+ eventually {
+ server.verify(
+ exactly(1),
+ postRequestedFor(urlEqualTo("/awesome/endpoint/martin")).withRequestBody(containing("{salary: 35896.00}")),
+ )
+ }
+ }
+ }
+
+ }
+
+ test("data batched to single endpoint for multiple records using a loop template") {
+ val path = "/awesome/endpoint/.*"
+ val expected =
+ """
+ |
+ | 35896.00
+ | 60039.00
+ | 65281.00
+ | 66560.00
+ | 63530.00
+ | 23309.00
+ | 10012.00
+ |
+ |""".stripMargin
+
+ (for {
+ server <- wireMockServer
+ configuration = HttpSinkConfig(
+ authentication = Option.empty,
+ method = HttpMethod.Post,
+ endpoint = s"http://$Host:${server.port()}/awesome/endpoint/{{value.name}}",
+ content =
+ s"""
+ |
+ | {{#message}}
+ | {{value.salary}}
+ | {{/message}}
+ | """.stripMargin,
+ headers = Seq(),
+ sslConfig = Option.empty,
+ batch = BatchConfiguration(7L.some, none, none).some,
+ errorThreshold = none,
+ uploadSyncPeriod = none,
+ ).toJson
+ _ = server.stubFor(httpPost(urlMatching(path)).willReturn(aResponse().withStatus(200)))
+ task <- sinkTask(configuration)
+ _ = task.put(
+ users.zipWithIndex.map {
+ case (struct, i) => new SinkRecord("myTopic", 0, null, null, SampleData.EmployeesSchema, struct, i.toLong)
+ }.asJava,
+ )
+ } yield server).use { server =>
+ IO.delay {
+ // verify REST calls
+ eventually {
+ server.verify(
+ exactly(1),
+ postRequestedFor(urlEqualTo("/awesome/endpoint/martin")).withRequestBody(equalToXml(expected)),
+ )
+ }
+ }
+ }
+ }
+
+ test("broken endpoint will return failure and error will be thrown") {
+
+ val path = "/awesome/endpoint"
+ (for {
+ server <- wireMockServer
+ configuration = HttpSinkConfig(
+ Option.empty,
+ HttpMethod.Post,
+ s"http://$Host:${server.port()}/awesome/endpoint",
+ s"""
+ | Ultimately not important for this test""".stripMargin,
+ Seq(),
+ Option.empty,
+ BatchConfiguration(
+ 1L.some,
+ none,
+ none,
+ ).some,
+ none,
+ none,
+ ).toJson
+ _ = server.stubFor(httpPost(urlMatching(path)).willReturn(aResponse().withStatus(404)))
+ task <- sinkTask(configuration)
+ } yield task).use { task =>
+ IO.delay {
+ eventually {
+ // put data
+ assertThrows[ConnectException] {
+ task.put(
+ users.zipWithIndex.map {
+ case (struct, i) =>
+ new SinkRecord("myTopic", 0, null, null, SampleData.EmployeesSchema, struct, i.toLong)
+ }.asJava,
+ )
+ }
+ }
+ }
+ }
+ }
+
+}
diff --git a/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/SampleData.scala b/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/SampleData.scala
new file mode 100644
index 000000000..e097df80b
--- /dev/null
+++ b/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/SampleData.scala
@@ -0,0 +1,51 @@
+package io.lenses.streamreactor.connect.http.sink
+
+import org.apache.kafka.connect.data.Decimal
+import org.apache.kafka.connect.data.Schema
+import org.apache.kafka.connect.data.SchemaBuilder
+import org.apache.kafka.connect.data.Struct
+
+object SampleData {
+
+ val DecimalSchema: Schema = Decimal.builder(18).optional().build()
+ val EmployeesSchema: Schema = SchemaBuilder.struct()
+ .field("name", SchemaBuilder.string().required().build())
+ .field("title", SchemaBuilder.string().optional().build())
+ .field("salary", DecimalSchema)
+ .build()
+
+ def bigDecimal(stringNum: String): java.math.BigDecimal =
+ BigDecimal(stringNum).setScale(2).bigDecimal
+
+ val Employees: List[Struct] = List(
+ new Struct(EmployeesSchema)
+ .put("name", "martin")
+ .put("title", "mr")
+ .put("salary", bigDecimal("35896.00")),
+ new Struct(EmployeesSchema)
+ .put("name", "jackie")
+ .put("title", "mrs")
+ .put("salary", bigDecimal("60039.00")),
+ new Struct(EmployeesSchema)
+ .put("name", "adam")
+ .put("title", "mr")
+ .put("salary", bigDecimal("65281.00")),
+ new Struct(EmployeesSchema)
+ .put("name", "jonny")
+ .put("title", "mr")
+ .put("salary", bigDecimal("66560.00")),
+ new Struct(EmployeesSchema)
+ .put("name", "jim")
+ .put("title", "mr")
+ .put("salary", bigDecimal("63530.00")),
+ new Struct(EmployeesSchema)
+ .put("name", "wilson")
+ .put("title", "dog")
+ .put("salary", bigDecimal("23309.00")),
+ new Struct(EmployeesSchema)
+ .put("name", "milson")
+ .put("title", "dog")
+ .put("salary", bigDecimal("10012.00")),
+ )
+
+}
diff --git a/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/client/HttpRequestSenderIT.scala b/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/client/HttpRequestSenderIT.scala
new file mode 100644
index 000000000..63231ff4f
--- /dev/null
+++ b/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/client/HttpRequestSenderIT.scala
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.client
+
+import cats.effect.IO
+import cats.effect.testing.scalatest.AsyncIOSpec
+import cats.implicits.catsSyntaxOptionId
+import com.github.tomakehurst.wiremock.WireMockServer
+import com.github.tomakehurst.wiremock.client.BasicCredentials
+import com.github.tomakehurst.wiremock.client.WireMock
+import com.github.tomakehurst.wiremock.client.WireMock._
+import com.github.tomakehurst.wiremock.matching.EqualToPattern
+import io.lenses.streamreactor.connect.http.sink.tpl.ProcessedTemplate
+import org.http4s.EntityDecoder
+import org.http4s.Method
+import org.http4s.Request
+import org.http4s.client.Client
+import org.http4s.jdkhttpclient.JdkHttpClient
+import org.mockito.ArgumentMatchers
+import org.mockito.MockitoSugar
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.funsuite.AsyncFunSuiteLike
+import org.scalatest.matchers.should.Matchers
+
+class HttpRequestSenderIT
+ extends AsyncIOSpec
+ with AsyncFunSuiteLike
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with MockitoSugar
+ with Matchers {
+
+ private val Host = "localhost"
+
+ private val wireMockServer = new WireMockServer()
+ private val expectedUrl = "/some/thing"
+
+ private val sinkName = "mySinkName"
+
+ override protected def beforeAll(): Unit = {
+ wireMockServer.start()
+ WireMock.configureFor(Host, wireMockServer.port())
+ }
+
+ override protected def afterAll(): Unit = wireMockServer.stop()
+
+ override protected def beforeEach(): Unit = wireMockServer.resetRequests()
+
+ test("should send a PUT request by default") {
+
+ stubFor(put(urlEqualTo(expectedUrl))
+ .willReturn(aResponse.withHeader("Content-Type", "text/plain")
+ .withBody("Hello world!")))
+
+ JdkHttpClient.simple[IO].use {
+ client =>
+ val requestSender = new HttpRequestSender(
+ sinkName,
+ Option.empty,
+ Method.PUT,
+ client,
+ )
+ val processedTemplate = ProcessedTemplate(
+ s"${wireMockServer.baseUrl()}$expectedUrl",
+ "mycontent",
+ Seq("X-Awesome-Header" -> "stream-reactor"),
+ )
+ requestSender.sendHttpRequest(processedTemplate).asserting {
+ response =>
+ WireMock.verify(
+ putRequestedFor(urlEqualTo(expectedUrl))
+ .withHeader("X-Awesome-Header", equalTo("stream-reactor"))
+ .withRequestBody(new EqualToPattern("mycontent")),
+ )
+ response should be(())
+ }
+ }
+
+ }
+
+ test("should send a POST request with basic auth") {
+
+ stubFor(
+ post(urlEqualTo(expectedUrl))
+ .withBasicAuth("myUser", "myPassword")
+ .willReturn(aResponse.withHeader("Content-Type", "text/plain")
+ .withBody("Hello world!")),
+ )
+
+ JdkHttpClient.simple[IO].use {
+ client =>
+ val requestSender = new HttpRequestSender(
+ sinkName,
+ BasicAuthentication("myUser", "myPassword").some,
+ Method.POST,
+ client,
+ )
+ val processedTemplate = ProcessedTemplate(
+ s"${wireMockServer.baseUrl()}$expectedUrl",
+ "mycontent",
+ Seq("X-Awesome-Header" -> "stream-reactor"),
+ )
+ requestSender.sendHttpRequest(processedTemplate).asserting {
+ response =>
+ WireMock.verify(
+ postRequestedFor(urlEqualTo(expectedUrl))
+ .withHeader("X-Awesome-Header", equalTo("stream-reactor"))
+ .withBasicAuth(new BasicCredentials("myUser", "myPassword"))
+ .withRequestBody(new EqualToPattern("mycontent")),
+ )
+ response should be(())
+ }
+ }
+
+ }
+
+ test("should error when client is thoroughly broken") {
+
+ val expectedException = new IllegalArgumentException("No fun allowed today")
+ val mockClient = mock[Client[IO]]
+
+ when(mockClient.expect[String](
+ ArgumentMatchers.any[Request[IO]],
+ )(
+ ArgumentMatchers.any[EntityDecoder[IO, String]],
+ )).thenReturn(
+ IO.raiseError(
+ expectedException,
+ ),
+ )
+
+ val requestSender = new HttpRequestSender(
+ sinkName,
+ Option.empty,
+ Method.PUT,
+ mockClient,
+ )
+ val processedTemplate = ProcessedTemplate(
+ s"${wireMockServer.baseUrl()}$expectedUrl",
+ "mycontent",
+ Seq("X-Awesome-Header" -> "stream-reactor"),
+ )
+ requestSender.sendHttpRequest(processedTemplate).assertThrows[IllegalArgumentException]
+ }
+
+}
diff --git a/kafka-connect-http/src/main/resources/http-sink-ascii.txt b/kafka-connect-http/src/main/resources/http-sink-ascii.txt
new file mode 100644
index 000000000..d23310380
--- /dev/null
+++ b/kafka-connect-http/src/main/resources/http-sink-ascii.txt
@@ -0,0 +1,23 @@
+
+ ████████▀▀▀▀▀███████████████████████████████████████████████████████████████████
+ █████▀ ▀████████████████████████████████████████████████████████████████
+ ███▀ ▄█████▄ ▀██████████████████████████████████████████████████████████████
+ ███ ▄███████▄ ██████ █████▌ █▌ ████ ███ ▄▄ ██ ███ ▄▄ ███
+ ███ █████████ ██████ █████▌ ██████▌ ▀██ ██ ██████ ██████ ███████
+ ███ ▀███████▀ ██████ █████▌ ██▌ █▄ █ ███▄▄ ██ ███▄▄ ███
+ ████▄ ▄███████ █████▌ ██████▌ ███ ███████ █ ███████████ ██
+ █████████ ████████████ ▌ █▌ ████▄ ██▄ ▄██ █▄ ▄███
+ █████████ ████████████████████████████████████████████████████████████████████
+ █████████ ▄████████████████████████████████████████████████████████████████████
+ ████████████████████████████████████████████████████████████████████████████████
+
+ #""#####""## #""""""""# #""""""""# ##"""""""`Y# #P""""""`## oo dP
+ # ##### ## #### #### #### #### ## ##### # ## # #####..# 88
+ # `# #### #### #### #### #' .# #. `Y# dP 88d888b. 88 .dP
+ # ##### ## #### #### #### #### ## ######## #######. # 88 88' `88 88888"
+ # ##### ## #### #### #### #### ## ######## #. .###' # 88 88 88 88 `8b.
+ # ##### ## #### #### #### #### ## ######## ## #b. .d# dP dP dP dP `YP
+ ############ ########## ########## ############ ###########
+
+ (Alpha) This is an alpha release. Please report any feedback, bugs or strange
+ behaviour via our Slack community at https://www.launchpass.com/lensesio
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConnector.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConnector.scala
new file mode 100644
index 000000000..5b1dddbf8
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConnector.scala
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink
+
+import cats.implicits.catsSyntaxOptionId
+import com.typesafe.scalalogging.LazyLogging
+import io.lenses.streamreactor.common.utils.JarManifest
+import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfigDef
+import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.connect.connector.Task
+import org.apache.kafka.connect.sink.SinkConnector
+
+import java.util
+import scala.jdk.CollectionConverters.MapHasAsJava
+import scala.jdk.CollectionConverters.MapHasAsScala
+import scala.jdk.CollectionConverters.SeqHasAsJava
+
+class HttpSinkConnector extends SinkConnector with LazyLogging {
+
+ private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)
+ private var props: Option[Map[String, String]] = Option.empty
+ private var maybeSinkName: Option[String] = Option.empty
+
+ private def sinkName = maybeSinkName.getOrElse("Lenses.io HTTP Sink")
+ override def version(): String = manifest.version()
+
+ override def taskClass(): Class[_ <: Task] = classOf[HttpSinkTask]
+
+ override def config(): ConfigDef = HttpSinkConfigDef.config
+
+ override def start(props: util.Map[String, String]): Unit = {
+ val propsAsScala = props.asScala.toMap
+ this.props = propsAsScala.some
+ this.maybeSinkName = propsAsScala.get("name")
+
+ logger.info(s"[$sinkName] Creating HTTP sink connector")
+ }
+
+ override def stop(): Unit = ()
+
+ override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] = {
+ logger.info(s"[$sinkName] Creating $maxTasks tasks config")
+ List.fill(maxTasks) {
+ props.map(_.asJava).getOrElse(Map.empty[String, String].asJava)
+ }.asJava
+ }
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala
new file mode 100644
index 000000000..3d2625c03
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink
+
+import cats.effect.Deferred
+import cats.effect.IO
+import cats.effect.Ref
+import cats.effect.unsafe.IORuntime
+import com.typesafe.scalalogging.LazyLogging
+import io.lenses.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
+import io.lenses.streamreactor.common.utils.JarManifest
+import io.lenses.streamreactor.connect.cloud.common.model.Offset
+import io.lenses.streamreactor.connect.cloud.common.model.Topic
+import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
+import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig
+import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig.fromJson
+import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfigDef.configProp
+import io.lenses.streamreactor.connect.http.sink.tpl.RawTemplate
+import io.lenses.streamreactor.connect.http.sink.tpl.TemplateType
+import org.apache.kafka.clients.consumer.OffsetAndMetadata
+import org.apache.kafka.common.{ TopicPartition => KafkaTopicPartition }
+import org.apache.kafka.connect.errors.ConnectException
+import org.apache.kafka.connect.sink.SinkRecord
+import org.apache.kafka.connect.sink.SinkTask
+import cats.syntax.all._
+import java.util
+import scala.jdk.CollectionConverters.IterableHasAsScala
+import scala.jdk.CollectionConverters.MapHasAsJava
+import scala.jdk.CollectionConverters.MapHasAsScala
+
+class HttpSinkTask extends SinkTask with LazyLogging {
+ private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)
+ implicit val runtime = IORuntime.global
+ private var maybeTemplate: Option[TemplateType] = Option.empty
+ private var maybeWriterManager: Option[HttpWriterManager] = Option.empty
+ private var maybeSinkName: Option[String] = Option.empty
+ private def sinkName = maybeSinkName.getOrElse("Lenses.io HTTP Sink")
+ private val deferred: Deferred[IO, Either[Throwable, Unit]] = Deferred.unsafe[IO, Either[Throwable, Unit]]
+
+ private val errorRef: Ref[IO, List[Throwable]] = Ref.of[IO, List[Throwable]](List.empty).unsafeRunSync()
+
+ override def start(props: util.Map[String, String]): Unit = {
+
+ printAsciiHeader(manifest, "/http-sink-ascii.txt")
+
+ val propsAsScala = props.asScala
+ maybeSinkName = propsAsScala.get("name")
+
+ IO
+ .fromEither(parseConfig(propsAsScala.get(configProp)))
+ .flatMap { config =>
+ val template = RawTemplate(config.endpoint, config.content, config.headers)
+ val writerManager = HttpWriterManager(sinkName, config, template, deferred)
+ val refUpdateCallback: Throwable => Unit =
+ (err: Throwable) => {
+ {
+ for {
+ updated <- this.errorRef.getAndUpdate {
+ lts => lts :+ err
+ }
+ } yield updated
+ }.unsafeRunSync()
+ ()
+
+ }
+ writerManager.start(refUpdateCallback)
+ .map { _ =>
+ this.maybeTemplate = Some(template)
+ this.maybeWriterManager = Some(writerManager)
+ }
+ }
+ .recoverWith {
+ case e =>
+ // errors at this point simply need to be thrown
+ IO.raiseError[Unit](new RuntimeException("Unexpected error occurred during sink start", e))
+ }.unsafeRunSync()
+ }
+
+ private def parseConfig(propVal: Option[String]): Either[Throwable, HttpSinkConfig] =
+ propVal.toRight(new IllegalArgumentException("No prop found"))
+ .flatMap(fromJson)
+
+ override def put(records: util.Collection[SinkRecord]): Unit = {
+
+ logger.debug(s"[$sinkName] put call with ${records.size()} records")
+ val storedErrors = errorRef.get.unsafeRunSync()
+
+ if (storedErrors.nonEmpty) {
+ throw new ConnectException(s"Previous operation failed with error: " +
+ storedErrors.map(_.getMessage).mkString(";"))
+
+ } else {
+
+ logger.trace(s"[$sinkName] building template")
+ val template = maybeTemplate.getOrElse(throw new IllegalStateException("No template available in put"))
+ val writerManager =
+ maybeWriterManager.getOrElse(throw new IllegalStateException("No writer manager available in put"))
+
+ records
+ .asScala
+ .toSeq
+ .map {
+ rec =>
+ Topic(rec.topic()).withPartition(rec.kafkaPartition()).withOffset(Offset(rec.kafkaOffset())) -> rec
+ }
+ .groupBy {
+ case (tpo, _) => tpo.topic
+ }
+ .foreach {
+ case (tp, records) =>
+ val recs = records.map(_._2)
+ val eitherRendered = template.renderRecords(recs)
+ eitherRendered match {
+ case Left(ex) =>
+ logger.error(s"[$sinkName] Template Rendering Failure", ex)
+ IO.raiseError(ex)
+ // rendering errors can not be recovered from as configuration should be amended
+
+ case Right(renderedRecs) =>
+ logger.trace(s"[$sinkName] Rendered successful: $renderedRecs")
+ writerManager
+ .getWriter(tp)
+ .flatMap {
+ writer =>
+ writer.add(renderedRecs)
+ }
+ .unsafeRunSync()
+ }
+
+ }
+ }
+ }
+
+ override def preCommit(
+ currentOffsets: util.Map[KafkaTopicPartition, OffsetAndMetadata],
+ ): util.Map[KafkaTopicPartition, OffsetAndMetadata] = {
+
+ val writerManager =
+ maybeWriterManager.getOrElse(throw new IllegalStateException("No writer manager available in put"))
+
+ def getDebugInfo(in: util.Map[KafkaTopicPartition, OffsetAndMetadata]): String =
+ in.asScala.map {
+ case (k, v) =>
+ k.topic() + "-" + k.partition() + "=" + v.offset()
+ }.mkString(";")
+
+ logger.debug(s"[{}] preCommit with offsets={}",
+ sinkName,
+ getDebugInfo(Option(currentOffsets).getOrElse(new util.HashMap())): Any,
+ )
+
+ val topicPartitionOffsetTransformed: Map[TopicPartition, OffsetAndMetadata] =
+ Option(currentOffsets)
+ .getOrElse(new util.HashMap())
+ .asScala
+ .map {
+ case (tp, offsetAndMetadata) =>
+ Topic(tp.topic()).withPartition(tp.partition()) -> offsetAndMetadata
+ }
+ .toMap
+
+ (for {
+ offsets <- writerManager
+ .preCommit(topicPartitionOffsetTransformed)
+ tpoTransformed = offsets.map {
+ case (topicPartition, offsetAndMetadata) =>
+ (topicPartition.toKafka, offsetAndMetadata)
+ }.asJava
+ _ <- IO(logger.debug(s"[{}] Returning latest written offsets={}", sinkName, getDebugInfo(tpoTransformed)))
+ } yield tpoTransformed).unsafeRunSync()
+
+ }
+
+ override def stop(): Unit =
+ (for {
+ _ <- maybeWriterManager.traverse(_.close)
+ _ <- deferred.complete(().asRight)
+ } yield ()).unsafeRunSync()
+
+ override def version(): String = manifest.version()
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala
new file mode 100644
index 000000000..a34c25ff6
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink
+
+import cats.effect.IO
+import cats.effect.Ref
+import cats.effect.unsafe.implicits.global
+import com.typesafe.scalalogging.LazyLogging
+import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.Count
+import io.lenses.streamreactor.connect.http.sink.OffsetMergeUtils.createCommitContextForEvaluation
+import io.lenses.streamreactor.connect.http.sink.OffsetMergeUtils.updateCommitContextPostCommit
+import io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender
+import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext
+import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord
+import io.lenses.streamreactor.connect.http.sink.tpl.TemplateType
+import org.apache.kafka.clients.consumer.OffsetAndMetadata
+
+import scala.collection.immutable.Queue
+
+class HttpWriter(
+ sinkName: String,
+ commitPolicy: CommitPolicy,
+ sender: HttpRequestSender,
+ template: TemplateType,
+ recordsQueueRef: Ref[IO, Queue[RenderedRecord]],
+ commitContextRef: Ref[IO, HttpCommitContext],
+ errorThreshold: Int,
+) extends LazyLogging {
+ private val maybeBatchSize: Option[Int] = commitPolicy.conditions.collectFirst {
+ case Count(maxCount) => maxCount.toInt
+ }
+
+ // TODO: feedback to kafka a warning if the queue gets too large
+
+ // adds records to the queue. Returns immediately - processing occurs asynchronously.
+ def add(newRecords: Seq[RenderedRecord]): IO[Unit] =
+ recordsQueueRef.modify { currentQueue =>
+ val updatedQueue = currentQueue.enqueueAll(newRecords)
+ (updatedQueue, ())
+ }
+
+ // called on a loop to process the queue
+ def process(): IO[Unit] = {
+ for {
+ _ <- IO(
+ logger.debug(s"[$sinkName] HttpWriter.process, queue size: ${recordsQueueRef.get.map(_.size).unsafeRunSync()}"),
+ )
+ recordQueue <- recordsQueueRef.get
+ res <- recordQueue match {
+ case recordsQueue: Queue[RenderedRecord] if recordsQueue.nonEmpty =>
+ for {
+ _ <- IO(logger.debug(s"[$sinkName] Queue is not empty"))
+ takeHowMany = maybeBatchSize.getOrElse(recordsQueue.size)
+ _ <- IO(logger.debug(s"[$sinkName] Required batch size is $takeHowMany"))
+
+ batch: Queue[RenderedRecord] <- IO(recordsQueue.take(takeHowMany))
+ _ <- IO(logger.info(s"[$sinkName] Batch of ${batch.size}"))
+ _ <- modifyCommitContext(batch)
+ refSet <- recordsQueueRef.set(dequeueN(recordsQueue, takeHowMany))
+ } yield refSet
+ case _ =>
+ IO(logger.trace(s"[$sinkName] Empty record queue"))
+ }
+ _ <- resetErrorsInCommitContext()
+ } yield res
+ }.onError {
+ e =>
+ for {
+ uniqueError: Option[Throwable] <- addErrorToCommitContext(e)
+ res <- if (uniqueError.nonEmpty) {
+ IO(logger.error("Error in HttpWriter", e)) *> IO.raiseError(e)
+ } else {
+ IO(logger.error("Error in HttpWriter but not reached threshold so ignoring", e)) *> IO.unit
+ }
+ } yield res
+ }
+
+ def preCommit(
+ initialOffsetAndMetaMap: Map[TopicPartition, OffsetAndMetadata],
+ ): IO[Map[TopicPartition, OffsetAndMetadata]] =
+ commitContextRef.get.map {
+ case HttpCommitContext(_, committedOffsets, _, _, _, _, _) =>
+ committedOffsets.flatMap {
+ case (tp, offset) =>
+ for {
+ initialOffsetAndMeta <- initialOffsetAndMetaMap.get(tp)
+
+ } yield tp -> new OffsetAndMetadata(offset.value,
+ initialOffsetAndMeta.leaderEpoch(),
+ initialOffsetAndMeta.metadata(),
+ )
+ }
+ case _ => initialOffsetAndMetaMap
+ }.orElse(IO(Map.empty[TopicPartition, OffsetAndMetadata]))
+
+ private def addErrorToCommitContext(e: Throwable): IO[Option[Throwable]] = {
+ val updatedCC = commitContextRef.getAndUpdate {
+ commitContext => commitContext.addError(e)
+ }
+ val maxError = updatedCC.map(cc =>
+ cc
+ .errors
+ .maxByOption { case (_, errSeq) => errSeq.size }
+ .filter { case (_, errSeq) => errSeq.size > errorThreshold }
+ .flatMap(_._2.headOption),
+ )
+ maxError
+ }
+
+ private def resetErrorsInCommitContext(): IO[Unit] =
+ commitContextRef.getAndUpdate {
+ commitContext => commitContext.resetErrors
+ } *> IO.unit
+
+ private def updateCommitContextIfFlush(
+ cc: HttpCommitContext,
+ batch: Queue[RenderedRecord],
+ ): IO[(HttpCommitContext, Unit)] =
+ for {
+ flushEvalCommitContext: HttpCommitContext <- IO.pure(createCommitContextForEvaluation(batch, cc))
+ _ <- IO.delay(logger.trace(s"[$sinkName] Updating sink context to: $flushEvalCommitContext"))
+ shouldFlush: Boolean <- IO.pure(commitPolicy.shouldFlush(flushEvalCommitContext))
+ _ <- IO.delay(logger.trace(s"[$sinkName] Should flush? $shouldFlush"))
+ _ <- if (shouldFlush) {
+ IO.delay(logger.trace(s"[$sinkName] Flushing batch"))
+ flush(batch)
+ } else {
+ IO.unit
+ }
+ } yield {
+ (
+ if (shouldFlush) {
+ updateCommitContextPostCommit(currentCommitContext = flushEvalCommitContext)
+ } else {
+ cc
+ },
+ (),
+ )
+ }
+
+ private def modifyCommitContext(batch: Queue[RenderedRecord]): IO[Unit] = {
+ logger.trace(s"[$sinkName] modifyCommitContext for batch of ${batch.size}")
+
+ commitContextRef.modify {
+ cc: HttpCommitContext =>
+ updateCommitContextIfFlush(cc, batch).unsafeRunSync()
+ }
+ }
+
+ private def dequeueN[A](rQ: Queue[A], n: Int): Queue[A] =
+ rQ.splitAt(n) match {
+ case (_, remaining) => remaining
+ }
+
+ private def flush(records: Seq[RenderedRecord]): IO[Unit] =
+ for {
+ processed <- IO.fromEither(template.process(records))
+ sent <- sender.sendHttpRequest(processed)
+ } yield sent
+
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala
new file mode 100644
index 000000000..09b831340
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink
+
+import cats.effect.FiberIO
+import cats.effect.IO
+import cats.effect.Ref
+import cats.effect.kernel.Deferred
+import cats.effect.kernel.Outcome
+import cats.effect.kernel.Temporal
+import cats.effect.unsafe.implicits.global
+import cats.implicits.catsSyntaxOptionId
+import cats.implicits.toTraverseOps
+import com.typesafe.scalalogging.LazyLogging
+import io.lenses.streamreactor.common.config.SSLConfigContext
+import io.lenses.streamreactor.connect.cloud.common.model.Topic
+import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
+import io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender
+import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext
+import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitPolicy
+import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig
+import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord
+import io.lenses.streamreactor.connect.http.sink.tpl.TemplateType
+import org.apache.kafka.clients.consumer.OffsetAndMetadata
+import org.http4s.jdkhttpclient.JdkHttpClient
+
+import java.net.http.HttpClient
+import scala.collection.immutable.Queue
+
+object HttpWriterManager {
+
+ val DefaultErrorThreshold = 5
+ val DefaultUploadSyncPeriod = 5
+
+ def apply(
+ sinkName: String,
+ config: HttpSinkConfig,
+ template: TemplateType,
+ terminate: Deferred[IO, Either[Throwable, Unit]],
+ )(
+ implicit
+ t: Temporal[IO],
+ ): HttpWriterManager = {
+
+ // in certain circumstances we want a customised http client.
+ val clientCreate = config match {
+ case HttpSinkConfig(_, _, _, _, _, Some(ssl), _, _, _) =>
+ val sslContext = SSLConfigContext(ssl) // TODO: wrap for error handling
+ val httpClient = HttpClient.newBuilder().sslContext(sslContext).build()
+ JdkHttpClient[IO](httpClient)
+ case _ =>
+ JdkHttpClient.simple[IO]
+ }
+ clientCreate.allocated.unsafeRunSync() match {
+ case (cRes, cResRel) =>
+ val requestSender = new HttpRequestSender(
+ sinkName,
+ config.authentication,
+ config.method.toHttp4sMethod,
+ cRes,
+ )
+ new HttpWriterManager(
+ sinkName,
+ template,
+ requestSender,
+ config.batch.map(_.toCommitPolicy).getOrElse(HttpCommitPolicy.Default),
+ cResRel,
+ Ref.unsafe(Map[Topic, HttpWriter]()),
+ terminate,
+ config.errorThreshold.getOrElse(DefaultErrorThreshold),
+ config.uploadSyncPeriod.getOrElse(DefaultUploadSyncPeriod),
+ )
+ }
+
+ }
+}
+class HttpWriterManager(
+ sinkName: String,
+ template: TemplateType,
+ httpRequestSender: HttpRequestSender,
+ commitPolicy: CommitPolicy,
+ val close: IO[Unit],
+ writersRef: Ref[IO, Map[Topic, HttpWriter]],
+ deferred: Deferred[IO, Either[Throwable, Unit]],
+ errorThreshold: Int,
+ uploadSyncPeriod: Int,
+)(
+ implicit
+ t: Temporal[IO],
+) extends LazyLogging {
+
+ private def createNewHttpWriter(): HttpWriter =
+ new HttpWriter(
+ sinkName = sinkName,
+ commitPolicy = commitPolicy,
+ sender = httpRequestSender,
+ template = template,
+ Ref.unsafe[IO, Queue[RenderedRecord]](Queue()),
+ Ref.unsafe[IO, HttpCommitContext](HttpCommitContext.default(sinkName)),
+ errorThreshold,
+ )
+
+ def getWriter(topic: Topic): IO[HttpWriter] = {
+ var foundWriter = Option.empty[HttpWriter]
+ for {
+ _ <- writersRef.getAndUpdate {
+ writers =>
+ foundWriter = writers.get(topic)
+ if (foundWriter.nonEmpty) {
+ writers // no update
+ } else {
+ val newWriter = createNewHttpWriter()
+ foundWriter = newWriter.some
+ writers + (topic -> newWriter)
+ }
+ }
+ o <- IO.fromOption(foundWriter)(new IllegalStateException("No writer found"))
+ } yield o
+ }
+
+ // answers the question: what have you committed?
+ def preCommit(currentOffsets: Map[TopicPartition, OffsetAndMetadata]): IO[Map[TopicPartition, OffsetAndMetadata]] = {
+
+ val currentOffsetsGroupedIO: IO[Map[Topic, Map[TopicPartition, OffsetAndMetadata]]] = IO
+ .pure(currentOffsets)
+ .map(_.groupBy {
+ case (TopicPartition(topic, _), _) => topic
+ })
+
+ for {
+ curr <- currentOffsetsGroupedIO
+ writers <- writersRef.get
+ res <- writers.toList.traverse {
+ case (topic, writer) =>
+ writer.preCommit(curr(topic))
+ }.map(_.flatten.toMap)
+ } yield res
+ }
+
+ def start(errCallback: Throwable => Unit): IO[Unit] = {
+ import scala.concurrent.duration._
+ for {
+ _ <- IO(logger.info(s"[$sinkName] starting HttpWriterManager"))
+ _ <- fs2
+ .Stream
+ .fixedRate(uploadSyncPeriod.millis)
+ .evalMap(_ => process().flatMap(handleResult(_, errCallback)).void)
+ .interruptWhen(deferred)
+ .onComplete(fs2.Stream.eval(close))
+ .compile
+ .drain
+ .background
+ .allocated
+ } yield ()
+ }
+
+ private def handleResult(
+ writersResult: List[Either[Throwable, _]],
+ errCallback: Throwable => Unit,
+ ): IO[Unit] = IO {
+ // Handle the result of individual writer processes
+ val failures = writersResult.collect {
+ case Left(error: Throwable) => error
+ }
+ if (failures.nonEmpty) {
+ logger.error(s"[$sinkName] Some writer processes failed: $failures")
+ failures.foreach(wr => errCallback(wr))
+ } else {
+ logger.debug(s"[$sinkName] All writer processes completed successfully")
+ }
+ }
+
+ def process(): IO[List[Either[Throwable, Unit]]] = {
+ logger.trace(s"[$sinkName] WriterManager.process()")
+ writersRef.get.flatMap { writersMap =>
+ if (writersMap.isEmpty) {
+ logger.info(s"[$sinkName] HttpWriterManager has no writers. Perhaps no records have been put to the sink yet.")
+ }
+
+ // Create an IO action for each writer to process it in parallel
+ val fiberIOs: List[IO[FiberIO[_]]] = writersMap.map {
+ case (id, writer) =>
+ logger.trace(s"[$sinkName] starting process for writer $id")
+ writer.process().start
+ }.toList
+
+ // Return a list of Fibers
+ fiberIOs.traverse { e =>
+ val f = e.flatMap(_.join.attempt).flatMap {
+ case Left(value: Throwable) => IO.pure(Left(value))
+ case Right(value: Outcome[IO, Throwable, _]) => value match {
+ case Outcome.Succeeded(_) => IO.pure(Right(()))
+ case Outcome.Errored(error) => IO.pure(Left(error))
+ case Outcome.Canceled() => IO.raiseError(new RuntimeException("IO canceled"))
+ }
+ }
+ f
+ }
+
+ }
+ }
+
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/OffsetMergeUtils.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/OffsetMergeUtils.scala
new file mode 100644
index 000000000..2246c4bea
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/OffsetMergeUtils.scala
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink
+
+import cats.implicits.catsSyntaxOptionId
+import io.lenses.streamreactor.connect.cloud.common.model.Offset
+import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
+import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext
+import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord
+
+object OffsetMergeUtils {
+
+ def createCommitContextForEvaluation(
+ batch: Seq[RenderedRecord],
+ currentCommitContext: HttpCommitContext,
+ ): HttpCommitContext = {
+ val count = batch.size.toLong
+ val fileSize = batch.map(_.recordRendered.length).sum.toLong
+ val highestOffsets = maxOffsets(batch)
+ currentCommitContext match {
+ case httpCommitContext @ HttpCommitContext(_, httpCommittedOffsets, _, _, _, _, _) =>
+ httpCommitContext.copy(
+ committedOffsets = mergeOffsets(httpCommittedOffsets, highestOffsets),
+ count = count,
+ fileSize = fileSize,
+ )
+ }
+ }
+
+ def updateCommitContextPostCommit(
+ currentCommitContext: HttpCommitContext,
+ ): HttpCommitContext =
+ currentCommitContext.copy(
+ count = 0L,
+ fileSize = 0L,
+ lastFlushedTimestamp = System.currentTimeMillis().some,
+ )
+
+ private def maxOffsets(batch: Seq[RenderedRecord]): Map[TopicPartition, Offset] =
+ batch
+ .map(_.topicPartitionOffset.toTopicPartitionOffsetTuple)
+ .groupBy { case (partition, _) => partition }
+ .map {
+ case (_, value) => value.maxBy {
+ case (_, offset) => offset
+ }
+ }
+
+ def mergeOffsets(
+ committedOffsets: Map[TopicPartition, Offset],
+ maxOffsets: Map[TopicPartition, Offset],
+ ): Map[TopicPartition, Offset] = {
+ val asSet = committedOffsets.toSet ++ maxOffsets.toSet
+ asSet.groupBy {
+ case (partition, _) => partition
+ }.map {
+ case (_, value) => value.maxBy {
+ case (_, offset) => offset
+ }
+ }
+ }
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/Authentication.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/Authentication.scala
new file mode 100644
index 000000000..a8f554551
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/Authentication.scala
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.client
+
+import io.circe.generic.extras.Configuration
+import io.circe.generic.extras.semiauto._
+import io.circe.Decoder
+import io.circe.Encoder
+
+sealed trait Authentication
+
+object Authentication {
+ implicit val customConfig: Configuration = Configuration.default.withDiscriminator("type")
+
+ implicit val decoder: Decoder[Authentication] = deriveConfiguredDecoder[Authentication]
+ implicit val encoder: Encoder[Authentication] = deriveConfiguredEncoder[Authentication]
+}
+
+case class BasicAuthentication(username: String, password: String) extends Authentication
+
+object BasicAuthentication {
+ implicit val customConfig: Configuration = Configuration.default.withDiscriminator("type")
+
+ implicit val decoder: Decoder[BasicAuthentication] = deriveConfiguredDecoder
+ implicit val encoder: Encoder[BasicAuthentication] = deriveConfiguredEncoder
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpMethod.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpMethod.scala
new file mode 100644
index 000000000..b40e4ba92
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpMethod.scala
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.client
+
+import enumeratum.CirceEnum
+import enumeratum.Enum
+import enumeratum.EnumEntry
+import org.http4s.Method
+
+sealed trait HttpMethod extends EnumEntry {
+ def toHttp4sMethod: Method
+}
+
+case object HttpMethod extends Enum[HttpMethod] with CirceEnum[HttpMethod] {
+
+ val values = findValues
+
+ case object Put extends HttpMethod {
+ override def toHttp4sMethod: Method = Method.PUT
+ }
+
+ case object Post extends HttpMethod {
+ override def toHttp4sMethod: Method = Method.POST
+ }
+
+ case object Patch extends HttpMethod {
+ override def toHttp4sMethod: Method = Method.PATCH
+ }
+
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpRequestSender.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpRequestSender.scala
new file mode 100644
index 000000000..f95b43a5a
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpRequestSender.scala
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.client
+
+import cats.effect.IO
+import com.typesafe.scalalogging.LazyLogging
+import io.lenses.streamreactor.connect.http.sink.tpl.ProcessedTemplate
+import org.http4s._
+import org.http4s.client.Client
+import org.http4s.headers.Authorization
+import org.http4s.headers.`Content-Type`
+import org.typelevel.ci.CIString
+class HttpRequestSender(
+ sinkName: String,
+ authentication: Option[Authentication], // ssl, basic, oauth2, proxy
+ method: Method,
+ client: Client[IO],
+) extends LazyLogging {
+
+ private def buildHeaders(headers: Seq[(String, String)]): IO[Headers] =
+ IO {
+ Headers(headers.map {
+ case (name, value) =>
+ Header.ToRaw.rawToRaw(new Header.Raw(CIString(name), value))
+ }: _*)
+ }
+
+ def sendHttpRequest(
+ processedTemplate: ProcessedTemplate,
+ ): IO[Unit] =
+ for {
+ tpl: ProcessedTemplate <- IO.pure(processedTemplate)
+
+ uri <- IO.pure(Uri.unsafeFromString(processedTemplate.endpoint))
+ _ <- IO.delay(logger.debug(s"[$sinkName] sending a http request to url $uri"))
+
+ clientHeaders: Headers <- buildHeaders(tpl.headers)
+
+ request <- IO {
+ Request[IO](
+ method = method,
+ uri = uri,
+ headers = clientHeaders,
+ ).withContentType(`Content-Type`(MediaType.application.xml)).withEntity(processedTemplate.content)
+ }
+ // Add authentication if present
+ authenticatedRequest <- IO {
+ authentication.fold(request) {
+ case BasicAuthentication(username, password) =>
+ request.putHeaders(Authorization(BasicCredentials(username, password)))
+ }
+ }
+ _ <- IO.delay(logger.debug(s"[$sinkName] Auth: $authenticatedRequest"))
+ response <- client.expect[String](authenticatedRequest).onError(e =>
+ IO {
+ logger.error(s"[$sinkName] error writing to HTTP endpoint", e.getMessage)
+ } *> IO.raiseError(e),
+ )
+ _ <- IO.delay(logger.trace(s"[$sinkName] Response: $response"))
+ } yield ()
+
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/commit/HttpCommitContext.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/commit/HttpCommitContext.scala
new file mode 100644
index 000000000..5f3381a29
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/commit/HttpCommitContext.scala
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.commit
+
+import com.typesafe.scalalogging.LazyLogging
+import io.lenses.streamreactor.connect.cloud.common.model.Offset
+import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitContext
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.ConditionCommitResult
+
+object HttpCommitContext {
+ def default(sinkName: String): HttpCommitContext =
+ HttpCommitContext(
+ sinkName,
+ Map.empty,
+ 0L,
+ 0L,
+ System.currentTimeMillis(),
+ Option.empty,
+ Map.empty,
+ )
+}
+
+/**
+ * @param tpo the [[TopicPartitionOffset]] of the last record written
+ * @param count the number of records written thus far to the file
+ * @param createdTimestamp the time in milliseconds when the the file was created/accessed first time
+ */
+case class HttpCommitContext(
+ sinkName: String,
+ committedOffsets: Map[TopicPartition, Offset],
+ count: Long,
+ fileSize: Long,
+ createdTimestamp: Long,
+ lastFlushedTimestamp: Option[Long],
+ errors: Map[String, Seq[Throwable]],
+) extends CommitContext
+ with LazyLogging {
+
+ override def generateLogLine(flushing: Boolean, result: Seq[ConditionCommitResult]): String = {
+ val flushingOrNot = if (flushing) "" else "Not "
+ val committedOffsetGroups = committedOffsets.map(tpo =>
+ s"topic:${tpo._1.topic}, partition:${tpo._1.partition}, offset:${tpo._2.value}",
+ ).mkString(";")
+ val logLine = result.flatMap(_.logLine).mkString(", ")
+ s"[$sinkName] ${flushingOrNot}Flushing for $committedOffsetGroups because $logLine"
+ }
+
+ def resetErrors: HttpCommitContext = copy(errors = Map.empty)
+
+ def addError(ex: Throwable): HttpCommitContext =
+ copy(
+ errors = {
+ val mapKey = ex.getClass.getSimpleName
+ val mapValue = errors.get(mapKey).fold(Seq(ex))(_ :+ ex)
+ errors + (mapKey -> mapValue)
+ },
+ )
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/commit/HttpCommitPolicy.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/commit/HttpCommitPolicy.scala
new file mode 100644
index 000000000..67714c2cc
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/commit/HttpCommitPolicy.scala
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.commit
+
+import com.typesafe.scalalogging.LazyLogging
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.Count
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.FileSize
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.Interval
+
+import java.time.Clock
+import java.time.Duration
+
+object HttpCommitPolicy extends LazyLogging {
+
+ private val defaultFlushSize = 100L
+ private val defaultFlushInterval = Duration.ofSeconds(100)
+ private val defaultFlushCount = 100L
+
+ val Default: CommitPolicy =
+ CommitPolicy(FileSize(defaultFlushSize),
+ Interval(defaultFlushInterval, Clock.systemDefaultZone()),
+ Count(defaultFlushCount),
+ )
+
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala
new file mode 100644
index 000000000..8a8940d37
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.config
+
+import io.circe.generic.semiauto.deriveDecoder
+import io.circe.generic.semiauto.deriveEncoder
+import io.circe.parser.decode
+import io.circe.syntax.EncoderOps
+import io.circe.Decoder
+import io.circe.Encoder
+import io.circe.Error
+import io.lenses.streamreactor.common.config.SSLConfig
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicyCondition
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.Count
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.FileSize
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.Interval
+import io.lenses.streamreactor.connect.http.sink.client.Authentication
+import io.lenses.streamreactor.connect.http.sink.client.HttpMethod
+
+import java.time.Clock
+import java.time.Duration
+
+object HttpSinkConfig {
+
+ implicit val decoder: Decoder[HttpSinkConfig] = deriveDecoder
+ implicit val encoder: Encoder[HttpSinkConfig] = deriveEncoder
+
+ def fromJson(json: String): Either[Error, HttpSinkConfig] = decode[HttpSinkConfig](json)
+
+}
+
+case class BatchConfiguration(
+ batchCount: Option[Long],
+ batchSize: Option[Long],
+ timeInterval: Option[Long],
+) {
+ def toCommitPolicy: CommitPolicy = {
+ val conditions: Seq[CommitPolicyCondition] = Seq(
+ batchCount.map(Count),
+ batchSize.map(FileSize),
+ timeInterval.map(inter => Interval(Duration.ofSeconds(inter), Clock.systemDefaultZone())),
+ ).flatten
+
+ CommitPolicy(conditions: _*)
+ }
+}
+
+object BatchConfiguration {
+
+ implicit val decoder: Decoder[BatchConfiguration] = deriveDecoder
+ implicit val encoder: Encoder[BatchConfiguration] = deriveEncoder
+
+}
+
+case class HttpSinkConfig(
+ authentication: Option[Authentication], // basic, oauth2, proxy
+ method: HttpMethod,
+ endpoint: String, // tokenised
+ content: String, // tokenised
+ headers: Seq[(String, String)], // tokenised
+ sslConfig: Option[SSLConfig],
+ batch: Option[BatchConfiguration],
+ errorThreshold: Option[Int],
+ uploadSyncPeriod: Option[Int],
+) {
+ def toJson: String = {
+ val decoded: HttpSinkConfig = this
+ decoded
+ .asJson(HttpSinkConfig.encoder)
+ .noSpaces
+ }
+
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala
new file mode 100644
index 000000000..f2ee36c78
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.config
+
+import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.config.ConfigDef.Importance
+import org.apache.kafka.common.config.ConfigDef.Type
+
+object HttpSinkConfigDef {
+
+ val configProp: String = "connect.http.config"
+ val config: ConfigDef =
+ new ConfigDef()
+ .define(
+ configProp,
+ Type.STRING,
+ Importance.HIGH,
+ "Configuration string",
+ )
+
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/ProcessedTemplate.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/ProcessedTemplate.scala
new file mode 100644
index 000000000..5d35426d2
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/ProcessedTemplate.scala
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.tpl
+
+case class ProcessedTemplate(
+ endpoint: String,
+ content: String,
+ headers: Seq[(String, String)],
+)
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/RenderedRecord.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/RenderedRecord.scala
new file mode 100644
index 000000000..7a31c8905
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/RenderedRecord.scala
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.tpl
+
+import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset
+
+case class RenderedRecord(
+ topicPartitionOffset: TopicPartitionOffset,
+ recordRendered: String,
+ headersRendered: Seq[(String, String)],
+ endpointRendered: Option[String], // only for the first 1
+)
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTypes.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTypes.scala
new file mode 100644
index 000000000..c7266cb40
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTypes.scala
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.tpl
+
+import cats.implicits.catsSyntaxEitherId
+import cats.implicits.catsSyntaxOptionId
+import cats.implicits.toTraverseOps
+import io.lenses.streamreactor.connect.http.sink.tpl.renderer.RecordRenderer
+import io.lenses.streamreactor.connect.http.sink.tpl.substitutions.SubstitutionError
+import org.apache.kafka.connect.sink.SinkRecord
+
+object RawTemplate {
+ private val innerTemplatePattern = """\{\{#message}}([\s\S]*?)\{\{/message}}""".r
+
+ def apply(endpoint: String, content: String, headers: Seq[(String, String)]): TemplateType =
+ innerTemplatePattern.findFirstMatchIn(content) match {
+ case Some(innerTemplate) =>
+ val start = content.substring(0, innerTemplate.start)
+ val end = content.substring(innerTemplate.end)
+ TemplateWithInnerLoop(endpoint, start, end, innerTemplate.group(1), headers)
+ case None =>
+ SimpleTemplate(endpoint, content, headers)
+ }
+}
+
+trait TemplateType {
+ def endpoint: String
+ def headers: Seq[(String, String)]
+
+ def renderRecords(record: Seq[SinkRecord]): Either[SubstitutionError, Seq[RenderedRecord]]
+
+ def process(records: Seq[RenderedRecord]): Either[SubstitutionError, ProcessedTemplate]
+}
+
+// this template type will require individual requests, the messages can't be batched
+case class SimpleTemplate(
+ endpoint: String,
+ content: String,
+ headers: Seq[(String, String)],
+) extends TemplateType {
+
+ override def renderRecords(records: Seq[SinkRecord]): Either[SubstitutionError, Seq[RenderedRecord]] =
+ RecordRenderer.renderRecords(records, endpoint.some, content, headers)
+
+ override def process(records: Seq[RenderedRecord]): Either[SubstitutionError, ProcessedTemplate] =
+ records.headOption match {
+ case Some(RenderedRecord(_, recordRendered, headersRendered, Some(endpointRendered))) =>
+ ProcessedTemplate(endpointRendered, recordRendered, headersRendered).asRight
+ case _ => SubstitutionError("No record found").asLeft
+ }
+}
+
+case class TemplateWithInnerLoop(
+ endpoint: String,
+ prefixContent: String,
+ suffixContent: String,
+ innerTemplate: String,
+ headers: Seq[(String, String)],
+) extends TemplateType {
+
+ override def renderRecords(records: Seq[SinkRecord]): Either[SubstitutionError, Seq[RenderedRecord]] =
+ records.zipWithIndex.map {
+ case (record, i) =>
+ RecordRenderer.renderRecord(
+ record,
+ Option.when(i == 0)(endpoint),
+ innerTemplate,
+ headers,
+ )
+ }.sequence
+
+ override def process(records: Seq[RenderedRecord]): Either[SubstitutionError, ProcessedTemplate] = {
+
+ val replaceWith = records.flatMap(_.recordRendered).mkString("")
+ val contentOrError = prefixContent + replaceWith + suffixContent
+ val maybeProcessedTpl = for {
+ headRecord <- records.headOption
+ ep <- headRecord.endpointRendered
+ } yield {
+ ProcessedTemplate(
+ endpoint = ep,
+ content = contentOrError,
+ headers = records.flatMap(_.headersRendered).distinct,
+ )
+ }
+ maybeProcessedTpl.toRight(SubstitutionError("No record or endpoint available"))
+ }
+
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/renderer/RecordRenderer.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/renderer/RecordRenderer.scala
new file mode 100644
index 000000000..8c3c0c6ad
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/renderer/RecordRenderer.scala
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.tpl.renderer
+
+import cats.implicits._
+import io.lenses.streamreactor.connect.cloud.common.model.Offset
+import io.lenses.streamreactor.connect.cloud.common.model.Topic
+import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset
+import io.lenses.streamreactor.connect.http.sink.tpl.substitutions.SubstitutionError
+import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord
+import org.apache.kafka.connect.sink.SinkRecord
+
+object RecordRenderer {
+
+ def renderRecords(
+ data: Seq[SinkRecord],
+ endpointTpl: Option[String],
+ contentTpl: String,
+ headers: Seq[(String, String)],
+ ): Either[SubstitutionError, Seq[RenderedRecord]] =
+ data.map(renderRecord(_, endpointTpl, contentTpl, headers)).sequence
+ def renderRecord(
+ sinkRecord: SinkRecord,
+ endpointTpl: Option[String],
+ contentTpl: String,
+ headers: Seq[(String, String)],
+ ): Either[SubstitutionError, RenderedRecord] = {
+ val topicPartitionOffset: TopicPartitionOffset =
+ Topic(sinkRecord.topic()).withPartition(sinkRecord.kafkaPartition()).withOffset(Offset(sinkRecord.kafkaOffset()))
+
+ for {
+ recordRend: String <- TemplateRenderer.render(sinkRecord, contentTpl)
+ headersRend: Seq[(String, String)] <- renderHeaders(sinkRecord, headers)
+ endpointRend: Option[String] <- renderEndpoint(sinkRecord, endpointTpl)
+ } yield RenderedRecord(topicPartitionOffset, recordRend, headersRend, endpointRend)
+ }
+
+ private def renderHeader(
+ sinkRecord: SinkRecord,
+ header: (String, String),
+ ): Either[SubstitutionError, (String, String)] =
+ header match {
+ case (hKey, hVal) =>
+ for {
+ k <- TemplateRenderer.render(sinkRecord, hKey)
+ v <- TemplateRenderer.render(sinkRecord, hVal)
+ } yield k -> v
+ }
+
+ private def renderHeaders(
+ sinkRecord: SinkRecord,
+ headers: Seq[(String, String)],
+ ): Either[SubstitutionError, Seq[(String, String)]] =
+ headers.map(h => renderHeader(sinkRecord, h)).sequence
+
+ private def renderEndpoint(
+ sinkRecord: SinkRecord,
+ endpointTpl: Option[String],
+ ): Either[SubstitutionError, Option[String]] =
+ endpointTpl.map(tpl => TemplateRenderer.render(sinkRecord, tpl)).sequence
+
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/renderer/TemplateRenderer.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/renderer/TemplateRenderer.scala
new file mode 100644
index 000000000..1fe2baa49
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/renderer/TemplateRenderer.scala
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.tpl.renderer
+
+import cats.implicits._
+import io.lenses.streamreactor.connect.http.sink.tpl.substitutions.SubstitutionError
+import io.lenses.streamreactor.connect.http.sink.tpl.substitutions.SubstitutionType
+import org.apache.kafka.connect.sink.SinkRecord
+
+import scala.util.matching.Regex
+object TemplateRenderer {
+
+ private val templatePattern: Regex = "\\{\\{(.*?)}}".r
+
+ // Method to render a single data entry with a template
+ def render(data: SinkRecord, tplText: String): Either[SubstitutionError, String] =
+ Either.catchOnly[SubstitutionError](
+ templatePattern
+ .replaceAllIn(
+ tplText,
+ matchTag => {
+ val tag = matchTag.group(1).trim
+ getValue(tag, data)
+ .leftMap(throw _)
+ .merge
+ },
+ ),
+ )
+
+ // Helper method to get the value for a given tag from data
+ private def getValue(tag: String, data: SinkRecord): Either[SubstitutionError, String] = {
+ val locs = tag.split("\\.", 2)
+ (locs(0).toLowerCase, locs.lift(1)) match {
+ case ("#message", _) => "".asRight
+ case ("/message", _) => "".asRight
+ case (key: String, locator: Option[String]) =>
+ SubstitutionType.withNameInsensitiveOption(key) match {
+ case Some(sType) => sType.get(locator, data).map(_.toString)
+ case None => SubstitutionError(s"Couldn't find $key SubstitutionType").asLeft
+ }
+ }
+ }
+
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Header.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Header.scala
new file mode 100644
index 000000000..64c00f902
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Header.scala
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.tpl.substitutions
+
+import cats.implicits.catsSyntaxEitherId
+import org.apache.kafka.connect.sink.SinkRecord
+
+case object Header extends SubstitutionType {
+ def get(locator: Option[String], sinkRecord: SinkRecord): Either[SubstitutionError, AnyRef] =
+ locator match {
+ case Some(loc) => sinkRecord.headers().lastWithName(loc).value().asRight
+ case None => SubstitutionError("Invalid locator for path").asLeft
+ }
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Key.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Key.scala
new file mode 100644
index 000000000..4c170fc9d
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Key.scala
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.tpl.substitutions
+
+import cats.implicits.toBifunctorOps
+import io.lenses.streamreactor.connect.cloud.common.sink.extractors.KafkaConnectExtractor
+import org.apache.kafka.connect.sink.SinkRecord
+
+case object Key extends SubstitutionType {
+
+ def get(locator: Option[String], sinkRecord: SinkRecord): Either[SubstitutionError, AnyRef] =
+ KafkaConnectExtractor.extractFromKey(sinkRecord, locator).leftMap(e =>
+ SubstitutionError(s"unable to extract field $locator for template, ", e),
+ )
+
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Offset.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Offset.scala
new file mode 100644
index 000000000..07b27868d
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Offset.scala
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.tpl.substitutions
+
+import cats.implicits.catsSyntaxEitherId
+import org.apache.kafka.connect.sink.SinkRecord
+
+case object Offset extends SubstitutionType {
+ override def get(locator: Option[String], sinkRecord: SinkRecord): Either[SubstitutionError, AnyRef] =
+ Long.box(sinkRecord.kafkaOffset()).asRight
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Partition.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Partition.scala
new file mode 100644
index 000000000..07b36323a
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Partition.scala
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.tpl.substitutions
+
+import cats.implicits.catsSyntaxEitherId
+import org.apache.kafka.connect.sink.SinkRecord
+
+case object Partition extends SubstitutionType {
+ override def get(locator: Option[String], sinkRecord: SinkRecord): Either[SubstitutionError, AnyRef] =
+ sinkRecord.kafkaPartition().asRight
+
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/SubstitutionError.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/SubstitutionError.scala
new file mode 100644
index 000000000..5c52bbe3c
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/SubstitutionError.scala
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.tpl.substitutions
+
+import cats.implicits.catsSyntaxOptionId
+
+object SubstitutionError {
+ def apply(msg: String): SubstitutionError = SubstitutionError(msg, Option.empty)
+ def apply(msg: String, throwable: Throwable): SubstitutionError = SubstitutionError(msg, throwable.some)
+}
+case class SubstitutionError(msg: String, throwable: Option[Throwable]) extends Throwable
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/SubstitutionType.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/SubstitutionType.scala
new file mode 100644
index 000000000..f94a50c63
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/SubstitutionType.scala
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.tpl.substitutions
+
+import enumeratum.CirceEnum
+import enumeratum.Enum
+import enumeratum.EnumEntry
+import org.apache.kafka.connect.sink.SinkRecord
+
+trait SubstitutionType extends EnumEntry {
+ def get(locator: Option[String], sinkRecord: SinkRecord): Either[SubstitutionError, AnyRef]
+}
+
+case object SubstitutionType extends Enum[SubstitutionType] with CirceEnum[SubstitutionType] {
+ override def values: IndexedSeq[SubstitutionType] =
+ IndexedSeq(Header, Key, Offset, Partition, Timestamp, Topic, Value)
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Timestamp.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Timestamp.scala
new file mode 100644
index 000000000..e51309823
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Timestamp.scala
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.tpl.substitutions
+
+import cats.implicits.catsSyntaxEitherId
+import org.apache.kafka.connect.sink.SinkRecord
+
+object Timestamp extends SubstitutionType {
+ override def get(locator: Option[String], sinkRecord: SinkRecord): Either[SubstitutionError, AnyRef] =
+ Long.box(sinkRecord.timestamp()).asRight
+
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Topic.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Topic.scala
new file mode 100644
index 000000000..f95bf5905
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Topic.scala
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.tpl.substitutions
+
+import cats.implicits.catsSyntaxEitherId
+import org.apache.kafka.connect.sink.SinkRecord
+
+case object Topic extends SubstitutionType {
+ override def get(locator: Option[String], sinkRecord: SinkRecord): Either[SubstitutionError, AnyRef] =
+ sinkRecord.topic().asRight
+}
diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Value.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Value.scala
new file mode 100644
index 000000000..b9afbd463
--- /dev/null
+++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/Value.scala
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.tpl.substitutions
+
+import cats.implicits.toBifunctorOps
+import io.lenses.streamreactor.connect.cloud.common.sink.extractors.KafkaConnectExtractor
+import org.apache.kafka.connect.sink.SinkRecord
+
+case object Value extends SubstitutionType {
+ override def get(locator: Option[String], sinkRecord: SinkRecord): Either[SubstitutionError, AnyRef] =
+ KafkaConnectExtractor.extractFromValue(sinkRecord, locator).leftMap(e =>
+ SubstitutionError(s"unable to extract field $locator for template, ", e),
+ )
+}
diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala
new file mode 100644
index 000000000..8919cf65b
--- /dev/null
+++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink
+
+import cats.implicits.none
+import io.lenses.streamreactor.connect.http.sink.client.BasicAuthentication
+import io.lenses.streamreactor.connect.http.sink.client.HttpMethod.Put
+import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig
+import org.scalatest.funsuite.AnyFunSuiteLike
+import org.scalatest.matchers.should.Matchers
+
+class HttpSinkConfigTest extends AnyFunSuiteLike with Matchers {
+
+ test("should write config to json") {
+ HttpSinkConfig(
+ Some(BasicAuthentication("user", "pass")),
+ Put,
+ "http://myaddress.example.com",
+ "\nDave\nJason\nHooray for Kafka Connect!\n",
+ Seq("something" -> "somethingelse"),
+ none,
+ none,
+ none,
+ none,
+ ).toJson should be(
+ """{"authentication":{"username":"user","password":"pass","type":"BasicAuthentication"},"method":"Put","endpoint":"http://myaddress.example.com","content":"\nDave\nJason\nHooray for Kafka Connect!\n","headers":[["something","somethingelse"]],"sslConfig":null,"batch":null,"errorThreshold":null,"uploadSyncPeriod":null}""",
+ )
+ }
+
+}
diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterTest.scala
new file mode 100644
index 000000000..373887919
--- /dev/null
+++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterTest.scala
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink
+
+import cats.effect.IO
+import cats.effect.Ref
+import cats.effect.testing.scalatest.AsyncIOSpec
+import io.lenses.streamreactor.connect.cloud.common.model.Topic
+import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
+import io.lenses.streamreactor.connect.cloud.common.sink.commit.Count
+import io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender
+import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext
+import io.lenses.streamreactor.connect.http.sink.tpl.ProcessedTemplate
+import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord
+import io.lenses.streamreactor.connect.http.sink.tpl.TemplateType
+import org.mockito.ArgumentMatchers.any
+import org.mockito.MockitoSugar
+import org.scalatest.funsuite.AsyncFunSuiteLike
+import org.scalatest.matchers.should.Matchers
+
+import scala.collection.immutable.Queue
+
+class HttpWriterTest extends AsyncIOSpec with AsyncFunSuiteLike with Matchers with MockitoSugar {
+
+ private val sinkName = "MySinkName"
+
+ private val topicPartition: TopicPartition = Topic("myTopic").withPartition(1)
+ private val defaultContext: HttpCommitContext = HttpCommitContext.default("My Sink")
+
+ test("add method should add records to the queue") {
+ val commitPolicy = CommitPolicy(Count(2L))
+ val senderMock = mock[HttpRequestSender]
+ val templateMock = mock[TemplateType]
+
+ for {
+ recordsQueueRef <- Ref.of[IO, Queue[RenderedRecord]](Queue.empty)
+ commitContextRef <- Ref.of[IO, HttpCommitContext](HttpCommitContext.default("My Sink"))
+ httpWriter = new HttpWriter(sinkName,
+ commitPolicy,
+ senderMock,
+ templateMock,
+ recordsQueueRef,
+ commitContextRef,
+ 5,
+ )
+ recordsToAdd = Seq(
+ RenderedRecord(topicPartition.atOffset(100), "record1", Seq.empty, None),
+ RenderedRecord(topicPartition.atOffset(101), "record2", Seq.empty, None),
+ )
+ _ <- httpWriter.add(recordsToAdd)
+
+ queue <- recordsQueueRef.get
+ } yield {
+ queue shouldBe Queue(recordsToAdd: _*)
+ }
+ }
+
+ test("process method should flush records when the queue is non-empty and commit policy requires flush") {
+ val commitPolicy = CommitPolicy(Count(2L))
+ val senderMock = mock[HttpRequestSender]
+ when(senderMock.sendHttpRequest(any[ProcessedTemplate])).thenReturn(IO.unit)
+
+ val templateMock = mock[TemplateType]
+ when(templateMock.process(any[Seq[RenderedRecord]])).thenReturn(Right(ProcessedTemplate("a", "b", Seq.empty)))
+
+ val recordsToAdd = Seq(
+ RenderedRecord(topicPartition.atOffset(100), "record1", Seq.empty, None),
+ RenderedRecord(topicPartition.atOffset(101), "record2", Seq.empty, None),
+ )
+
+ {
+ for {
+ recordsQueueRef <- Ref.of[IO, Queue[RenderedRecord]](Queue.empty)
+ commitContextRef <- Ref.of[IO, HttpCommitContext](defaultContext)
+
+ httpWriter = new HttpWriter(sinkName,
+ commitPolicy,
+ senderMock,
+ templateMock,
+ recordsQueueRef,
+ commitContextRef,
+ 5,
+ )
+
+ _ <- httpWriter.add(recordsToAdd)
+ _ <- httpWriter.process()
+ updatedContext <- commitContextRef.get
+ updatedQueue <- recordsQueueRef.get
+ } yield {
+ updatedContext should not be defaultContext
+ updatedQueue shouldBe empty
+ }
+ }
+ }
+
+ test("process method should not flush records when the queue is empty") {
+ val commitPolicy = CommitPolicy(Count(2L))
+ val senderMock = mock[HttpRequestSender]
+ when(senderMock.sendHttpRequest(any[ProcessedTemplate])).thenReturn(IO.unit)
+
+ val templateMock = mock[TemplateType]
+
+ for {
+ recordsQueueRef <- Ref.of[IO, Queue[RenderedRecord]](Queue.empty)
+ commitContextRef <- Ref.of[IO, HttpCommitContext](defaultContext)
+
+ httpWriter = new HttpWriter(sinkName,
+ commitPolicy,
+ senderMock,
+ templateMock,
+ recordsQueueRef,
+ commitContextRef,
+ 5,
+ )
+
+ _ <- httpWriter.process()
+ updatedContext <- commitContextRef.get
+ updatedQueue <- recordsQueueRef.get
+ } yield {
+ updatedContext shouldBe defaultContext
+ updatedQueue shouldBe empty
+ }
+ }
+}
diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/OffsetMergeUtilsTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/OffsetMergeUtilsTest.scala
new file mode 100644
index 000000000..044e19bc5
--- /dev/null
+++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/OffsetMergeUtilsTest.scala
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink
+
+import io.lenses.streamreactor.connect.cloud.common.model.Offset
+import io.lenses.streamreactor.connect.cloud.common.model.Topic
+import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
+import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext
+import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord
+import org.scalatest.funsuite.AnyFunSuite
+import org.scalatest.matchers.should.Matchers._
+
+class OffsetMergeUtilsTest extends AnyFunSuite {
+
+ private val sinkName = "mySinkName"
+
+ test(
+ "createCommitContextFromBatch should create a new HttpCommitContext with merged offsets when currentCommitContext is provided",
+ ) {
+ val batch = Seq(
+ RenderedRecord(Topic("topic1").withPartition(0).withOffset(Offset(100)), "record1", Seq.empty, None),
+ RenderedRecord(Topic("topic2").withPartition(0).withOffset(Offset(50)), "record2", Seq.empty, None),
+ )
+ val currentCommitContext = HttpCommitContext(
+ sinkName,
+ Map(Topic("topic1").withPartition(0) -> Offset(50)),
+ 1L,
+ 10L,
+ System.currentTimeMillis(),
+ None,
+ Map.empty,
+ )
+
+ val result = OffsetMergeUtils.createCommitContextForEvaluation(batch, currentCommitContext)
+
+ result.committedOffsets shouldBe Map(
+ Topic("topic1").withPartition(0) -> Offset(100),
+ Topic("topic2").withPartition(0) -> Offset(50),
+ )
+ result.count shouldBe 2L
+ result.fileSize shouldBe 14L
+ }
+
+ test("createCommitContextFromBatch should create a new HttpCommitContext when currentCommitContext is None") {
+ val batch = Seq(
+ RenderedRecord(Topic("topic1").withPartition(0).withOffset(Offset(100)), "record1", Seq.empty, None),
+ RenderedRecord(Topic("topic2").withPartition(0).withOffset(Offset(50)), "record2", Seq.empty, None),
+ )
+ val currentCommitContext = HttpCommitContext.default("My Sink")
+
+ val result = OffsetMergeUtils.createCommitContextForEvaluation(batch, currentCommitContext)
+
+ result.committedOffsets shouldBe Map(
+ Topic("topic1").withPartition(0) -> Offset(100),
+ Topic("topic2").withPartition(0) -> Offset(50),
+ )
+ result.count shouldBe 2L
+ result.fileSize shouldBe 14L
+ }
+
+ test("mergeOffsets should merge committed offsets correctly") {
+ val committedOffsets = Map(
+ Topic("topic1").withPartition(0) -> Offset(100),
+ Topic("topic2").withPartition(0) -> Offset(50),
+ )
+ val topTpos = Map(
+ Topic("topic2").withPartition(0) -> Offset(80),
+ Topic("topic3").withPartition(0) -> Offset(70),
+ )
+
+ val result = OffsetMergeUtils.mergeOffsets(committedOffsets, topTpos)
+
+ result shouldBe Map(
+ Topic("topic1").withPartition(0) -> Offset(100),
+ Topic("topic2").withPartition(0) -> Offset(80),
+ Topic("topic3").withPartition(0) -> Offset(70),
+ )
+ }
+
+ test("mergeOffsets should handle empty committed offsets") {
+ val committedOffsets = Map.empty[TopicPartition, Offset]
+ val topTpos = Map(
+ Topic("topic2").withPartition(0) -> Offset(80),
+ Topic("topic3").withPartition(0) -> Offset(70),
+ )
+
+ val result = OffsetMergeUtils.mergeOffsets(committedOffsets, topTpos)
+
+ result shouldBe topTpos
+ }
+}
diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/RawTemplateTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/RawTemplateTest.scala
new file mode 100644
index 000000000..693ce219b
--- /dev/null
+++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/RawTemplateTest.scala
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.tpl
+
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.kafka.connect.sink.SinkRecord
+import org.mockito.MockitoSugar.mock
+import org.mockito.MockitoSugar.when
+import org.scalatest.EitherValues
+import org.scalatest.funsuite.AnyFunSuite
+import org.scalatest.matchers.should.Matchers
+
+class RawTemplateTest extends AnyFunSuite with Matchers with EitherValues with LazyLogging {
+
+ test("parse should process templates") {
+
+ val template = RawTemplate(
+ endpoint = "Endpoint: {{key}}",
+ content = "Content: {{value}}",
+ headers = Seq(
+ ("HeaderKey1", "HeaderValue1"),
+ ("HeaderKey2", "HeaderValue2"),
+ ),
+ )
+
+ val sinkRecord = mock[SinkRecord]
+ when(sinkRecord.key()).thenReturn("KeyData")
+ when(sinkRecord.value()).thenReturn("ValueData")
+
+ val rendered = template.renderRecords(Seq(sinkRecord))
+
+ val processedTemplate = template.process(rendered.value)
+ processedTemplate.value.endpoint should be("Endpoint: KeyData")
+ processedTemplate.value.content should be("Content: ValueData")
+ processedTemplate.value.headers should be(
+ Seq("HeaderKey1" -> "HeaderValue1", "HeaderKey2" -> "HeaderValue2"),
+ )
+ }
+
+ test("parse should correctly execute header templates") {
+ val template = RawTemplate(
+ endpoint = "Endpoint: {{key}}",
+ content = "Content: {{value}}",
+ headers = Seq(
+ ("HeaderKey1-{{topic}}", "HeaderValue1-{{topic}}"),
+ ("HeaderKey2-{{partition}}", "HeaderValue2-{{partition}}"),
+ ),
+ )
+
+ val sinkRecord = mock[SinkRecord]
+ when(sinkRecord.key()).thenReturn("KeyData")
+ when(sinkRecord.value()).thenReturn("ValueData")
+ when(sinkRecord.topic()).thenReturn("myTopic")
+ when(sinkRecord.kafkaPartition()).thenReturn(100)
+
+ val headerResults = Seq(
+ "HeaderKey1-myTopic" -> "HeaderValue1-myTopic",
+ "HeaderKey2-100" -> "HeaderValue2-100",
+ )
+
+ val rendered = template.renderRecords(Seq(sinkRecord))
+
+ val processedTemplate = template.process(rendered.value)
+ processedTemplate.value.headers shouldBe headerResults
+ }
+
+}
diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTest.scala
new file mode 100644
index 000000000..726de6e83
--- /dev/null
+++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTest.scala
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2017-2024 Lenses.io Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lenses.streamreactor.connect.http.sink.tpl
+
+import org.apache.kafka.connect.data.Schema
+import org.apache.kafka.connect.data.SchemaBuilder
+import org.apache.kafka.connect.data.Struct
+import org.apache.kafka.connect.sink.SinkRecord
+import org.scalatest.EitherValues
+import org.scalatest.funsuite.AnyFunSuiteLike
+import org.scalatest.matchers.should.Matchers
+
+class TemplateTest extends AnyFunSuiteLike with Matchers with EitherValues {
+
+ private val multiTemplate =
+ """
+ |
+ |
+ | {{#message}}
+ |
+ | {{topic}}
+ | {{value.employeeId}}
+ | {{value.orderNo}}
+ | {{value.groupDomain}}
+ |
+ | {{/message}}
+ |
+ |""".stripMargin
+
+ test("template behaviour") {
+
+ val valueSchema = SchemaBuilder
+ .struct()
+ .name("myStruct")
+ .field("groupDomain", Schema.STRING_SCHEMA)
+ .field("orderNo", Schema.INT32_SCHEMA)
+ .field("employeeId", Schema.STRING_SCHEMA)
+ .build()
+
+ val value = new Struct(valueSchema)
+ value.put("groupDomain", "myExampleGroup.uk")
+ value.put("orderNo", 10)
+ value.put("employeeId", "Abcd1234")
+
+ val record = new SinkRecord("myTopic", 0, null, null, valueSchema, value, 9)
+ val processedTemplate = RawTemplate(
+ endpoint = "http://{{value.groupDomain}}.example.com/{{value.orderNo}}/{{value.employeeId}}/{{topic}}",
+ content =
+ """
+ | {{topic}}
+ | {{value.employeeId}}
+ | {{value.orderNo}}
+ | {{value.groupDomain}}
+ |""".stripMargin,
+ Seq(),
+ )
+
+ val rendered = processedTemplate.renderRecords(Seq(record))
+
+ val processed = processedTemplate.process(rendered.value)
+ processed.value.endpoint should be("http://myExampleGroup.uk.example.com/10/Abcd1234/myTopic")
+
+ processed.value.content should be(
+ """
+ | myTopic
+ | Abcd1234
+ | 10
+ | myExampleGroup.uk
+ |""".stripMargin,
+ )
+ }
+
+ test("template behaviour for batching messages") {
+
+ val valueSchema = SchemaBuilder
+ .struct()
+ .name("myStruct")
+ .field("groupDomain", Schema.STRING_SCHEMA)
+ .field("orderNo", Schema.INT32_SCHEMA)
+ .field("employeeId", Schema.STRING_SCHEMA)
+ .build()
+
+ val value1 = new Struct(valueSchema)
+ value1.put("groupDomain", "myExampleGroup.uk")
+ value1.put("orderNo", 10)
+ value1.put("employeeId", "Abcd1234")
+
+ val value2 = new Struct(valueSchema)
+ value2.put("groupDomain", "myExampleGroup.uk")
+ value2.put("orderNo", 11)
+ value2.put("employeeId", "Efgh5678")
+
+ val record1 = new SinkRecord("myTopic", 0, null, null, valueSchema, value1, 9)
+ val record2 = new SinkRecord("myTopic", 0, null, null, valueSchema, value2, 10)
+
+ val records = Seq(record1, record2)
+
+ val processedTemplate = RawTemplate(
+ endpoint = "http://{{value.groupDomain}}.example.com/{{value.orderNo}}/{{value.employeeId}}/{{topic}}",
+ content = multiTemplate,
+ Seq(),
+ )
+
+ val rendered = processedTemplate.renderRecords(records)
+
+ val processed = processedTemplate.process(rendered.value)
+ processed.value.endpoint should be("http://myExampleGroup.uk.example.com/10/Abcd1234/myTopic")
+
+ normalized(processed.value.content) should be(
+ normalized(
+ """
+ |
+ |
+ |
+ | myTopic
+ | Abcd1234
+ | 10
+ | myExampleGroup.uk
+ |
+ |
+ | myTopic
+ | Efgh5678
+ | 11
+ | myExampleGroup.uk
+ |
+ |
+ |""".stripMargin,
+ ),
+ )
+ }
+
+ test("template behaviour for batching messages without whitespace in between") {
+
+ val valueSchema = SchemaBuilder
+ .struct()
+ .name("myStruct")
+ .field("groupDomain", Schema.STRING_SCHEMA)
+ .field("orderNo", Schema.INT32_SCHEMA)
+ .field("employeeId", Schema.STRING_SCHEMA)
+ .build()
+
+ val value1 = new Struct(valueSchema)
+ value1.put("groupDomain", "myExampleGroup.uk")
+ value1.put("orderNo", 10)
+ value1.put("employeeId", "Abcd1234")
+
+ val value2 = new Struct(valueSchema)
+ value2.put("groupDomain", "myExampleGroup.uk")
+ value2.put("orderNo", 11)
+ value2.put("employeeId", "Efgh5678")
+
+ val record1 = new SinkRecord("myTopic", 0, null, null, valueSchema, value1, 9)
+ val record2 = new SinkRecord("myTopic", 0, null, null, valueSchema, value2, 10)
+
+ val records = Seq(record1, record2)
+
+ val processedTemplate = RawTemplate(
+ endpoint = "http://{{value.groupDomain}}.example.com/{{value.orderNo}}/{{value.employeeId}}/{{topic}}",
+ content = normalized(multiTemplate),
+ Seq(),
+ )
+
+ val rendered = processedTemplate.renderRecords(records)
+
+ val processed = processedTemplate.process(rendered.value)
+ processed.value.endpoint should be("http://myExampleGroup.uk.example.com/10/Abcd1234/myTopic")
+
+ normalized(processed.value.content) should be(
+ normalized(
+ """myTopicAbcd123410myExampleGroup.uk
+ |
+ |
+ | myTopic
+ | Efgh5678
+ | 11
+ | myExampleGroup.uk
+ |
+ |
+ |""".stripMargin,
+ ),
+ )
+ }
+
+ private def normalized(s: String): String =
+ s
+ .replaceAll(">\\s+<", "><")
+ .replaceAll("(?s)\\s+", " ").trim
+
+}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 57a58bac8..90227f12b 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -50,9 +50,10 @@ object Dependencies {
val enumeratumVersion = "1.7.2"
- val http4sVersion = "1.0.0-M32"
- val avroVersion = "1.11.0"
- val avro4sVersion = "4.1.0"
+ val http4sVersion = "1.0.0-M32"
+ val http4sJdkVersion = "1.0.0-M1"
+ val avroVersion = "1.11.0"
+ val avro4sVersion = "4.1.0"
val catsVersion = "2.9.0"
val catsEffectVersion = "3.4.8"
@@ -60,7 +61,7 @@ object Dependencies {
val urlValidatorVersion = "1.7"
val circeVersion = "0.15.0-M1"
- val circeGenericExtrasVersion = "0.14.1"
+ val circeGenericExtrasVersion = "0.14.3"
val circeJsonSchemaVersion = "0.2.0"
val shapelessVersion = "2.3.10"
@@ -72,8 +73,8 @@ object Dependencies {
val logbackVersion = "1.4.7"
val scalaLoggingVersion = "3.9.5"
- val wiremockJre8Version = "2.35.0"
- val parquetVersion = "1.13.1"
+ val wiremockVersion = "3.3.1"
+ val parquetVersion = "1.13.1"
val jerseyCommonVersion = "3.1.1"
@@ -86,7 +87,7 @@ object Dependencies {
val guavaVersion = "31.0.1-jre"
val javaxBindVersion = "2.3.1"
- val jacksonVersion = "2.14.2"
+ val jacksonVersion = "2.15.3"
val json4sVersion = "4.0.6"
val mockitoScalaVersion = "1.17.12"
val snakeYamlVersion = "2.0"
@@ -132,7 +133,7 @@ object Dependencies {
val mongoDbVersion = "3.12.12"
- val jedisVersion = "4.3.1"
+ val jedisVersion = "4.4.0"
val gsonVersion = "2.10.1"
val nimbusJoseJwtVersion = "9.30.2"
@@ -165,10 +166,13 @@ object Dependencies {
val urlValidator = "commons-validator" % "commons-validator" % urlValidatorVersion
- val circeGeneric = "io.circe" %% "circe-generic" % circeVersion
- val circeParser = "io.circe" %% "circe-parser" % circeVersion
- val circeRefined = "io.circe" %% "circe-refined" % circeVersion
- val circe: Seq[ModuleID] = Seq(circeGeneric, circeParser)
+ val circeGeneric = "io.circe" %% "circe-generic" % circeVersion
+ val circeGenericExtras = "io.circe" %% "circe-generic-extras" % circeGenericExtrasVersion
+ val circeParser = "io.circe" %% "circe-parser" % circeVersion
+ val circeRefined = "io.circe" %% "circe-refined" % circeVersion
+ val circe: Seq[ModuleID] = Seq(circeGeneric, circeParser, circeGenericExtras)
+
+ val betterMonadicFor = addCompilerPlugin("com.olegpy" %% "better-monadic-for" % Versions.betterMonadicForVersion)
// logging
val logback = "ch.qos.logback" % "logback-classic" % logbackVersion
@@ -214,12 +218,10 @@ object Dependencies {
val confluentProtobufConverter: ModuleID =
confluentExcludes("io.confluent" % "kafka-connect-protobuf-converter" % confluentVersion)
- val http4sDsl = "org.http4s" %% "http4s-dsl" % http4sVersion
- val http4sAsyncClient = "org.http4s" %% "http4s-async-http-client" % http4sVersion
- val http4sBlazeServer = "org.http4s" %% "http4s-blaze-server" % http4sVersion
- val http4sBlazeClient = "org.http4s" %% "http4s-blaze-client" % http4sVersion
- val http4sCirce = "org.http4s" %% "http4s-circe" % http4sVersion
- val http4s: Seq[ModuleID] = Seq(http4sDsl, http4sAsyncClient, http4sBlazeServer, http4sCirce)
+ val http4sDsl = "org.http4s" %% "http4s-dsl" % http4sVersion
+ val http4sJdkClient = "org.http4s" %% "http4s-jdk-http-client" % http4sJdkVersion
+ val http4sCirce = "org.http4s" %% "http4s-circe" % http4sVersion
+ val http4s: Seq[ModuleID] = Seq(http4sDsl, http4sJdkClient, http4sCirce)
val bouncyProv = "org.bouncycastle" % "bcprov-jdk15on" % bouncyCastleVersion
val bouncyUtil = "org.bouncycastle" % "bcutil-jdk15on" % bouncyCastleVersion
@@ -234,7 +236,7 @@ object Dependencies {
lazy val avro4sJson = "com.sksamuel.avro4s" %% "avro4s-json" % avro4sVersion
lazy val avro4sProtobuf = "com.sksamuel.avro4s" %% "avro4s-protobuf" % avro4sVersion
- val `wiremock-jre8` = "com.github.tomakehurst" % "wiremock-jre8" % wiremockJre8Version
+ val `wiremock` = "org.wiremock" % "wiremock" % wiremockVersion
val jerseyCommon = "org.glassfish.jersey.core" % "jersey-common" % jerseyCommonVersion
@@ -418,7 +420,7 @@ trait Dependencies {
scalatestPlusScalaCheck,
scalaCheck,
`mockito-scala`,
- `wiremock-jre8`,
+ `wiremock`,
jerseyCommon,
avro4s,
kafkaClients,
@@ -484,6 +486,10 @@ trait Dependencies {
val kafkaConnectS3FuncTestDeps: Seq[ModuleID] = baseTestDeps ++ compressionCodecDeps :+ s3Sdk
+ val kafkaConnectHttpDeps: Seq[ModuleID] = Seq()
+
+ val kafkaConnectHttpTestDeps: Seq[ModuleID] = baseTestDeps ++ Seq(
+ )
val kafkaConnectCassandraDeps: Seq[ModuleID] = Seq(
cassandraDriver,
jsonPath,
diff --git a/project/Settings.scala b/project/Settings.scala
index bac5fc2b2..77dd39dcf 100644
--- a/project/Settings.scala
+++ b/project/Settings.scala
@@ -1,3 +1,4 @@
+import Dependencies.betterMonadicFor
import Dependencies.globalExcludeDeps
import Dependencies.googleProtobuf
import Dependencies.googleProtobufJava
@@ -132,6 +133,7 @@ object Settings extends Dependencies {
("StreamReactor-Docs", "https://docs.lenses.io/5.0/integrations/connectors/stream-reactor/"),
),
),
+ betterMonadicFor,
)
val settings: Seq[Setting[_]] = commonSettings ++ Seq(