diff --git a/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisCacheTest.scala b/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisCacheTest.scala index 97398ee6a..1d0aa30fc 100644 --- a/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisCacheTest.scala +++ b/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisCacheTest.scala @@ -22,6 +22,7 @@ import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings import com.dimafeng.testcontainers.ForAllTestContainer import com.dimafeng.testcontainers.GenericContainer import com.google.gson.Gson +import io.lenses.streamreactor.connect.redis.sink.JedisClientBuilder import org.apache.kafka.connect.data.Schema import org.apache.kafka.connect.data.SchemaBuilder import org.apache.kafka.connect.data.Struct @@ -66,8 +67,7 @@ class RedisCacheTest val props = (baseProps + (RedisConfigConstants.KCQL_CONFIG -> QUERY_ALL)).asJava val config = RedisConfig(props) val settings = RedisSinkSettings(config) - val writer = new RedisCache(settings) - writer.createClient(settings) + val writer = new RedisCache(settings, JedisClientBuilder.createClient(settings)) val json = """ @@ -99,8 +99,7 @@ class RedisCacheTest val props = (baseProps + (RedisConfigConstants.KCQL_CONFIG -> QUERY_ALL)).asJava val config = RedisConfig(props) val settings = RedisSinkSettings(config) - val writer = new RedisCache(settings) - writer.createClient(settings) + val writer = new RedisCache(settings, JedisClientBuilder.createClient(settings)) val childSchema = SchemaBuilder.struct().name("com.example.Child") .field("firstName", Schema.STRING_SCHEMA) @@ -154,8 +153,7 @@ class RedisCacheTest val props = (baseProps + (RedisConfigConstants.KCQL_CONFIG -> QUERY_ALL)).asJava val config = RedisConfig(props) val settings = RedisSinkSettings(config) - val writer = new RedisCache(settings) - writer.createClient(settings) + val writer = new RedisCache(settings, JedisClientBuilder.createClient(settings)) val schema = SchemaBuilder.struct().name("com.example.Person") .field("firstName", Schema.STRING_SCHEMA) @@ -190,8 +188,7 @@ class RedisCacheTest val config = RedisConfig(props) val settings = RedisSinkSettings(config) - val writer = new RedisCache(settings) - writer.createClient(settings) + val writer = new RedisCache(settings, JedisClientBuilder.createClient(settings)) val schema = SchemaBuilder.struct().name("com.example.Person") .field("firstName", Schema.STRING_SCHEMA) @@ -261,9 +258,8 @@ class RedisCacheTest val props = base_Props.asJava val config = RedisConfig(props) val settings = RedisSinkSettings(config) - val writer = new RedisCache(settings) + val writer = new RedisCache(settings, JedisClientBuilder.createClient(settings)) - writer.createClient(settings) writer.write(Seq(nickRecord)) val key = nick.get("firstName").toString + RedisConfigConstants.REDIS_PK_DELIMITER_DEFAULT_VALUE + nickJr.get( @@ -280,8 +276,7 @@ class RedisCacheTest val props = (base_Props + (RedisConfigConstants.REDIS_PK_DELIMITER -> delimiter)).asJava val config = RedisConfig(props) val settings = RedisSinkSettings(config) - val writer = new RedisCache(settings) - writer.createClient(settings) + val writer = new RedisCache(settings, JedisClientBuilder.createClient(settings)) writer.write(Seq(nickRecord)) @@ -297,8 +292,7 @@ class RedisCacheTest val props = base_Props.asJava val config = RedisConfig(props) val settings = RedisSinkSettings(config) - val writer = new RedisCache(settings) - writer.createClient(settings) + val writer = new RedisCache(settings, JedisClientBuilder.createClient(settings)) writer.write(Seq(nickRecord)) diff --git a/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisGeoAddTest.scala b/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisGeoAddTest.scala index 7beae1de5..184064e20 100644 --- a/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisGeoAddTest.scala +++ b/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisGeoAddTest.scala @@ -6,6 +6,7 @@ import io.lenses.streamreactor.connect.redis.sink.config.RedisConnectionInfo import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings import com.dimafeng.testcontainers.ForAllTestContainer import com.dimafeng.testcontainers.GenericContainer +import io.lenses.streamreactor.connect.redis.sink.JedisClientBuilder import org.apache.kafka.connect.data.Schema import org.apache.kafka.connect.data.SchemaBuilder import org.apache.kafka.connect.data.Struct @@ -42,8 +43,7 @@ class RedisGeoAddTest extends AnyWordSpec with Matchers with MockitoSugar with F val config = RedisConfig(props) val connectionInfo = new RedisConnectionInfo("localhost", container.mappedPort(6379), None) val settings = RedisSinkSettings(config) - val writer = new RedisGeoAdd(settings) - writer.createClient(settings) + val writer = new RedisGeoAdd(settings, JedisClientBuilder.createClient(settings)) val schema = SchemaBuilder.struct().name("com.example.Cpu") .field("longitude", Schema.STRING_SCHEMA) @@ -86,8 +86,7 @@ class RedisGeoAddTest extends AnyWordSpec with Matchers with MockitoSugar with F val config = RedisConfig(props) val connectionInfo = new RedisConnectionInfo("localhost", container.mappedPort(6379), None) val settings = RedisSinkSettings(config) - val writer = new RedisGeoAdd(settings) - writer.createClient(settings) + val writer = new RedisGeoAdd(settings, JedisClientBuilder.createClient(settings)) val schema = SchemaBuilder.struct().name("com.example.Cpu") .field("longitude", Schema.STRING_SCHEMA) @@ -132,8 +131,7 @@ class RedisGeoAddTest extends AnyWordSpec with Matchers with MockitoSugar with F val config = RedisConfig(props) val connectionInfo = new RedisConnectionInfo("localhost", container.mappedPort(6379), None) val settings = RedisSinkSettings(config) - val writer = new RedisGeoAdd(settings) - writer.createClient(settings) + val writer = new RedisGeoAdd(settings, JedisClientBuilder.createClient(settings)) val schema = SchemaBuilder.struct().name("com.example.Cpu") .field("lng", Schema.STRING_SCHEMA) diff --git a/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisInsertSortedSetTest.scala b/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisInsertSortedSetTest.scala index f238e773f..bc27696ff 100644 --- a/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisInsertSortedSetTest.scala +++ b/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisInsertSortedSetTest.scala @@ -22,6 +22,7 @@ import io.lenses.streamreactor.connect.redis.sink.config.RedisConnectionInfo import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings import com.dimafeng.testcontainers import com.dimafeng.testcontainers.ForAllTestContainer +import io.lenses.streamreactor.connect.redis.sink.JedisClientBuilder import org.apache.kafka.connect.data.Schema import org.apache.kafka.connect.data.SchemaBuilder import org.apache.kafka.connect.data.Struct @@ -57,8 +58,7 @@ class RedisInsertSortedSetTest extends AnyWordSpec with Matchers with MockitoSug val config = RedisConfig(props) val connectionInfo = new RedisConnectionInfo("localhost", container.mappedPort(6379), None) val settings = RedisSinkSettings(config) - val writer = new RedisInsertSortedSet(settings) - writer.createClient(settings) + val writer = new RedisInsertSortedSet(settings, JedisClientBuilder.createClient(settings)) val schema = SchemaBuilder.struct().name("com.example.Cpu") .field("type", Schema.STRING_SCHEMA) diff --git a/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisMultipleSortedSetsTest.scala b/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisMultipleSortedSetsTest.scala index 6f1c83d3a..0b311c1bc 100644 --- a/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisMultipleSortedSetsTest.scala +++ b/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisMultipleSortedSetsTest.scala @@ -16,6 +16,7 @@ package io.lenses.streamreactor.connect.redis.sink.writer +import io.lenses.streamreactor.connect.redis.sink.JedisClientBuilder import io.lenses.streamreactor.connect.redis.sink.RedisSinkTask import io.lenses.streamreactor.connect.redis.sink.config.RedisConfig import io.lenses.streamreactor.connect.redis.sink.config.RedisConfigConstants @@ -61,8 +62,7 @@ class RedisMultipleSortedSetsTest extends AnyWordSpec with Matchers with Mockito val connectionInfo = new RedisConnectionInfo("localhost", container.mappedPort(6379), None) val settings = RedisSinkSettings(config) - val writer = new RedisMultipleSortedSets(settings) - writer.createClient(settings) + val writer = new RedisMultipleSortedSets(settings, JedisClientBuilder.createClient(settings)) val schema = SchemaBuilder.struct().name("com.example.device") .field("sensorID", Schema.STRING_SCHEMA) diff --git a/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisPubSubTest.scala b/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisPubSubTest.scala index f26025447..b09911f74 100644 --- a/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisPubSubTest.scala +++ b/kafka-connect-redis/src/it/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisPubSubTest.scala @@ -23,6 +23,7 @@ import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings import com.dimafeng.testcontainers.ForAllTestContainer import com.dimafeng.testcontainers.GenericContainer import com.typesafe.scalalogging.LazyLogging +import io.lenses.streamreactor.connect.redis.sink.JedisClientBuilder import org.apache.kafka.connect.data.Schema import org.apache.kafka.connect.data.SchemaBuilder import org.apache.kafka.connect.data.Struct @@ -60,8 +61,7 @@ class RedisPubSubTest extends AnyWordSpec with Matchers with MockitoSugar with L val config = RedisConfig(props) val connectionInfo = new RedisConnectionInfo("localhost", container.mappedPort(6379), None) val settings = RedisSinkSettings(config) - val writer = new RedisPubSub(settings) - writer.createClient(settings) + val writer = new RedisPubSub(settings, JedisClientBuilder.createClient(settings)) val schema = SchemaBuilder.struct().name("com.example.Cpu") .field("type", Schema.STRING_SCHEMA) diff --git a/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/JedisClientBuilder.scala b/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/JedisClientBuilder.scala new file mode 100644 index 000000000..c81800225 --- /dev/null +++ b/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/JedisClientBuilder.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.redis.sink + +import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings +import redis.clients.jedis.Jedis + +import java.io.File +import java.io.FileNotFoundException + +object JedisClientBuilder { + def createClient(sinkSettings: RedisSinkSettings): Jedis = { + val connection = sinkSettings.connectionInfo + + if (connection.isSslConnection) { + connection.keyStoreFilepath match { + case Some(path) => + if (!new File(path).exists) { + throw new FileNotFoundException(s"Keystore not found in: [$path]") + } + + System.setProperty("javax.net.ssl.keyStorePassword", connection.keyStorePassword.getOrElse("")) + System.setProperty("javax.net.ssl.keyStore", path) + System.setProperty("javax.net.ssl.keyStoreType", connection.keyStoreType.getOrElse("jceks")) + + case None => + } + + connection.trustStoreFilepath match { + case Some(path) => + if (!new File(path).exists) { + throw new FileNotFoundException(s"Truststore not found in: $path") + } + + System.setProperty("javax.net.ssl.trustStorePassword", connection.trustStorePassword.getOrElse("")) + System.setProperty("javax.net.ssl.trustStore", path) + System.setProperty("javax.net.ssl.trustStoreType", connection.trustStoreType.getOrElse("jceks")) + + case None => + } + } + + val jedis = new Jedis(connection.host, connection.port, connection.isSslConnection) + connection.password.foreach(p => jedis.auth(p)) + jedis + } +} diff --git a/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/RedisSinkTask.scala b/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/RedisSinkTask.scala index f341e5a30..59eeddc21 100644 --- a/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/RedisSinkTask.scala +++ b/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/RedisSinkTask.scala @@ -24,6 +24,7 @@ import io.lenses.streamreactor.connect.redis.sink.config.RedisConfigConstants import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings import io.lenses.streamreactor.connect.redis.sink.writer._ import com.typesafe.scalalogging.StrictLogging +import io.lenses.streamreactor.common.sink.DbWriter import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition import org.apache.kafka.connect.sink.SinkRecord @@ -40,7 +41,7 @@ import scala.jdk.CollectionConverters.ListHasAsScala * target sink */ class RedisSinkTask extends SinkTask with StrictLogging { - var writer: List[RedisWriter] = List[RedisWriter]() + var writer: List[DbWriter] = List[DbWriter]() private val progressCounter = new ProgressCounter private var enableProgress: Boolean = false private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation) @@ -83,35 +84,30 @@ class RedisSinkTask extends SinkTask with StrictLogging { val mode_GEOADD = filterGeoAddMode(settings) val mode_STREAM = filterStream(settings) - + val jedis = JedisClientBuilder.createClient(settings) //-- Start as many writers as required writer = (modeCache.kcqlSettings.headOption.map { _ => logger.info(s"Starting [${modeCache.kcqlSettings.size}] KCQLs with Redis Cache mode") - val writer = new RedisCache(modeCache) - writer.createClient(settings) + val writer = new RedisCache(modeCache, jedis) List(writer) } ++ mode_INSERT_SS.kcqlSettings.headOption.map { _ => logger.info(s"Starting ${mode_INSERT_SS.kcqlSettings.size}] KCQLs with Redis Insert Sorted Set mode") - val writer = new RedisInsertSortedSet(mode_INSERT_SS) - writer.createClient(settings) + val writer = new RedisInsertSortedSet(mode_INSERT_SS, jedis) List(writer) } ++ mode_PUBSUB.kcqlSettings.headOption.map { _ => logger.info(s"Starting [${mode_PUBSUB.kcqlSettings.size}] KCQLs with Redis PubSub mode") - val writer = new RedisPubSub(mode_PUBSUB) - writer.createClient(settings) + val writer = new RedisPubSub(mode_PUBSUB, jedis) List(writer) } ++ mode_PK_SS.kcqlSettings.headOption.map { _ => logger.info(s"Starting [${mode_PK_SS.kcqlSettings.size}] KCQLs with Redis Multiple Sorted Sets mode") - val writer = new RedisMultipleSortedSets(mode_PK_SS) - writer.createClient(settings) + val writer = new RedisMultipleSortedSets(mode_PK_SS, jedis) List(writer) } ++ mode_GEOADD.kcqlSettings.headOption.map { _ => logger.info(s"Starting [${mode_GEOADD.kcqlSettings.size}] KCQLs with Redis Geo Add mode") - List(new RedisGeoAdd(mode_GEOADD)) + List(new RedisGeoAdd(mode_GEOADD, jedis)) } ++ mode_STREAM.kcqlSettings.headOption.map { _ => logger.info(s"Starting [${mode_STREAM.kcqlSettings.size}] KCQLs with Redis Stream mode") - val writer = new RedisStreams(mode_STREAM) - writer.createClient(settings) + val writer = new RedisStreams(mode_STREAM, jedis) List(writer) }).flatten.toList diff --git a/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisCache.scala b/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisCache.scala index c66e47569..ef2e3d97c 100644 --- a/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisCache.scala +++ b/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisCache.scala @@ -15,15 +15,19 @@ */ package io.lenses.streamreactor.connect.redis.sink.writer +import com.typesafe.scalalogging.StrictLogging import io.lenses.kcql.Kcql import io.lenses.streamreactor.common.config.base.settings.Projections +import io.lenses.streamreactor.common.errors.ErrorHandler import io.lenses.streamreactor.common.schemas.SinkRecordConverterHelper.SinkRecordExtension import io.lenses.streamreactor.common.schemas.StructHelper +import io.lenses.streamreactor.common.sink.DbWriter import io.lenses.streamreactor.connect.json.SimpleJsonConverter import io.lenses.streamreactor.connect.redis.sink.config.RedisKCQLSetting import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings import org.apache.kafka.connect.errors.ConnectException import org.apache.kafka.connect.sink.SinkRecord +import redis.clients.jedis.Jedis import scala.jdk.CollectionConverters.ListHasAsScala import scala.util.Failure @@ -40,7 +44,8 @@ import scala.util.Try * INSERT INTO FX- SELECT price from yahoo-fx PK symbol * SELECT price from yahoo-fx PK symbol WITHEXTRACT */ -class RedisCache(sinkSettings: RedisSinkSettings) extends RedisWriter { +class RedisCache(sinkSettings: RedisSinkSettings, jedis: Jedis) extends DbWriter with StrictLogging with ErrorHandler { + initialize(sinkSettings.taskRetries, sinkSettings.errorPolicy) private lazy val simpleJsonConverter = new SimpleJsonConverter() val configs: Set[Kcql] = sinkSettings.kcqlSettings.map(_.kcqlConfig) @@ -133,4 +138,5 @@ class RedisCache(sinkSettings: RedisSinkSettings) extends RedisWriter { } logger.debug(s"Wrote [${sinkRecords.size}] rows for topic [$topic]") } + override def close(): Unit = jedis.close() } diff --git a/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisGeoAdd.scala b/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisGeoAdd.scala index 7a9ee914f..cd64ae4d1 100644 --- a/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisGeoAdd.scala +++ b/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisGeoAdd.scala @@ -15,15 +15,19 @@ */ package io.lenses.streamreactor.connect.redis.sink.writer +import com.typesafe.scalalogging.StrictLogging import io.lenses.kcql.Kcql import io.lenses.streamreactor.common.config.base.settings.Projections +import io.lenses.streamreactor.common.errors.ErrorHandler import io.lenses.streamreactor.common.schemas.SinkRecordConverterHelper.SinkRecordExtension import io.lenses.streamreactor.common.schemas.StructHelper +import io.lenses.streamreactor.common.sink.DbWriter import io.lenses.streamreactor.connect.json.SimpleJsonConverter import io.lenses.streamreactor.connect.redis.sink.config.RedisKCQLSetting import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings import org.apache.kafka.connect.errors.ConnectException import org.apache.kafka.connect.sink.SinkRecord +import redis.clients.jedis.Jedis import scala.jdk.CollectionConverters.ListHasAsScala import scala.util.control.Exception.allCatch @@ -31,7 +35,12 @@ import scala.util.Failure import scala.util.Success import scala.util.Try -class RedisGeoAdd(sinkSettings: RedisSinkSettings) extends RedisWriter with GeoAddSupport { +class RedisGeoAdd(sinkSettings: RedisSinkSettings, jedis: Jedis) + extends DbWriter + with StrictLogging + with ErrorHandler + with GeoAddSupport { + initialize(sinkSettings.taskRetries, sinkSettings.errorPolicy) private lazy val simpleJsonConverter = new SimpleJsonConverter() @@ -135,4 +144,6 @@ class RedisGeoAdd(sinkSettings: RedisSinkSettings) extends RedisWriter with GeoA } def isDoubleNumber(s: String): Boolean = (allCatch opt s.toDouble).isDefined + + override def close(): Unit = jedis.close() } diff --git a/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisInsertSortedSet.scala b/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisInsertSortedSet.scala index d55d1f937..28f32e69d 100644 --- a/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisInsertSortedSet.scala +++ b/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisInsertSortedSet.scala @@ -15,14 +15,18 @@ */ package io.lenses.streamreactor.connect.redis.sink.writer +import com.typesafe.scalalogging.StrictLogging import io.lenses.kcql.Kcql import io.lenses.streamreactor.common.config.base.settings.Projections +import io.lenses.streamreactor.common.errors.ErrorHandler import io.lenses.streamreactor.common.rowkeys.StringStructFieldsStringKeyBuilder import io.lenses.streamreactor.common.schemas.SinkRecordConverterHelper.SinkRecordExtension +import io.lenses.streamreactor.common.sink.DbWriter import io.lenses.streamreactor.connect.json.SimpleJsonConverter import io.lenses.streamreactor.connect.redis.sink.config.RedisKCQLSetting import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings import org.apache.kafka.connect.sink.SinkRecord +import redis.clients.jedis.Jedis import scala.jdk.CollectionConverters.ListHasAsScala import scala.util.Try @@ -40,7 +44,12 @@ import scala.util.Try * INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS SortedSet * INSERT INTO cpu_stats_SS SELECT * from cpuTopic STOREAS SortedSet (score=ts) */ -class RedisInsertSortedSet(sinkSettings: RedisSinkSettings) extends RedisWriter with SortedSetSupport { +class RedisInsertSortedSet(sinkSettings: RedisSinkSettings, jedis: Jedis) + extends DbWriter + with StrictLogging + with ErrorHandler + with SortedSetSupport { + initialize(sinkSettings.taskRetries, sinkSettings.errorPolicy) val configs: Set[Kcql] = sinkSettings.kcqlSettings.map(_.kcqlConfig) configs.foreach { c => @@ -124,4 +133,5 @@ class RedisInsertSortedSet(sinkSettings: RedisSinkSettings) extends RedisWriter }, ) + override def close(): Unit = jedis.close() } diff --git a/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisMultipleSortedSets.scala b/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisMultipleSortedSets.scala index b0bbf6b5e..459826866 100644 --- a/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisMultipleSortedSets.scala +++ b/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisMultipleSortedSets.scala @@ -15,14 +15,18 @@ */ package io.lenses.streamreactor.connect.redis.sink.writer +import com.typesafe.scalalogging.StrictLogging import io.lenses.kcql.Kcql +import io.lenses.streamreactor.common.errors.ErrorHandler import io.lenses.streamreactor.common.schemas.SinkRecordConverterHelper.SinkRecordExtension import io.lenses.streamreactor.common.schemas.StructHelper +import io.lenses.streamreactor.common.sink.DbWriter import io.lenses.streamreactor.connect.json.SimpleJsonConverter import io.lenses.streamreactor.connect.redis.sink.config.RedisKCQLSetting import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings import org.apache.kafka.connect.errors.ConnectException import org.apache.kafka.connect.sink.SinkRecord +import redis.clients.jedis.Jedis import scala.jdk.CollectionConverters.ListHasAsScala import scala.util.Failure @@ -36,7 +40,12 @@ import scala.util.Try * * .. PK .. STOREAS SortedSet */ -class RedisMultipleSortedSets(sinkSettings: RedisSinkSettings) extends RedisWriter with SortedSetSupport { +class RedisMultipleSortedSets(sinkSettings: RedisSinkSettings, jedis: Jedis) + extends DbWriter + with StrictLogging + with ErrorHandler + with SortedSetSupport { + initialize(sinkSettings.taskRetries, sinkSettings.errorPolicy) private lazy val simpleJsonConverter = new SimpleJsonConverter() @@ -151,4 +160,5 @@ class RedisMultipleSortedSets(sinkSettings: RedisSinkSettings) extends RedisWrit logger.debug(s"Wrote [${sinkRecords.size}] rows for topic [$topic]") } + override def close(): Unit = jedis.close() } diff --git a/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisPubSub.scala b/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisPubSub.scala index eaa3ea369..c4591fb7b 100644 --- a/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisPubSub.scala +++ b/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisPubSub.scala @@ -15,14 +15,18 @@ */ package io.lenses.streamreactor.connect.redis.sink.writer +import com.typesafe.scalalogging.StrictLogging import io.lenses.kcql.Kcql import io.lenses.streamreactor.common.config.base.settings.Projections +import io.lenses.streamreactor.common.errors.ErrorHandler import io.lenses.streamreactor.common.rowkeys.StringStructFieldsStringKeyBuilder import io.lenses.streamreactor.common.schemas.SinkRecordConverterHelper.SinkRecordExtension +import io.lenses.streamreactor.common.sink.DbWriter import io.lenses.streamreactor.connect.json.SimpleJsonConverter import io.lenses.streamreactor.connect.redis.sink.config.RedisKCQLSetting import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings import org.apache.kafka.connect.sink.SinkRecord +import redis.clients.jedis.Jedis import scala.jdk.CollectionConverters.ListHasAsScala import scala.util.Try @@ -40,8 +44,12 @@ import scala.util.Try * SELECT * from cpuTopic STOREAS PubSub * SELECT * from cpuTopic STOREAS PubSub (channel=channel) */ -class RedisPubSub(sinkSettings: RedisSinkSettings) extends RedisWriter with PubSubSupport { - +class RedisPubSub(sinkSettings: RedisSinkSettings, jedis: Jedis) + extends DbWriter + with StrictLogging + with ErrorHandler + with PubSubSupport { + initialize(sinkSettings.taskRetries, sinkSettings.errorPolicy) val configs: Set[Kcql] = sinkSettings.kcqlSettings.map(_.kcqlConfig) configs.foreach { c => // assert(c.getTarget.length > 0, "Add to your KCQL syntax : INSERT INTO REDIS_KEY_NAME ") @@ -104,5 +112,5 @@ class RedisPubSub(sinkSettings: RedisSinkSettings) extends RedisWriter with PubS logger.debug(s"Published [${sinkRecords.size}] messages for topic [$topic]") }, ) - + override def close(): Unit = jedis.close() } diff --git a/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisStreams.scala b/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisStreams.scala index bc0ab5d9b..c248f488b 100644 --- a/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisStreams.scala +++ b/kafka-connect-redis/src/main/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisStreams.scala @@ -22,8 +22,12 @@ import io.lenses.streamreactor.connect.json.SimpleJsonConverter import io.lenses.streamreactor.connect.redis.sink.config.RedisKCQLSetting import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings import com.fasterxml.jackson.databind.ObjectMapper +import com.typesafe.scalalogging.StrictLogging +import io.lenses.streamreactor.common.errors.ErrorHandler +import io.lenses.streamreactor.common.sink.DbWriter import org.apache.kafka.connect.errors.ConnectException import org.apache.kafka.connect.sink.SinkRecord +import redis.clients.jedis.Jedis import redis.clients.jedis.params.XAddParams import scala.jdk.CollectionConverters.MapHasAsJava @@ -41,7 +45,12 @@ import scala.util.Try * * INSERT INTO stream1 SELECT * from cpuTopic STOREAS stream */ -class RedisStreams(sinkSettings: RedisSinkSettings) extends RedisWriter with PubSubSupport { +class RedisStreams(sinkSettings: RedisSinkSettings, jedis: Jedis) + extends DbWriter + with StrictLogging + with ErrorHandler + with PubSubSupport { + initialize(sinkSettings.taskRetries, sinkSettings.errorPolicy) val configs: Set[Kcql] = sinkSettings.kcqlSettings.map(_.kcqlConfig) configs.foreach { c => @@ -105,4 +114,5 @@ class RedisStreams(sinkSettings: RedisSinkSettings) extends RedisWriter with Pub logger.debug(s"Published [${sinkRecords.size}] messages for topic [$topic]") }, ) + override def close(): Unit = jedis.close() } diff --git a/kafka-connect-redis/src/test/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisSslTest.scala b/kafka-connect-redis/src/test/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisSslTest.scala deleted file mode 100644 index 4e5b88d52..000000000 --- a/kafka-connect-redis/src/test/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisSslTest.scala +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Copyright 2017-2023 Lenses.io Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.lenses.streamreactor.connect.redis.sink.writer - -import io.lenses.streamreactor.connect.redis.sink.config.RedisConfig -import io.lenses.streamreactor.connect.redis.sink.config.RedisConfigConstants -import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings -import com.google.gson.Gson -import org.apache.kafka.common.config.SslConfigs -import org.apache.kafka.connect.data.Schema -import org.apache.kafka.connect.data.SchemaBuilder -import org.apache.kafka.connect.data.Struct -import org.apache.kafka.connect.sink.SinkRecord -import org.mockito.MockitoSugar -import org.scalatest.BeforeAndAfterAll -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec -import redis.clients.jedis.Jedis - -import java.net.URI -import scala.jdk.CollectionConverters.MapHasAsJava -import scala.jdk.CollectionConverters.MapHasAsScala - -/* -README BEFORE THE TEST - -Since Redis natively doesn't support ssl connections -we use tunneling via port 6390 and the Jedis client https://github.com/xetorthio/jedis - -The test requires to: -1) Start the server by executing `make` on https://github.com/xetorthio/jedis/blob/master/Makefile -2) set the truststoreFilePath below with the location of truststore.jceks file -3) set the runTests to true - */ - -class RedisSslTest extends AnyWordSpec with Matchers with BeforeAndAfterAll with MockitoSugar { - - val runTests = false - - val truststoreFilePath = "src/test/resources/truststore.jceks" - - val gson = new Gson() - - val TOPIC = "topic" - val baseProps = Map( - RedisConfigConstants.REDIS_HOST -> "localhost", - RedisConfigConstants.REDIS_PORT -> "6390", - RedisConfigConstants.REDIS_PASSWORD -> "foobared", - RedisConfigConstants.REDIS_SSL_ENABLED -> "true", - SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG -> truststoreFilePath, - ) - -// def setupTrustStore(): Unit = { -// setJvmTrustStore(truststoreFilePath, "jceks") -// } - -// private def setJvmTrustStore(trustStoreFilePath: String, trustStoreType: String): Unit = { -// new File(trustStoreFilePath).exists shouldBe true -// System.setProperty("javax.net.ssl.trustStore", trustStoreFilePath) -// System.setProperty("javax.net.ssl.trustStoreType", trustStoreType) -// } - -// override def beforeAll() = { -// if (runTests) { -// setupTrustStore() -// } -// } -// -// override def afterAll() = { -// } - - "JedisSslClient" should { - - "establish ssl connection" in { - - if (!runTests) cancel("runTests is disabled") - - val truststoreFilePath = getClass.getResource("/truststore.jks").getPath - val keystoreFilePath = getClass.getResource("/keystore.jks").getPath - - val map = Map( - RedisConfigConstants.REDIS_HOST -> "rediss://0.0.0.0", - RedisConfigConstants.REDIS_PORT -> "8453", - RedisConfigConstants.KCQL_CONFIG -> "SELECT * FROM topicA PK firstName, child.firstName", - RedisConfigConstants.ERROR_POLICY -> "THROW", - RedisConfigConstants.REDIS_SSL_ENABLED -> "true", - SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG -> truststoreFilePath, - SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG -> "truststore-password", - SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> keystoreFilePath, - SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> "keystore-password", - ) - - val config = RedisConfig(map.asJava) - val settings = RedisSinkSettings(config) - - val writer = new RedisCache(settings) - writer.createClient(settings) - - val props = System.getProperties - props.containsKey("javax.net.ssl.keyStorePassword") shouldBe true - props.get("javax.net.ssl.keyStorePassword") shouldBe "keystore-password" - props.containsKey("javax.net.ssl.keyStore") shouldBe true - props.get("javax.net.ssl.keyStore") shouldBe keystoreFilePath - props.containsKey("javax.net.ssl.keyStoreType") shouldBe true - props.get("javax.net.ssl.keyStoreType") shouldBe "JKS" - - props.containsKey("javax.net.ssl.trustStorePassword") shouldBe true - props.get("javax.net.ssl.trustStorePassword") shouldBe "truststore-password" - props.containsKey("javax.net.ssl.trustStore") shouldBe true - props.get("javax.net.ssl.trustStore") shouldBe truststoreFilePath - props.containsKey("javax.net.ssl.trustStoreType") shouldBe true - props.get("javax.net.ssl.trustStoreType") shouldBe "JKS" - -// if (runTests) { -// -// val jedis = new Jedis(URI.create(s"rediss://${baseProps(RedisConfigConstants.REDIS_HOST)}:${baseProps(RedisConfigConstants.REDIS_PORT)}")) -// jedis.auth(baseProps(RedisConfigConstants.REDIS_PASSWORD)) -// jedis.ping() shouldBe "PONG" -// -// } - } - } - - "RedisDbWriter" should { - - "write Kafka records to Redis using CACHE mode and ssl connection" in { - - if (runTests) { - - val jedis = new Jedis(URI.create( - s"rediss://${baseProps(RedisConfigConstants.REDIS_HOST)}:${baseProps(RedisConfigConstants.REDIS_PORT)}", - )) - jedis.auth(baseProps(RedisConfigConstants.REDIS_PASSWORD)) - jedis.ping() shouldBe "PONG" - - val QUERY_ALL = s"SELECT * FROM $TOPIC PK firstName, child.firstName" - val props = (baseProps + (RedisConfigConstants.KCQL_CONFIG -> QUERY_ALL)).asJava - val config = RedisConfig(props) - val settings = RedisSinkSettings(config) - val writer = new RedisCache(settings) - - val childSchema = SchemaBuilder.struct().name("com.example.Child") - .field("firstName", Schema.STRING_SCHEMA) - .build() - - val schema = SchemaBuilder.struct().name("com.example.Person") - .field("firstName", Schema.STRING_SCHEMA) - .field("age", Schema.INT32_SCHEMA) - .field("threshold", Schema.OPTIONAL_FLOAT64_SCHEMA) - .field("child", childSchema) - .build() - - val alexJr = new Struct(childSchema) - .put("firstName", "Alex_Junior") - val alex = new Struct(schema) - .put("firstName", "Alex") - .put("age", 30) - .put("child", alexJr) - val maraJr = new Struct(childSchema) - .put("firstName", "Mara_Junior") - val mara = new Struct(schema).put("firstName", "Mara") - .put("age", 22) - .put("threshold", 12.4) - .put("child", maraJr) - - val alexRecord = new SinkRecord(TOPIC, 1, null, null, schema, alex, 0) - val maraRecord = new SinkRecord(TOPIC, 1, null, null, schema, mara, 1) - - writer.write(Seq(alexRecord, maraRecord)) - - val alexValue = jedis.get("Alex.Alex_Junior") - alexValue should not be null - - val alexMap = gson.fromJson(alexValue, classOf[java.util.Map[String, AnyRef]]).asScala - alexMap("firstName").toString shouldBe "Alex" - alexMap("age").toString shouldBe "30.0" //it gets back a java double!? - alexMap("child").asInstanceOf[java.util.Map[String, AnyRef]].get("firstName") shouldBe "Alex_Junior" - - val maraValue = jedis.get("Mara.Mara_Junior") - maraValue should not be null - - val maraMap = gson.fromJson(maraValue, classOf[java.util.Map[String, AnyRef]]).asScala - maraMap("firstName") shouldBe "Mara" - maraMap("age").toString shouldBe "22.0" - maraMap("threshold").toString shouldBe "12.4" - maraMap("child").asInstanceOf[java.util.Map[String, AnyRef]].get("firstName") shouldBe "Mara_Junior" - } - } - - } -} diff --git a/kafka-connect-redis/src/test/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisStreamTest.scala b/kafka-connect-redis/src/test/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisStreamTest.scala index c3622be10..a59dfbc32 100644 --- a/kafka-connect-redis/src/test/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisStreamTest.scala +++ b/kafka-connect-redis/src/test/scala/io/lenses/streamreactor/connect/redis/sink/writer/RedisStreamTest.scala @@ -56,12 +56,6 @@ class RedisStreamTest with BeforeAndAfterAll with MockitoSugar with ArgumentMatchersSugar { -// -// val redisServer = new RedisServer(6379) -// -// override def beforeAll() = redisServer.start() -// -// override def afterAll() = redisServer.stop() "Redis Stream writer" should { @@ -78,7 +72,8 @@ class RedisStreamTest val config = RedisConfig(props) val settings = RedisSinkSettings(config) - val writer = new RedisStreams(settings) + val jedis = mock[Jedis] + val writer = new RedisStreams(settings, jedis) val schema = SchemaBuilder.struct().name("com.example.Cpu") .field("type", Schema.STRING_SCHEMA) @@ -91,9 +86,6 @@ class RedisStreamTest val sinkRecord1 = new SinkRecord(TOPIC, 0, null, null, schema, struct1, 1) - val jedis = mock[Jedis] - writer.jedis = jedis - val map = new util.HashMap[String, String]() map.put("type", "Xeon") map.put("temperature", "60.4")