Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/utgang pdl fellesregler logging #2

Merged
merged 2 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/hendelselogg-backup/nais/nais-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ spec:
gcp:
sqlInstances:
- type: POSTGRES_15
tier: db-custom-1-3840
databases:
- name: hendelselogg
accessPolicy:
Expand Down
3 changes: 3 additions & 0 deletions apps/utgang-pdl/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ dependencies {
implementation(project(":domain:main-avro-schema"))
implementation(project(":domain:interne-hendelser"))
implementation(project(":domain:arbeidssoekerregisteret-kotlin"))
implementation(project(":domain:arbeidssoeker-regler"))

implementation(project(":lib:kafka-key-generator-client"))
implementation(project(":lib:kafka"))
implementation(project(":lib:kafka-streams"))
implementation(project(":lib:hoplite-config"))

api(arrow.core)

implementation(orgApacheKafka.kafkaStreams)

implementation(ktorServer.bundles.withNettyAndMicrometer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics
import io.micrometer.prometheusmetrics.PrometheusConfig
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentForenkletStatus
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentPerson
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.health.Health
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.health.initKtor
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka.appTopology
Expand Down Expand Up @@ -51,6 +52,7 @@ fun main() {
applicationConfiguration.hendelseloggTopic,
applicationConfiguration.hendelseStateStoreName,
pdlHentForenkletStatus = PdlHentForenkletStatus.create(),
pdlHentPerson = PdlHentPerson.create(),
)
val kafkaStreams = KafkaStreams(
topology,
Expand All @@ -69,4 +71,3 @@ fun main() {
).start(wait = true)
logger.info("Avsluttet")
}

Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,38 @@ import no.nav.paw.kafkakeygenerator.auth.azureAdM2MTokenClient
import no.nav.paw.kafkakeygenerator.auth.currentNaisEnv
import no.nav.paw.pdl.PdlClient
import no.nav.paw.pdl.PdlException
import no.nav.paw.pdl.graphql.generated.hentforenkletstatusbolk.HentPersonBolkResult
import no.nav.paw.pdl.graphql.generated.hentforenkletstatusbolk.HentPersonBolkResult as ForenkletStatusBolkResult
import no.nav.paw.pdl.graphql.generated.hentpersonbolk.HentPersonBolkResult
import no.nav.paw.pdl.hentForenkletStatusBolk
import no.nav.paw.pdl.hentPersonBolk
import org.slf4j.LoggerFactory

const val BEHANDLINGSNUMMER = "B452"

fun interface PdlHentPerson {
fun hentPerson(ident: List<String>, callId: String, navConsumerId: String): List<HentPersonBolkResult>?

companion object {
val logger = LoggerFactory.getLogger("pdlClient")

fun create(): PdlHentPerson {
val pdlClient = createPdlClient()
return PdlHentPerson { ident, callId, navConsumerId ->
runBlocking {
try {
pdlClient.hentPersonBolk(ident = ident, callId = callId, navConsumerId = navConsumerId, behandlingsnummer = BEHANDLINGSNUMMER)
} catch (e: PdlException) {
logger.error("PDL hentPerson feiler med: $e", e)
null
}
}
}
}
}
}

fun interface PdlHentForenkletStatus {
fun hentForenkletStatus(ident: List<String>, callId: String, navConsumerId: String): List<HentPersonBolkResult>?
fun hentForenkletStatus(ident: List<String>, callId: String, navConsumerId: String): List<ForenkletStatusBolkResult>?

companion object {
val logger = LoggerFactory.getLogger("pdlClient")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,33 @@
package no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka

import arrow.core.Either
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.ApplicationInfo
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentForenkletStatus
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentForenkletStatus.Companion.logger
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentPerson
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka.serdes.HendelseState
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.metrics.tellPdlAvsluttetHendelser
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.metrics.tellStatusFraPdlHentPersonBolk
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.utils.genererPersonFakta
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.utils.negativeOpplysninger
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.utils.statusToOpplysningMap
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.utils.toAarsak
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.utils.toPerson
import no.nav.paw.arbeidssokerregisteret.application.OK
import no.nav.paw.arbeidssokerregisteret.application.Problem
import no.nav.paw.arbeidssokerregisteret.application.evaluer
import no.nav.paw.arbeidssokerregisteret.application.hendelseOpplysningTilDomeneOpplysninger
import no.nav.paw.arbeidssokerregisteret.application.reglerForInngangIPrioritertRekkefolge
import no.nav.paw.arbeidssokerregisteret.intern.v1.Avsluttet
import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Bruker
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.BrukerType
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Metadata
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Opplysning
import no.nav.paw.pdl.graphql.generated.hentforenkletstatusbolk.Folkeregisterpersonstatus
import no.nav.paw.pdl.graphql.generated.hentforenkletstatusbolk.HentPersonBolkResult
import no.nav.paw.pdl.graphql.generated.hentforenkletstatusbolk.HentPersonBolkResult as ForenkletStatusBolkResult
import no.nav.paw.pdl.graphql.generated.hentpersonbolk.HentPersonBolkResult
import org.apache.kafka.streams.processor.Cancellable
import org.apache.kafka.streams.processor.PunctuationType
import org.apache.kafka.streams.processor.api.ProcessorContext
Expand All @@ -31,6 +45,7 @@ fun scheduleAvsluttPerioder(
hendelseStateStore: KeyValueStore<UUID, HendelseState>,
interval: Duration = Duration.ofDays(1),
pdlHentForenkletStatus: PdlHentForenkletStatus,
pdlHentPersonBolk: PdlHentPerson,
prometheusMeterRegistry: PrometheusMeterRegistry,
): Cancellable = ctx.schedule(interval, PunctuationType.WALL_CLOCK_TIME) {

Expand All @@ -44,6 +59,18 @@ fun scheduleAvsluttPerioder(
.chunked(1000) { chunk ->
val identitetsnummere = chunk.map { it.value.identitetsnummer }

// Versjon 2
val pdlHentPersonResults = hentPersonBolk(identitetsnummere, pdlHentPersonBolk)
if (pdlHentPersonResults == null) {
logger.warn("PDL hentPersonBolk returnerte null")
} else {
pdlHentPersonResults.processResultsV2(
chunk,
logger
)
}

// Versjon 1
val pdlResults = hentForenkletStatus(identitetsnummere, pdlHentForenkletStatus)
if (pdlResults == null) {
logger.error("PDL hentForenkletStatus returnerte null")
Expand All @@ -61,7 +88,8 @@ fun scheduleAvsluttPerioder(
}
}

private fun List<HentPersonBolkResult>.processResults(

private fun List<ForenkletStatusBolkResult>.processResults(
chunk: List<KeyValue<UUID, HendelseState>>,
hendelseStateStore: KeyValueStore<UUID, HendelseState>,
ctx: ProcessorContext<Long, Hendelse>,
Expand All @@ -87,9 +115,14 @@ private fun List<HentPersonBolkResult>.processResults(

val aarsak = folkeregisterpersonstatus
.filterAvsluttPeriodeGrunnlag(hendelseState.opplysninger)
.ifEmpty { return@forEachIndexed }
.ifEmpty {
logger.info("Versjon 1: OK, matchende opplysning fra startet hendelse og forhåndsgodkjent av ansatt")
return@forEachIndexed
}
.toAarsak()

logger.info("Versjon 2: PROBLEM, negative opplysninger fra pdl: ${folkeregisterpersonstatus.filterAvsluttPeriodeGrunnlag(hendelseState.opplysninger)}, generert aarsak: $aarsak")

val avsluttetHendelse = genererAvsluttetHendelseRecord(hendelseState, aarsak)

logger.info("Sender avsluttet hendelse med aarsak: $aarsak")
Expand All @@ -101,8 +134,73 @@ private fun List<HentPersonBolkResult>.processResults(
}
}

private fun List<HentPersonBolkResult>.processResultsV2(
chunk: List<KeyValue<UUID, HendelseState>>,
logger: Logger
) = this.forEach { result ->
if (result.code in pdlErrorResponses) {
logger.error("Versjon 2: Feil ved henting av Person fra PDL: ${result.code}")
return@forEach
}

val person = result.person ?: throw IllegalStateException("Versjon 2: Person mangler")
val hendelseOpplysninger = chunk.find { it.value.identitetsnummer == result.ident }
?.value?.opplysninger ?: throw IllegalStateException("Versjon 2: HendelseState mangler")

val domeneOpplysninger = hendelseOpplysninger
.filterNot { it == Opplysning.FORHAANDSGODKJENT_AV_ANSATT }
.map { hendelseOpplysningTilDomeneOpplysninger(it) as no.nav.paw.arbeidssokerregisteret.application.opplysninger.Opplysning }
.toSet()

val opplysningerEvaluering = reglerForInngangIPrioritertRekkefolge.evaluer(domeneOpplysninger)
val pdlEvaluering = reglerForInngangIPrioritertRekkefolge.evaluer(genererPersonFakta(person.toPerson()))

val erForhaandsgodkjent = hendelseOpplysninger.contains(Opplysning.FORHAANDSGODKJENT_AV_ANSATT)

when {
pdlEvaluering.isLeft() -> handleLeftEvaluation(
pdlEvaluering, opplysningerEvaluering, erForhaandsgodkjent, logger
)
pdlEvaluering.isRight() -> handleRightEvaluation(
opplysningerEvaluering, erForhaandsgodkjent, logger
)
}
}

private fun handleLeftEvaluation(
pdlEvaluering: Either<Problem, OK>,
opplysningerEvaluering: Either<Problem, OK>,
erForhaandsgodkjent: Boolean,
logger: Logger
) {
val pdlEvalueringResultat = pdlEvaluering.leftOrNull()?.opplysning?.toSet()
?: throw IllegalStateException("Versjon 2: PDL evaluering mangler opplysning")
val opplysningerEvalueringResultat = opplysningerEvaluering.leftOrNull()?.opplysning?.toSet()
?: throw IllegalStateException("Versjon 2: Opplysninger evaluering mangler opplysning")

if (pdlEvalueringResultat == opplysningerEvalueringResultat && erForhaandsgodkjent) {
logger.info("Versjon 2: OK, matchende opplysning fra startet hendelse og forhåndsgodkjent av ansatt: $pdlEvalueringResultat")
} else {
val aarsak = pdlEvalueringResultat.filterNot { it in opplysningerEvalueringResultat }.toAarsak()
logger.info("Versjon 2: PROBLEM, negative opplysninger fra pdl: $pdlEvalueringResultat, generert aarsak: $aarsak")
}
}

private fun handleRightEvaluation(
opplysningerEvaluering: Either<Problem, OK>,
erForhaandsgodkjent: Boolean,
logger: Logger
) {
if (opplysningerEvaluering.isLeft() && erForhaandsgodkjent) {
logger.info("Versjon 2: OK, ingen negative opplysninger fra pdl, negative opplysninger fra startet hendelse og forhåndsgodkjent av ansatt funnet")
} else {
logger.info("Versjon 2: OK, ingen negative opplysninger fra pdl")
}
}


private fun hentPersonStatusOgHendelseState(
result: HentPersonBolkResult,
result: ForenkletStatusBolkResult,
chunk: List<KeyValue<UUID, HendelseState>>
): Pair<List<Folkeregisterpersonstatus>, HendelseState>? {
val person = result.person ?: return null
Expand All @@ -123,20 +221,34 @@ private fun oppdaterHendelseStateOpplysninger(
.toSet()
)
hendelseStateStore.put(hendelseState.periodeId, oppdatertHendelseState)
logger.info("Versjon 1: OK, ingen negative opplysninger fra pdl, negative opplysninger fra startet hendelse og forhåndsgodkjent av ansatt funnet")
} else {
logger.info("Versjon 1: OK, ingen negative opplysninger fra pdl")
}
}

private fun hentForenkletStatus(
identitetsnummere: List<String>,
pdlHentForenkletStatus: PdlHentForenkletStatus,
): List<HentPersonBolkResult>? {
): List<ForenkletStatusBolkResult>? {
return pdlHentForenkletStatus.hentForenkletStatus(
identitetsnummere,
UUID.randomUUID().toString(),
"paw-arbeidssoekerregisteret-utgang-pdl",
)
}

private fun hentPersonBolk(
identitetsnummere: List<String>,
pdlHentPersonBolk: PdlHentPerson,
): List<HentPersonBolkResult>? {
return pdlHentPersonBolk.hentPerson(
identitetsnummere,
UUID.randomUUID().toString(),
"paw-arbeidssoekerregisteret-utgang-pdl",
)
}

private fun List<KeyValue<UUID, HendelseState>>.filterValidHendelseStates(): List<KeyValue<UUID, HendelseState>> =
this.filter { entry ->
val hendelseState = entry.value
Expand All @@ -156,31 +268,7 @@ fun List<Folkeregisterpersonstatus>.filterAvsluttPeriodeGrunnlag(
.toSet()
}

private fun Set<Opplysning>.toAarsak(): String =
this.joinToString(separator = ", ") {
when (it) {
Opplysning.DOED -> "Personen er doed"
Opplysning.SAVNET -> "Personen er savnet"
Opplysning.IKKE_BOSATT -> "Personen er ikke bosatt etter folkeregisterloven"
Opplysning.OPPHOERT_IDENTITET -> "Personen har opphoert identitet"
else -> it.name
}
}

private val negativeOpplysninger = setOf(
Opplysning.IKKE_BOSATT,
Opplysning.SAVNET,
Opplysning.DOED,
Opplysning.OPPHOERT_IDENTITET,
)

private val statusToOpplysningMap = mapOf(
"ikkeBosatt" to Opplysning.IKKE_BOSATT,
"forsvunnet" to Opplysning.SAVNET,
"doedIFolkeregisteret" to Opplysning.DOED,
"opphoert" to Opplysning.OPPHOERT_IDENTITET,
"dNummer" to Opplysning.DNUMMER
)

private val List<Folkeregisterpersonstatus>.erBosattEtterFolkeregisterloven
get(): Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka

import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentForenkletStatus
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentPerson
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka.processors.oppdaterHendelseState
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseSerde
Expand All @@ -18,6 +19,7 @@ fun StreamsBuilder.appTopology(
hendelseLoggTopic: String,
hendelseStateStoreName: String,
pdlHentForenkletStatus: PdlHentForenkletStatus,
pdlHentPerson: PdlHentPerson
): Topology {
stream(hendelseLoggTopic, Consumed.with(Serdes.Long(), HendelseSerde()))
.filter { _, value -> value is Startet }
Expand All @@ -28,7 +30,8 @@ fun StreamsBuilder.appTopology(
.oppdaterHendelseState(
hendelseStateStoreName = hendelseStateStoreName,
prometheusMeterRegistry = prometheusRegistry,
pdlHentForenkletStatus = pdlHentForenkletStatus
pdlHentForenkletStatus = pdlHentForenkletStatus,
pdlHentPerson = pdlHentPerson
)
.to(hendelseLoggTopic, Produced.with(Serdes.Long(), HendelseSerde()))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka.processors

import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentForenkletStatus
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentPerson
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka.scheduleAvsluttPerioder
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka.serdes.HendelseState
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
Expand All @@ -18,13 +19,15 @@ import java.util.UUID
fun KStream<Long, Periode>.oppdaterHendelseState(
hendelseStateStoreName: String,
prometheusMeterRegistry: PrometheusMeterRegistry,
pdlHentForenkletStatus: PdlHentForenkletStatus
pdlHentForenkletStatus: PdlHentForenkletStatus,
pdlHentPerson: PdlHentPerson
): KStream<Long, Hendelse> {
val processor = {
PeriodeProcessor(
hendelseStateStoreName,
prometheusMeterRegistry,
pdlHentForenkletStatus
pdlHentForenkletStatus,
pdlHentPerson
)
}
return process(processor, Named.`as`("periodeProsessor"), hendelseStateStoreName)
Expand All @@ -33,7 +36,8 @@ fun KStream<Long, Periode>.oppdaterHendelseState(
class PeriodeProcessor(
private val hendelseStateStoreName: String,
private val prometheusMeterRegistry: PrometheusMeterRegistry,
private val pdlHentForenkletStatus: PdlHentForenkletStatus
private val pdlHentForenkletStatus: PdlHentForenkletStatus,
private val pdlHentPersonBolk: PdlHentPerson,
) : Processor<Long, Periode, Long, Hendelse> {
private var hendelseStateStore: KeyValueStore<UUID, HendelseState>? = null
private var context: ProcessorContext<Long, Hendelse>? = null
Expand All @@ -47,6 +51,7 @@ class PeriodeProcessor(
requireNotNull(hendelseStateStore),
Duration.ofDays(1),
pdlHentForenkletStatus,
pdlHentPersonBolk,
prometheusMeterRegistry
)
}
Expand Down
Loading
Loading