Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch 8, Opensearch (Incomplete) #992

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 39 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import Dependencies.Versions
import Dependencies.globalExcludeDeps
import Dependencies.gson
import Dependencies.bouncyCastle

import Settings.*
import sbt.Keys.libraryDependencies
import sbt.*
Expand All @@ -19,8 +21,9 @@ lazy val subProjects: Seq[Project] = Seq(
`azure-documentdb`,
`azure-datalake`,
cassandra,
elastic6,
elastic7,
`elastic-common`,
opensearch,
elastic8,
ftp,
`gcp-storage`,
http,
Expand Down Expand Up @@ -220,18 +223,17 @@ lazy val cassandra = (project in file("kafka-connect-cassandra"))
.configureFunctionalTests()
.enablePlugins(PackPlugin)

lazy val elastic6 = (project in file("kafka-connect-elastic6"))
lazy val `elastic-common` = (project in file("kafka-connect-elastic-common"))
.dependsOn(common)
.dependsOn(`sql-common`)
.dependsOn(`test-common` % "fun->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-elastic6",
name := "kafka-connect-elastic-common",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectElastic6Deps,
libraryDependencies ++= baseDeps ++ kafkaConnectElasticBaseDeps,
publish / skip := true,
FunctionalTest / baseDirectory := (LocalRootProject / baseDirectory).value,
packExcludeJars := Seq(
"scala-.*\\.jar",
"zookeeper-.*\\.jar",
Expand All @@ -240,20 +242,20 @@ lazy val elastic6 = (project in file("kafka-connect-elastic6"))
)
.configureAssembly(true)
.configureTests(baseTestDeps)
.configureIntegrationTests(kafkaConnectElastic6TestDeps)
.configureIntegrationTests(kafkaConnectElastic8TestDeps)
.configureFunctionalTests()
.enablePlugins(PackPlugin)
.disablePlugins(PackPlugin)

lazy val elastic7 = (project in file("kafka-connect-elastic7"))
lazy val elastic8 = (project in file("kafka-connect-elastic8"))
.dependsOn(common)
.dependsOn(`sql-common`)
.dependsOn(`test-common` % "fun->compile")
.dependsOn(`elastic-common`)
.dependsOn(`test-common` % "fun->compile;it->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-elastic7",
name := "kafka-connect-elastic8",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectElastic7Deps,
libraryDependencies ++= baseDeps ++ kafkaConnectElastic8Deps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
Expand All @@ -263,10 +265,33 @@ lazy val elastic7 = (project in file("kafka-connect-elastic7"))
)
.configureAssembly(true)
.configureTests(baseTestDeps)
.configureIntegrationTests(kafkaConnectElastic7TestDeps)
.configureIntegrationTests(kafkaConnectElastic8TestDeps)
.configureFunctionalTests()
.enablePlugins(PackPlugin)

lazy val opensearch = (project in file("kafka-connect-opensearch"))
.dependsOn(common)
.dependsOn(`elastic-common`)
.dependsOn(`test-common` % "fun->compile;it->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-opensearch",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectOpenSearchDeps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
"zookeeper-.*\\.jar",
),
),
)
.configureAssembly(false)
.configureTests(baseTestDeps)
//.configureIntegrationTests(kafkaConnectOpenSearchTestDeps)
.configureFunctionalTests(bouncyCastle)
.enablePlugins(PackPlugin)

lazy val http = (project in file("kafka-connect-http"))
.dependsOn(common)
.dependsOn(`test-common` % "fun->compile")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.lenses.streamreactor.connect.azure.documentdb.sink

