From 518e78a88b14eba32d1cf7489454bcf0678608f0 Mon Sep 17 00:00:00 2001 From: David Sloan Date: Thu, 14 Dec 2023 16:09:00 +0000 Subject: [PATCH] HTTP Sink WTD --- build.sbt | 29 ++++++- .../sink/extractors/SinkDataExtractor.scala | 7 +- .../common/sink/naming/CloudKeyNamer.scala | 2 +- .../sink/config/PartitionNamePath.scala | 0 .../sink/extractors/ArrayExtractor.scala | 0 .../sink/extractors/ArrayIndexUtil.scala | 0 .../extractors/ComplexTypeExtractor.scala | 0 .../sink/extractors/ExtractorErrorType.scala | 0 .../extractors/KafkaConnectExtractor.scala | 70 +++++++++++++++ .../common/sink/extractors/MapExtractor.scala | 0 .../sink/extractors/PrimitiveExtractor.scala | 0 .../sink/extractors/StructExtractor.scala | 0 .../extractors/WrappedArrayExtractor.scala | 0 .../WrappedComplexTypeExtractor.scala | 0 .../sink/extractors/WrappedMapExtractor.scala | 0 .../WrappedPrimitiveExtractor.scala | 0 .../connect/http/sink/HttpSinkConnector.scala | 55 ++++++++++++ .../connect/http/sink/HttpSinkTask.scala | 85 +++++++++++++++++++ .../http/sink/client/Authentication.scala | 39 +++++++++ .../connect/http/sink/client/HttpMethod.scala | 34 ++++++++ .../http/sink/client/HttpRequestSender.scala | 58 +++++++++++++ .../http/sink/config/HttpSinkConfig.scala | 47 ++++++++++ .../http/sink/config/HttpSinkConfigDef.scala | 34 ++++++++ .../tpl/binding/KafkaConnectBaseBinding.scala | 33 +++++++ .../binding/KafkaConnectHeaderBinding.scala | 23 +++++ .../tpl/binding/KafkaConnectKeyBinding.scala | 27 ++++++ .../binding/KafkaConnectObjectHandler.scala | 49 +++++++++++ .../binding/KafkaConnectOffsetBinding.scala | 23 +++++ .../KafkaConnectPartitionBinding.scala | 23 +++++ .../KafkaConnectTimestampBinding.scala | 23 +++++ .../binding/KafkaConnectTopicBinding.scala | 23 +++++ .../binding/KafkaConnectValueBinding.scala | 28 ++++++ .../tpl/templates/ProcessedTemplate.scala | 22 +++++ .../http/sink/tpl/templates/RawTemplate.scala | 38 +++++++++ .../sink/tpl/templates/SubstitutionType.scala | 69 +++++++++++++++ .../http/sink/tpl/templates/Template.scala | 46 ++++++++++ .../http/sink/HttpSinkConfigTest.scala | 38 +++++++++ .../connect/http/sink/tpl/TemplateTest.scala | 67 +++++++++++++++ project/Dependencies.scala | 30 ++++--- 39 files changed, 998 insertions(+), 24 deletions(-) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/PartitionNamePath.scala (100%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayExtractor.scala (100%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayIndexUtil.scala (100%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ComplexTypeExtractor.scala (100%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ExtractorErrorType.scala (100%) create mode 100644 kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/KafkaConnectExtractor.scala rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala (100%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PrimitiveExtractor.scala (100%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala (100%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedArrayExtractor.scala (100%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedComplexTypeExtractor.scala (100%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedMapExtractor.scala (100%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedPrimitiveExtractor.scala (100%) create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConnector.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/Authentication.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpMethod.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpRequestSender.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectBaseBinding.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectHeaderBinding.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectKeyBinding.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectObjectHandler.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectOffsetBinding.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectPartitionBinding.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectTimestampBinding.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectTopicBinding.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectValueBinding.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/ProcessedTemplate.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/RawTemplate.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/SubstitutionType.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/Template.scala create mode 100644 kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala create mode 100644 kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTest.scala diff --git a/build.sbt b/build.sbt index ccc0db5982..28d582a817 100644 --- a/build.sbt +++ b/build.sbt @@ -1,9 +1,9 @@ import Dependencies.globalExcludeDeps import Dependencies.gson - -import Settings._ +import Dependencies.mustache +import Settings.* import sbt.Keys.libraryDependencies -import sbt._ +import sbt.* import sbt.Project.projectToLocalProject import java.io.File @@ -22,6 +22,7 @@ lazy val subProjects: Seq[Project] = Seq( elastic7, ftp, `gcp-storage`, + http, influxdb, jms, mongodb, @@ -246,6 +247,28 @@ lazy val elastic7 = (project in file("kafka-connect-elastic7")) .configureFunctionalTests() .enablePlugins(PackPlugin) +lazy val http = (project in file("kafka-connect-http")) + .dependsOn(common) + //.dependsOn(`test-common` % "fun->compile") + .settings( + settings ++ + Seq( + name := "kafka-connect-http", + description := "Kafka Connect compatible connectors to move data between Kafka and http", + libraryDependencies ++= baseDeps ++ Seq(mustache), + publish / skip := true, + packExcludeJars := Seq( + "scala-.*\\.jar", + "zookeeper-.*\\.jar", + ), + ), + ) + .configureAssembly(false) + .configureTests(baseTestDeps) + //.configureIntegrationTests() + //.configureFunctionalTests(kafkaConnectS3FuncTestDeps) + .enablePlugins(PackPlugin, ProtocPlugin) + lazy val influxdb = (project in file("kafka-connect-influxdb")) .dependsOn(common) .settings( diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/SinkDataExtractor.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/SinkDataExtractor.scala index 4f514169b8..1b02026832 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/SinkDataExtractor.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/SinkDataExtractor.scala @@ -17,12 +17,7 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors import cats.implicits._ import com.typesafe.scalalogging.LazyLogging -import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData -import io.lenses.streamreactor.connect.cloud.common.formats.writer.ByteArraySinkData -import io.lenses.streamreactor.connect.cloud.common.formats.writer.MapSinkData -import io.lenses.streamreactor.connect.cloud.common.formats.writer.PrimitiveSinkData -import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData -import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData +import io.lenses.streamreactor.connect.cloud.common.formats.writer._ import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath /** diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/CloudKeyNamer.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/CloudKeyNamer.scala index 994d6c183f..e82198c2b8 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/CloudKeyNamer.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/CloudKeyNamer.scala @@ -30,10 +30,10 @@ import io.lenses.streamreactor.connect.cloud.common.sink.FatalCloudSinkError import io.lenses.streamreactor.connect.cloud.common.sink.SinkError import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionDisplay.KeysAndValues import io.lenses.streamreactor.connect.cloud.common.sink.config.DatePartitionField +import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath import io.lenses.streamreactor.connect.cloud.common.sink.config.HeaderPartitionField import io.lenses.streamreactor.connect.cloud.common.sink.config.KeyPartitionField import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionField -import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionPartitionField import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionSelection import io.lenses.streamreactor.connect.cloud.common.sink.config.TopicPartitionField diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/PartitionNamePath.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/PartitionNamePath.scala similarity index 100% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/PartitionNamePath.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/PartitionNamePath.scala diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayExtractor.scala similarity index 100% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayExtractor.scala diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayIndexUtil.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayIndexUtil.scala similarity index 100% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayIndexUtil.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayIndexUtil.scala diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ComplexTypeExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ComplexTypeExtractor.scala similarity index 100% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ComplexTypeExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ComplexTypeExtractor.scala diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ExtractorErrorType.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ExtractorErrorType.scala similarity index 100% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ExtractorErrorType.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ExtractorErrorType.scala diff --git a/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/KafkaConnectExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/KafkaConnectExtractor.scala new file mode 100644 index 0000000000..60d99cc910 --- /dev/null +++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/KafkaConnectExtractor.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.sink.extractors + +import cats.implicits.catsSyntaxEitherId +import com.typesafe.scalalogging.LazyLogging +import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.errors.ConnectException +import org.apache.kafka.connect.sink.SinkRecord + +import java.util +import java.lang +import java.nio.ByteBuffer +import scala.jdk.CollectionConverters.MapHasAsJava + +object KafkaConnectExtractor extends LazyLogging { + + def extractFromKey(sinkRecord: SinkRecord, path: Option[String]): Either[Throwable, AnyRef] = + extract(sinkRecord.key(), Option(sinkRecord.keySchema()), path) + + def extractFromValue(sinkRecord: SinkRecord, path: Option[String]): Either[Throwable, AnyRef] = + extract(sinkRecord.value(), Option(sinkRecord.valueSchema()), path) + + // TODO: test with all different types + private def extract( + extractFrom: AnyRef, + extractSchema: Option[Schema], + maybePath: Option[String], + ): Either[Throwable, AnyRef] = { + + val maybePnp: Option[PartitionNamePath] = maybePath.map(path => PartitionNamePath(path.split('.').toIndexedSeq: _*)) + (extractFrom, maybePnp) match { + case (shortVal: lang.Short, _) => shortVal.asRight + case (boolVal: lang.Boolean, _) => boolVal.asRight + case (stringVal: lang.String, _) => stringVal.asRight + case (longVal: lang.Long, _) => longVal.asRight + case (intVal: lang.Integer, _) => intVal.asRight + case (byteVal: lang.Byte, _) => byteVal.asRight + case (doubleVal: lang.Double, _) => doubleVal.asRight + case (floatVal: lang.Float, _) => floatVal.asRight + case (bytesVal: Array[Byte], _) => bytesVal.asRight + case (bytesVal: ByteBuffer, _) => bytesVal.array().asRight + case (arrayVal: Array[_], _) => arrayVal.asRight + case (decimal: BigDecimal, _) => decimal.asRight + case (decimal: java.math.BigDecimal, _) => decimal.asRight + case null => null + case (structVal: Struct, Some(pnp)) => StructExtractor.extractPathFromStruct(structVal, pnp) + case (mapVal: Map[_, _], Some(pnp)) => MapExtractor.extractPathFromMap(mapVal.asJava, pnp, extractSchema.orNull) + case (mapVal: util.Map[_, _], Some(pnp)) => MapExtractor.extractPathFromMap(mapVal, pnp, extractSchema.orNull) + case (listVal: util.List[_], Some(pnp)) => ArrayExtractor.extractPathFromArray(listVal, pnp, extractSchema.orNull) + case otherVal => new ConnectException("Unknown value type: " + otherVal.getClass.getName).asLeft + } + } + +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala similarity index 100% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PrimitiveExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PrimitiveExtractor.scala similarity index 100% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PrimitiveExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PrimitiveExtractor.scala diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala similarity index 100% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedArrayExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedArrayExtractor.scala similarity index 100% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedArrayExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedArrayExtractor.scala diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedComplexTypeExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedComplexTypeExtractor.scala similarity index 100% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedComplexTypeExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedComplexTypeExtractor.scala diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedMapExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedMapExtractor.scala similarity index 100% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedMapExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedMapExtractor.scala diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedPrimitiveExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedPrimitiveExtractor.scala similarity index 100% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedPrimitiveExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedPrimitiveExtractor.scala diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConnector.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConnector.scala new file mode 100644 index 0000000000..aad600922e --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConnector.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink + +import cats.implicits.catsSyntaxOptionId +import com.typesafe.scalalogging.LazyLogging +import io.lenses.streamreactor.common.utils.JarManifest +import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfigDef +import org.apache.kafka.common.config.ConfigDef +import org.apache.kafka.connect.connector.Task +import org.apache.kafka.connect.sink.SinkConnector + +import java.util +import scala.jdk.CollectionConverters.MapHasAsJava +import scala.jdk.CollectionConverters.MapHasAsScala +import scala.jdk.CollectionConverters.SeqHasAsJava + +class HttpSinkConnector extends SinkConnector with LazyLogging { + + private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation) + private var props: Option[Map[String, String]] = Option.empty + + override def version(): String = manifest.version() + + override def taskClass(): Class[_ <: Task] = classOf[HttpSinkTask] + + override def config(): ConfigDef = HttpSinkConfigDef.config + + override def start(props: util.Map[String, String]): Unit = { + logger.info(s"Creating S3 sink connector") + this.props = props.asScala.toMap.some + } + + override def stop(): Unit = () + + override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] = { + logger.info(s"Creating $maxTasks tasks config") + List.fill(maxTasks) { + props.map(_.asJava).getOrElse(Map.empty[String, String].asJava) + }.asJava + } +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala new file mode 100644 index 0000000000..c67ed2d887 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala @@ -0,0 +1,85 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink + +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import cats.implicits._ +import io.lenses.streamreactor.common.utils.JarManifest +import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig.fromJson +import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfigDef.configProp +import io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender +import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig +import io.lenses.streamreactor.connect.http.sink.tpl.templates.ProcessedTemplate +import io.lenses.streamreactor.connect.http.sink.tpl.templates.RawTemplate +import io.lenses.streamreactor.connect.http.sink.tpl.templates.Template +import org.apache.kafka.connect.sink.SinkRecord +import org.apache.kafka.connect.sink.SinkTask +import org.http4s.jdkhttpclient.JdkHttpClient + +import java.util +import scala.jdk.CollectionConverters.IterableHasAsScala +import scala.jdk.CollectionConverters.MapHasAsScala + +class HttpSinkTask extends SinkTask { + private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation) + + private var maybeConfig: Option[HttpSinkConfig] = Option.empty + + private var maybeTemplate: Option[Template] = Option.empty + override def start(props: util.Map[String, String]): Unit = { + { + for { + propVal <- props.asScala.get(configProp).toRight(new IllegalArgumentException("No prop found")) + config <- fromJson(propVal) + template = RawTemplate(config.endpoint, config.content, config.headers) + .parse() + } yield { + this.maybeConfig = config.some + this.maybeTemplate = template.some + } + }.leftMap(throw _) + () + + } + + override def put(records: util.Collection[SinkRecord]): Unit = { + val sinkRecords = records.asScala + (maybeConfig, maybeTemplate) match { + case (Some(config), Some(template)) => + val sender = new HttpRequestSender(config) + val clientResource = JdkHttpClient.simple[IO] + + clientResource.use { + client => + sinkRecords.map { + record => + val processed: ProcessedTemplate = template.process(record) + sender.sendHttpRequest( + client, + processed, + ) + }.toSeq.sequence *> IO.unit + }.unsafeRunSync() + case _ => throw new IllegalArgumentException("Config or tmeplate not set") + } + + } + + override def stop(): Unit = ??? + + override def version(): String = manifest.version() +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/Authentication.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/Authentication.scala new file mode 100644 index 0000000000..f6ead99ec1 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/Authentication.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.client + +import io.circe.generic.extras.Configuration +import io.circe.generic.extras.semiauto._ +import io.circe.Decoder +import io.circe.Encoder + +sealed trait Authentication + +object Authentication { + implicit val customConfig: Configuration = Configuration.default.withDiscriminator("type") + + implicit val decoder: Decoder[Authentication] = deriveConfiguredDecoder[Authentication] + implicit val encoder: Encoder[Authentication] = deriveConfiguredEncoder[Authentication] +} + +case class BasicAuthentication(username: String, password: String) extends Authentication + +object BasicAuthentication { + implicit val customConfig: Configuration = Configuration.default.withDiscriminator("type") + + implicit val decoder: Decoder[BasicAuthentication] = deriveConfiguredDecoder + implicit val encoder: Encoder[BasicAuthentication] = deriveConfiguredEncoder +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpMethod.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpMethod.scala new file mode 100644 index 0000000000..f74bf0ab80 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpMethod.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.client + +import enumeratum.CirceEnum +import enumeratum.Enum +import enumeratum.EnumEntry + +sealed trait HttpMethod extends EnumEntry + +case object HttpMethod extends Enum[HttpMethod] with CirceEnum[HttpMethod] { + + val values = findValues + + case object Put extends HttpMethod + + case object Post extends HttpMethod + + case object Patch extends HttpMethod + +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpRequestSender.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpRequestSender.scala new file mode 100644 index 0000000000..a26479a117 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpRequestSender.scala @@ -0,0 +1,58 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.client + +import cats.effect.IO +import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig +import io.lenses.streamreactor.connect.http.sink.tpl.templates.ProcessedTemplate +import org.http4s._ +import org.http4s.client.Client +import org.http4s.headers.Authorization +import org.typelevel.ci.CIString +class HttpRequestSender(config: HttpSinkConfig) { + + def sendHttpRequest( + client: Client[IO], + processedTemplate: ProcessedTemplate, + ): IO[Unit] = { + + val uri = Uri.unsafeFromString(processedTemplate.endpoint) + + val clientHeaders: Headers = Headers(processedTemplate.headers.map { + case (name, value) => + Header.ToRaw.rawToRaw(new Header.Raw(CIString(name), value)) + }: _*) + + val request = Request[IO]( + method = Method.fromString(config.method.entryName).getOrElse(Method.GET), + uri = uri, + headers = clientHeaders, + ).withEntity(config.content) + + // Add authentication if present + val authenticatedRequest = config.authentication.fold(request) { + case BasicAuthentication(username, password) => + request.putHeaders(Authorization(BasicCredentials(username, password))) + } + + for { + response <- client.expect[String](authenticatedRequest) + _ <- IO(println(s"Response: $response")) + } yield () + + } + +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala new file mode 100644 index 0000000000..f1c12970b3 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.config + +import io.circe.generic.semiauto.deriveDecoder +import io.circe.generic.semiauto.deriveEncoder +import io.circe.parser.decode +import io.circe.syntax.EncoderOps +import io.circe.Decoder +import io.circe.Encoder +import io.circe.Error +import io.lenses.streamreactor.connect.http.sink.client.Authentication +import io.lenses.streamreactor.connect.http.sink.client.HttpMethod +object HttpSinkConfig { + + implicit val decoder: Decoder[HttpSinkConfig] = deriveDecoder + implicit val encoder: Encoder[HttpSinkConfig] = deriveEncoder + + def fromJson(json: String): Either[Error, HttpSinkConfig] = decode[HttpSinkConfig](json) + +} +case class HttpSinkConfig( + authentication: Option[Authentication], // ssl, basic, oauth2, proxy + method: HttpMethod, + endpoint: String, // tokenised + content: String, // tokenised + headers: Seq[(String, String)], // tokenised +) { + def toJson: String = { + val decoded: HttpSinkConfig = this + decoded.asJson(HttpSinkConfig.encoder).noSpaces + } + +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala new file mode 100644 index 0000000000..77f1a8a84a --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.config + +import org.apache.kafka.common.config.ConfigDef +import org.apache.kafka.common.config.ConfigDef.Importance +import org.apache.kafka.common.config.ConfigDef.Type + +object HttpSinkConfigDef { + + val configProp: String = "connect.http.config" + val config: ConfigDef = + new ConfigDef() + .define( + configProp, + Type.STRING, + Importance.HIGH, + "Configuration string", + ) + +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectBaseBinding.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectBaseBinding.scala new file mode 100644 index 0000000000..49301f79ab --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectBaseBinding.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import com.github.mustachejava.Binding +import org.apache.kafka.connect.sink.SinkRecord + +import java.util + +abstract class KafkaConnectBaseBinding extends Binding { + + override def get(scopes: util.List[AnyRef]): AnyRef = { + + val sinkRecord = scopes.get(0).asInstanceOf[SinkRecord] + get(sinkRecord) + } + + def get(sinkRecord: SinkRecord): AnyRef + +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectHeaderBinding.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectHeaderBinding.scala new file mode 100644 index 0000000000..6a453cd7ab --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectHeaderBinding.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import org.apache.kafka.connect.sink.SinkRecord + +class KafkaConnectHeaderBinding(name: String) extends KafkaConnectBaseBinding() { + override def get(sinkRecord: SinkRecord): AnyRef = + sinkRecord.headers().lastWithName(name).value() +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectKeyBinding.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectKeyBinding.scala new file mode 100644 index 0000000000..3693e14011 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectKeyBinding.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import cats.implicits.toBifunctorOps +import io.lenses.streamreactor.connect.cloud.common.sink.extractors.KafkaConnectExtractor +import org.apache.kafka.connect.errors.ConnectException +import org.apache.kafka.connect.sink.SinkRecord + +class KafkaConnectKeyBinding(name: Option[String]) extends KafkaConnectBaseBinding() { + override def get(sinkRecord: SinkRecord): AnyRef = KafkaConnectExtractor.extractFromKey(sinkRecord, name).leftMap(e => + throw new ConnectException(s"unable to extract field $name for template, ", e), + ).merge +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectObjectHandler.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectObjectHandler.scala new file mode 100644 index 0000000000..b675360960 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectObjectHandler.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import com.github.mustachejava.reflect.BaseObjectHandler +import com.github.mustachejava.util.Wrapper +import com.github.mustachejava.Binding +import com.github.mustachejava.Code +import com.github.mustachejava.TemplateContext +import io.lenses.streamreactor.connect.http.sink.tpl.templates.SubstitutionType + +import java.util +import scala.util.Try + +class KafkaConnectObjectHandler extends BaseObjectHandler { + + override def createBinding(nameCouldBeNull: String, tc: TemplateContext, code: Code): Binding = + splitOutSubstitution(nameCouldBeNull).map { + case (substitutionType: SubstitutionType, locator: Option[String]) => substitutionType.toBinding(locator) + }.orNull + + private def splitOutSubstitution(nameCouldBeNull: String): Option[(SubstitutionType, Option[String])] = + Option(nameCouldBeNull) + .flatMap { + name => + val split = name.split("\\.", 2) + for { + subsType <- SubstitutionType.withNameInsensitiveOption(split.head) + } yield { + subsType -> Try(split(1)).toOption + } + } + + override def find(name: String, scopes: util.List[AnyRef]): Wrapper = + null +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectOffsetBinding.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectOffsetBinding.scala new file mode 100644 index 0000000000..5e6951ed4c --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectOffsetBinding.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import org.apache.kafka.connect.sink.SinkRecord + +class KafkaConnectOffsetBinding() extends KafkaConnectBaseBinding() { + override def get(sinkRecord: SinkRecord): AnyRef = + Long.box(sinkRecord.kafkaOffset()) +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectPartitionBinding.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectPartitionBinding.scala new file mode 100644 index 0000000000..485896e338 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectPartitionBinding.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import org.apache.kafka.connect.sink.SinkRecord + +class KafkaConnectPartitionBinding() extends KafkaConnectBaseBinding() { + override def get(sinkRecord: SinkRecord): AnyRef = + sinkRecord.kafkaPartition() +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectTimestampBinding.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectTimestampBinding.scala new file mode 100644 index 0000000000..c1edd3a131 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectTimestampBinding.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import org.apache.kafka.connect.sink.SinkRecord + +class KafkaConnectTimestampBinding() extends KafkaConnectBaseBinding() { + override def get(sinkRecord: SinkRecord): AnyRef = + Long.box(sinkRecord.timestamp()) +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectTopicBinding.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectTopicBinding.scala new file mode 100644 index 0000000000..056419645a --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectTopicBinding.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import org.apache.kafka.connect.sink.SinkRecord + +class KafkaConnectTopicBinding() extends KafkaConnectBaseBinding() { + override def get(sinkRecord: SinkRecord): AnyRef = + sinkRecord.topic() +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectValueBinding.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectValueBinding.scala new file mode 100644 index 0000000000..744f52266e --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectValueBinding.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import cats.implicits.toBifunctorOps +import io.lenses.streamreactor.connect.cloud.common.sink.extractors.KafkaConnectExtractor +import org.apache.kafka.connect.errors.ConnectException +import org.apache.kafka.connect.sink.SinkRecord + +class KafkaConnectValueBinding(name: Option[String]) extends KafkaConnectBaseBinding() { + override def get(sinkRecord: SinkRecord): AnyRef = + KafkaConnectExtractor.extractFromValue(sinkRecord, name).leftMap(e => + throw new ConnectException(s"unable to extract field $name for template, ", e), + ).merge +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/ProcessedTemplate.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/ProcessedTemplate.scala new file mode 100644 index 0000000000..e7f4066a54 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/ProcessedTemplate.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.templates + +case class ProcessedTemplate( + endpoint: String, + content: String, + headers: Seq[(String, String)], +) diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/RawTemplate.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/RawTemplate.scala new file mode 100644 index 0000000000..17b6173266 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/RawTemplate.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.templates + +import com.github.mustachejava.DefaultMustacheFactory +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectObjectHandler + +import java.io.StringReader + +case class RawTemplate( + endpoint: String, + content: String, + headers: Seq[(String, String)], +) { + def parse(): Template = { + val mf = new DefaultMustacheFactory() + mf.setObjectHandler(new KafkaConnectObjectHandler()) + + val endpointTemplate = mf.compile(new StringReader(endpoint), "endpoint") + val contentTemplate = mf.compile(new StringReader(content), "content") + //val headerTemplates = template.headers.map() + // TODO + Template(endpointTemplate, contentTemplate) + } +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/SubstitutionType.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/SubstitutionType.scala new file mode 100644 index 0000000000..f2e7068c75 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/SubstitutionType.scala @@ -0,0 +1,69 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.templates + +import enumeratum.CirceEnum +import enumeratum.Enum +import enumeratum.EnumEntry +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectBaseBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectHeaderBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectKeyBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectOffsetBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectPartitionBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectTopicBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectValueBinding +import org.apache.kafka.connect.errors.ConnectException + +sealed trait SubstitutionType extends EnumEntry { + def toBinding(locator: Option[String]): KafkaConnectBaseBinding +} + +case object SubstitutionType extends Enum[SubstitutionType] with CirceEnum[SubstitutionType] { + + val values = findValues + + case object Key extends SubstitutionType { + override def toBinding(locator: Option[String]): KafkaConnectBaseBinding = new KafkaConnectKeyBinding(locator) + } + + case object Value extends SubstitutionType { + override def toBinding(locator: Option[String]): KafkaConnectBaseBinding = new KafkaConnectValueBinding(locator) + + } + + case object Header extends SubstitutionType { + // TODO: Better error handling + override def toBinding(locator: Option[String]): KafkaConnectBaseBinding = + new KafkaConnectHeaderBinding(locator.getOrElse(throw new ConnectException("Invalid locator for path"))) + + } + + case object Topic extends SubstitutionType { + override def toBinding(locator: Option[String]): KafkaConnectBaseBinding = new KafkaConnectTopicBinding() + + } + + case object Partition extends SubstitutionType { + override def toBinding(locator: Option[String]): KafkaConnectBaseBinding = new KafkaConnectPartitionBinding() + + } + + case object Offset extends SubstitutionType { + override def toBinding(locator: Option[String]): KafkaConnectBaseBinding = new KafkaConnectOffsetBinding() + + } + +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/Template.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/Template.scala new file mode 100644 index 0000000000..f9091f7eb7 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/Template.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.templates + +import com.github.mustachejava.Mustache +import org.apache.kafka.connect.sink.SinkRecord + +import java.io.StringWriter + +case class Template( + endpointTemplate: Mustache, + contentTemplate: Mustache, +) { + def process( + sinkRecord: SinkRecord, + ): ProcessedTemplate = + ProcessedTemplate( + executeTemplate(endpointTemplate, sinkRecord), + executeTemplate(contentTemplate, sinkRecord), + Seq(), + ) + + private def executeTemplate( + template: Mustache, + sinkRecord: SinkRecord, + ): String = { + val stringWriter = new StringWriter() + template.execute(stringWriter, sinkRecord) + stringWriter.flush() + stringWriter.toString + } + +} diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala new file mode 100644 index 0000000000..44298d7887 --- /dev/null +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink + +import io.lenses.streamreactor.connect.http.sink.client.BasicAuthentication +import io.lenses.streamreactor.connect.http.sink.client.HttpMethod.Put +import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig +import org.scalatest.funsuite.AnyFunSuiteLike +import org.scalatest.matchers.should.Matchers + +class HttpSinkConfigTest extends AnyFunSuiteLike with Matchers { + + test("should write config to json") { + HttpSinkConfig( + Some(BasicAuthentication("user", "pass")), + Put, + "http://myaddress.example.com", + "\nDave\nJason\nHooray for Kafka Connect!\n", + Seq("something" -> "somethingelse"), + ).toJson should be( + """{"authentication":{"username":"user","password":"pass","type":"BasicAuthentication"},"method":"Put","endpoint":"http://myaddress.example.com","content":"\nDave\nJason\nHooray for Kafka Connect!\n","headers":[["something","somethingelse"]]}""", + ) + } + +} diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTest.scala new file mode 100644 index 0000000000..258e163831 --- /dev/null +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTest.scala @@ -0,0 +1,67 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl + +import io.lenses.streamreactor.connect.http.sink.tpl.templates.RawTemplate +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.sink.SinkRecord +import org.scalatest.funsuite.AnyFunSuiteLike +import org.scalatest.matchers.should.Matchers + +class TemplateTest extends AnyFunSuiteLike with Matchers { + + test("template behaviour") { + + val valueSchema = SchemaBuilder + .struct() + .name("myStruct") + .field("groupDomain", Schema.STRING_SCHEMA) + .field("orderNo", Schema.INT32_SCHEMA) + .field("employeeId", Schema.STRING_SCHEMA) + .build() + + val value = new Struct(valueSchema) + value.put("groupDomain", "myExampleGroup.uk") + value.put("orderNo", 10) + value.put("employeeId", "Abcd1234") + + val record = new SinkRecord("myTopic", 0, null, null, valueSchema, value, 9) + val processedTemplate = RawTemplate( + endpoint = "http://{{value.groupDomain}}.example.com/{{value.orderNo}}/{{value.employeeId}}/{{topic}}", + content = + """ + |{{topic}} + |{{value.employeeId}} + |{{value.orderNo}} + |{{value.groupDomain}} + |""".stripMargin, + Seq(), + ).parse().process(record) + processedTemplate.endpoint should be("http://myExampleGroup.uk.example.com/10/Abcd1234/myTopic") + + processedTemplate.content should be( + """ + |myTopic + |Abcd1234 + |10 + |myExampleGroup.uk + |""".stripMargin, + ) + } + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 57a58bac85..557731e6e0 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -50,9 +50,10 @@ object Dependencies { val enumeratumVersion = "1.7.2" - val http4sVersion = "1.0.0-M32" - val avroVersion = "1.11.0" - val avro4sVersion = "4.1.0" + val http4sVersion = "1.0.0-M32" + val http4sJdkVersion = "1.0.0-M1" + val avroVersion = "1.11.0" + val avro4sVersion = "4.1.0" val catsVersion = "2.9.0" val catsEffectVersion = "3.4.8" @@ -60,7 +61,7 @@ object Dependencies { val urlValidatorVersion = "1.7" val circeVersion = "0.15.0-M1" - val circeGenericExtrasVersion = "0.14.1" + val circeGenericExtrasVersion = "0.14.3" val circeJsonSchemaVersion = "0.2.0" val shapelessVersion = "2.3.10" @@ -165,10 +166,13 @@ object Dependencies { val urlValidator = "commons-validator" % "commons-validator" % urlValidatorVersion - val circeGeneric = "io.circe" %% "circe-generic" % circeVersion - val circeParser = "io.circe" %% "circe-parser" % circeVersion - val circeRefined = "io.circe" %% "circe-refined" % circeVersion - val circe: Seq[ModuleID] = Seq(circeGeneric, circeParser) + val circeGeneric = "io.circe" %% "circe-generic" % circeVersion + val circeGenericExtras = "io.circe" %% "circe-generic-extras" % circeGenericExtrasVersion + val circeParser = "io.circe" %% "circe-parser" % circeVersion + val circeRefined = "io.circe" %% "circe-refined" % circeVersion + val circe: Seq[ModuleID] = Seq(circeGeneric, circeParser, circeGenericExtras) + + val mustache = "com.github.spullara.mustache.java" % "compiler" % "0.9.10" // logging val logback = "ch.qos.logback" % "logback-classic" % logbackVersion @@ -214,12 +218,10 @@ object Dependencies { val confluentProtobufConverter: ModuleID = confluentExcludes("io.confluent" % "kafka-connect-protobuf-converter" % confluentVersion) - val http4sDsl = "org.http4s" %% "http4s-dsl" % http4sVersion - val http4sAsyncClient = "org.http4s" %% "http4s-async-http-client" % http4sVersion - val http4sBlazeServer = "org.http4s" %% "http4s-blaze-server" % http4sVersion - val http4sBlazeClient = "org.http4s" %% "http4s-blaze-client" % http4sVersion - val http4sCirce = "org.http4s" %% "http4s-circe" % http4sVersion - val http4s: Seq[ModuleID] = Seq(http4sDsl, http4sAsyncClient, http4sBlazeServer, http4sCirce) + val http4sDsl = "org.http4s" %% "http4s-dsl" % http4sVersion + val http4sJdkClient = "org.http4s" %% "http4s-jdk-http-client" % http4sJdkVersion + val http4sCirce = "org.http4s" %% "http4s-circe" % http4sVersion + val http4s: Seq[ModuleID] = Seq(http4sDsl, http4sJdkClient, http4sCirce) val bouncyProv = "org.bouncycastle" % "bcprov-jdk15on" % bouncyCastleVersion val bouncyUtil = "org.bouncycastle" % "bcutil-jdk15on" % bouncyCastleVersion