Skip to content

Commit

Permalink
Endret impl av Kafka plugin i bekreftelse-api
Browse files Browse the repository at this point in the history
  • Loading branch information
naviktthomas committed Dec 4, 2024
1 parent 7396be8 commit 84d71db
Show file tree
Hide file tree
Showing 20 changed files with 209 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fun main() {
with(applicationContext.serverConfig) {
logger.info("Starter $appName med hostname $host og port $port")

embeddedServer(Netty, port = port) {
embeddedServer(factory = Netty, port = port) {
module(applicationContext)
}.apply {
addShutdownHook {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ class KafkaConsumerExceptionHandler(
errorLogger.error("Kafka Consumer opplevde en uhåndterbar feil", throwable)
livenessIndicator.setUnhealthy()
readinessIndicator.setUnhealthy()
throw throwable
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package no.nav.paw.bekreftelse.api.listener

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition

class NoopConsumerRebalanceListener : ConsumerRebalanceListener {
override fun onPartitionsRevoked(partitions: MutableCollection<TopicPartition>?) {}
override fun onPartitionsAssigned(partitions: MutableCollection<TopicPartition>?) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ package no.nav.paw.bekreftelse.api.plugins
import io.ktor.server.application.Application
import io.ktor.server.application.install
import no.nav.paw.bekreftelse.api.context.ApplicationContext
import no.nav.paw.bekreftelse.api.plugins.custom.KafkaConsumerPlugin
import no.nav.paw.bekreftelse.api.plugins.custom.KafkaProducerPlugin
import no.nav.paw.bekreftelse.api.plugins.custom.kafkaConsumerPlugin
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse

fun Application.configureKafka(applicationContext: ApplicationContext) {
with(applicationContext) {
install(KafkaProducerPlugin) {
kafkaProducers = listOf(bekreftelseKafkaProducer)
}
install(KafkaConsumerPlugin) {
consumeFunction = bekreftelseService::processBekreftelseHendelse
errorFunction = kafkaConsumerExceptionHandler::handleException
consumer = bekreftelseKafkaConsumer
topic = applicationConfig.kafkaTopology.bekreftelseHendelsesloggTopic
install(kafkaConsumerPlugin<Long, BekreftelseHendelse>()) {
this.consumeFunction = bekreftelseService::processBekreftelseHendelser
this.errorFunction = kafkaConsumerExceptionHandler::handleException
this.kafkaConsumer = bekreftelseKafkaConsumer
this.kafkaTopics = listOf(applicationConfig.kafkaTopology.bekreftelseHendelsesloggTopic)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,77 +12,91 @@ import io.ktor.util.KtorDsl
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import no.nav.paw.bekreftelse.api.listener.NoopConsumerRebalanceListener
import no.nav.paw.bekreftelse.api.utils.buildApplicationLogger
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean

private val logger = buildApplicationLogger
val KafkaConsumerReady: EventDefinition<Application> = EventDefinition()

@KtorDsl
class KafkaConsumerPluginConfig {
var consumeFunction: ((ConsumerRecord<Long, BekreftelseHendelse>) -> Unit)? = null
class KafkaConsumerPluginConfig<K, V, R> {
var consumeFunction: ((ConsumerRecords<K, V>) -> Unit)? = null
var successFunction: ((ConsumerRecords<K, V>) -> Unit)? = null
var errorFunction: ((throwable: Throwable) -> Unit)? = null
var consumer: KafkaConsumer<Long, BekreftelseHendelse>? = null
var topic: String? = null
var pollTimeout: Duration = Duration.ofMillis(100)
var shutDownTimeout: Duration = Duration.ofMillis(500)
var kafkaConsumer: KafkaConsumer<K, V>? = null
var kafkaTopics: Collection<String>? = null
var pollTimeout: Duration? = null
var closeTimeout: Duration? = null
var rebalanceListener: ConsumerRebalanceListener? = null
val pollingFlag = AtomicBoolean(true)
val shutdownFlag = AtomicBoolean(false)

companion object {
const val PLUGIN_NAME = "KafkaConsumerPlugin"
}
}

val KafkaConsumerPlugin: ApplicationPlugin<KafkaConsumerPluginConfig> =
private fun <K, V> KafkaConsumer<K, V>.defaultSuccessFunction(records: ConsumerRecords<K, V>) {
if (!records.isEmpty) {
logger.debug("Kafka Consumer success. {} records processed", records.count())
this.commitSync()
}
}

private fun defaultErrorFunction(throwable: Throwable) {
logger.error("Kafka Consumer failed", throwable)
throw throwable
}

fun <K, V> kafkaConsumerPlugin(): ApplicationPlugin<KafkaConsumerPluginConfig<K, V, Unit>> =
createApplicationPlugin(KafkaConsumerPluginConfig.PLUGIN_NAME, ::KafkaConsumerPluginConfig) {
application.log.info("Oppretter {}", KafkaConsumerPluginConfig.PLUGIN_NAME)
val kafkaTopics = requireNotNull(pluginConfig.kafkaTopics) { "KafkaTopics er null" }
val kafkaConsumer = requireNotNull(pluginConfig.kafkaConsumer) { "KafkaConsumer er null" }
val consumeFunction = requireNotNull(pluginConfig.consumeFunction) { "ConsumeFunction er null" }
val errorFunction = pluginConfig.errorFunction ?: { }
val consumer = requireNotNull(pluginConfig.consumer) { "KafkaConsumes er null" }
val topic = requireNotNull(pluginConfig.topic) { "Topic er null" }
val pollTimeout = requireNotNull(pluginConfig.pollTimeout) { "PollTimeout er null" }
val shutDownTimeout = requireNotNull(pluginConfig.shutDownTimeout) { "ShutDownTimeout er null" }
val rebalanceListener = pluginConfig.rebalanceListener
val pollingFlag = pluginConfig.pollingFlag
val successFunction = pluginConfig.successFunction ?: kafkaConsumer::defaultSuccessFunction
val errorFunction = pluginConfig.errorFunction ?: ::defaultErrorFunction
val pollTimeout = pluginConfig.pollTimeout ?: Duration.ofMillis(100)
val closeTimeout = pluginConfig.closeTimeout ?: Duration.ofMillis(500)
val rebalanceListener = pluginConfig.rebalanceListener ?: NoopConsumerRebalanceListener()
val shutdownFlag = pluginConfig.shutdownFlag
var consumeJob: Job? = null

on(MonitoringEvent(ApplicationStarted)) { application ->
application.log.info("Starter Kafka Consumer")
if (rebalanceListener == null) {
consumer.subscribe(listOf(topic))
} else {
consumer.subscribe(listOf(topic), rebalanceListener)
}
logger.info("Kafka Consumer klargjøres")
kafkaConsumer.subscribe(kafkaTopics, rebalanceListener)
application.environment.monitor.raise(KafkaConsumerReady, application)
}

on(MonitoringEvent(ApplicationStopping)) { application ->
application.log.info("Stopper Kafka Consumer")
pollingFlag.set(false)
on(MonitoringEvent(ApplicationStopping)) { _ ->
logger.info("Kafka Consumer stopper")
kafkaConsumer.unsubscribe()
kafkaConsumer.close(closeTimeout)
shutdownFlag.set(true)
consumeJob?.cancel()
consumer.unsubscribe()
consumer.close(shutDownTimeout)
}

on(MonitoringEvent(KafkaConsumerReady)) { application ->
consumeJob = application.launch(Dispatchers.IO) {
try {
application.log.info("Starter Kafka Consumer polling")
while (pollingFlag.get()) {
application.log.trace("Polling Kafka Consumer for records")
val records = consumer.poll(pollTimeout)
records.forEach { consumeFunction(it) }
consumer.commitSync()
logger.info("Kafka Consumer starter")
while (!shutdownFlag.get()) {
try {
val records = kafkaConsumer.poll(pollTimeout)
consumeFunction(records)
successFunction(records)
} catch (throwable: Throwable) {
kafkaConsumer.unsubscribe()
kafkaConsumer.close(closeTimeout)
shutdownFlag.set(true)
errorFunction(throwable)
}
application.log.info("Kafka Consumer polling avsluttet")
} catch (e: Exception) {
application.log.error("Kafka Consumer polling avbrutt med feil", e)
errorFunction(e)
}
logger.info("Kafka Consumer avsluttet")
consumeJob?.cancel()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient
import no.nav.paw.security.authentication.model.Bruker
import no.nav.paw.security.authentication.model.Identitetsnummer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.jetbrains.exposed.sql.transactions.transaction

class BekreftelseService(
Expand Down Expand Up @@ -98,31 +99,34 @@ class BekreftelseService(
}
}

@WithSpan(value = "processBekreftelseHendelse")
fun processBekreftelseHendelse(record: ConsumerRecord<Long, BekreftelseHendelse>) {
@WithSpan(value = "processBekreftelseHendelser")
fun processBekreftelseHendelser(records: ConsumerRecords<Long, BekreftelseHendelse>) {
transaction {
val hendelse = record.value()

meterRegistry.receiveBekreftelseHendelseCounter(hendelse.hendelseType)
records.forEach(::processBekreftelseHendelse)
}
}

logger.debug("Mottok hendelse av type {}", hendelse.hendelseType)
@WithSpan(value = "processBekreftelseHendelse")
fun processBekreftelseHendelse(record: ConsumerRecord<Long, BekreftelseHendelse>) {
val hendelse = record.value()
meterRegistry.receiveBekreftelseHendelseCounter(hendelse.hendelseType)
logger.debug("Mottok hendelse av type {}", hendelse.hendelseType)

when (hendelse) {
is BekreftelseTilgjengelig -> {
processBekreftelseTilgjengelig(record.partition(), record.offset(), record.key(), hendelse)
}
when (hendelse) {
is BekreftelseTilgjengelig -> {
processBekreftelseTilgjengelig(record.partition(), record.offset(), record.key(), hendelse)
}

is BekreftelseMeldingMottatt -> {
processBekreftelseMeldingMottatt(hendelse)
}
is BekreftelseMeldingMottatt -> {
processBekreftelseMeldingMottatt(hendelse)
}

is PeriodeAvsluttet -> {
processPeriodeAvsluttet(hendelse)
}
is PeriodeAvsluttet -> {
processPeriodeAvsluttet(hendelse)
}

else -> {
processAnnenHendelse(hendelse)
}
else -> {
processAnnenHendelse(hendelse)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
url = "http://localhost:8090/kafka-keys"
urlLokalInfo = "http://localhost:8090/lokal-info"
url = "http://localhost:8090/api/v2/hentEllerOpprett"
urlLokalInfo = "http://localhost:8090/api/v2/lokalInfo"
scope = "api://test.test.kafka-keys/.default"
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,15 @@ fun main() {
gjelderTil = Instant.now().plus(Duration.ofDays(14)),
)

sendHendelse(kafkaProducer, topic, key, value)
kafkaProducer.sendBlocking(topic, key, value)
}

fun sendHendelse(
producer: Producer<Long, BekreftelseHendelse>,
fun Producer<Long, BekreftelseHendelse>.sendBlocking(
topic: String,
key: Long,
value: BekreftelseHendelse
) =
runBlocking {
launch {
producer.sendDeferred(ProducerRecord(topic, key, value)).await()
}
}
) = runBlocking {
launch {
sendDeferred(ProducerRecord(topic, key, value)).await()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"request": {
"method": "POST",
"urlPathPattern": "/api/v2/hentEllerOpprett",
"bodyPatterns": [
{
"matchesJsonPath": "$[?(@.ident == '01017012345')]"
}
]
},
"response": {
"status": 200,
"jsonBody": {
"id": 10001,
"key": -10001
},
"headers": {
"Content-Type": "application/json"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"request": {
"method": "POST",
"urlPathPattern": "/api/v2/hentEllerOpprett",
"bodyPatterns": [
{
"matchesJsonPath": "$[?(@.ident == '02017012345')]"
}
]
},
"response": {
"status": 200,
"jsonBody": {
"id": 10002,
"key": -10002
},
"headers": {
"Content-Type": "application/json"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"request": {
"method": "POST",
"urlPathPattern": "/api/v2/hentEllerOpprett",
"bodyPatterns": [
{
"matchesJsonPath": "$[?(@.ident == '03017012345')]"
}
]
},
"response": {
"status": 200,
"jsonBody": {
"id": 10003,
"key": -10003
},
"headers": {
"Content-Type": "application/json"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"request": {
"method": "POST",
"urlPathPattern": "/api/v2/hentEllerOpprett",
"bodyPatterns": [
{
"matchesJsonPath": "$[?(@.ident == '04017012345')]"
}
]
},
"response": {
"status": 200,
"jsonBody": {
"id": 10004,
"key": -10004
},
"headers": {
"Content-Type": "application/json"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"request": {
"method": "POST",
"urlPathPattern": "/api/v2/hentEllerOpprett",
"bodyPatterns": [
{
"matchesJsonPath": "$[?(@.ident == '05017012345')]"
}
]
},
"response": {
"status": 200,
"jsonBody": {
"id": 10004,
"key": -10004
},
"headers": {
"Content-Type": "application/json"
}
}
}
Loading

0 comments on commit 84d71db

Please sign in to comment.