Skip to content

Commit

Permalink
HTTP Sink WTD
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan committed Dec 14, 2023
1 parent 65d3aff commit 0647dc6
Show file tree
Hide file tree
Showing 44 changed files with 991 additions and 36 deletions.
29 changes: 26 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import Dependencies.globalExcludeDeps
import Dependencies.gson

import Settings._
import Dependencies.mustache
import Settings.*
import sbt.Keys.libraryDependencies
import sbt._
import sbt.*
import sbt.Project.projectToLocalProject

import java.io.File
Expand All @@ -22,6 +22,7 @@ lazy val subProjects: Seq[Project] = Seq(
elastic7,
ftp,
`gcp-storage`,
http,
influxdb,
jms,
mongodb,
Expand Down Expand Up @@ -246,6 +247,28 @@ lazy val elastic7 = (project in file("kafka-connect-elastic7"))
.configureFunctionalTests()
.enablePlugins(PackPlugin)

lazy val http = (project in file("kafka-connect-http"))
.dependsOn(common)
//.dependsOn(`test-common` % "fun->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-http",
description := "Kafka Connect compatible connectors to move data between Kafka and http",
libraryDependencies ++= baseDeps ++ Seq(mustache),
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
"zookeeper-.*\\.jar",
),
),
)
.configureAssembly(false)
.configureTests(baseTestDeps)
//.configureIntegrationTests()
//.configureFunctionalTests(kafkaConnectS3FuncTestDeps)
.enablePlugins(PackPlugin, ProtocPlugin)

lazy val influxdb = (project in file("kafka-connect-influxdb"))
.dependsOn(common)
.settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package io.lenses.streamreactor.connect.cloud.common.formats.writer
import com.opencsv.CSVWriter
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.sink.SinkError
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import io.lenses.streamreactor.connect.cloud.common.sink.extractors.ExtractorErrorAdaptor.adaptErrorResponse
import io.lenses.streamreactor.connect.cloud.common.sink.extractors.PartitionNamePath
import io.lenses.streamreactor.connect.cloud.common.sink.extractors.SinkDataExtractor
import io.lenses.streamreactor.connect.cloud.common.stream.CloudOutputStream
import org.apache.kafka.connect.data.Schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.lenses.streamreactor.connect.cloud.common.sink.config

import io.lenses.kcql.Kcql
import io.lenses.streamreactor.connect.cloud.common.sink.extractors.PartitionNamePath

import java.time.format.DateTimeFormatter
import java.util.TimeZone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,7 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors

import cats.implicits._
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ByteArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MapSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.PrimitiveSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import io.lenses.streamreactor.connect.cloud.common.formats.writer._

