From 0647dc624cc4154ffb58ceac09aded0cf1996089 Mon Sep 17 00:00:00 2001 From: David Sloan Date: Thu, 14 Dec 2023 16:09:00 +0000 Subject: [PATCH] HTTP Sink WTD --- build.sbt | 29 ++++++- .../formats/writer/CsvFormatWriter.scala | 2 +- .../common/sink/config/PartitionField.scala | 1 + .../sink/extractors/SinkDataExtractor.scala | 8 +- .../common/sink/naming/CloudKeyNamer.scala | 2 +- .../common/model/PartitionFieldTest.scala | 1 + .../sink/extractors/MapExtractorTest.scala | 1 - .../extractors/SinkDataExtractorTest.scala | 1 - .../sink/extractors/ArrayExtractor.scala | 1 - .../sink/extractors/ArrayIndexUtil.scala | 0 .../extractors/ComplexTypeExtractor.scala | 1 - .../sink/extractors/ExtractorErrorType.scala | 0 .../extractors/KafkaConnectExtractor.scala | 69 +++++++++++++++ .../common/sink/extractors/MapExtractor.scala | 1 - .../sink/extractors}/PartitionNamePath.scala | 2 +- .../sink/extractors/PrimitiveExtractor.scala | 0 .../sink/extractors/StructExtractor.scala | 1 - .../extractors/WrappedArrayExtractor.scala | 1 - .../WrappedComplexTypeExtractor.scala | 1 - .../sink/extractors/WrappedMapExtractor.scala | 1 - .../WrappedPrimitiveExtractor.scala | 0 .../connect/http/sink/HttpSinkConnector.scala | 55 ++++++++++++ .../connect/http/sink/HttpSinkTask.scala | 85 +++++++++++++++++++ .../http/sink/client/Authentication.scala | 39 +++++++++ .../connect/http/sink/client/HttpMethod.scala | 34 ++++++++ .../http/sink/client/HttpRequestSender.scala | 58 +++++++++++++ .../http/sink/config/HttpSinkConfig.scala | 47 ++++++++++ .../http/sink/config/HttpSinkConfigDef.scala | 34 ++++++++ .../tpl/binding/KafkaConnectBaseBinding.scala | 33 +++++++ .../binding/KafkaConnectHeaderBinding.scala | 23 +++++ .../tpl/binding/KafkaConnectKeyBinding.scala | 27 ++++++ .../binding/KafkaConnectObjectHandler.scala | 48 +++++++++++ .../binding/KafkaConnectOffsetBinding.scala | 23 +++++ .../KafkaConnectPartitionBinding.scala | 23 +++++ .../KafkaConnectTimestampBinding.scala | 21 +++++ .../binding/KafkaConnectTopicBinding.scala | 23 +++++ .../binding/KafkaConnectValueBinding.scala | 28 ++++++ .../tpl/templates/ProcessedTemplate.scala | 22 +++++ .../http/sink/tpl/templates/RawTemplate.scala | 38 +++++++++ .../sink/tpl/templates/SubstitutionType.scala | 66 ++++++++++++++ .../http/sink/tpl/templates/Template.scala | 46 ++++++++++ .../http/sink/HttpSinkConfigTest.scala | 36 ++++++++ .../connect/http/sink/tpl/TemplateTest.scala | 65 ++++++++++++++ project/Dependencies.scala | 30 ++++--- 44 files changed, 991 insertions(+), 36 deletions(-) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayExtractor.scala (96%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayIndexUtil.scala (100%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ComplexTypeExtractor.scala (94%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ExtractorErrorType.scala (100%) create mode 100644 kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/KafkaConnectExtractor.scala rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala (95%) rename {kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config => kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors}/PartitionNamePath.scala (94%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PrimitiveExtractor.scala (100%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala (97%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedArrayExtractor.scala (94%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedComplexTypeExtractor.scala (95%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedMapExtractor.scala (95%) rename {kafka-connect-cloud-common => kafka-connect-common}/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedPrimitiveExtractor.scala (100%) create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConnector.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/Authentication.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpMethod.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpRequestSender.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectBaseBinding.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectHeaderBinding.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectKeyBinding.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectObjectHandler.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectOffsetBinding.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectPartitionBinding.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectTimestampBinding.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectTopicBinding.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectValueBinding.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/ProcessedTemplate.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/RawTemplate.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/SubstitutionType.scala create mode 100644 kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/Template.scala create mode 100644 kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala create mode 100644 kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTest.scala diff --git a/build.sbt b/build.sbt index 78614a30cb..16b51b34f3 100644 --- a/build.sbt +++ b/build.sbt @@ -1,9 +1,9 @@ import Dependencies.globalExcludeDeps import Dependencies.gson - -import Settings._ +import Dependencies.mustache +import Settings.* import sbt.Keys.libraryDependencies -import sbt._ +import sbt.* import sbt.Project.projectToLocalProject import java.io.File @@ -22,6 +22,7 @@ lazy val subProjects: Seq[Project] = Seq( elastic7, ftp, `gcp-storage`, + http, influxdb, jms, mongodb, @@ -246,6 +247,28 @@ lazy val elastic7 = (project in file("kafka-connect-elastic7")) .configureFunctionalTests() .enablePlugins(PackPlugin) +lazy val http = (project in file("kafka-connect-http")) + .dependsOn(common) + //.dependsOn(`test-common` % "fun->compile") + .settings( + settings ++ + Seq( + name := "kafka-connect-http", + description := "Kafka Connect compatible connectors to move data between Kafka and http", + libraryDependencies ++= baseDeps ++ Seq(mustache), + publish / skip := true, + packExcludeJars := Seq( + "scala-.*\\.jar", + "zookeeper-.*\\.jar", + ), + ), + ) + .configureAssembly(false) + .configureTests(baseTestDeps) + //.configureIntegrationTests() + //.configureFunctionalTests(kafkaConnectS3FuncTestDeps) + .enablePlugins(PackPlugin, ProtocPlugin) + lazy val influxdb = (project in file("kafka-connect-influxdb")) .dependsOn(common) .settings( diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/CsvFormatWriter.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/CsvFormatWriter.scala index 353a900988..63980d0930 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/CsvFormatWriter.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/CsvFormatWriter.scala @@ -18,8 +18,8 @@ package io.lenses.streamreactor.connect.cloud.common.formats.writer import com.opencsv.CSVWriter import com.typesafe.scalalogging.LazyLogging import io.lenses.streamreactor.connect.cloud.common.sink.SinkError -import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath import io.lenses.streamreactor.connect.cloud.common.sink.extractors.ExtractorErrorAdaptor.adaptErrorResponse +import io.lenses.streamreactor.connect.cloud.common.sink.extractors.PartitionNamePath import io.lenses.streamreactor.connect.cloud.common.sink.extractors.SinkDataExtractor import io.lenses.streamreactor.connect.cloud.common.stream.CloudOutputStream import org.apache.kafka.connect.data.Schema diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/PartitionField.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/PartitionField.scala index be0eca415d..3a004dc687 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/PartitionField.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/PartitionField.scala @@ -16,6 +16,7 @@ package io.lenses.streamreactor.connect.cloud.common.sink.config import io.lenses.kcql.Kcql +import io.lenses.streamreactor.connect.cloud.common.sink.extractors.PartitionNamePath import java.time.format.DateTimeFormatter import java.util.TimeZone diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/SinkDataExtractor.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/SinkDataExtractor.scala index c3f416748a..b69195227e 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/SinkDataExtractor.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/SinkDataExtractor.scala @@ -17,13 +17,7 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors import cats.implicits._ import com.typesafe.scalalogging.LazyLogging -import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData -import io.lenses.streamreactor.connect.cloud.common.formats.writer.ByteArraySinkData -import io.lenses.streamreactor.connect.cloud.common.formats.writer.MapSinkData -import io.lenses.streamreactor.connect.cloud.common.formats.writer.PrimitiveSinkData -import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData -import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData -import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath +import io.lenses.streamreactor.connect.cloud.common.formats.writer._ /** * Extracts values from a SinkData wrapper type diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/CloudKeyNamer.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/CloudKeyNamer.scala index 589a4fbe09..87ca76abec 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/CloudKeyNamer.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/CloudKeyNamer.scala @@ -33,13 +33,13 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.DatePartitionFie import io.lenses.streamreactor.connect.cloud.common.sink.config.HeaderPartitionField import io.lenses.streamreactor.connect.cloud.common.sink.config.KeyPartitionField import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionField -import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionPartitionField import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionSelection import io.lenses.streamreactor.connect.cloud.common.sink.config.TopicPartitionField import io.lenses.streamreactor.connect.cloud.common.sink.config.ValuePartitionField import io.lenses.streamreactor.connect.cloud.common.sink.config.WholeKeyPartitionField import io.lenses.streamreactor.connect.cloud.common.sink.extractors.ExtractorErrorAdaptor.adaptErrorResponse +import io.lenses.streamreactor.connect.cloud.common.sink.extractors.PartitionNamePath import io.lenses.streamreactor.connect.cloud.common.sink.extractors.SinkDataExtractor import java.io.File diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/model/PartitionFieldTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/model/PartitionFieldTest.scala index 282fa78d6c..de7fc112e4 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/model/PartitionFieldTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/model/PartitionFieldTest.scala @@ -17,6 +17,7 @@ package io.lenses.streamreactor.connect.cloud.common.model import io.lenses.kcql.Kcql import io.lenses.streamreactor.connect.cloud.common.sink.config._ +import io.lenses.streamreactor.connect.cloud.common.sink.extractors.PartitionNamePath import org.mockito.MockitoSugar import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractorTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractorTest.scala index ac2604aadd..f0f7b1a239 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractorTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractorTest.scala @@ -15,7 +15,6 @@ */ package io.lenses.streamreactor.connect.cloud.common.sink.extractors -import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath import org.apache.kafka.connect.data.SchemaBuilder import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/SinkDataExtractorTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/SinkDataExtractorTest.scala index 1a86d62d0e..f807832401 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/SinkDataExtractorTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/SinkDataExtractorTest.scala @@ -18,7 +18,6 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData import io.lenses.streamreactor.connect.cloud.common.formats.writer.MapSinkData import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData -import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath import io.lenses.streamreactor.connect.cloud.common.sink.extractors.ExtractorErrorType.MissingValue import io.lenses.streamreactor.connect.cloud.common.sink.extractors.ExtractorErrorType.UnexpectedType import io.lenses.streamreactor.connect.cloud.common.sink.extractors.ExtractorError diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayExtractor.scala similarity index 96% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayExtractor.scala index 461d6c5523..e71a634efa 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayExtractor.scala +++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayExtractor.scala @@ -18,7 +18,6 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors import cats.implicits._ import com.typesafe.scalalogging.LazyLogging import ArrayIndexUtil.getArrayIndex -import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath import org.apache.kafka.connect.data.Schema import java.util diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayIndexUtil.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayIndexUtil.scala similarity index 100% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayIndexUtil.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ArrayIndexUtil.scala diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ComplexTypeExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ComplexTypeExtractor.scala similarity index 94% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ComplexTypeExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ComplexTypeExtractor.scala index 2193663300..b39fc0f173 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ComplexTypeExtractor.scala +++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ComplexTypeExtractor.scala @@ -17,7 +17,6 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors import cats.implicits._ import com.typesafe.scalalogging.LazyLogging -import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath import org.apache.kafka.connect.data.Schema import org.apache.kafka.connect.data.Struct diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ExtractorErrorType.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ExtractorErrorType.scala similarity index 100% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ExtractorErrorType.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/ExtractorErrorType.scala diff --git a/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/KafkaConnectExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/KafkaConnectExtractor.scala new file mode 100644 index 0000000000..8978ded62d --- /dev/null +++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/KafkaConnectExtractor.scala @@ -0,0 +1,69 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.sink.extractors + +import cats.implicits.catsSyntaxEitherId +import com.typesafe.scalalogging.LazyLogging +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.errors.ConnectException +import org.apache.kafka.connect.sink.SinkRecord + +import java.util +import java.lang +import java.nio.ByteBuffer +import scala.jdk.CollectionConverters.MapHasAsJava + +object KafkaConnectExtractor extends LazyLogging { + + def extractFromKey(sinkRecord: SinkRecord, path: String): Either[Throwable, AnyRef] = + extract(sinkRecord.key(), Option(sinkRecord.keySchema()), path) + + def extractFromValue(sinkRecord: SinkRecord, path: String): Either[Throwable, AnyRef] = + extract(sinkRecord.value(), Option(sinkRecord.valueSchema()), path) + + // TODO: test with all different types + private def extract( + extractFrom: AnyRef, + extractSchema: Option[Schema], + path: String, + ): Either[Throwable, AnyRef] = { + + val pnp = PartitionNamePath(path.split('.').toIndexedSeq: _*) + extractFrom match { + case shortVal: lang.Short => shortVal.asRight + case boolVal: lang.Boolean => boolVal.asRight + case stringVal: lang.String => stringVal.asRight + case longVal: lang.Long => longVal.asRight + case intVal: lang.Integer => intVal.asRight + case byteVal: lang.Byte => byteVal.asRight + case doubleVal: lang.Double => doubleVal.asRight + case floatVal: lang.Float => floatVal.asRight + case bytesVal: Array[Byte] => bytesVal.asRight + case bytesVal: ByteBuffer => bytesVal.array().asRight + case arrayVal: Array[_] => arrayVal.asRight + case decimal: BigDecimal => decimal.asRight + case decimal: java.math.BigDecimal => decimal.asRight + case null => null + case structVal: Struct => StructExtractor.extractPathFromStruct(structVal, pnp) + case mapVal: Map[_, _] => MapExtractor.extractPathFromMap(mapVal.asJava, pnp, extractSchema.orNull) + case mapVal: util.Map[_, _] => MapExtractor.extractPathFromMap(mapVal, pnp, extractSchema.orNull) + case listVal: util.List[_] => ArrayExtractor.extractPathFromArray(listVal, pnp, extractSchema.orNull) + case otherVal => new ConnectException("Unknown value type: " + otherVal.getClass.getName).asLeft + } + } + +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala similarity index 95% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala index 98e1b60c6d..4358af616e 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala +++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/MapExtractor.scala @@ -17,7 +17,6 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors import cats.implicits._ import com.typesafe.scalalogging.LazyLogging -import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath import org.apache.kafka.connect.data.Schema import java.util diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/PartitionNamePath.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PartitionNamePath.scala similarity index 94% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/PartitionNamePath.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PartitionNamePath.scala index 6886ef8fb8..bc8032ccba 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/PartitionNamePath.scala +++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PartitionNamePath.scala @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.lenses.streamreactor.connect.cloud.common.sink.config +package io.lenses.streamreactor.connect.cloud.common.sink.extractors case class PartitionNamePath(path: String*) { override def toString: String = path.mkString(".") diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PrimitiveExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PrimitiveExtractor.scala similarity index 100% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PrimitiveExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/PrimitiveExtractor.scala diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala similarity index 97% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala index 08c177c20d..ff98d8038c 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala +++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/StructExtractor.scala @@ -19,7 +19,6 @@ import cats.implicits._ import com.typesafe.scalalogging.LazyLogging import ExtractorErrorType.UnexpectedType import PrimitiveExtractor.anyToEither -import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath import org.apache.kafka.connect.data.Schema.Type._ import org.apache.kafka.connect.data.Struct diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedArrayExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedArrayExtractor.scala similarity index 94% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedArrayExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedArrayExtractor.scala index 72fb667d39..841c1307dc 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedArrayExtractor.scala +++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedArrayExtractor.scala @@ -17,7 +17,6 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors import cats.implicits._ import ArrayIndexUtil.getArrayIndex -import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath object WrappedArrayExtractor { diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedComplexTypeExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedComplexTypeExtractor.scala similarity index 95% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedComplexTypeExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedComplexTypeExtractor.scala index 97a9e414eb..f38a6c2500 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedComplexTypeExtractor.scala +++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedComplexTypeExtractor.scala @@ -18,7 +18,6 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors import cats.implicits._ import com.typesafe.scalalogging.LazyLogging import WrappedMapExtractor.extractPathFromMap -import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath import org.apache.kafka.connect.data.Struct object WrappedComplexTypeExtractor extends LazyLogging { diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedMapExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedMapExtractor.scala similarity index 95% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedMapExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedMapExtractor.scala index 58440204d7..4a812295f3 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedMapExtractor.scala +++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedMapExtractor.scala @@ -16,7 +16,6 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors import cats.implicits._ -import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath /** * Extracts values from a Map. diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedPrimitiveExtractor.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedPrimitiveExtractor.scala similarity index 100% rename from kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedPrimitiveExtractor.scala rename to kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/WrappedPrimitiveExtractor.scala diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConnector.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConnector.scala new file mode 100644 index 0000000000..aad600922e --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConnector.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink + +import cats.implicits.catsSyntaxOptionId +import com.typesafe.scalalogging.LazyLogging +import io.lenses.streamreactor.common.utils.JarManifest +import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfigDef +import org.apache.kafka.common.config.ConfigDef +import org.apache.kafka.connect.connector.Task +import org.apache.kafka.connect.sink.SinkConnector + +import java.util +import scala.jdk.CollectionConverters.MapHasAsJava +import scala.jdk.CollectionConverters.MapHasAsScala +import scala.jdk.CollectionConverters.SeqHasAsJava + +class HttpSinkConnector extends SinkConnector with LazyLogging { + + private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation) + private var props: Option[Map[String, String]] = Option.empty + + override def version(): String = manifest.version() + + override def taskClass(): Class[_ <: Task] = classOf[HttpSinkTask] + + override def config(): ConfigDef = HttpSinkConfigDef.config + + override def start(props: util.Map[String, String]): Unit = { + logger.info(s"Creating S3 sink connector") + this.props = props.asScala.toMap.some + } + + override def stop(): Unit = () + + override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] = { + logger.info(s"Creating $maxTasks tasks config") + List.fill(maxTasks) { + props.map(_.asJava).getOrElse(Map.empty[String, String].asJava) + }.asJava + } +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala new file mode 100644 index 0000000000..c67ed2d887 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala @@ -0,0 +1,85 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink + +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import cats.implicits._ +import io.lenses.streamreactor.common.utils.JarManifest +import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig.fromJson +import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfigDef.configProp +import io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender +import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig +import io.lenses.streamreactor.connect.http.sink.tpl.templates.ProcessedTemplate +import io.lenses.streamreactor.connect.http.sink.tpl.templates.RawTemplate +import io.lenses.streamreactor.connect.http.sink.tpl.templates.Template +import org.apache.kafka.connect.sink.SinkRecord +import org.apache.kafka.connect.sink.SinkTask +import org.http4s.jdkhttpclient.JdkHttpClient + +import java.util +import scala.jdk.CollectionConverters.IterableHasAsScala +import scala.jdk.CollectionConverters.MapHasAsScala + +class HttpSinkTask extends SinkTask { + private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation) + + private var maybeConfig: Option[HttpSinkConfig] = Option.empty + + private var maybeTemplate: Option[Template] = Option.empty + override def start(props: util.Map[String, String]): Unit = { + { + for { + propVal <- props.asScala.get(configProp).toRight(new IllegalArgumentException("No prop found")) + config <- fromJson(propVal) + template = RawTemplate(config.endpoint, config.content, config.headers) + .parse() + } yield { + this.maybeConfig = config.some + this.maybeTemplate = template.some + } + }.leftMap(throw _) + () + + } + + override def put(records: util.Collection[SinkRecord]): Unit = { + val sinkRecords = records.asScala + (maybeConfig, maybeTemplate) match { + case (Some(config), Some(template)) => + val sender = new HttpRequestSender(config) + val clientResource = JdkHttpClient.simple[IO] + + clientResource.use { + client => + sinkRecords.map { + record => + val processed: ProcessedTemplate = template.process(record) + sender.sendHttpRequest( + client, + processed, + ) + }.toSeq.sequence *> IO.unit + }.unsafeRunSync() + case _ => throw new IllegalArgumentException("Config or tmeplate not set") + } + + } + + override def stop(): Unit = ??? + + override def version(): String = manifest.version() +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/Authentication.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/Authentication.scala new file mode 100644 index 0000000000..f6ead99ec1 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/Authentication.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.client + +import io.circe.generic.extras.Configuration +import io.circe.generic.extras.semiauto._ +import io.circe.Decoder +import io.circe.Encoder + +sealed trait Authentication + +object Authentication { + implicit val customConfig: Configuration = Configuration.default.withDiscriminator("type") + + implicit val decoder: Decoder[Authentication] = deriveConfiguredDecoder[Authentication] + implicit val encoder: Encoder[Authentication] = deriveConfiguredEncoder[Authentication] +} + +case class BasicAuthentication(username: String, password: String) extends Authentication + +object BasicAuthentication { + implicit val customConfig: Configuration = Configuration.default.withDiscriminator("type") + + implicit val decoder: Decoder[BasicAuthentication] = deriveConfiguredDecoder + implicit val encoder: Encoder[BasicAuthentication] = deriveConfiguredEncoder +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpMethod.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpMethod.scala new file mode 100644 index 0000000000..f74bf0ab80 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpMethod.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.client + +import enumeratum.CirceEnum +import enumeratum.Enum +import enumeratum.EnumEntry + +sealed trait HttpMethod extends EnumEntry + +case object HttpMethod extends Enum[HttpMethod] with CirceEnum[HttpMethod] { + + val values = findValues + + case object Put extends HttpMethod + + case object Post extends HttpMethod + + case object Patch extends HttpMethod + +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpRequestSender.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpRequestSender.scala new file mode 100644 index 0000000000..a26479a117 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpRequestSender.scala @@ -0,0 +1,58 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.client + +import cats.effect.IO +import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig +import io.lenses.streamreactor.connect.http.sink.tpl.templates.ProcessedTemplate +import org.http4s._ +import org.http4s.client.Client +import org.http4s.headers.Authorization +import org.typelevel.ci.CIString +class HttpRequestSender(config: HttpSinkConfig) { + + def sendHttpRequest( + client: Client[IO], + processedTemplate: ProcessedTemplate, + ): IO[Unit] = { + + val uri = Uri.unsafeFromString(processedTemplate.endpoint) + + val clientHeaders: Headers = Headers(processedTemplate.headers.map { + case (name, value) => + Header.ToRaw.rawToRaw(new Header.Raw(CIString(name), value)) + }: _*) + + val request = Request[IO]( + method = Method.fromString(config.method.entryName).getOrElse(Method.GET), + uri = uri, + headers = clientHeaders, + ).withEntity(config.content) + + // Add authentication if present + val authenticatedRequest = config.authentication.fold(request) { + case BasicAuthentication(username, password) => + request.putHeaders(Authorization(BasicCredentials(username, password))) + } + + for { + response <- client.expect[String](authenticatedRequest) + _ <- IO(println(s"Response: $response")) + } yield () + + } + +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala new file mode 100644 index 0000000000..f1c12970b3 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.config + +import io.circe.generic.semiauto.deriveDecoder +import io.circe.generic.semiauto.deriveEncoder +import io.circe.parser.decode +import io.circe.syntax.EncoderOps +import io.circe.Decoder +import io.circe.Encoder +import io.circe.Error +import io.lenses.streamreactor.connect.http.sink.client.Authentication +import io.lenses.streamreactor.connect.http.sink.client.HttpMethod +object HttpSinkConfig { + + implicit val decoder: Decoder[HttpSinkConfig] = deriveDecoder + implicit val encoder: Encoder[HttpSinkConfig] = deriveEncoder + + def fromJson(json: String): Either[Error, HttpSinkConfig] = decode[HttpSinkConfig](json) + +} +case class HttpSinkConfig( + authentication: Option[Authentication], // ssl, basic, oauth2, proxy + method: HttpMethod, + endpoint: String, // tokenised + content: String, // tokenised + headers: Seq[(String, String)], // tokenised +) { + def toJson: String = { + val decoded: HttpSinkConfig = this + decoded.asJson(HttpSinkConfig.encoder).noSpaces + } + +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala new file mode 100644 index 0000000000..77f1a8a84a --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.config + +import org.apache.kafka.common.config.ConfigDef +import org.apache.kafka.common.config.ConfigDef.Importance +import org.apache.kafka.common.config.ConfigDef.Type + +object HttpSinkConfigDef { + + val configProp: String = "connect.http.config" + val config: ConfigDef = + new ConfigDef() + .define( + configProp, + Type.STRING, + Importance.HIGH, + "Configuration string", + ) + +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectBaseBinding.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectBaseBinding.scala new file mode 100644 index 0000000000..ec89be0dde --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectBaseBinding.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import com.github.mustachejava.Binding +import org.apache.kafka.connect.sink.SinkRecord + +import java.util + +abstract class KafkaConnectBaseBinding() extends Binding { + + override def get(scopes: util.List[AnyRef]): AnyRef = { + + val sinkRecord = scopes.get(0).asInstanceOf[SinkRecord] + get(sinkRecord) + } + + def get(sinkRecord: SinkRecord): AnyRef + +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectHeaderBinding.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectHeaderBinding.scala new file mode 100644 index 0000000000..6a453cd7ab --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectHeaderBinding.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import org.apache.kafka.connect.sink.SinkRecord + +class KafkaConnectHeaderBinding(name: String) extends KafkaConnectBaseBinding() { + override def get(sinkRecord: SinkRecord): AnyRef = + sinkRecord.headers().lastWithName(name).value() +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectKeyBinding.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectKeyBinding.scala new file mode 100644 index 0000000000..f515d5f9f8 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectKeyBinding.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import cats.implicits.toBifunctorOps +import io.lenses.streamreactor.connect.cloud.common.sink.extractors.KafkaConnectExtractor +import org.apache.kafka.connect.errors.ConnectException +import org.apache.kafka.connect.sink.SinkRecord + +class KafkaConnectKeyBinding(name: String) extends KafkaConnectBaseBinding() { + override def get(sinkRecord: SinkRecord): AnyRef = KafkaConnectExtractor.extractFromKey(sinkRecord, name).leftMap(e => + throw new ConnectException(s"unable to extract field $name for template, ", e), + ).merge +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectObjectHandler.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectObjectHandler.scala new file mode 100644 index 0000000000..bc3d52bf78 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectObjectHandler.scala @@ -0,0 +1,48 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import com.github.mustachejava.reflect.BaseObjectHandler +import com.github.mustachejava.util.Wrapper +import com.github.mustachejava.Binding +import com.github.mustachejava.Code +import com.github.mustachejava.TemplateContext +import io.lenses.streamreactor.connect.http.sink.tpl.templates.SubstitutionType + +import java.util + +class KafkaConnectObjectHandler extends BaseObjectHandler { + + override def createBinding(nameCouldBeNull: String, tc: TemplateContext, code: Code): Binding = + splitOutSubstitution(nameCouldBeNull).map { + case (substitutionType: SubstitutionType, locator: String) => substitutionType.toBinding(locator) + }.orNull + + private def splitOutSubstitution(nameCouldBeNull: String): Option[(SubstitutionType, String)] = + Option(nameCouldBeNull) + .flatMap { + name => + val split = name.split("\\.", 2) + for { + subsType <- SubstitutionType.withNameInsensitiveOption(split.head) + } yield { + subsType -> split(1) + } + } + + override def find(name: String, scopes: util.List[AnyRef]): Wrapper = + null +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectOffsetBinding.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectOffsetBinding.scala new file mode 100644 index 0000000000..5e6951ed4c --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectOffsetBinding.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import org.apache.kafka.connect.sink.SinkRecord + +class KafkaConnectOffsetBinding() extends KafkaConnectBaseBinding() { + override def get(sinkRecord: SinkRecord): AnyRef = + Long.box(sinkRecord.kafkaOffset()) +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectPartitionBinding.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectPartitionBinding.scala new file mode 100644 index 0000000000..485896e338 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectPartitionBinding.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import org.apache.kafka.connect.sink.SinkRecord + +class KafkaConnectPartitionBinding() extends KafkaConnectBaseBinding() { + override def get(sinkRecord: SinkRecord): AnyRef = + sinkRecord.kafkaPartition() +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectTimestampBinding.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectTimestampBinding.scala new file mode 100644 index 0000000000..aee494c059 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectTimestampBinding.scala @@ -0,0 +1,21 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +class KafkaConnectTimestampBinding() extends KafkaConnectBaseBinding() { + override def get(sinkRecord: SinkRecord): AnyRef = + Long.box(sinkRecord.timestamp()) +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectTopicBinding.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectTopicBinding.scala new file mode 100644 index 0000000000..056419645a --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectTopicBinding.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import org.apache.kafka.connect.sink.SinkRecord + +class KafkaConnectTopicBinding() extends KafkaConnectBaseBinding() { + override def get(sinkRecord: SinkRecord): AnyRef = + sinkRecord.topic() +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectValueBinding.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectValueBinding.scala new file mode 100644 index 0000000000..a52250254e --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/binding/KafkaConnectValueBinding.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.binding + +import cats.implicits.toBifunctorOps +import io.lenses.streamreactor.connect.cloud.common.sink.extractors.KafkaConnectExtractor +import org.apache.kafka.connect.errors.ConnectException +import org.apache.kafka.connect.sink.SinkRecord + +class KafkaConnectValueBinding(name: String) extends KafkaConnectBaseBinding() { + override def get(sinkRecord: SinkRecord): AnyRef = + KafkaConnectExtractor.extractFromValue(sinkRecord, name).leftMap(e => + throw new ConnectException(s"unable to extract field $name for template, ", e), + ).merge +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/ProcessedTemplate.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/ProcessedTemplate.scala new file mode 100644 index 0000000000..e7f4066a54 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/ProcessedTemplate.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.templates + +case class ProcessedTemplate( + endpoint: String, + content: String, + headers: Seq[(String, String)], +) diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/RawTemplate.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/RawTemplate.scala new file mode 100644 index 0000000000..17b6173266 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/RawTemplate.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.templates + +import com.github.mustachejava.DefaultMustacheFactory +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectObjectHandler + +import java.io.StringReader + +case class RawTemplate( + endpoint: String, + content: String, + headers: Seq[(String, String)], +) { + def parse(): Template = { + val mf = new DefaultMustacheFactory() + mf.setObjectHandler(new KafkaConnectObjectHandler()) + + val endpointTemplate = mf.compile(new StringReader(endpoint), "endpoint") + val contentTemplate = mf.compile(new StringReader(content), "content") + //val headerTemplates = template.headers.map() + // TODO + Template(endpointTemplate, contentTemplate) + } +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/SubstitutionType.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/SubstitutionType.scala new file mode 100644 index 0000000000..75fabea324 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/SubstitutionType.scala @@ -0,0 +1,66 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.templates + +import enumeratum.CirceEnum +import enumeratum.Enum +import enumeratum.EnumEntry +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectBaseBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectHeaderBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectKeyBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectOffsetBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectPartitionBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectTopicBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectValueBinding + +sealed trait SubstitutionType extends EnumEntry { + def toBinding(locator: String): KafkaConnectBaseBinding +} + +case object SubstitutionType extends Enum[SubstitutionType] with CirceEnum[SubstitutionType] { + + val values = findValues + + case object Key extends SubstitutionType { + override def toBinding(locator: String): KafkaConnectBaseBinding = new KafkaConnectKeyBinding(locator) + } + + case object Value extends SubstitutionType { + override def toBinding(locator: String): KafkaConnectBaseBinding = new KafkaConnectValueBinding(locator) + + } + + case object Header extends SubstitutionType { + override def toBinding(locator: String): KafkaConnectBaseBinding = new KafkaConnectHeaderBinding(locator) + + } + + case object Topic extends SubstitutionType { + override def toBinding(locator: String): KafkaConnectBaseBinding = new KafkaConnectTopicBinding() + + } + + case object Partition extends SubstitutionType { + override def toBinding(locator: String): KafkaConnectBaseBinding = new KafkaConnectPartitionBinding() + + } + + case object Offset extends SubstitutionType { + override def toBinding(locator: String): KafkaConnectBaseBinding = new KafkaConnectOffsetBinding() + + } + +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/Template.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/Template.scala new file mode 100644 index 0000000000..f9091f7eb7 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/Template.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.templates + +import com.github.mustachejava.Mustache +import org.apache.kafka.connect.sink.SinkRecord + +import java.io.StringWriter + +case class Template( + endpointTemplate: Mustache, + contentTemplate: Mustache, +) { + def process( + sinkRecord: SinkRecord, + ): ProcessedTemplate = + ProcessedTemplate( + executeTemplate(endpointTemplate, sinkRecord), + executeTemplate(contentTemplate, sinkRecord), + Seq(), + ) + + private def executeTemplate( + template: Mustache, + sinkRecord: SinkRecord, + ): String = { + val stringWriter = new StringWriter() + template.execute(stringWriter, sinkRecord) + stringWriter.flush() + stringWriter.toString + } + +} diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala new file mode 100644 index 0000000000..c1ca5cb2c4 --- /dev/null +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink + +import io.lenses.streamreactor.connect.http.sink.client.BasicAuthentication +import io.lenses.streamreactor.connect.http.sink.client.HttpMethod.Put +import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig +import org.scalatest.funsuite.AnyFunSuiteLike +import org.scalatest.matchers.should.Matchers + +class HttpSinkConfigTest extends AnyFunSuiteLike with Matchers { + + test("should write config to json") { + HttpSinkConfig( + Some(BasicAuthentication("user", "pass")), + Put, + "http://fabulous.paulabean.com", + "\nTove\nJani\nReminder\nDon't forget me this weekend!\n", + Seq("something" -> "somethingelse"), + ).toJson should be("") + } + +} diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTest.scala new file mode 100644 index 0000000000..1fd861bb53 --- /dev/null +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTest.scala @@ -0,0 +1,65 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl + +import io.lenses.streamreactor.connect.http.sink.tpl.templates.RawTemplate +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.sink.SinkRecord +import org.scalatest.funsuite.AnyFunSuiteLike +import org.scalatest.matchers.should.Matchers + +class TemplateTest extends AnyFunSuiteLike with Matchers { + + test("template behaviour") { + + val valueSchema = SchemaBuilder + .struct() + .name("myStruct") + .field("groupDomain", Schema.STRING_SCHEMA) + .field("orderNo", Schema.INT32_SCHEMA) + .field("employeeId", Schema.STRING_SCHEMA) + .build() + + val value = new Struct(valueSchema) + value.put("groupDomain", "myExampleGroup.uk") + value.put("orderNo", 10) + value.put("employeeId", "Abcd1234") + + val record = new SinkRecord("myTopic", 0, null, null, valueSchema, value, 9) + val processedTemplate = RawTemplate( + endpoint = "http://{{value.groupDomain}}.example.com/{{value.orderNo}}/{{value.employeeId}}", + content = + """ + |{{value.employeeId}} + |{{value.orderNo}} + |{{value.groupDomain}} + |""".stripMargin, + Seq(), + ).parse().process(record) + processedTemplate.endpoint should be("http://myExampleGroup.uk.example.com/10/Abcd1234") + + processedTemplate.content should be( + """ + |Abcd1234 + |10 + |myExampleGroup.uk + |""".stripMargin, + ) + } + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 24f9dfa5cb..5b7297b1fd 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -51,9 +51,10 @@ object Dependencies { val enumeratumVersion = "1.7.2" - val http4sVersion = "1.0.0-M32" - val avroVersion = "1.11.0" - val avro4sVersion = "4.1.0" + val http4sVersion = "1.0.0-M32" + val http4sJdkVersion = "1.0.0-M1" + val avroVersion = "1.11.0" + val avro4sVersion = "4.1.0" val catsVersion = "2.9.0" val catsEffectVersion = "3.4.8" @@ -61,7 +62,7 @@ object Dependencies { val urlValidatorVersion = "1.7" val circeVersion = "0.15.0-M1" - val circeGenericExtrasVersion = "0.14.1" + val circeGenericExtrasVersion = "0.14.3" val circeJsonSchemaVersion = "0.2.0" val shapelessVersion = "2.3.10" @@ -167,10 +168,13 @@ object Dependencies { val urlValidator = "commons-validator" % "commons-validator" % urlValidatorVersion - val circeGeneric = "io.circe" %% "circe-generic" % circeVersion - val circeParser = "io.circe" %% "circe-parser" % circeVersion - val circeRefined = "io.circe" %% "circe-refined" % circeVersion - val circe: Seq[ModuleID] = Seq(circeGeneric, circeParser) + val circeGeneric = "io.circe" %% "circe-generic" % circeVersion + val circeGenericExtras = "io.circe" %% "circe-generic-extras" % circeGenericExtrasVersion + val circeParser = "io.circe" %% "circe-parser" % circeVersion + val circeRefined = "io.circe" %% "circe-refined" % circeVersion + val circe: Seq[ModuleID] = Seq(circeGeneric, circeParser, circeGenericExtras) + + val mustache = "com.github.spullara.mustache.java" % "compiler" % "0.9.10" // logging val logback = "ch.qos.logback" % "logback-classic" % logbackVersion @@ -216,12 +220,10 @@ object Dependencies { val confluentProtobufConverter: ModuleID = confluentExcludes("io.confluent" % "kafka-connect-protobuf-converter" % confluentVersion) - val http4sDsl = "org.http4s" %% "http4s-dsl" % http4sVersion - val http4sAsyncClient = "org.http4s" %% "http4s-async-http-client" % http4sVersion - val http4sBlazeServer = "org.http4s" %% "http4s-blaze-server" % http4sVersion - val http4sBlazeClient = "org.http4s" %% "http4s-blaze-client" % http4sVersion - val http4sCirce = "org.http4s" %% "http4s-circe" % http4sVersion - val http4s: Seq[ModuleID] = Seq(http4sDsl, http4sAsyncClient, http4sBlazeServer, http4sCirce) + val http4sDsl = "org.http4s" %% "http4s-dsl" % http4sVersion + val http4sJdkClient = "org.http4s" %% "http4s-jdk-http-client" % http4sJdkVersion + val http4sCirce = "org.http4s" %% "http4s-circe" % http4sVersion + val http4s: Seq[ModuleID] = Seq(http4sDsl, http4sJdkClient, http4sCirce) val bouncyProv = "org.bouncycastle" % "bcprov-jdk15on" % bouncyCastleVersion val bouncyUtil = "org.bouncycastle" % "bcutil-jdk15on" % bouncyCastleVersion