Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added new config admin.client.api.enabled #1

Open
wants to merge 1 commit into
base: 4.1
Choose a base branch
from

Conversation

mroiter-larus
Copy link
Member

Added new config kafka.admin.client.api.enabled, which is true by default, in order to disable the Kafka AdminClient API usage.

@mroiter-larus mroiter-larus force-pushed the isp_change_request branch 3 times, most recently from 171634e to d056619 Compare December 23, 2021 18:09
import streams.toMap
import streams.StreamsEventRouter
import streams.config.StreamsConfig
import streams.*

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no rimettilo come prima

import streams.events.StreamsEvent
import streams.events.StreamsPluginStatus
import streams.events.StreamsTransactionEvent
import streams.extensions.isDefaultDb
import streams.utils.JSONUtils
import streams.utils.KafkaValidationUtils.getInvalidTopicsError
import streams.utils.StreamsUtils
import java.util.Properties
import java.util.UUID
import java.util.*

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no rimettilo come prima

Comment on lines 54 to 55
val adminServiceFactory = AdminServiceFactory(kafkaConfig, eventRouterConfiguration.allTopics(), log)
kafkaAdminService = adminServiceFactory.getAdminService(kafkaConfig.adminClientApiEnabled)!!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no rimuovi

Comment on lines 5 to 13
class AdminServiceFactory(private val props: KafkaConfiguration, private val allTopics: List<String>, private val log: Log) {

fun getAdminService(adminClientApiEnabled: Boolean): AdminService? {
return when(adminClientApiEnabled) {
false -> DefaultAdminService(log)
true -> KafkaAdminService(props, allTopics, log)
}
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no immaginavo qualcosa del tipo

Suggested change
class AdminServiceFactory(private val props: KafkaConfiguration, private val allTopics: List<String>, private val log: Log) {
fun getAdminService(adminClientApiEnabled: Boolean): AdminService? {
return when(adminClientApiEnabled) {
false -> DefaultAdminService(log)
true -> KafkaAdminService(props, allTopics, log)
}
}
}
object AdminServiceFactory {
fun <AdminService> getInstance(props: KafkaConfiguration, allTopics: List<String>, log: Log): AdminService = when (props.adminClientApiEnabled) {
true -> KafkaAdminService(props, allTopics, log)
else -> DefaultAdminService(log)
}
}

@@ -41,7 +34,8 @@ class KafkaEventRouter(private val config: Map<String, String>,

private var producer: Neo4jKafkaProducer<ByteArray, ByteArray>? = null
private val kafkaConfig by lazy { KafkaConfiguration.from(config, log) }
private val kafkaAdminService by lazy { KafkaAdminService(kafkaConfig, eventRouterConfiguration.allTopics(), log) }
//private val kafkaAdminService by lazy { KafkaAdminService(kafkaConfig, eventRouterConfiguration.allTopics(), log) }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//private val kafkaAdminService by lazy { KafkaAdminService(kafkaConfig, eventRouterConfiguration.allTopics(), log) }
private val kafkaAdminService by lazy {
AdminServiceFactory.getInstance(kafkaConfig, eventRouterConfiguration.allTopics(), log)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants