From fe9465ccf7769a30e04f3827cd58fe8d5b4ccfdb Mon Sep 17 00:00:00 2001 From: mroiter-larus Date: Fri, 17 Dec 2021 18:07:44 +0100 Subject: [PATCH] Added new config admin.client.api.enabled --- .../kotlin/streams/config/StreamsConfig.kt | 5 +++++ .../streams/kafka/KafkaSinkConfiguration.kt | 9 ++++---- .../kafka/KafkaSinkConfigurationTest.kt | 18 +++++++++++----- .../main/kotlin/streams/kafka/AdminService.kt | 21 +++++++++++++++++++ .../streams/kafka/DefaultAdminService.kt | 17 +++++++++++++++ .../kotlin/streams/kafka/KafkaAdminService.kt | 10 ++++----- .../streams/kafka/KafkaConfiguration.kt | 10 +++++++-- .../kotlin/streams/kafka/KafkaEventRouter.kt | 20 +++++++++--------- .../streams/kafka/KafkaConfigurationTest.kt | 4 +++- 9 files changed, 87 insertions(+), 27 deletions(-) create mode 100644 producer/src/main/kotlin/streams/kafka/AdminService.kt create mode 100644 producer/src/main/kotlin/streams/kafka/DefaultAdminService.kt diff --git a/common/src/main/kotlin/streams/config/StreamsConfig.kt b/common/src/main/kotlin/streams/config/StreamsConfig.kt index 2603bb73..6b1c1b58 100644 --- a/common/src/main/kotlin/streams/config/StreamsConfig.kt +++ b/common/src/main/kotlin/streams/config/StreamsConfig.kt @@ -41,6 +41,8 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe const val POLL_INTERVAL = "streams.sink.poll.interval" const val INSTANCE_WAIT_TIMEOUT = "streams.wait.timeout" const val INSTANCE_WAIT_TIMEOUT_VALUE = 120000L + const val KAFKA_ADMIN_CLIENT_API_ENABLED = "kafka.admin.client.api.enabled" + const val KAFKA_ADMIN_CLIENT_API_ENABLED_VALUE = true private const val DEFAULT_TRIGGER_PERIOD: Int = 10000 @@ -83,6 +85,8 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe fun getSystemDbWaitTimeout(config: Map) = config.getOrDefault(SYSTEM_DB_WAIT_TIMEOUT, SYSTEM_DB_WAIT_TIMEOUT_VALUE).toString().toLong() fun getInstanceWaitTimeout(config: Map) = config.getOrDefault(INSTANCE_WAIT_TIMEOUT, INSTANCE_WAIT_TIMEOUT_VALUE).toString().toLong() + + fun isKafkaAdminClientApiEnabled(config: Map) = config.getOrDefault(KAFKA_ADMIN_CLIENT_API_ENABLED, KAFKA_ADMIN_CLIENT_API_ENABLED_VALUE).toString().toBoolean() } private val configLifecycle: ConfigurationLifecycle @@ -221,4 +225,5 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe fun getInstanceWaitTimeout() = Companion.getInstanceWaitTimeout(getConfiguration()) + fun isKafkaAdminClientApiEnabled() = Companion.isKafkaAdminClientApiEnabled(getConfiguration()) } \ No newline at end of file diff --git a/consumer/src/main/kotlin/streams/kafka/KafkaSinkConfiguration.kt b/consumer/src/main/kotlin/streams/kafka/KafkaSinkConfiguration.kt index 0a747f0d..9252ff4d 100644 --- a/consumer/src/main/kotlin/streams/kafka/KafkaSinkConfiguration.kt +++ b/consumer/src/main/kotlin/streams/kafka/KafkaSinkConfiguration.kt @@ -38,14 +38,15 @@ data class KafkaSinkConfiguration(val bootstrapServers: String = "localhost:9092 val streamsSinkConfiguration: StreamsSinkConfiguration = StreamsSinkConfiguration(), val enableAutoCommit: Boolean = true, val streamsAsyncCommit: Boolean = false, - val extraProperties: Map = emptyMap()) { + val extraProperties: Map = emptyMap(), + val adminClientApiEnabled: Boolean = StreamsConfig.KAFKA_ADMIN_CLIENT_API_ENABLED_VALUE) { companion object { fun from(cfg: Map, dbName: String, isDefaultDb: Boolean): KafkaSinkConfiguration { val kafkaCfg = create(cfg, dbName, isDefaultDb) validate(kafkaCfg) - val invalidTopics = getInvalidTopics(kafkaCfg.asProperties(), kafkaCfg.streamsSinkConfiguration.topics.allTopics()) + val invalidTopics = if (kafkaCfg.adminClientApiEnabled) getInvalidTopics(kafkaCfg.asProperties(), kafkaCfg.streamsSinkConfiguration.topics.allTopics()) else emptyList() return if (invalidTopics.isNotEmpty()) { kafkaCfg.copy(streamsSinkConfiguration = StreamsSinkConfiguration.from(cfg, dbName, invalidTopics, isDefaultDb)) } else { @@ -65,7 +66,6 @@ data class KafkaSinkConfiguration(val bootstrapServers: String = "localhost:9092 val streamsSinkConfiguration = StreamsSinkConfiguration.from(configMap = cfg, dbName = dbName, isDefaultDb = isDefaultDb) - return default.copy(keyDeserializer = config.getOrDefault(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, default.keyDeserializer), valueDeserializer = config.getOrDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, default.valueDeserializer), bootstrapServers = config.getOrDefault(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, default.bootstrapServers), @@ -74,7 +74,8 @@ data class KafkaSinkConfiguration(val bootstrapServers: String = "localhost:9092 enableAutoCommit = config.getOrDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, default.enableAutoCommit).toString().toBoolean(), streamsAsyncCommit = config.getOrDefault("streams.async.commit", default.streamsAsyncCommit).toString().toBoolean(), streamsSinkConfiguration = streamsSinkConfiguration, - extraProperties = extraProperties // for what we don't provide a default configuration + extraProperties = extraProperties, + adminClientApiEnabled = config.getOrDefault("admin.client.api.enabled", default.adminClientApiEnabled).toString().toBoolean()// for what we don't provide a default configuration ) } diff --git a/consumer/src/test/kotlin/streams/kafka/KafkaSinkConfigurationTest.kt b/consumer/src/test/kotlin/streams/kafka/KafkaSinkConfigurationTest.kt index b5508012..4e6277b2 100644 --- a/consumer/src/test/kotlin/streams/kafka/KafkaSinkConfigurationTest.kt +++ b/consumer/src/test/kotlin/streams/kafka/KafkaSinkConfigurationTest.kt @@ -18,6 +18,7 @@ import streams.StreamsSinkConfigurationTest import streams.config.StreamsConfig import streams.service.TopicValidationException import kotlin.test.assertEquals +import kotlin.test.assertFalse import kotlin.test.assertTrue class KafkaSinkConfigurationTest { @@ -37,6 +38,7 @@ class KafkaSinkConfigurationTest { assertEquals(true, default.enableAutoCommit) assertEquals(false, default.streamsAsyncCommit) assertEquals(emptyMap(), default.extraProperties) + assertTrue { default.adminClientApiEnabled } } @Test @@ -48,6 +50,7 @@ class KafkaSinkConfigurationTest { val group = "foo" val autoOffsetReset = "latest" val autoCommit = "false" + val kafkaAdminClientApiEnabled = "false" val config = mapOf(topicKey to topicValue, "kafka.bootstrap.servers" to bootstrap, "kafka.auto.offset.reset" to autoOffsetReset, @@ -55,14 +58,16 @@ class KafkaSinkConfigurationTest { "kafka.group.id" to group, "kafka.streams.async.commit" to "true", "kafka.key.deserializer" to ByteArrayDeserializer::class.java.name, - "kafka.value.deserializer" to KafkaAvroDeserializer::class.java.name) + "kafka.value.deserializer" to KafkaAvroDeserializer::class.java.name, + "kafka.admin.client.api.enabled" to kafkaAdminClientApiEnabled) val expectedMap = mapOf("bootstrap.servers" to bootstrap, "auto.offset.reset" to autoOffsetReset, "enable.auto.commit" to autoCommit, "group.id" to group, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.toString(), ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.toString(), "streams.async.commit" to "true", "key.deserializer" to ByteArrayDeserializer::class.java.name, - "value.deserializer" to KafkaAvroDeserializer::class.java.name) + "value.deserializer" to KafkaAvroDeserializer::class.java.name, + "admin.client.api.enabled" to kafkaAdminClientApiEnabled) val kafkaSinkConfiguration = KafkaSinkConfiguration.create(config, defaultDbName, isDefaultDb = true) StreamsSinkConfigurationTest.testFromConf(kafkaSinkConfiguration.streamsSinkConfiguration, topic, topicValue) @@ -70,6 +75,7 @@ class KafkaSinkConfigurationTest { assertEquals(bootstrap, kafkaSinkConfiguration.bootstrapServers) assertEquals(autoOffsetReset, kafkaSinkConfiguration.autoOffsetReset) assertEquals(group, kafkaSinkConfiguration.groupId) + assertFalse { kafkaSinkConfiguration.adminClientApiEnabled } val resultMap = kafkaSinkConfiguration .asProperties() .map { it.key.toString() to it.value.toString() } @@ -94,6 +100,7 @@ class KafkaSinkConfigurationTest { val autoOffsetReset = "latest" val autoCommit = "false" val asyncCommit = "true" + val kafkaAdminClientApiEnabled = "false" val config = mapOf(topicKey to topicValue, topicKeyFoo to topicValueFoo, "kafka.bootstrap.servers" to bootstrap, @@ -102,14 +109,16 @@ class KafkaSinkConfigurationTest { "kafka.group.id" to group, "kafka.streams.async.commit" to asyncCommit, "kafka.key.deserializer" to ByteArrayDeserializer::class.java.name, - "kafka.value.deserializer" to KafkaAvroDeserializer::class.java.name) + "kafka.value.deserializer" to KafkaAvroDeserializer::class.java.name, + "kafka.admin.client.api.enabled" to kafkaAdminClientApiEnabled) val expectedMap = mapOf("bootstrap.servers" to bootstrap, "auto.offset.reset" to autoOffsetReset, "enable.auto.commit" to autoCommit, "group.id" to "$group-$dbName", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.toString(), ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.toString(), "key.deserializer" to ByteArrayDeserializer::class.java.name, "streams.async.commit" to asyncCommit, - "value.deserializer" to KafkaAvroDeserializer::class.java.name) + "value.deserializer" to KafkaAvroDeserializer::class.java.name, + "admin.client.api.enabled" to kafkaAdminClientApiEnabled) val kafkaSinkConfiguration = KafkaSinkConfiguration.create(config, dbName, isDefaultDb = false) StreamsSinkConfigurationTest.testFromConf(kafkaSinkConfiguration.streamsSinkConfiguration, topic, topicValueFoo) @@ -179,5 +188,4 @@ class KafkaSinkConfigurationTest { throw e } } - } \ No newline at end of file diff --git a/producer/src/main/kotlin/streams/kafka/AdminService.kt b/producer/src/main/kotlin/streams/kafka/AdminService.kt new file mode 100644 index 00000000..d01f0c6a --- /dev/null +++ b/producer/src/main/kotlin/streams/kafka/AdminService.kt @@ -0,0 +1,21 @@ +package streams.kafka + +import org.neo4j.logging.Log + +interface AdminService { + + fun start() + + fun stop() + + fun isValidTopic(topic: String): Boolean + + fun getInvalidTopics(): List + + companion object{ + fun getInstance(props: KafkaConfiguration, allTopics: List, log: Log): AdminService = when (props.adminClientApiEnabled) { + true -> KafkaAdminService(props, allTopics, log) + else -> DefaultAdminService(log) + } + } +} \ No newline at end of file diff --git a/producer/src/main/kotlin/streams/kafka/DefaultAdminService.kt b/producer/src/main/kotlin/streams/kafka/DefaultAdminService.kt new file mode 100644 index 00000000..9cd85eb3 --- /dev/null +++ b/producer/src/main/kotlin/streams/kafka/DefaultAdminService.kt @@ -0,0 +1,17 @@ +package streams.kafka + +import org.neo4j.logging.Log + +class DefaultAdminService(private val log: Log) : AdminService { + + override fun start() { + log.info("No need to start the AdminService to check the topic list. We'll consider the topic's auto creation enabled") + } + + override fun stop() {} // Do nothing + + override fun isValidTopic(topic: String): Boolean = true + + override fun getInvalidTopics(): List = emptyList() + +} \ No newline at end of file diff --git a/producer/src/main/kotlin/streams/kafka/KafkaAdminService.kt b/producer/src/main/kotlin/streams/kafka/KafkaAdminService.kt index f1bad2cd..12986f0e 100644 --- a/producer/src/main/kotlin/streams/kafka/KafkaAdminService.kt +++ b/producer/src/main/kotlin/streams/kafka/KafkaAdminService.kt @@ -15,13 +15,13 @@ import streams.utils.StreamsUtils import java.util.Collections import java.util.concurrent.ConcurrentHashMap -class KafkaAdminService(private val props: KafkaConfiguration, private val allTopics: List, private val log: Log) { +class KafkaAdminService(private val props: KafkaConfiguration, private val allTopics: List, private val log: Log) : AdminService { private val client = AdminClient.create(props.asProperties()) private val kafkaTopics: MutableSet = Collections.newSetFromMap(ConcurrentHashMap()) private val isAutoCreateTopicsEnabled = KafkaValidationUtils.isAutoCreateTopicsEnabled(client) private lateinit var job: Job - fun start() { + override fun start() { if (!isAutoCreateTopicsEnabled) { job = GlobalScope.launch(Dispatchers.IO) { while (isActive) { @@ -39,7 +39,7 @@ class KafkaAdminService(private val props: KafkaConfiguration, private val allTo } } - fun stop() { + override fun stop() { StreamsUtils.ignoreExceptions({ runBlocking { job.cancelAndJoin() @@ -47,10 +47,10 @@ class KafkaAdminService(private val props: KafkaConfiguration, private val allTo }, UninitializedPropertyAccessException::class.java) } - fun isValidTopic(topic: String) = when (isAutoCreateTopicsEnabled) { + override fun isValidTopic(topic: String) = when (isAutoCreateTopicsEnabled) { true -> true else -> kafkaTopics.contains(topic) } - fun getInvalidTopics() = KafkaValidationUtils.getInvalidTopics(client, allTopics) + override fun getInvalidTopics() = KafkaValidationUtils.getInvalidTopics(client, allTopics) } \ No newline at end of file diff --git a/producer/src/main/kotlin/streams/kafka/KafkaConfiguration.kt b/producer/src/main/kotlin/streams/kafka/KafkaConfiguration.kt index 682bbb23..dad50d9c 100644 --- a/producer/src/main/kotlin/streams/kafka/KafkaConfiguration.kt +++ b/producer/src/main/kotlin/streams/kafka/KafkaConfiguration.kt @@ -6,6 +6,7 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.serialization.ByteArraySerializer import org.neo4j.logging.Log +import streams.config.StreamsConfig import streams.extensions.getInt import streams.extensions.toPointCase import streams.utils.JSONUtils @@ -30,7 +31,8 @@ data class KafkaConfiguration(val bootstrapServers: String = "localhost:9092", val lingerMs: Int = 1, val topicDiscoveryPollingInterval: Long = TimeUnit.MINUTES.toMillis(5), val streamsLogCompactionStrategy: String = LogStrategy.delete.toString(), - val extraProperties: Map = emptyMap()) { + val extraProperties: Map = emptyMap(), + val adminClientApiEnabled: Boolean = StreamsConfig.KAFKA_ADMIN_CLIENT_API_ENABLED_VALUE) { companion object { // Visible for testing @@ -56,7 +58,11 @@ data class KafkaConfiguration(val bootstrapServers: String = "localhost:9092", topicDiscoveryPollingInterval = config.getOrDefault("topic.discovery.polling.interval", default.topicDiscoveryPollingInterval).toString().toLong(), streamsLogCompactionStrategy = config.getOrDefault("streams.log.compaction.strategy", default.streamsLogCompactionStrategy), - extraProperties = extraProperties // for what we don't provide a default configuration + extraProperties = extraProperties, + adminClientApiEnabled = config.getOrDefault("admin.client.api.enabled", + default.adminClientApiEnabled) + .toString() + .toBoolean() // for what we don't provide a default configuration ) } diff --git a/producer/src/main/kotlin/streams/kafka/KafkaEventRouter.kt b/producer/src/main/kotlin/streams/kafka/KafkaEventRouter.kt index 5ddd55f1..178ce30b 100644 --- a/producer/src/main/kotlin/streams/kafka/KafkaEventRouter.kt +++ b/producer/src/main/kotlin/streams/kafka/KafkaEventRouter.kt @@ -11,17 +11,13 @@ import org.apache.kafka.common.errors.OutOfOrderSequenceException import org.apache.kafka.common.errors.ProducerFencedException import org.neo4j.graphdb.GraphDatabaseService import org.neo4j.logging.Log -import org.neo4j.logging.internal.LogService -import streams.StreamsEventRouterConfiguration -import streams.asSourceRecordKey -import streams.asSourceRecordValue -import streams.toMap import streams.StreamsEventRouter -import streams.config.StreamsConfig +import streams.StreamsEventRouterConfiguration import streams.events.StreamsEvent import streams.events.StreamsPluginStatus import streams.events.StreamsTransactionEvent import streams.extensions.isDefaultDb +import streams.toMap import streams.utils.JSONUtils import streams.utils.KafkaValidationUtils.getInvalidTopicsError import streams.utils.StreamsUtils @@ -29,9 +25,9 @@ import java.util.Properties import java.util.UUID -class KafkaEventRouter(private val config: Map, - private val db: GraphDatabaseService, - private val log: Log): StreamsEventRouter(config, db, log) { +abstract class KafkaEventRouter(private val config: Map, + private val db: GraphDatabaseService, + private val log: Log): StreamsEventRouter(config, db, log) { override val eventRouterConfiguration: StreamsEventRouterConfiguration = StreamsEventRouterConfiguration .from(config, db.databaseName(), db.isDefaultDb(), log) @@ -41,7 +37,7 @@ class KafkaEventRouter(private val config: Map, private var producer: Neo4jKafkaProducer? = null private val kafkaConfig by lazy { KafkaConfiguration.from(config, log) } - private val kafkaAdminService by lazy { KafkaAdminService(kafkaConfig, eventRouterConfiguration.allTopics(), log) } + private val kafkaAdminService by lazy { AdminService.getInstance(kafkaConfig, eventRouterConfiguration.allTopics(), log) } override fun printInvalidTopics() { val invalidTopics = kafkaAdminService.getInvalidTopics() @@ -56,6 +52,10 @@ class KafkaEventRouter(private val config: Map, } override fun start() = runBlocking { + + //val adminServiceFactory = AdminServiceFactory(kafkaConfig, eventRouterConfiguration.allTopics(), log) + //kafkaAdminService = adminServiceFactory.getAdminService(kafkaConfig.adminClientApiEnabled)!! + mutex.withLock(producer) { if (status(producer) == StreamsPluginStatus.RUNNING) { return@runBlocking diff --git a/producer/src/test/kotlin/streams/kafka/KafkaConfigurationTest.kt b/producer/src/test/kotlin/streams/kafka/KafkaConfigurationTest.kt index 20773e39..a95130f4 100644 --- a/producer/src/test/kotlin/streams/kafka/KafkaConfigurationTest.kt +++ b/producer/src/test/kotlin/streams/kafka/KafkaConfigurationTest.kt @@ -22,7 +22,8 @@ class KafkaConfigurationTest { "kafka.linger.ms" to 10, "kafka.fetch.min.bytes" to 1234, "kafka.topic.discovery.polling.interval" to 0L, - "kafka.streams.log.compaction.strategy" to "delete") + "kafka.streams.log.compaction.strategy" to "delete", + "kafka.admin.client.api.enabled" to false) val kafkaConfig = KafkaConfiguration.create(map.mapValues { it.value.toString() }) @@ -46,5 +47,6 @@ class KafkaConfigurationTest { assertEquals(map["kafka.fetch.min.bytes"].toString(), properties["fetch.min.bytes"]) assertEquals(map["kafka.topic.discovery.polling.interval"], properties["topic.discovery.polling.interval"]) assertEquals(map["kafka.streams.log.compaction.strategy"], properties["streams.log.compaction.strategy"]) + assertEquals(map["kafka.admin.client.api.enabled"], properties["admin.client.api.enabled"]) } } \ No newline at end of file