Skip to content

Commit

Permalink
config overridable
Browse files Browse the repository at this point in the history
  • Loading branch information
rtc11 committed Mar 13, 2024
1 parent f4c6127 commit cf3685f
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 20 deletions.
10 changes: 10 additions & 0 deletions kafka-2/main/no/nav/aap/kafka/streams/v2/config/SslConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,14 @@ data class SslConfig(
this[SslConfigs.SSL_KEY_PASSWORD_CONFIG] = credstorePsw
this[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = ""
}

companion object {
val DEFAULT: SslConfig by lazy {
SslConfig(
truststorePath = System.getenv("KAFKA_TRUSTSTORE_PATH"),
keystorePath = System.getenv("KAFKA_KEYSTORE_PATH"),
credstorePsw = System.getenv("KAFKA_CREDSTORE_PASSWORD")
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,14 @@ data class SchemaRegistryConfig(
this[SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE] = "USER_INFO"
this[SchemaRegistryClientConfig.USER_INFO_CONFIG] = "$user:$password"
}

companion object {
val DEFAULT: SchemaRegistryConfig by lazy {
SchemaRegistryConfig(
url = System.getenv("KAFKA_SCHEMA_REGISTRY"),
user = System.getenv("KAFKA_SCHEMA_REGISTRY_USER"),
password = System.getenv("KAFKA_SCHEMA_REGISTRY_PASSWORD"),
)
}
}
}
27 changes: 7 additions & 20 deletions kafka-avroserde/main/no/nav/aap/kafka/serde/avro/AvroSerde.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,15 @@ import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
import no.nav.aap.kafka.schemaregistry.SchemaRegistryConfig
import no.nav.aap.kafka.streams.v2.config.SslConfig
import org.apache.avro.specific.SpecificRecord
import java.util.*

object AvroSerde {
fun <T : SpecificRecord> specific(): SpecificAvroSerde<T> = SpecificAvroSerde<T>().apply {

// configureres før appen starter og kan ikke hentes fra KStreamsConfig eller KafkaConfig.
val schemaRegistry: Properties = SchemaRegistryConfig(
url = System.getenv("KAFKA_SCHEMA_REGISTRY"),
user = System.getenv("KAFKA_SCHEMA_REGISTRY_USER"),
password = System.getenv("KAFKA_SCHEMA_REGISTRY_PASSWORD"),
).properties()

// configureres før appen starter og kan ikke hentes fra KStreamsConfig eller KafkaConfig.
val ssl: Properties = SslConfig(
truststorePath = System.getenv("KAFKA_TRUSTSTORE_PATH"),
keystorePath = System.getenv("KAFKA_KEYSTORE_PATH"),
credstorePsw = System.getenv("KAFKA_CREDSTORE_PASSWORD")
).properties()

val avroProperties = schemaRegistry + ssl
val avroConfig = avroProperties.map { it.key.toString() to it.value.toString() }
configure(avroConfig.toMap(), false)
fun <T : SpecificRecord> specific(
schema: SchemaRegistryConfig = SchemaRegistryConfig.DEFAULT,
ssl: SslConfig = SslConfig.DEFAULT,
): SpecificAvroSerde<T> = SpecificAvroSerde<T>().apply {
val properties = schema.properties() + ssl.properties()
val serdeConfig = properties.mapKeys { it.key.toString() }
configure(serdeConfig, false)
}

fun generic() = GenericAvroSerde()
Expand Down

0 comments on commit cf3685f

Please sign in to comment.