Skip to content

Commit

Permalink
HTTP Sink WTD
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan committed Jan 4, 2024
1 parent fe05718 commit 518e78a
Show file tree
Hide file tree
Showing 39 changed files with 998 additions and 24 deletions.
29 changes: 26 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import Dependencies.globalExcludeDeps
import Dependencies.gson

import Settings._
import Dependencies.mustache
import Settings.*
import sbt.Keys.libraryDependencies
import sbt._
import sbt.*
import sbt.Project.projectToLocalProject

import java.io.File
Expand All @@ -22,6 +22,7 @@ lazy val subProjects: Seq[Project] = Seq(
elastic7,
ftp,
`gcp-storage`,
http,
influxdb,
jms,
mongodb,
Expand Down Expand Up @@ -246,6 +247,28 @@ lazy val elastic7 = (project in file("kafka-connect-elastic7"))
.configureFunctionalTests()
.enablePlugins(PackPlugin)

lazy val http = (project in file("kafka-connect-http"))
.dependsOn(common)
//.dependsOn(`test-common` % "fun->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-http",
description := "Kafka Connect compatible connectors to move data between Kafka and http",
libraryDependencies ++= baseDeps ++ Seq(mustache),
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
"zookeeper-.*\\.jar",
),
),
)
.configureAssembly(false)
.configureTests(baseTestDeps)
//.configureIntegrationTests()
//.configureFunctionalTests(kafkaConnectS3FuncTestDeps)
.enablePlugins(PackPlugin, ProtocPlugin)

lazy val influxdb = (project in file("kafka-connect-influxdb"))
.dependsOn(common)
.settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors

import cats.implicits._
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ByteArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MapSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.PrimitiveSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer._
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import io.lenses.streamreactor.connect.cloud.common.sink.FatalCloudSinkError
import io.lenses.streamreactor.connect.cloud.common.sink.SinkError
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionDisplay.KeysAndValues
import io.lenses.streamreactor.connect.cloud.common.sink.config.DatePartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import io.lenses.streamreactor.connect.cloud.common.sink.config.HeaderPartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.KeyPartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionPartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionSelection
import io.lenses.streamreactor.connect.cloud.common.sink.config.TopicPartitionField
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.sink.extractors

import cats.implicits.catsSyntaxEitherId
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord

import java.util
import java.lang
import java.nio.ByteBuffer
import scala.jdk.CollectionConverters.MapHasAsJava

