Skip to content

Commit

Permalink
Added new config admin.client.api.enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
mroiter-larus committed Dec 23, 2021
1 parent a6a14fe commit d056619
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 30 deletions.
5 changes: 5 additions & 0 deletions common/src/main/kotlin/streams/config/StreamsConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe
const val POLL_INTERVAL = "streams.sink.poll.interval"
const val INSTANCE_WAIT_TIMEOUT = "streams.wait.timeout"
const val INSTANCE_WAIT_TIMEOUT_VALUE = 120000L
const val KAFKA_ADMIN_CLIENT_API_ENABLED = "kafka.admin.client.api.enabled"
const val KAFKA_ADMIN_CLIENT_API_ENABLED_VALUE = true

private const val DEFAULT_TRIGGER_PERIOD: Int = 10000

Expand Down Expand Up @@ -83,6 +85,8 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe
fun getSystemDbWaitTimeout(config: Map<String, Any?>) = config.getOrDefault(SYSTEM_DB_WAIT_TIMEOUT, SYSTEM_DB_WAIT_TIMEOUT_VALUE).toString().toLong()

fun getInstanceWaitTimeout(config: Map<String, Any?>) = config.getOrDefault(INSTANCE_WAIT_TIMEOUT, INSTANCE_WAIT_TIMEOUT_VALUE).toString().toLong()

fun isKafkaAdminClientApiEnabled(config: Map<String, Any?>) = config.getOrDefault(KAFKA_ADMIN_CLIENT_API_ENABLED, KAFKA_ADMIN_CLIENT_API_ENABLED_VALUE).toString().toBoolean()
}

private val configLifecycle: ConfigurationLifecycle
Expand Down Expand Up @@ -221,4 +225,5 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe

fun getInstanceWaitTimeout() = Companion.getInstanceWaitTimeout(getConfiguration())

