Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP Sink MVP #1013

Merged
merged 6 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import Dependencies.globalExcludeDeps
import Dependencies.gson

import Settings._
import Settings.*
import sbt.Keys.libraryDependencies
import sbt._
import sbt.*
import sbt.Project.projectToLocalProject

import java.io.File
Expand All @@ -22,6 +21,7 @@ lazy val subProjects: Seq[Project] = Seq(
elastic7,
ftp,
`gcp-storage`,
http,
influxdb,
jms,
mongodb,
Expand Down Expand Up @@ -246,6 +246,28 @@ lazy val elastic7 = (project in file("kafka-connect-elastic7"))
.configureFunctionalTests()
.enablePlugins(PackPlugin)

lazy val http = (project in file("kafka-connect-http"))
.dependsOn(common)
//.dependsOn(`test-common` % "fun->compile")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the functional tests are to be added later? Do we need that line then?

.settings(
settings ++
Seq(
name := "kafka-connect-http",
description := "Kafka Connect compatible connectors to move data between Kafka and http",
libraryDependencies ++= baseDeps ++ kafkaConnectHttpDeps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
"zookeeper-.*\\.jar",
),
),
)
.configureAssembly(false)
.configureTests(baseTestDeps ++ kafkaConnectHttpTestDeps)
.configureIntegrationTests(baseTestDeps ++ kafkaConnectHttpTestDeps)
//.configureFunctionalTests(kafkaConnectS3FuncTestDeps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the functional tests are to be added later? Do we need that line then?

.enablePlugins(PackPlugin, ProtocPlugin)