object KafkaConnectExtractor extends LazyLogging {

def extractFromKey(sinkRecord: SinkRecord, path: Option[String]): Either[Throwable, AnyRef] =
extract(sinkRecord.key(), Option(sinkRecord.keySchema()), path)

def extractFromValue(sinkRecord: SinkRecord, path: Option[String]): Either[Throwable, AnyRef] =
extract(sinkRecord.value(), Option(sinkRecord.valueSchema()), path)

// TODO: test with all different types
private def extract(
extractFrom: AnyRef,
extractSchema: Option[Schema],
maybePath: Option[String],
): Either[Throwable, AnyRef] = {

val maybePnp: Option[PartitionNamePath] = maybePath.map(path => PartitionNamePath(path.split('.').toIndexedSeq: _*))
(extractFrom, maybePnp) match {
case (shortVal: lang.Short, _) => shortVal.asRight
case (boolVal: lang.Boolean, _) => boolVal.asRight
case (stringVal: lang.String, _) => stringVal.asRight
case (longVal: lang.Long, _) => longVal.asRight
case (intVal: lang.Integer, _) => intVal.asRight
case (byteVal: lang.Byte, _) => byteVal.asRight
case (doubleVal: lang.Double, _) => doubleVal.asRight
case (floatVal: lang.Float, _) => floatVal.asRight
case (bytesVal: Array[Byte], _) => bytesVal.asRight
case (bytesVal: ByteBuffer, _) => bytesVal.array().asRight
case (arrayVal: Array[_], _) => arrayVal.asRight
case (decimal: BigDecimal, _) => decimal.asRight
case (decimal: java.math.BigDecimal, _) => decimal.asRight
case null => null
case (structVal: Struct, Some(pnp)) => StructExtractor.extractPathFromStruct(structVal, pnp)
case (mapVal: Map[_, _], Some(pnp)) => MapExtractor.extractPathFromMap(mapVal.asJava, pnp, extractSchema.orNull)
case (mapVal: util.Map[_, _], Some(pnp)) => MapExtractor.extractPathFromMap(mapVal, pnp, extractSchema.orNull)
case (listVal: util.List[_], Some(pnp)) => ArrayExtractor.extractPathFromArray(listVal, pnp, extractSchema.orNull)
case otherVal => new ConnectException("Unknown value type: " + otherVal.getClass.getName).asLeft
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink

import cats.implicits.catsSyntaxOptionId
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.common.utils.JarManifest
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfigDef
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.Task
import org.apache.kafka.connect.sink.SinkConnector

import java.util
import scala.jdk.CollectionConverters.MapHasAsJava
import scala.jdk.CollectionConverters.MapHasAsScala
import scala.jdk.CollectionConverters.SeqHasAsJava

class HttpSinkConnector extends SinkConnector with LazyLogging {

private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)
private var props: Option[Map[String, String]] = Option.empty

override def version(): String = manifest.version()

override def taskClass(): Class[_ <: Task] = classOf[HttpSinkTask]

override def config(): ConfigDef = HttpSinkConfigDef.config

override def start(props: util.Map[String, String]): Unit = {
logger.info(s"Creating S3 sink connector")
this.props = props.asScala.toMap.some
}

override def stop(): Unit = ()

override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] = {
logger.info(s"Creating $maxTasks tasks config")
List.fill(maxTasks) {
props.map(_.asJava).getOrElse(Map.empty[String, String].asJava)
}.asJava
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import cats.implicits._
import io.lenses.streamreactor.common.utils.JarManifest
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig.fromJson
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfigDef.configProp
import io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig
import io.lenses.streamreactor.connect.http.sink.tpl.templates.ProcessedTemplate
import io.lenses.streamreactor.connect.http.sink.tpl.templates.RawTemplate
import io.lenses.streamreactor.connect.http.sink.tpl.templates.Template
import org.apache.kafka.connect.sink.SinkRecord
import org.apache.kafka.connect.sink.SinkTask
import org.http4s.jdkhttpclient.JdkHttpClient

import java.util
import scala.jdk.CollectionConverters.IterableHasAsScala
import scala.jdk.CollectionConverters.MapHasAsScala

class HttpSinkTask extends SinkTask {
private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)

private var maybeConfig: Option[HttpSinkConfig] = Option.empty

private var maybeTemplate: Option[Template] = Option.empty
override def start(props: util.Map[String, String]): Unit = {
{
for {
propVal <- props.asScala.get(configProp).toRight(new IllegalArgumentException("No prop found"))
config <- fromJson(propVal)
template = RawTemplate(config.endpoint, config.content, config.headers)
.parse()
} yield {
this.maybeConfig = config.some
this.maybeTemplate = template.some
}
}.leftMap(throw _)
()

}

override def put(records: util.Collection[SinkRecord]): Unit = {
val sinkRecords = records.asScala
(maybeConfig, maybeTemplate) match {
case (Some(config), Some(template)) =>
val sender = new HttpRequestSender(config)
val clientResource = JdkHttpClient.simple[IO]

clientResource.use {
client =>
sinkRecords.map {
record =>
val processed: ProcessedTemplate = template.process(record)
sender.sendHttpRequest(
client,
processed,
)
}.toSeq.sequence *> IO.unit
}.unsafeRunSync()
case _ => throw new IllegalArgumentException("Config or tmeplate not set")
}

}

override def stop(): Unit = ???

override def version(): String = manifest.version()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.client

import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto._
import io.circe.Decoder
import io.circe.Encoder

sealed trait Authentication

object Authentication {
implicit val customConfig: Configuration = Configuration.default.withDiscriminator("type")

implicit val decoder: Decoder[Authentication] = deriveConfiguredDecoder[Authentication]
implicit val encoder: Encoder[Authentication] = deriveConfiguredEncoder[Authentication]
}

case class BasicAuthentication(username: String, password: String) extends Authentication

object BasicAuthentication {
implicit val customConfig: Configuration = Configuration.default.withDiscriminator("type")

implicit val decoder: Decoder[BasicAuthentication] = deriveConfiguredDecoder
implicit val encoder: Encoder[BasicAuthentication] = deriveConfiguredEncoder
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.client

import enumeratum.CirceEnum
import enumeratum.Enum
import enumeratum.EnumEntry

sealed trait HttpMethod extends EnumEntry

case object HttpMethod extends Enum[HttpMethod] with CirceEnum[HttpMethod] {

val values = findValues

case object Put extends HttpMethod

case object Post extends HttpMethod

case object Patch extends HttpMethod

}
Loading

0 comments on commit 518e78a

Please sign in to comment.