Skip to content

Commit

Permalink
La til mer logging og metrikker
Browse files Browse the repository at this point in the history
  • Loading branch information
naviktthomas committed Nov 19, 2024
1 parent 193c012 commit b07f07c
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ fun startApplication(
)
val pdlService = PdlService(pdlClient)
val kafkaKeysService = KafkaKeysService(
prometheusMeterRegistry,
kafkaKeysRepository,
pdlService
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@ import no.nav.paw.kafkakeygenerator.repository.IdentitetRepository
import no.nav.paw.kafkakeygenerator.repository.KafkaKeysAuditRepository
import no.nav.paw.kafkakeygenerator.utils.buildErrorLogger
import no.nav.paw.kafkakeygenerator.utils.buildLogger
import no.nav.paw.kafkakeygenerator.utils.countIgnoredEvents
import no.nav.paw.kafkakeygenerator.utils.countProcessedEvents
import no.nav.paw.kafkakeygenerator.utils.countReceivedEvents
import no.nav.paw.kafkakeygenerator.utils.countKafkaFailed
import no.nav.paw.kafkakeygenerator.utils.countKafkaIgnored
import no.nav.paw.kafkakeygenerator.utils.countKafkaInserted
import no.nav.paw.kafkakeygenerator.utils.countKafkaProcessed
import no.nav.paw.kafkakeygenerator.utils.countKafkaReceived
import no.nav.paw.kafkakeygenerator.utils.countKafkaUpdated
import no.nav.paw.kafkakeygenerator.utils.countKafkaVerified
import no.nav.paw.kafkakeygenerator.vo.ArbeidssoekerId
import no.nav.paw.kafkakeygenerator.vo.Audit
import no.nav.paw.kafkakeygenerator.vo.IdentitetStatus
Expand Down Expand Up @@ -45,16 +49,18 @@ class KafkaConsumerService(
records
.map { it.value() }
.onEach {
meterRegistry.countReceivedEvents()
meterRegistry.countKafkaReceived()
if (it is IdentitetsnummerSammenslaatt) {
meterRegistry.countProcessedEvents()
logger.debug("Prosesserer hendelse av type {}", it.hendelseType)
meterRegistry.countKafkaProcessed()
} else {
meterRegistry.countIgnoredEvents()
logger.debug("Ignorerer hendelse av type {}", it.hendelseType)
meterRegistry.countKafkaIgnored()
}
}
.filterIsInstance<IdentitetsnummerSammenslaatt>()
.forEach { hendelse ->
logger.info("Mottok hendelse om sammenslåing av identitetsnummer")
logger.info("Mottok hendelse om sammenslåing av Identitetsnummer")
val identitetsnummer = hendelse.alleIdentitetsnummer
.map { Identitetsnummer(it) } + Identitetsnummer(hendelse.identitetsnummer)
val fraArbeidssoekerId = ArbeidssoekerId(hendelse.id)
Expand All @@ -70,12 +76,17 @@ class KafkaConsumerService(
tilArbeidssoekerId: ArbeidssoekerId
) {
transaction(database) {
// TODO Dette vil stoppe Kafka Consumer og føre til unhealthy helsestatus for appen. Vil vi det?
identitetRepository.find(fraArbeidssoekerId).let {
if (it == null) throw IllegalStateException("ArbeidssøkerId ikke funnet")
if (it == null) {
meterRegistry.countKafkaFailed()
throw IllegalStateException("ArbeidssøkerId ikke funnet")
}
}
identitetRepository.find(tilArbeidssoekerId).let {
if (it == null) throw IllegalStateException("ArbeidssøkerId ikke funnet")
if (it == null) {
meterRegistry.countKafkaFailed()
throw IllegalStateException("ArbeidssøkerId ikke funnet")
}
}

identitetsnummerSet.forEach { identitetsnummer ->
Expand All @@ -97,9 +108,13 @@ class KafkaConsumerService(
eksisterendeArbeidssoekerId: ArbeidssoekerId
) {
if (eksisterendeArbeidssoekerId == tilArbeidssoekerId) {
logger.info("Identitetsnummer er allerede linket til korrekt ArbeidsøkerId")
meterRegistry.countKafkaVerified()
val audit = Audit(identitetsnummer, IdentitetStatus.VERIFISERT, "Ingen endringer")
kafkaKeysAuditRepository.insert(audit)
} else {
logger.info("Identitetsnummer oppdateres med annen ArbeidsøkerId")
meterRegistry.countKafkaUpdated()
val count = identitetRepository.update(identitetsnummer, tilArbeidssoekerId)
if (count != 0) {
val audit = Audit(
Expand All @@ -119,6 +134,8 @@ class KafkaConsumerService(
identitetsnummer: Identitetsnummer,
tilArbeidssoekerId: ArbeidssoekerId
) {
logger.info("Identitetsnummer opprettes med eksisterende ArbeidsøkerId")
meterRegistry.countKafkaInserted()
val count = identitetRepository.insert(identitetsnummer, tilArbeidssoekerId)
if (count != 0) {
val audit = Audit(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,45 @@
package no.nav.paw.kafkakeygenerator.service

import io.micrometer.core.instrument.MeterRegistry
import io.opentelemetry.instrumentation.annotations.WithSpan
import no.nav.paw.kafkakeygenerator.vo.FailureCode.CONFLICT
import no.nav.paw.kafkakeygenerator.vo.FailureCode.DB_NOT_FOUND
import no.nav.paw.kafkakeygenerator.api.v2.*
import no.nav.paw.kafkakeygenerator.api.v2.Alias
import no.nav.paw.kafkakeygenerator.api.v2.InfoResponse
import no.nav.paw.kafkakeygenerator.api.v2.LokaleAlias
import no.nav.paw.kafkakeygenerator.api.v2.publicTopicKeyFunction
import no.nav.paw.kafkakeygenerator.mergedetector.findMerge
import no.nav.paw.kafkakeygenerator.mergedetector.hentLagretData
import no.nav.paw.kafkakeygenerator.mergedetector.vo.MergeDetected
import no.nav.paw.kafkakeygenerator.repository.KafkaKeysRepository
import no.nav.paw.kafkakeygenerator.vo.*
import no.nav.paw.kafkakeygenerator.utils.buildLogger
import no.nav.paw.kafkakeygenerator.utils.countRestApiFailed
import no.nav.paw.kafkakeygenerator.utils.countRestApiFetch
import no.nav.paw.kafkakeygenerator.utils.countRestApiInserted
import no.nav.paw.kafkakeygenerator.utils.countRestApiReceived
import no.nav.paw.kafkakeygenerator.vo.ArbeidssoekerId
import no.nav.paw.kafkakeygenerator.vo.CallId
import no.nav.paw.kafkakeygenerator.vo.Either
import no.nav.paw.kafkakeygenerator.vo.Failure
import no.nav.paw.kafkakeygenerator.vo.FailureCode.CONFLICT
import no.nav.paw.kafkakeygenerator.vo.FailureCode.DB_NOT_FOUND
import no.nav.paw.kafkakeygenerator.vo.Identitetsnummer
import no.nav.paw.kafkakeygenerator.vo.Info
import no.nav.paw.kafkakeygenerator.vo.LokalIdData
import no.nav.paw.kafkakeygenerator.vo.PdlData
import no.nav.paw.kafkakeygenerator.vo.PdlId
import no.nav.paw.kafkakeygenerator.vo.flatMap
import no.nav.paw.kafkakeygenerator.vo.left
import no.nav.paw.kafkakeygenerator.vo.recover
import no.nav.paw.kafkakeygenerator.vo.right
import no.nav.paw.kafkakeygenerator.vo.suspendingRecover
import org.apache.kafka.clients.producer.internals.BuiltInPartitioner.partitionForKey
import org.apache.kafka.common.serialization.Serdes

class KafkaKeysService(
private val meterRegistry: MeterRegistry,
private val kafkaKeysRepository: KafkaKeysRepository,
private val pdlService: PdlService
) {
private val logger = buildLogger
private val keySerializer = Serdes.Long().serializer()

@WithSpan
Expand Down Expand Up @@ -107,6 +131,8 @@ class KafkaKeysService(

@WithSpan
suspend fun hent(callId: CallId, identitet: Identitetsnummer): Either<Failure, ArbeidssoekerId> {
logger.debug("Henter identer fra database")
meterRegistry.countRestApiFetch()
return kafkaKeysRepository.hent(identitet)
.suspendingRecover(DB_NOT_FOUND) {
sjekkMotAliaser(callId, identitet)
Expand All @@ -115,16 +141,21 @@ class KafkaKeysService(

@WithSpan
suspend fun hentEllerOpprett(callId: CallId, identitet: Identitetsnummer): Either<Failure, ArbeidssoekerId> {
meterRegistry.countRestApiReceived()
return hent(callId, identitet)
.suspendingRecover(DB_NOT_FOUND) {
logger.debug("Oppretter identer i database")
meterRegistry.countRestApiInserted()
kafkaKeysRepository.opprett(identitet)
}.recover(CONFLICT) {
meterRegistry.countRestApiFailed()
kafkaKeysRepository.hent(identitet)
}
}

@WithSpan
private suspend fun sjekkMotAliaser(callId: CallId, identitet: Identitetsnummer): Either<Failure, ArbeidssoekerId> {
logger.debug("Sjekker identer mot PDL")
return pdlService.hentIdentiter(
callId = callId,
identitet = identitet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import io.micrometer.core.instrument.Tags
private const val METRIC_PREFIX = "paw_kafka_key_generator"

fun MeterRegistry.genericCounter(
suffix: String,
source: String,
target: String,
action: String
) {
counter(
"${METRIC_PREFIX}_antall_${suffix}",
"${METRIC_PREFIX}_antall_operasjoner",
Tags.of(
Tag.of("source", source),
Tag.of("target", target),
Expand All @@ -22,14 +21,46 @@ fun MeterRegistry.genericCounter(
).increment()
}

fun MeterRegistry.countReceivedEvents() {
genericCounter("hendelser", "kafka", "database", "received")
fun MeterRegistry.countRestApiReceived() {
genericCounter("rest_api", "database", "received")
}

fun MeterRegistry.countProcessedEvents() {
genericCounter("hendelser", "kafka", "database", "processed")
fun MeterRegistry.countRestApiFetch() {
genericCounter("rest_api", "database", "fetch")
}

fun MeterRegistry.countIgnoredEvents() {
genericCounter("hendelser", "kafka", "database", "ignored")
fun MeterRegistry.countRestApiInserted() {
genericCounter("rest_api", "database", "inserted")
}

fun MeterRegistry.countRestApiFailed() {
genericCounter("rest_api", "database", "failed")
}

fun MeterRegistry.countKafkaReceived() {
genericCounter("kafka", "database", "received")
}

fun MeterRegistry.countKafkaProcessed() {
genericCounter("kafka", "database", "processed")
}

fun MeterRegistry.countKafkaIgnored() {
genericCounter("kafka", "database", "ignored")
}

fun MeterRegistry.countKafkaInserted() {
genericCounter("kafka", "database", "inserted")
}

fun MeterRegistry.countKafkaUpdated() {
genericCounter("kafka", "database", "updated")
}

fun MeterRegistry.countKafkaVerified() {
genericCounter("kafka", "database", "verified")
}

fun MeterRegistry.countKafkaFailed() {
genericCounter("kafka", "database", "failed")
}
2 changes: 1 addition & 1 deletion apps/kafka-key-generator/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
<appender-ref ref="STDOUT_JSON"/>
</root>
<logger name="no.nav.paw.pdl.PdlClient" level="WARN"/>
<logger name="MergeDetector" level="INFO"/>
</then>
<else>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
</root>
<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="WARN" />
<logger name="org.apache.kafka.clients.producer.ProducerConfig" level="WARN" />
<logger name="no.nav.paw.kafkakeygenerator" level="DEBUG" />
</else>
</if>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.kotest.matchers.shouldNotBe
import io.kotest.matchers.types.shouldBeInstanceOf
import io.ktor.client.HttpClient
import io.ktor.client.engine.mock.MockEngine
import io.micrometer.core.instrument.logging.LoggingMeterRegistry
import kotlinx.coroutines.runBlocking
import no.nav.paw.kafkakeygenerator.plugin.custom.flywayMigrate
import no.nav.paw.kafkakeygenerator.repository.KafkaKeysRepository
Expand Down Expand Up @@ -42,6 +43,7 @@ class KafkaKeysServiceTest : StringSpec({
})
) { "fake token" }
val kafkaKeysService = KafkaKeysService(
meterRegistry = LoggingMeterRegistry(),
kafkaKeysRepository = KafkaKeysRepository(Database.connect(dataSource)),
pdlService = PdlService(pdlKlient)
)
Expand Down

0 comments on commit b07f07c

Please sign in to comment.