lazy val influxdb = (project in file("kafka-connect-influxdb"))
.dependsOn(common)
.settings(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.sink.commit

import io.lenses.streamreactor.connect.cloud.common.sink.config.FlushSettings.defaultFlushCount
import io.lenses.streamreactor.connect.cloud.common.sink.config.FlushSettings.defaultFlushInterval
import io.lenses.streamreactor.connect.cloud.common.sink.config.FlushSettings.defaultFlushSize

object CloudCommitPolicy {
val Default: CommitPolicy =
CommitPolicy(FileSize(defaultFlushSize), Interval(defaultFlushInterval), Count(defaultFlushCount))

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings
import io.lenses.streamreactor.connect.cloud.common.config.FormatSelection
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CloudCommitPolicy
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
import io.lenses.streamreactor.connect.cloud.common.sink.commit.Count
import io.lenses.streamreactor.connect.cloud.common.sink.config.kcqlprops.CloudSinkProps
Expand Down Expand Up @@ -114,7 +115,7 @@ case class CloudSinkBucketOptions(
formatSelection: FormatSelection,
keyNamer: KeyNamer,
partitionSelection: PartitionSelection,
commitPolicy: CommitPolicy = CommitPolicy.Default,
commitPolicy: CommitPolicy = CloudCommitPolicy.Default,
localStagingArea: LocalStagingArea,
dataStorage: DataStorageSettings,
) extends WithTransformableDataStorage
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors

import cats.implicits._
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ByteArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MapSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.PrimitiveSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer._
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import io.lenses.streamreactor.connect.cloud.common.sink.FatalCloudSinkError
import io.lenses.streamreactor.connect.cloud.common.sink.SinkError
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionDisplay.KeysAndValues
import io.lenses.streamreactor.connect.cloud.common.sink.config.DatePartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import io.lenses.streamreactor.connect.cloud.common.sink.config.HeaderPartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.KeyPartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionPartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionSelection
import io.lenses.streamreactor.connect.cloud.common.sink.config.TopicPartitionField
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,19 @@ package io.lenses.streamreactor.connect.cloud.common.sink.writer
import cats.implicits._
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.formats.writer.FormatWriter
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
import io.lenses.streamreactor.connect.cloud.common.model.UploadableFile
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.sink.FatalCloudSinkError
import io.lenses.streamreactor.connect.cloud.common.sink.NonFatalCloudSinkError
import io.lenses.streamreactor.connect.cloud.common.sink.SinkError
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitContext
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CloudCommitContext
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManager
import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata
import io.lenses.streamreactor.connect.cloud.common.storage.NonExistingFileError
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
import io.lenses.streamreactor.connect.cloud.common.storage.UploadFailedError
import io.lenses.streamreactor.connect.cloud.common.storage.ZeroByteFileError
import io.lenses.streamreactor.connect.cloud.common.storage._
import org.apache.kafka.connect.data.Schema

import java.io.File
Expand Down Expand Up @@ -155,7 +151,7 @@ class Writer[SM <: FileMetadata](
def shouldFlush: Boolean =
writeState match {
case Writing(commitState, _, file, uncommittedOffset) => commitPolicy.shouldFlush(
CommitContext(
CloudCommitContext(
topicPartition.withOffset(uncommittedOffset),
commitState.recordCount,
commitState.lastKnownFileSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ import com.typesafe.scalalogging.Logger
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitContext
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
import io.lenses.streamreactor.connect.cloud.common.sink.commit.Count
import io.lenses.streamreactor.connect.cloud.common.sink.commit.FileSize
import io.lenses.streamreactor.connect.cloud.common.sink.commit.Interval
import org.mockito.MockitoSugar
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
Expand All @@ -46,12 +41,12 @@ class CommitPolicyTest extends AnyWordSpec with Matchers with MockitoSugar {
val lastFlushTimeAdjusted: Option[Long] = lastFlushTimestampAdjust.fold(Option.empty[Long])(e => Some(nowTime + e))
val tpo = TopicPartitionOffset(Topic("myTopic"), 1, Offset(100))

policy.shouldFlush(CommitContext(tpo,
count,
fileSize,
creationTimeAdjusted,
lastFlushTimeAdjusted,
"my/filename.txt",
policy.shouldFlush(CloudCommitContext(tpo,
count,
fileSize,
creationTimeAdjusted,
lastFlushTimeAdjusted,
"my/filename.txt",
))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
*/
package io.lenses.streamreactor.common.config

import io.circe.Decoder
import io.circe.Encoder
import io.circe.generic.semiauto.deriveDecoder
import io.circe.generic.semiauto.deriveEncoder

import java.io.FileInputStream
import java.security.KeyStore
import java.security.SecureRandom
Expand All @@ -38,7 +43,7 @@ object SSLConfigContext {
* @param config An SSLConfig containing key and truststore credentials
* @return a SSLContext
*/
def getSSLContext(config: SSLConfig): SSLContext = {
private def getSSLContext(config: SSLConfig): SSLContext = {
val useClientCertAuth = config.useClientCert

//is client certification authentication set
Expand Down Expand Up @@ -100,3 +105,10 @@ case class SSLConfig(
keyStoreType: String = "JKS",
trustStoreType: String = "JKS",
)

object SSLConfig {

implicit val decoder: Decoder[SSLConfig] = deriveDecoder
implicit val encoder: Encoder[SSLConfig] = deriveEncoder

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,37 @@
*/
package io.lenses.streamreactor.connect.cloud.common.sink.commit

import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset

trait CommitContext {
def count: Long
def fileSize: Long
def createdTimestamp: Long
def lastFlushedTimestamp: Option[Long]

def lastModified: Long = lastFlushedTimestamp.getOrElse(createdTimestamp)

def generateLogLine(flushing: Boolean, result: Seq[ConditionCommitResult]): String
}

/**
* @param tpo the [[TopicPartitionOffset]] of the last record written
* @param count the number of records written thus far to the file
* @param createdTimestamp the time in milliseconds when the the file was created/accessed first time
*/
case class CommitContext(
case class CloudCommitContext(
tpo: TopicPartitionOffset,
count: Long,
fileSize: Long,
createdTimestamp: Long,
lastFlushedTimestamp: Option[Long],
partitionFile: String,
) {
def lastModified: Long = lastFlushedTimestamp.getOrElse(createdTimestamp)
) extends CommitContext
with LazyLogging {
override def generateLogLine(flushing: Boolean, result: Seq[ConditionCommitResult]): String = {
val flushingOrNot = if (flushing) "" else "Not "
s"${flushingOrNot}Flushing '$partitionFile' for {topic:'${tpo.topic.value}', partition:${tpo.partition}, offset:${tpo.offset.value}, ${result.flatMap(_.logLine).mkString(", ")}}"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ package io.lenses.streamreactor.connect.cloud.common.sink.commit

import com.typesafe.scalalogging.LazyLogging
import com.typesafe.scalalogging.Logger
import io.lenses.streamreactor.connect.cloud.common.sink.config.FlushSettings.defaultFlushCount
import io.lenses.streamreactor.connect.cloud.common.sink.config.FlushSettings.defaultFlushInterval
import io.lenses.streamreactor.connect.cloud.common.sink.config.FlushSettings.defaultFlushSize

import scala.util.Try

Expand Down Expand Up @@ -48,25 +45,15 @@ case class CommitPolicy(logger: Logger, conditions: CommitPolicyCondition*) {
case ConditionCommitResult(true, _) => true
case _ => false
}
val flushingOrNot = if (flush) "" else "Not "

if (debugEnabled)
logger.debug(
"{}Flushing '{}' for {topic:'{}', partition:{}, offset:{}, {}}",
flushingOrNot,
context.partitionFile,
context.tpo.topic.value,
context.tpo.partition,
context.tpo.offset.value,
res.flatMap(_.logLine).mkString(", "),
)
if (debugEnabled) {
logger.debug(context.generateLogLine(flush, res))
}
flush
}
}

object CommitPolicy extends LazyLogging {
val Default: CommitPolicy =
CommitPolicy(FileSize(defaultFlushSize), Interval(defaultFlushInterval), Count(defaultFlushCount))
def apply(conditions: CommitPolicyCondition*): CommitPolicy =
CommitPolicy(logger, conditions: _*)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.sink.extractors

import cats.implicits.catsSyntaxEitherId
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionNamePath
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord

import java.util
import java.lang
import java.nio.ByteBuffer
import scala.jdk.CollectionConverters.MapHasAsJava

object KafkaConnectExtractor extends LazyLogging {

def extractFromKey(sinkRecord: SinkRecord, path: Option[String]): Either[Throwable, AnyRef] =
extract(sinkRecord.key(), Option(sinkRecord.keySchema()), path)

def extractFromValue(sinkRecord: SinkRecord, path: Option[String]): Either[Throwable, AnyRef] =
extract(sinkRecord.value(), Option(sinkRecord.valueSchema()), path)

// TODO: test with all different types
private def extract(
extractFrom: AnyRef,
extractSchema: Option[Schema],
maybePath: Option[String],
): Either[Throwable, AnyRef] = {

val maybePnp: Option[PartitionNamePath] = maybePath.map(path => PartitionNamePath(path.split('.').toIndexedSeq: _*))
(extractFrom, maybePnp) match {
case (shortVal: lang.Short, _) => shortVal.asRight
case (boolVal: lang.Boolean, _) => boolVal.asRight
case (stringVal: lang.String, _) => stringVal.asRight
case (longVal: lang.Long, _) => longVal.asRight
case (intVal: lang.Integer, _) => intVal.asRight
case (byteVal: lang.Byte, _) => byteVal.asRight
case (doubleVal: lang.Double, _) => doubleVal.asRight
case (floatVal: lang.Float, _) => floatVal.asRight
case (bytesVal: Array[Byte], _) => bytesVal.asRight
case (bytesVal: ByteBuffer, _) => bytesVal.array().asRight
case (arrayVal: Array[_], _) => arrayVal.asRight
case (decimal: BigDecimal, _) => decimal.asRight
case (decimal: java.math.BigDecimal, _) => decimal.asRight
case null => null
case (structVal: Struct, Some(pnp)) => StructExtractor.extractPathFromStruct(structVal, pnp)
case (mapVal: Map[_, _], Some(pnp)) => MapExtractor.extractPathFromMap(mapVal.asJava, pnp, extractSchema.orNull)
case (mapVal: util.Map[_, _], Some(pnp)) => MapExtractor.extractPathFromMap(mapVal, pnp, extractSchema.orNull)
case (listVal: util.List[_], Some(pnp)) => ArrayExtractor.extractPathFromArray(listVal, pnp, extractSchema.orNull)
case otherVal => new ConnectException("Unknown value type: " + otherVal.getClass.getName).asLeft
}
}

}
Loading
Loading