import cats.implicits.toBifunctorOps
import io.lenses.streamreactor.common.config.Helpers
import io.lenses.streamreactor.common.utils.JarManifest
import io.lenses.streamreactor.connect.azure.documentdb.DocumentClientProvider
Expand Down Expand Up @@ -100,7 +101,7 @@ class DocumentDbSinkConnector private[sink] (builder: DocumentDbSinkSettings =>
configProps = props

//check input topics
Helpers.checkInputTopics(DocumentDbConfigConstants.KCQL_CONFIG, props.asScala.toMap)
Helpers.checkInputTopics(DocumentDbConfigConstants.KCQL_CONFIG, props.asScala.toMap).leftMap(throw _)

val settings = DocumentDbSinkSettings(config)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package io.lenses.streamreactor.connect.cassandra

import io.lenses.streamreactor.common.config.SSLConfig
import io.lenses.streamreactor.common.config.SSLConfigContext
import io.lenses.streamreactor.connect.cassandra.config.CassandraConfigConstants
import io.lenses.streamreactor.connect.cassandra.config.SSLConfig
import io.lenses.streamreactor.connect.cassandra.config.SSLConfigContext
import io.lenses.streamreactor.connect.cassandra.config.LoadBalancingPolicy
import com.datastax.driver.core.Cluster.Builder
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cassandra.config

import java.io.FileInputStream
import java.security.KeyStore
import java.security.SecureRandom
import javax.net.ssl._

/**
* Created by [email protected] on 14/04/16.
* stream-reactor
*/
object SSLConfigContext {
def apply(config: SSLConfig): SSLContext =
getSSLContext(config)

/**
* Get a SSL Connect for a given set of credentials
*
* @param config An SSLConfig containing key and truststore credentials
* @return a SSLContext
*/
def getSSLContext(config: SSLConfig): SSLContext = {
val useClientCertAuth = config.useClientCert

//is client certification authentication set
val keyManagers: Array[KeyManager] = if (useClientCertAuth) {
getKeyManagers(config)
} else {
Array[KeyManager]()
}

val ctx: SSLContext = SSLContext.getInstance("SSL")
val trustManagers = getTrustManagers(config)
ctx.init(keyManagers, trustManagers, new SecureRandom())
ctx
}

/**
* Get an array of Trust Managers
*
* @param config An SSLConfig containing key and truststore credentials
* @return An Array of TrustManagers
*/
def getTrustManagers(config: SSLConfig): Array[TrustManager] = {
val tsf = new FileInputStream(config.trustStorePath)
val ts = KeyStore.getInstance(config.trustStoreType)
ts.load(tsf, config.trustStorePass.toCharArray)
val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
tmf.init(ts)
tmf.getTrustManagers
}

/**
* Get an array of Key Managers
*
* @param config An SSLConfig containing key and truststore credentials
* @return An Array of KeyManagers
*/
def getKeyManagers(config: SSLConfig): Array[KeyManager] = {
require(config.keyStorePath.nonEmpty, "Key store path is not set!")
require(config.keyStorePass.nonEmpty, "Key store password is not set!")
val ksf = new FileInputStream(config.keyStorePath.get)
val ks = KeyStore.getInstance(config.keyStoreType)
ks.load(ksf, config.keyStorePass.get.toCharArray)
val kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
kmf.init(ks, config.keyStorePass.get.toCharArray)
kmf.getKeyManagers
}

}

/**
* Class for holding key and truststore settings
*/
case class SSLConfig(
trustStorePath: String,
trustStorePass: String,
keyStorePath: Option[String],
keyStorePass: Option[String],
useClientCert: Boolean = false,
keyStoreType: String = "JKS",
trustStoreType: String = "JKS",
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
*/
package io.lenses.streamreactor.connect.cassandra.sink

import cats.implicits.toBifunctorOps
import com.typesafe.scalalogging.StrictLogging
import io.lenses.streamreactor.common.config.Helpers
import io.lenses.streamreactor.common.utils.JarManifest

import java.util
import io.lenses.streamreactor.connect.cassandra.config.CassandraConfigConstants
import io.lenses.streamreactor.connect.cassandra.config.CassandraConfigSink
import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.Task
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkConnector

import java.util
import scala.jdk.CollectionConverters.MapHasAsScala
import scala.jdk.CollectionConverters.SeqHasAsJava
import scala.util.Failure
Expand Down Expand Up @@ -66,7 +66,7 @@ class CassandraSinkConnector extends SinkConnector with StrictLogging {
*/
override def start(props: util.Map[String, String]): Unit = {
//check input topics
Helpers.checkInputTopics(CassandraConfigConstants.KCQL, props.asScala.toMap)
Helpers.checkInputTopics(CassandraConfigConstants.KCQL, props.asScala.toMap).leftMap(throw _)
configProps = props
Try(new CassandraConfigSink(props.asScala.toMap)) match {
case Failure(f) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cassandra.config

import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import javax.net.ssl.KeyManager
import javax.net.ssl.SSLContext
import javax.net.ssl.TrustManager

/**
* Created by [email protected] on 19/04/16.
* stream-reactor
*/
class TestSSLConfigContext extends AnyWordSpec with Matchers with BeforeAndAfter {
var sslConfig: SSLConfig = null
var sslConfigNoClient: SSLConfig = null

before {
val trustStorePath = getClass.getResource("/stc_truststore.jks").getPath
val keystorePath = getClass.getResource("/stc_keystore.jks").getPath
val trustStorePassword = "erZHDS9Eo0CcNo"
val keystorePassword = "8yJQLUnGkwZxOw"
sslConfig = SSLConfig(trustStorePath, trustStorePassword, Some(keystorePath), Some(keystorePassword), true)
sslConfigNoClient = SSLConfig(trustStorePath, trustStorePassword, Some(keystorePath), Some(keystorePassword), false)
}

"SSLConfigContext" should {
"should return an Array of KeyManagers" in {
val keyManagers = SSLConfigContext.getKeyManagers(sslConfig)
keyManagers.length shouldBe 1
val entry = keyManagers.head
entry shouldBe a[KeyManager]
}

"should return an Array of TrustManagers" in {
val trustManager = SSLConfigContext.getTrustManagers(sslConfig)
trustManager.length shouldBe 1
val entry = trustManager.head
entry shouldBe a[TrustManager]
}

"should return a SSLContext" in {
val context = SSLConfigContext(sslConfig)
context.getProtocol shouldBe "SSL"
context shouldBe a[SSLContext]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.lenses.streamreactor.common.config

import cats.implicits.catsSyntaxEitherId
import io.lenses.kcql.Kcql
import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.common.config.ConfigException
Expand All @@ -26,32 +27,32 @@ import org.apache.kafka.common.config.ConfigException

object Helpers extends StrictLogging {

def checkInputTopics(kcqlConstant: String, props: Map[String, String]): Boolean = {
def checkInputTopics(kcqlConstant: String, props: Map[String, String]): Either[Throwable, Unit] = {
val topics = props("topics").split(",").map(t => t.trim).toSet
val raw = props(kcqlConstant)
if (raw.isEmpty) {
throw new ConfigException(s"Missing $kcqlConstant")
return new ConfigException(s"Missing $kcqlConstant").asLeft
}
val kcql = raw.split(";").map(r => Kcql.parse(r)).toSet
val sources = kcql.map(k => k.getSource)
val res = topics.subsetOf(sources)

if (!res) {
val missing = topics.diff(sources)
throw new ConfigException(
return new ConfigException(
s"Mandatory `topics` configuration contains topics not set in $kcqlConstant: ${missing}, kcql contains $sources",
)
).asLeft
}

val res1 = sources.subsetOf(topics)

if (!res1) {
val missing = topics.diff(sources)
throw new ConfigException(
return new ConfigException(
s"$kcqlConstant configuration contains topics not set in mandatory `topic` configuration: ${missing}, kcql contains $sources",
)
).asLeft
}

true
().asRight
}
}
Loading
Loading