Skip to content

Commit

Permalink
move and create defaults for configs
Browse files Browse the repository at this point in the history
  • Loading branch information
rtc11 committed Mar 13, 2024
1 parent cf3685f commit abda0ef
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package no.nav.aap.kafka.streams.v2.config

import java.util.*

private fun getEnvVar(envar: String) = System.getenv(envar) ?: error("missing envvar $envar")

data class SchemaRegistryConfig(
private val url: String = getEnvVar("KAFKA_SCHEMA_REGISTRY"),
private val user: String = getEnvVar("KAFKA_SCHEMA_REGISTRY_USER"),
private val password: String = getEnvVar("KAFKA_SCHEMA_REGISTRY_PASSWORD"),
) {
fun properties() = Properties().apply {
this["schema.registry.url"] = url
this["basic.auth.credentials.source"] = "USER_INFO"
this["basic.auth.user.info"] = "$user:$password"
}
}
18 changes: 5 additions & 13 deletions kafka-2/main/no/nav/aap/kafka/streams/v2/config/SslConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.SslConfigs
import java.util.*

private fun getEnvVar(envar: String) = System.getenv(envar) ?: error("missing envvar $envar")

data class SslConfig(
private val truststorePath: String,
private val keystorePath: String,
private val credstorePsw: String,
private val truststorePath: String = getEnvVar("KAFKA_TRUSTSTORE_PATH"),
private val keystorePath: String = getEnvVar("KAFKA_KEYSTORE_PATH"),
private val credstorePsw: String = getEnvVar("KAFKA_CREDSTORE_PASSWORD"),
) {
fun properties() = Properties().apply {
this[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SSL"
Expand All @@ -20,14 +22,4 @@ data class SslConfig(
this[SslConfigs.SSL_KEY_PASSWORD_CONFIG] = credstorePsw
this[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = ""
}

companion object {
val DEFAULT: SslConfig by lazy {
SslConfig(
truststorePath = System.getenv("KAFKA_TRUSTSTORE_PATH"),
keystorePath = System.getenv("KAFKA_KEYSTORE_PATH"),
credstorePsw = System.getenv("KAFKA_CREDSTORE_PASSWORD")
)
}
}
}
16 changes: 9 additions & 7 deletions kafka-2/main/no/nav/aap/kafka/streams/v2/config/StreamsConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.streams.StreamsConfig
import java.util.*

private fun getEnvVar(envar: String) = System.getenv(envar) ?: error("missing envvar $envar")

data class StreamsConfig(
internal val applicationId: String,
internal val brokers: String,
internal val compressionType: String = "snappy",
internal val ssl: SslConfig? = null,
internal val schemaRegistry: Properties = Properties(),
internal val additionalProperties: Properties = Properties(),
val applicationId: String = getEnvVar("KAFKA_STREAMS_APPLICATION_ID"),
val brokers: String = getEnvVar("KAFKA_BROKERS"),
val ssl: SslConfig? = SslConfig(),
val schemaRegistry: SchemaRegistryConfig = SchemaRegistryConfig(),
val compressionType: String = "snappy",
val additionalProperties: Properties = Properties(),
) {
fun streamsProperties(): Properties = Properties().apply {
this[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId
this[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = brokers

ssl?.let { putAll(it.properties()) }
putAll(schemaRegistry)
putAll(schemaRegistry.properties())
putAll(additionalProperties)

/* Exception handler when leaving the stream, e.g. serialization */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ class ConsumerConfig private constructor(

constructor(streamsConfig: StreamsConfig) : this(
brokers = streamsConfig.brokers,

ssl = streamsConfig.ssl,
schemaRegistry = streamsConfig.schemaRegistry,
schemaRegistry = streamsConfig.schemaRegistry.properties(),
)

internal fun toProperties(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ProducerConfig private constructor(
constructor(streamsConfig: StreamsConfig) : this(
brokers = streamsConfig.brokers,
ssl = streamsConfig.ssl,
schemaRegistry = streamsConfig.schemaRegistry,
schemaRegistry = streamsConfig.schemaRegistry.properties(),
compressionType = streamsConfig.compressionType
)

Expand Down
3 changes: 2 additions & 1 deletion kafka-2/test/no/nav/aap/kafka/streams/v2/TestUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import no.nav.aap.kafka.serde.json.Migratable
import no.nav.aap.kafka.streams.concurrency.Bufferable
import no.nav.aap.kafka.streams.v2.config.SchemaRegistryConfig
import no.nav.aap.kafka.streams.v2.config.StreamsConfig
import no.nav.aap.kafka.streams.v2.processor.Processor
import no.nav.aap.kafka.streams.v2.processor.ProcessorMetadata
Expand Down Expand Up @@ -60,7 +61,7 @@ internal class StreamsMock : Streams {
StreamsMock().apply {
connect(
topology = Topology().apply(topology),
config = StreamsConfig("", ""),
config = StreamsConfig("", "", null, SchemaRegistryConfig("", "", "")),
registry = SimpleMeterRegistry()
)
}
Expand Down

This file was deleted.

6 changes: 3 additions & 3 deletions kafka-avroserde/main/no/nav/aap/kafka/serde/avro/AvroSerde.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package no.nav.aap.kafka.serde.avro

import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
import no.nav.aap.kafka.schemaregistry.SchemaRegistryConfig
import no.nav.aap.kafka.streams.v2.config.SchemaRegistryConfig
import no.nav.aap.kafka.streams.v2.config.SslConfig
import org.apache.avro.specific.SpecificRecord

object AvroSerde {
fun <T : SpecificRecord> specific(
schema: SchemaRegistryConfig = SchemaRegistryConfig.DEFAULT,
ssl: SslConfig = SslConfig.DEFAULT,
schema: SchemaRegistryConfig = SchemaRegistryConfig(),
ssl: SslConfig = SslConfig(),
): SpecificAvroSerde<T> = SpecificAvroSerde<T>().apply {
val properties = schema.properties() + ssl.properties()
val serdeConfig = properties.mapKeys { it.key.toString() }
Expand Down
20 changes: 2 additions & 18 deletions kafka-avroserde/test/no/nav/aap/kafka/ConfigTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,20 @@ package no.nav.aap.kafka

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig
import no.nav.aap.kafka.schemaregistry.SchemaRegistryConfig
import no.nav.aap.kafka.streams.v2.config.SchemaRegistryConfig
import no.nav.aap.kafka.streams.v2.config.SslConfig
import no.nav.aap.kafka.streams.v2.config.StreamsConfig
import org.junit.jupiter.api.Test
import kotlin.test.assertEquals
import kotlin.test.assertNull

internal class ConfigTest {

@Test
fun `schemaRegistry is empty without schema registry url`() {
val config = StreamsConfig(
applicationId = "app",
brokers = "localhost:9092",
ssl = SslConfig("", "", "")
)

config.streamsProperties().apply {
assertNull(this[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG])
assertNull(this[SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE])
assertNull(this[SchemaRegistryClientConfig.USER_INFO_CONFIG])
}
}

@Test
fun `schema registry config is configured when present`() {
val config = StreamsConfig(
applicationId = "app",
brokers = "localhost:9092",
schemaRegistry = schemaConfig.properties(),
schemaRegistry = schemaConfig,
ssl = SslConfig("", "", "")
)

Expand Down
2 changes: 2 additions & 0 deletions kafka-test-2/test/no/nav/aap/kafka/streams/v2/test/Test.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package no.nav.aap.kafka.streams.v2.test
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import no.nav.aap.kafka.streams.v2.Table
import no.nav.aap.kafka.streams.v2.Topic
import no.nav.aap.kafka.streams.v2.config.SchemaRegistryConfig
import no.nav.aap.kafka.streams.v2.config.SslConfig
import no.nav.aap.kafka.streams.v2.config.StreamsConfig
import no.nav.aap.kafka.streams.v2.serde.StringSerde
Expand All @@ -28,6 +29,7 @@ internal class Test {
applicationId = "app",
brokers = "mock://kafka",
ssl = SslConfig("", "", ""),
schemaRegistry = SchemaRegistryConfig("", "", ""),
)

kafka.connect(topology, config, registry)
Expand Down

0 comments on commit abda0ef

Please sign in to comment.