Skip to content

Commit

Permalink
Use new topic for brukernotifikasjoner (#470)
Browse files Browse the repository at this point in the history
* Use new topic for brukernotifikasjoner
  • Loading branch information
AudunSorheim authored Apr 10, 2024
1 parent 3b6921c commit 6ac1f51
Show file tree
Hide file tree
Showing 21 changed files with 279 additions and 330 deletions.
10 changes: 3 additions & 7 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ val vaultJdbcVersion = "1.3.9"
val jacksonVersion = "2.17.0"
val postgresEmbeddedVersion = "1.0.3"
val kafkaVersion = "3.6.0"
val avroVersion = "1.11.3"
val confluentVersion = "7.6.0"
val brukernotifikasjonerSchemaVersion = "2.5.1"
val brukernotifikasjonerBuilderVersion = "1.0.2"
val kotlinVersion = "1.9.23"

val githubUser: String by project
Expand All @@ -49,8 +47,8 @@ allOpen {

repositories {
mavenCentral()
maven(url = "https://github-package-registry-mirror.gc.nav.no/cached/maven-release")
maven(url = "https://jitpack.io")
maven(url = "https://packages.confluent.io/maven/")
maven(url = "https://repo.adeo.no/repository/maven-releases/")
maven(url = "https://github.com/navikt/vault-jdbc")
}
Expand Down Expand Up @@ -118,9 +116,7 @@ dependencies {
implementation("org.apache.kafka:kafka_2.13:$kafkaVersion") {
exclude(group = "log4j")
}
implementation("io.confluent:kafka-avro-serializer:$confluentVersion")
implementation("org.apache.avro:avro:$avroVersion")
implementation("com.github.navikt:brukernotifikasjon-schemas:$brukernotifikasjonerSchemaVersion")
implementation("no.nav.tms.varsel:kotlin-builder:$brukernotifikasjonerBuilderVersion")

// Test
testImplementation(kotlin("test"))
Expand Down
95 changes: 28 additions & 67 deletions src/main/kotlin/no/nav/syfo/kafka/common/KafkaCommon.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,30 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig.USER_INFO_CONFIG
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig
import no.nav.syfo.ApplicationState
import no.nav.syfo.Environment
import org.apache.kafka.clients.CommonClientConfigs.GROUP_ID_CONFIG
import org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG
import org.apache.kafka.clients.consumer.ConsumerConfig.*
import org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
import org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
import org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
import org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
import org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG
import org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
import org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG
import org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM
import org.apache.kafka.common.config.SslConfigs.*
import org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
import org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG
import org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
import org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_CONFIG
import org.apache.kafka.common.config.SslConfigs.SSL_KEY_PASSWORD_CONFIG
import org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG
import org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG
import org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG
import java.time.Duration
import java.util.*

const val topicBrukernotifikasjonBeskjed = "min-side.aapen-brukernotifikasjon-beskjed-v1"
const val topicBrukernotifikasjonOppgave = "min-side.aapen-brukernotifikasjon-oppgave-v1"
const val topicBrukernotifikasjonDone = "min-side.aapen-brukernotifikasjon-done-v1"
const val topicDineSykmeldteHendelse = "teamsykmelding.dinesykmeldte-hendelser-v2"
const val topicVarselBus = "team-esyfo.varselbus"
const val topicSykepengedagerInfotrygd = "aap.sykepengedager.infotrygd.v1"
Expand All @@ -37,90 +40,48 @@ const val topicTestdataReset = "teamsykefravr.testdata-reset"
const val JAVA_KEYSTORE = "JKS"
const val PKCS12 = "PKCS12"
const val SSL = "SSL"
const val USER_INFO = "USER_INFO"

val pollDurationInMillis = Duration.ofMillis(1000L)

interface KafkaListener {
suspend fun listen(applicationState: ApplicationState)
}

fun aivenConsumerProperties(env: Environment): Properties {
val sslConfig = env.kafkaEnv.sslConfig

return consumerProperties(env).apply {
fun commonProperties(env: Environment): Properties {
return Properties().apply {
put(SECURITY_PROTOCOL_CONFIG, SSL)
put(BOOTSTRAP_SERVERS_CONFIG, env.kafkaEnv.aivenBroker)
put(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "") // Disable server host name verification
put(SSL_TRUSTSTORE_TYPE_CONFIG, JAVA_KEYSTORE)
put(SSL_KEYSTORE_TYPE_CONFIG, PKCS12)
put(SSL_TRUSTSTORE_LOCATION_CONFIG, sslConfig.truststoreLocation)
put(SSL_TRUSTSTORE_PASSWORD_CONFIG, sslConfig.credstorePassword)
put(SSL_KEYSTORE_LOCATION_CONFIG, sslConfig.keystoreLocation)
put(SSL_KEYSTORE_PASSWORD_CONFIG, sslConfig.credstorePassword)
put(SSL_KEY_PASSWORD_CONFIG, sslConfig.credstorePassword)
put(BOOTSTRAP_SERVERS_CONFIG, env.kafkaEnv.aivenBroker)
remove(SASL_MECHANISM)
remove(SASL_JAAS_CONFIG)
remove(SASL_MECHANISM)
put(SSL_TRUSTSTORE_LOCATION_CONFIG, env.kafkaEnv.sslConfig.truststoreLocation)
put(SSL_TRUSTSTORE_PASSWORD_CONFIG, env.kafkaEnv.sslConfig.credstorePassword)
put(SSL_KEYSTORE_LOCATION_CONFIG, env.kafkaEnv.sslConfig.keystoreLocation)
put(SSL_KEYSTORE_PASSWORD_CONFIG, env.kafkaEnv.sslConfig.credstorePassword)
put(SSL_KEY_PASSWORD_CONFIG, env.kafkaEnv.sslConfig.credstorePassword)
if (!env.appEnv.remote) {
remove(SECURITY_PROTOCOL_CONFIG)
}
}
}

fun consumerProperties(env: Environment): Properties {
val properties = HashMap<String, String>().apply {
return commonProperties(env).apply {
put(GROUP_ID_CONFIG, "esyfovarsel-group-v04-gcp-v03")
put(AUTO_OFFSET_RESET_CONFIG, "earliest")
put(MAX_POLL_RECORDS_CONFIG, "1")
put(ENABLE_AUTO_COMMIT_CONFIG, "false")
put(SECURITY_PROTOCOL_CONFIG, "SASL_SSL")
put(SASL_MECHANISM, "PLAIN")
put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
put(
SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${env.authEnv.serviceuserUsername}\" password=\"${env.authEnv.serviceuserPassword}\";"
)
put(BOOTSTRAP_SERVERS_CONFIG, env.kafkaEnv.bootstrapServersUrl)
}.toProperties()
if (!env.appEnv.remote) {
properties.remove(SECURITY_PROTOCOL_CONFIG)
properties.remove(SASL_MECHANISM)
}
return properties
}

fun producerProperties(env: Environment): Properties {
val sslConfig = env.kafkaEnv.sslConfig
val schemaRegistryConfig = env.kafkaEnv.schemaRegistry
val userinfoConfig = "${schemaRegistryConfig.username}:${schemaRegistryConfig.password}"

val properties = HashMap<String, String>().apply {
return commonProperties(env).apply {
put(ACKS_CONFIG, "all")
put(SECURITY_PROTOCOL_CONFIG, SSL)
put(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "") // Disable server host name verification
put(SSL_TRUSTSTORE_TYPE_CONFIG, JAVA_KEYSTORE)
put(SSL_KEYSTORE_TYPE_CONFIG, PKCS12)
put(SSL_TRUSTSTORE_LOCATION_CONFIG, sslConfig.truststoreLocation)
put(SSL_TRUSTSTORE_PASSWORD_CONFIG, sslConfig.credstorePassword)
put(SSL_KEYSTORE_LOCATION_CONFIG, sslConfig.keystoreLocation)
put(SSL_KEYSTORE_PASSWORD_CONFIG, sslConfig.credstorePassword)
put(SSL_KEY_PASSWORD_CONFIG, sslConfig.credstorePassword)
put(BOOTSTRAP_SERVERS_CONFIG, env.kafkaEnv.aivenBroker)

put(KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer")
put(VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer")
put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryConfig.url)
put(BASIC_AUTH_CREDENTIALS_SOURCE, USER_INFO)
put(USER_INFO_CONFIG, userinfoConfig)
put(BOOTSTRAP_SERVERS_CONFIG, env.kafkaEnv.aivenBroker)

remove(SASL_MECHANISM)
remove(SASL_JAAS_CONFIG)
remove(SASL_MECHANISM)
}.toProperties()
if (!env.appEnv.remote) {
properties.remove(SECURITY_PROTOCOL_CONFIG)
put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
put(VALUE_SERIALIZER_CLASS_CONFIG, JacksonKafkaSerializer::class.java)
}
return properties
}

fun createObjectMapper() = ObjectMapper().apply {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class InfotrygdKafkaConsumer(
}

fun infotrygdConsumerProperties(env: Environment): Properties {
val commonConsumerProperties = aivenConsumerProperties(env)
val commonConsumerProperties = consumerProperties(env)
return commonConsumerProperties.apply {
remove(CommonClientConfigs.GROUP_ID_CONFIG)
put(CommonClientConfigs.GROUP_ID_CONFIG, "esyfovarsel-group-infotrygd-01")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class TestdataResetConsumer(
}

fun testdataResetProperties(env: Environment): Properties {
val commonConsumerProperties = aivenConsumerProperties(env)
val commonConsumerProperties = consumerProperties(env)
return commonConsumerProperties.apply {
remove(CommonClientConfigs.GROUP_ID_CONFIG)
put(CommonClientConfigs.GROUP_ID_CONFIG, "esyfovarsel-group-testdata-reset-01")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class UtbetalingKafkaConsumer(
}

private fun utbetalingSpleisConsumerProperties(env: Environment): Properties {
val commonConsumerProperties = aivenConsumerProperties(env)
val commonConsumerProperties = consumerProperties(env)
return commonConsumerProperties.apply {
remove(GROUP_ID_CONFIG)
put(GROUP_ID_CONFIG, "esyfovarsel-group-utbetaling-spleis-01")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class VarselBusKafkaConsumer(
private val objectMapper = createObjectMapper()

init {
val kafkaConfig = aivenConsumerProperties(env)
val kafkaConfig = consumerProperties(env)
kafkaListener = KafkaConsumer(kafkaConfig)
kafkaListener.subscribe(listOf(topicVarselBus))
}
Expand Down
Loading

0 comments on commit 6ac1f51

Please sign in to comment.