/**
* Extracts values from a SinkData wrapper type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.DatePartitionFie
import io.lenses.streamreactor.connect.cloud.common.sink.config.HeaderPartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.KeyPartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionPartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionSelection
import io.lenses.streamreactor.connect.cloud.common.sink.config.TopicPartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.ValuePartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.WholeKeyPartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.extractors.ExtractorErrorAdaptor.adaptErrorResponse
import io.lenses.streamreactor.connect.cloud.common.sink.extractors.PartitionNamePath
import io.lenses.streamreactor.connect.cloud.common.sink.extractors.SinkDataExtractor

import java.io.File
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package io.lenses.streamreactor.connect.cloud.common.model

import io.lenses.kcql.Kcql
import io.lenses.streamreactor.connect.cloud.common.sink.config._
import io.lenses.streamreactor.connect.cloud.common.sink.extractors.PartitionNamePath
import org.mockito.MockitoSugar
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.lenses.streamreactor.connect.cloud.common.sink.extractors

import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import org.apache.kafka.connect.data.SchemaBuilder
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MapSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import io.lenses.streamreactor.connect.cloud.common.sink.extractors.ExtractorErrorType.MissingValue
import io.lenses.streamreactor.connect.cloud.common.sink.extractors.ExtractorErrorType.UnexpectedType
import io.lenses.streamreactor.connect.cloud.common.sink.extractors.ExtractorError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors
import cats.implicits._
import com.typesafe.scalalogging.LazyLogging
import ArrayIndexUtil.getArrayIndex
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import org.apache.kafka.connect.data.Schema

import java.util
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors

import cats.implicits._
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.Struct

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.sink.extractors

import cats.implicits.catsSyntaxEitherId
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord

import java.util
import java.lang
import java.nio.ByteBuffer
import scala.jdk.CollectionConverters.MapHasAsJava

object KafkaConnectExtractor extends LazyLogging {

def extractFromKey(sinkRecord: SinkRecord, path: String): Either[Throwable, AnyRef] =
extract(sinkRecord.key(), Option(sinkRecord.keySchema()), path)

def extractFromValue(sinkRecord: SinkRecord, path: String): Either[Throwable, AnyRef] =
extract(sinkRecord.value(), Option(sinkRecord.valueSchema()), path)

// TODO: test with all different types
private def extract(
extractFrom: AnyRef,
extractSchema: Option[Schema],
path: String,
): Either[Throwable, AnyRef] = {

val pnp = PartitionNamePath(path.split('.').toIndexedSeq: _*)
extractFrom match {
case shortVal: lang.Short => shortVal.asRight
case boolVal: lang.Boolean => boolVal.asRight
case stringVal: lang.String => stringVal.asRight
case longVal: lang.Long => longVal.asRight
case intVal: lang.Integer => intVal.asRight
case byteVal: lang.Byte => byteVal.asRight
case doubleVal: lang.Double => doubleVal.asRight
case floatVal: lang.Float => floatVal.asRight
case bytesVal: Array[Byte] => bytesVal.asRight
case bytesVal: ByteBuffer => bytesVal.array().asRight
case arrayVal: Array[_] => arrayVal.asRight
case decimal: BigDecimal => decimal.asRight
case decimal: java.math.BigDecimal => decimal.asRight
case null => null
case structVal: Struct => StructExtractor.extractPathFromStruct(structVal, pnp)
case mapVal: Map[_, _] => MapExtractor.extractPathFromMap(mapVal.asJava, pnp, extractSchema.orNull)
case mapVal: util.Map[_, _] => MapExtractor.extractPathFromMap(mapVal, pnp, extractSchema.orNull)
case listVal: util.List[_] => ArrayExtractor.extractPathFromArray(listVal, pnp, extractSchema.orNull)
case otherVal => new ConnectException("Unknown value type: " + otherVal.getClass.getName).asLeft
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors

import cats.implicits._
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import org.apache.kafka.connect.data.Schema

import java.util
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.sink.config
package io.lenses.streamreactor.connect.cloud.common.sink.extractors

case class PartitionNamePath(path: String*) {
override def toString: String = path.mkString(".")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import cats.implicits._
import com.typesafe.scalalogging.LazyLogging
import ExtractorErrorType.UnexpectedType
import PrimitiveExtractor.anyToEither
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import org.apache.kafka.connect.data.Schema.Type._
import org.apache.kafka.connect.data.Struct

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors

import cats.implicits._
import ArrayIndexUtil.getArrayIndex
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath

object WrappedArrayExtractor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors
import cats.implicits._
import com.typesafe.scalalogging.LazyLogging
import WrappedMapExtractor.extractPathFromMap
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import org.apache.kafka.connect.data.Struct

object WrappedComplexTypeExtractor extends LazyLogging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.lenses.streamreactor.connect.cloud.common.sink.extractors

import cats.implicits._
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath

/**
* Extracts values from a Map.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink

import cats.implicits.catsSyntaxOptionId
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.common.utils.JarManifest
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfigDef
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.Task
import org.apache.kafka.connect.sink.SinkConnector

import java.util
import scala.jdk.CollectionConverters.MapHasAsJava
import scala.jdk.CollectionConverters.MapHasAsScala
import scala.jdk.CollectionConverters.SeqHasAsJava

class HttpSinkConnector extends SinkConnector with LazyLogging {

private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)
private var props: Option[Map[String, String]] = Option.empty

override def version(): String = manifest.version()

override def taskClass(): Class[_ <: Task] = classOf[HttpSinkTask]

override def config(): ConfigDef = HttpSinkConfigDef.config

override def start(props: util.Map[String, String]): Unit = {
logger.info(s"Creating S3 sink connector")
this.props = props.asScala.toMap.some
}

override def stop(): Unit = ()

override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] = {
logger.info(s"Creating $maxTasks tasks config")
List.fill(maxTasks) {
props.map(_.asJava).getOrElse(Map.empty[String, String].asJava)
}.asJava
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import cats.implicits._
import io.lenses.streamreactor.common.utils.JarManifest
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig.fromJson
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfigDef.configProp
import io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig
import io.lenses.streamreactor.connect.http.sink.tpl.templates.ProcessedTemplate
import io.lenses.streamreactor.connect.http.sink.tpl.templates.RawTemplate
import io.lenses.streamreactor.connect.http.sink.tpl.templates.Template
import org.apache.kafka.connect.sink.SinkRecord
import org.apache.kafka.connect.sink.SinkTask
import org.http4s.jdkhttpclient.JdkHttpClient

import java.util
import scala.jdk.CollectionConverters.IterableHasAsScala
import scala.jdk.CollectionConverters.MapHasAsScala

class HttpSinkTask extends SinkTask {
private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)

private var maybeConfig: Option[HttpSinkConfig] = Option.empty

private var maybeTemplate: Option[Template] = Option.empty
override def start(props: util.Map[String, String]): Unit = {
{
for {
propVal <- props.asScala.get(configProp).toRight(new IllegalArgumentException("No prop found"))
config <- fromJson(propVal)
template = RawTemplate(config.endpoint, config.content, config.headers)
.parse()
} yield {
this.maybeConfig = config.some
this.maybeTemplate = template.some
}
}.leftMap(throw _)
()

}

override def put(records: util.Collection[SinkRecord]): Unit = {
val sinkRecords = records.asScala
(maybeConfig, maybeTemplate) match {
case (Some(config), Some(template)) =>
val sender = new HttpRequestSender(config)
val clientResource = JdkHttpClient.simple[IO]

clientResource.use {
client =>
sinkRecords.map {
record =>
val processed: ProcessedTemplate = template.process(record)
sender.sendHttpRequest(
client,
processed,
)
}.toSeq.sequence *> IO.unit
}.unsafeRunSync()
case _ => throw new IllegalArgumentException("Config or tmeplate not set")
}

}

override def stop(): Unit = ???

override def version(): String = manifest.version()
}
Loading

0 comments on commit 0647dc6

Please sign in to comment.