fun isKafkaAdminClientApiEnabled() = Companion.isKafkaAdminClientApiEnabled(getConfiguration())
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ data class KafkaSinkConfiguration(val bootstrapServers: String = "localhost:9092
val streamsSinkConfiguration: StreamsSinkConfiguration = StreamsSinkConfiguration(),
val enableAutoCommit: Boolean = true,
val streamsAsyncCommit: Boolean = false,
val extraProperties: Map<String, String> = emptyMap()) {
val extraProperties: Map<String, String> = emptyMap(),
val adminClientApiEnabled: Boolean = StreamsConfig.KAFKA_ADMIN_CLIENT_API_ENABLED_VALUE) {

companion object {

fun from(cfg: Map<String, String>, dbName: String, isDefaultDb: Boolean): KafkaSinkConfiguration {
val kafkaCfg = create(cfg, dbName, isDefaultDb)
validate(kafkaCfg)
val invalidTopics = getInvalidTopics(kafkaCfg.asProperties(), kafkaCfg.streamsSinkConfiguration.topics.allTopics())
val invalidTopics = if (kafkaCfg.adminClientApiEnabled) getInvalidTopics(kafkaCfg.asProperties(), kafkaCfg.streamsSinkConfiguration.topics.allTopics()) else emptyList()
return if (invalidTopics.isNotEmpty()) {
kafkaCfg.copy(streamsSinkConfiguration = StreamsSinkConfiguration.from(cfg, dbName, invalidTopics, isDefaultDb))
} else {
Expand All @@ -65,7 +66,6 @@ data class KafkaSinkConfiguration(val bootstrapServers: String = "localhost:9092

val streamsSinkConfiguration = StreamsSinkConfiguration.from(configMap = cfg, dbName = dbName, isDefaultDb = isDefaultDb)


return default.copy(keyDeserializer = config.getOrDefault(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, default.keyDeserializer),
valueDeserializer = config.getOrDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, default.valueDeserializer),
bootstrapServers = config.getOrDefault(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, default.bootstrapServers),
Expand All @@ -74,7 +74,8 @@ data class KafkaSinkConfiguration(val bootstrapServers: String = "localhost:9092
enableAutoCommit = config.getOrDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, default.enableAutoCommit).toString().toBoolean(),
streamsAsyncCommit = config.getOrDefault("streams.async.commit", default.streamsAsyncCommit).toString().toBoolean(),
streamsSinkConfiguration = streamsSinkConfiguration,
extraProperties = extraProperties // for what we don't provide a default configuration
extraProperties = extraProperties,
adminClientApiEnabled = config.getOrDefault("admin.client.api.enabled", default.adminClientApiEnabled).toString().toBoolean()// for what we don't provide a default configuration
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import streams.StreamsSinkConfigurationTest
import streams.config.StreamsConfig
import streams.service.TopicValidationException
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue

class KafkaSinkConfigurationTest {
Expand All @@ -37,6 +38,7 @@ class KafkaSinkConfigurationTest {
assertEquals(true, default.enableAutoCommit)
assertEquals(false, default.streamsAsyncCommit)
assertEquals(emptyMap(), default.extraProperties)
assertTrue { default.adminClientApiEnabled }
}

@Test
Expand All @@ -48,28 +50,32 @@ class KafkaSinkConfigurationTest {
val group = "foo"
val autoOffsetReset = "latest"
val autoCommit = "false"
val kafkaAdminClientApiEnabled = "false"
val config = mapOf(topicKey to topicValue,
"kafka.bootstrap.servers" to bootstrap,
"kafka.auto.offset.reset" to autoOffsetReset,
"kafka.enable.auto.commit" to autoCommit,
"kafka.group.id" to group,
"kafka.streams.async.commit" to "true",
"kafka.key.deserializer" to ByteArrayDeserializer::class.java.name,
"kafka.value.deserializer" to KafkaAvroDeserializer::class.java.name)
"kafka.value.deserializer" to KafkaAvroDeserializer::class.java.name,
"kafka.admin.client.api.enabled" to kafkaAdminClientApiEnabled)
val expectedMap = mapOf("bootstrap.servers" to bootstrap,
"auto.offset.reset" to autoOffsetReset, "enable.auto.commit" to autoCommit, "group.id" to group,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.toString(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.toString(),
"streams.async.commit" to "true",
"key.deserializer" to ByteArrayDeserializer::class.java.name,
"value.deserializer" to KafkaAvroDeserializer::class.java.name)
"value.deserializer" to KafkaAvroDeserializer::class.java.name,
"admin.client.api.enabled" to kafkaAdminClientApiEnabled)

val kafkaSinkConfiguration = KafkaSinkConfiguration.create(config, defaultDbName, isDefaultDb = true)
StreamsSinkConfigurationTest.testFromConf(kafkaSinkConfiguration.streamsSinkConfiguration, topic, topicValue)
assertEquals(emptyMap(), kafkaSinkConfiguration.extraProperties)
assertEquals(bootstrap, kafkaSinkConfiguration.bootstrapServers)
assertEquals(autoOffsetReset, kafkaSinkConfiguration.autoOffsetReset)
assertEquals(group, kafkaSinkConfiguration.groupId)
assertFalse { kafkaSinkConfiguration.adminClientApiEnabled }
val resultMap = kafkaSinkConfiguration
.asProperties()
.map { it.key.toString() to it.value.toString() }
Expand All @@ -94,6 +100,7 @@ class KafkaSinkConfigurationTest {
val autoOffsetReset = "latest"
val autoCommit = "false"
val asyncCommit = "true"
val kafkaAdminClientApiEnabled = "false"
val config = mapOf(topicKey to topicValue,
topicKeyFoo to topicValueFoo,
"kafka.bootstrap.servers" to bootstrap,
Expand All @@ -102,14 +109,16 @@ class KafkaSinkConfigurationTest {
"kafka.group.id" to group,
"kafka.streams.async.commit" to asyncCommit,
"kafka.key.deserializer" to ByteArrayDeserializer::class.java.name,
"kafka.value.deserializer" to KafkaAvroDeserializer::class.java.name)
"kafka.value.deserializer" to KafkaAvroDeserializer::class.java.name,
"kafka.admin.client.api.enabled" to kafkaAdminClientApiEnabled)
val expectedMap = mapOf("bootstrap.servers" to bootstrap,
"auto.offset.reset" to autoOffsetReset, "enable.auto.commit" to autoCommit, "group.id" to "$group-$dbName",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.toString(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.toString(),
"key.deserializer" to ByteArrayDeserializer::class.java.name,
"streams.async.commit" to asyncCommit,
"value.deserializer" to KafkaAvroDeserializer::class.java.name)
"value.deserializer" to KafkaAvroDeserializer::class.java.name,
"admin.client.api.enabled" to kafkaAdminClientApiEnabled)

val kafkaSinkConfiguration = KafkaSinkConfiguration.create(config, dbName, isDefaultDb = false)
StreamsSinkConfigurationTest.testFromConf(kafkaSinkConfiguration.streamsSinkConfiguration, topic, topicValueFoo)
Expand Down Expand Up @@ -179,5 +188,4 @@ class KafkaSinkConfigurationTest {
throw e
}
}

}
12 changes: 12 additions & 0 deletions producer/src/main/kotlin/streams/kafka/AdminService.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package streams.kafka

interface AdminService {

fun start()

fun stop()

fun isValidTopic(topic: String): Boolean

fun getInvalidTopics(): List<String>
}
13 changes: 13 additions & 0 deletions producer/src/main/kotlin/streams/kafka/AdminServiceFactory.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package streams.kafka

import org.neo4j.logging.Log

class AdminServiceFactory(private val props: KafkaConfiguration, private val allTopics: List<String>, private val log: Log) {

fun getAdminService(adminClientApiEnabled: Boolean): AdminService? {
return when(adminClientApiEnabled) {
false -> DefaultAdminService(log)
true -> KafkaAdminService(props, allTopics, log)
}
}
}
17 changes: 17 additions & 0 deletions producer/src/main/kotlin/streams/kafka/DefaultAdminService.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package streams.kafka

import org.neo4j.logging.Log

class DefaultAdminService(private val log: Log) : AdminService {

override fun start() {
log.info("No need to start the AdminService to check the topic list. We'll consider the topic's auto creation enabled")
}

override fun stop() {} // Do nothing

override fun isValidTopic(topic: String): Boolean = true

override fun getInvalidTopics(): List<String> = emptyList()

}
10 changes: 5 additions & 5 deletions producer/src/main/kotlin/streams/kafka/KafkaAdminService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import streams.utils.StreamsUtils
import java.util.Collections
import java.util.concurrent.ConcurrentHashMap

class KafkaAdminService(private val props: KafkaConfiguration, private val allTopics: List<String>, private val log: Log) {
class KafkaAdminService(private val props: KafkaConfiguration, private val allTopics: List<String>, private val log: Log) : AdminService {
private val client = AdminClient.create(props.asProperties())
private val kafkaTopics: MutableSet<String> = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
private val isAutoCreateTopicsEnabled = KafkaValidationUtils.isAutoCreateTopicsEnabled(client)
private lateinit var job: Job

fun start() {
override fun start() {
if (!isAutoCreateTopicsEnabled) {
job = GlobalScope.launch(Dispatchers.IO) {
while (isActive) {
Expand All @@ -39,18 +39,18 @@ class KafkaAdminService(private val props: KafkaConfiguration, private val allTo
}
}

fun stop() {
override fun stop() {
StreamsUtils.ignoreExceptions({
runBlocking {
job.cancelAndJoin()
}
}, UninitializedPropertyAccessException::class.java)
}

fun isValidTopic(topic: String) = when (isAutoCreateTopicsEnabled) {
override fun isValidTopic(topic: String) = when (isAutoCreateTopicsEnabled) {
true -> true
else -> kafkaTopics.contains(topic)
}

fun getInvalidTopics() = KafkaValidationUtils.getInvalidTopics(client, allTopics)
override fun getInvalidTopics() = KafkaValidationUtils.getInvalidTopics(client, allTopics)
}
10 changes: 8 additions & 2 deletions producer/src/main/kotlin/streams/kafka/KafkaConfiguration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.neo4j.logging.Log
import streams.config.StreamsConfig
import streams.extensions.getInt
import streams.extensions.toPointCase
import streams.utils.JSONUtils
Expand All @@ -30,7 +31,8 @@ data class KafkaConfiguration(val bootstrapServers: String = "localhost:9092",
val lingerMs: Int = 1,
val topicDiscoveryPollingInterval: Long = TimeUnit.MINUTES.toMillis(5),
val streamsLogCompactionStrategy: String = LogStrategy.delete.toString(),
val extraProperties: Map<String, String> = emptyMap()) {
val extraProperties: Map<String, String> = emptyMap(),
val adminClientApiEnabled: Boolean = StreamsConfig.KAFKA_ADMIN_CLIENT_API_ENABLED_VALUE) {

companion object {
// Visible for testing
Expand All @@ -56,7 +58,11 @@ data class KafkaConfiguration(val bootstrapServers: String = "localhost:9092",
topicDiscoveryPollingInterval = config.getOrDefault("topic.discovery.polling.interval",
default.topicDiscoveryPollingInterval).toString().toLong(),
streamsLogCompactionStrategy = config.getOrDefault("streams.log.compaction.strategy", default.streamsLogCompactionStrategy),
extraProperties = extraProperties // for what we don't provide a default configuration
extraProperties = extraProperties,
adminClientApiEnabled = config.getOrDefault("admin.client.api.enabled",
default.adminClientApiEnabled)
.toString()
.toBoolean() // for what we don't provide a default configuration
)
}

Expand Down
24 changes: 11 additions & 13 deletions producer/src/main/kotlin/streams/kafka/KafkaEventRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,20 @@ import org.apache.kafka.common.errors.OutOfOrderSequenceException
import org.apache.kafka.common.errors.ProducerFencedException
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.logging.Log
import org.neo4j.logging.internal.LogService
import streams.StreamsEventRouterConfiguration
import streams.asSourceRecordKey
import streams.asSourceRecordValue
import streams.toMap
import streams.StreamsEventRouter
import streams.config.StreamsConfig
import streams.*
import streams.events.StreamsEvent
import streams.events.StreamsPluginStatus
import streams.events.StreamsTransactionEvent
import streams.extensions.isDefaultDb
import streams.utils.JSONUtils
import streams.utils.KafkaValidationUtils.getInvalidTopicsError
import streams.utils.StreamsUtils
import java.util.Properties
import java.util.UUID
import java.util.*


class KafkaEventRouter(private val config: Map<String, String>,
private val db: GraphDatabaseService,
private val log: Log): StreamsEventRouter(config, db, log) {
abstract class KafkaEventRouter(private val config: Map<String, String>,
private val db: GraphDatabaseService,
private val log: Log): StreamsEventRouter(config, db, log) {

override val eventRouterConfiguration: StreamsEventRouterConfiguration = StreamsEventRouterConfiguration
.from(config, db.databaseName(), db.isDefaultDb(), log)
Expand All @@ -41,7 +34,8 @@ class KafkaEventRouter(private val config: Map<String, String>,

private var producer: Neo4jKafkaProducer<ByteArray, ByteArray>? = null
private val kafkaConfig by lazy { KafkaConfiguration.from(config, log) }
private val kafkaAdminService by lazy { KafkaAdminService(kafkaConfig, eventRouterConfiguration.allTopics(), log) }
//private val kafkaAdminService by lazy { KafkaAdminService(kafkaConfig, eventRouterConfiguration.allTopics(), log) }
private lateinit var kafkaAdminService: AdminService

override fun printInvalidTopics() {
val invalidTopics = kafkaAdminService.getInvalidTopics()
Expand All @@ -56,6 +50,10 @@ class KafkaEventRouter(private val config: Map<String, String>,
}

override fun start() = runBlocking {

val adminServiceFactory = AdminServiceFactory(kafkaConfig, eventRouterConfiguration.allTopics(), log)
kafkaAdminService = adminServiceFactory.getAdminService(kafkaConfig.adminClientApiEnabled)!!

mutex.withLock(producer) {
if (status(producer) == StreamsPluginStatus.RUNNING) {
return@runBlocking
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class KafkaConfigurationTest {
"kafka.linger.ms" to 10,
"kafka.fetch.min.bytes" to 1234,
"kafka.topic.discovery.polling.interval" to 0L,
"kafka.streams.log.compaction.strategy" to "delete")
"kafka.streams.log.compaction.strategy" to "delete",
"kafka.admin.client.api.enabled" to false)

val kafkaConfig = KafkaConfiguration.create(map.mapValues { it.value.toString() })

Expand All @@ -46,5 +47,6 @@ class KafkaConfigurationTest {
assertEquals(map["kafka.fetch.min.bytes"].toString(), properties["fetch.min.bytes"])
assertEquals(map["kafka.topic.discovery.polling.interval"], properties["topic.discovery.polling.interval"])
assertEquals(map["kafka.streams.log.compaction.strategy"], properties["streams.log.compaction.strategy"])
assertEquals(map["kafka.admin.client.api.enabled"], properties["admin.client.api.enabled"])
}
}

0 comments on commit d056619

Please sign in to comment.