Skip to content

Commit

Permalink
Opprette et separat løp for utmelding basert på fellesregler i arbeid…
Browse files Browse the repository at this point in the history
…soker-regler. Logger evaluering av begge løp til sammenligning.
  • Loading branch information
robertkittilsen authored and naviktthomas committed Aug 20, 2024
1 parent 5ef3fa7 commit 885679a
Show file tree
Hide file tree
Showing 13 changed files with 462 additions and 40 deletions.
3 changes: 3 additions & 0 deletions apps/utgang-pdl/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ dependencies {
implementation(project(":domain:main-avro-schema"))
implementation(project(":domain:interne-hendelser"))
implementation(project(":domain:arbeidssoekerregisteret-kotlin"))
implementation(project(":domain:arbeidssoeker-regler"))

implementation(project(":lib:kafka-key-generator-client"))
implementation(project(":lib:kafka"))
implementation(project(":lib:kafka-streams"))
implementation(project(":lib:hoplite-config"))

api(arrow.core)

implementation(orgApacheKafka.kafkaStreams)

implementation(ktorServer.bundles.withNettyAndMicrometer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics
import io.micrometer.prometheusmetrics.PrometheusConfig
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentForenkletStatus
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentPerson
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.health.Health
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.health.initKtor
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka.appTopology
Expand Down Expand Up @@ -51,6 +52,7 @@ fun main() {
applicationConfiguration.hendelseloggTopic,
applicationConfiguration.hendelseStateStoreName,
pdlHentForenkletStatus = PdlHentForenkletStatus.create(),
pdlHentPerson = PdlHentPerson.create(),
)
val kafkaStreams = KafkaStreams(
topology,
Expand All @@ -69,4 +71,3 @@ fun main() {
).start(wait = true)
logger.info("Avsluttet")
}

Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,38 @@ import no.nav.paw.kafkakeygenerator.auth.azureAdM2MTokenClient
import no.nav.paw.kafkakeygenerator.auth.currentNaisEnv
import no.nav.paw.pdl.PdlClient
import no.nav.paw.pdl.PdlException
import no.nav.paw.pdl.graphql.generated.hentforenkletstatusbolk.HentPersonBolkResult
import no.nav.paw.pdl.graphql.generated.hentforenkletstatusbolk.HentPersonBolkResult as ForenkletStatusBolkResult
import no.nav.paw.pdl.graphql.generated.hentpersonbolk.HentPersonBolkResult
import no.nav.paw.pdl.hentForenkletStatusBolk
import no.nav.paw.pdl.hentPersonBolk
import org.slf4j.LoggerFactory

const val BEHANDLINGSNUMMER = "B452"

fun interface PdlHentPerson {
fun hentPerson(ident: List<String>, callId: String, navConsumerId: String): List<HentPersonBolkResult>?

companion object {
val logger = LoggerFactory.getLogger("pdlClient")

fun create(): PdlHentPerson {
val pdlClient = createPdlClient()
return PdlHentPerson { ident, callId, navConsumerId ->
runBlocking {
try {
pdlClient.hentPersonBolk(ident = ident, callId = callId, navConsumerId = navConsumerId, behandlingsnummer = BEHANDLINGSNUMMER)
} catch (e: PdlException) {
logger.error("PDL hentPerson feiler med: $e", e)
null
}
}
}
}
}
}

fun interface PdlHentForenkletStatus {
fun hentForenkletStatus(ident: List<String>, callId: String, navConsumerId: String): List<HentPersonBolkResult>?
fun hentForenkletStatus(ident: List<String>, callId: String, navConsumerId: String): List<ForenkletStatusBolkResult>?

companion object {
val logger = LoggerFactory.getLogger("pdlClient")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,33 @@
package no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka

import arrow.core.Either
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.ApplicationInfo
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentForenkletStatus
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentForenkletStatus.Companion.logger
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentPerson
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka.serdes.HendelseState
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.metrics.tellPdlAvsluttetHendelser
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.metrics.tellStatusFraPdlHentPersonBolk
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.utils.genererPersonFakta
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.utils.negativeOpplysninger
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.utils.statusToOpplysningMap
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.utils.toAarsak
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.utils.toPerson
import no.nav.paw.arbeidssokerregisteret.application.OK
import no.nav.paw.arbeidssokerregisteret.application.Problem
import no.nav.paw.arbeidssokerregisteret.application.evaluer
import no.nav.paw.arbeidssokerregisteret.application.hendelseOpplysningTilDomeneOpplysninger
import no.nav.paw.arbeidssokerregisteret.application.reglerForInngangIPrioritertRekkefolge
import no.nav.paw.arbeidssokerregisteret.intern.v1.Avsluttet
import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Bruker
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.BrukerType
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Metadata
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Opplysning
import no.nav.paw.pdl.graphql.generated.hentforenkletstatusbolk.Folkeregisterpersonstatus
import no.nav.paw.pdl.graphql.generated.hentforenkletstatusbolk.HentPersonBolkResult
import no.nav.paw.pdl.graphql.generated.hentforenkletstatusbolk.HentPersonBolkResult as ForenkletStatusBolkResult
import no.nav.paw.pdl.graphql.generated.hentpersonbolk.HentPersonBolkResult
import org.apache.kafka.streams.processor.Cancellable
import org.apache.kafka.streams.processor.PunctuationType
import org.apache.kafka.streams.processor.api.ProcessorContext
Expand All @@ -31,6 +45,7 @@ fun scheduleAvsluttPerioder(
hendelseStateStore: KeyValueStore<UUID, HendelseState>,
interval: Duration = Duration.ofDays(1),
pdlHentForenkletStatus: PdlHentForenkletStatus,
pdlHentPersonBolk: PdlHentPerson,
prometheusMeterRegistry: PrometheusMeterRegistry,
): Cancellable = ctx.schedule(interval, PunctuationType.WALL_CLOCK_TIME) {

Expand All @@ -44,6 +59,18 @@ fun scheduleAvsluttPerioder(
.chunked(1000) { chunk ->
val identitetsnummere = chunk.map { it.value.identitetsnummer }

// Versjon 2
val pdlHentPersonResults = hentPersonBolk(identitetsnummere, pdlHentPersonBolk)
if (pdlHentPersonResults == null) {
logger.warn("PDL hentPersonBolk returnerte null")
} else {
pdlHentPersonResults.processResultsV2(
chunk,
logger
)
}

// Versjon 1
val pdlResults = hentForenkletStatus(identitetsnummere, pdlHentForenkletStatus)
if (pdlResults == null) {
logger.error("PDL hentForenkletStatus returnerte null")
Expand All @@ -61,7 +88,8 @@ fun scheduleAvsluttPerioder(
}
}

private fun List<HentPersonBolkResult>.processResults(

private fun List<ForenkletStatusBolkResult>.processResults(
chunk: List<KeyValue<UUID, HendelseState>>,
hendelseStateStore: KeyValueStore<UUID, HendelseState>,
ctx: ProcessorContext<Long, Hendelse>,
Expand All @@ -87,9 +115,14 @@ private fun List<HentPersonBolkResult>.processResults(

val aarsak = folkeregisterpersonstatus
.filterAvsluttPeriodeGrunnlag(hendelseState.opplysninger)
.ifEmpty { return@forEachIndexed }
.ifEmpty {
logger.info("Versjon 1: OK, matchende opplysning fra startet hendelse og forhåndsgodkjent av ansatt")
return@forEachIndexed
}
.toAarsak()

logger.info("Versjon 2: PROBLEM, negative opplysninger fra pdl: ${folkeregisterpersonstatus.filterAvsluttPeriodeGrunnlag(hendelseState.opplysninger)}, generert aarsak: $aarsak")

val avsluttetHendelse = genererAvsluttetHendelseRecord(hendelseState, aarsak)

logger.info("Sender avsluttet hendelse med aarsak: $aarsak")
Expand All @@ -101,8 +134,73 @@ private fun List<HentPersonBolkResult>.processResults(
}
}

private fun List<HentPersonBolkResult>.processResultsV2(
chunk: List<KeyValue<UUID, HendelseState>>,
logger: Logger
) = this.forEach { result ->
if (result.code in pdlErrorResponses) {
logger.error("Versjon 2: Feil ved henting av Person fra PDL: ${result.code}")
return@forEach
}

val person = result.person ?: throw IllegalStateException("Versjon 2: Person mangler")
val hendelseOpplysninger = chunk.find { it.value.identitetsnummer == result.ident }
?.value?.opplysninger ?: throw IllegalStateException("Versjon 2: HendelseState mangler")

val domeneOpplysninger = hendelseOpplysninger
.filterNot { it == Opplysning.FORHAANDSGODKJENT_AV_ANSATT }
.map { hendelseOpplysningTilDomeneOpplysninger(it) as no.nav.paw.arbeidssokerregisteret.application.opplysninger.Opplysning }
.toSet()

val opplysningerEvaluering = reglerForInngangIPrioritertRekkefolge.evaluer(domeneOpplysninger)
val pdlEvaluering = reglerForInngangIPrioritertRekkefolge.evaluer(genererPersonFakta(person.toPerson()))

val erForhaandsgodkjent = hendelseOpplysninger.contains(Opplysning.FORHAANDSGODKJENT_AV_ANSATT)

when {
pdlEvaluering.isLeft() -> handleLeftEvaluation(
pdlEvaluering, opplysningerEvaluering, erForhaandsgodkjent, logger
)
pdlEvaluering.isRight() -> handleRightEvaluation(
opplysningerEvaluering, erForhaandsgodkjent, logger
)
}
}

private fun handleLeftEvaluation(
pdlEvaluering: Either<Problem, OK>,
opplysningerEvaluering: Either<Problem, OK>,
erForhaandsgodkjent: Boolean,
logger: Logger
) {
val pdlEvalueringResultat = pdlEvaluering.leftOrNull()?.opplysning?.toSet()
?: throw IllegalStateException("Versjon 2: PDL evaluering mangler opplysning")
val opplysningerEvalueringResultat = opplysningerEvaluering.leftOrNull()?.opplysning?.toSet()
?: throw IllegalStateException("Versjon 2: Opplysninger evaluering mangler opplysning")

if (pdlEvalueringResultat == opplysningerEvalueringResultat && erForhaandsgodkjent) {
logger.info("Versjon 2: OK, matchende opplysning fra startet hendelse og forhåndsgodkjent av ansatt: $pdlEvalueringResultat")
} else {
val aarsak = pdlEvalueringResultat.filterNot { it in opplysningerEvalueringResultat }.toAarsak()
logger.info("Versjon 2: PROBLEM, negative opplysninger fra pdl: $pdlEvalueringResultat, generert aarsak: $aarsak")
}
}

private fun handleRightEvaluation(
opplysningerEvaluering: Either<Problem, OK>,
erForhaandsgodkjent: Boolean,
logger: Logger
) {
if (opplysningerEvaluering.isLeft() && erForhaandsgodkjent) {
logger.info("Versjon 2: OK, ingen negative opplysninger fra pdl, negative opplysninger fra startet hendelse og forhåndsgodkjent av ansatt funnet")
} else {
logger.info("Versjon 2: OK, ingen negative opplysninger fra pdl")
}
}


private fun hentPersonStatusOgHendelseState(
result: HentPersonBolkResult,
result: ForenkletStatusBolkResult,
chunk: List<KeyValue<UUID, HendelseState>>
): Pair<List<Folkeregisterpersonstatus>, HendelseState>? {
val person = result.person ?: return null
Expand All @@ -123,20 +221,34 @@ private fun oppdaterHendelseStateOpplysninger(
.toSet()
)
hendelseStateStore.put(hendelseState.periodeId, oppdatertHendelseState)
logger.info("Versjon 1: OK, ingen negative opplysninger fra pdl, negative opplysninger fra startet hendelse og forhåndsgodkjent av ansatt funnet")
} else {
logger.info("Versjon 1: OK, ingen negative opplysninger fra pdl")
}
}

private fun hentForenkletStatus(
identitetsnummere: List<String>,
pdlHentForenkletStatus: PdlHentForenkletStatus,
): List<HentPersonBolkResult>? {
): List<ForenkletStatusBolkResult>? {
return pdlHentForenkletStatus.hentForenkletStatus(
identitetsnummere,
UUID.randomUUID().toString(),
"paw-arbeidssoekerregisteret-utgang-pdl",
)
}

private fun hentPersonBolk(
identitetsnummere: List<String>,
pdlHentPersonBolk: PdlHentPerson,
): List<HentPersonBolkResult>? {
return pdlHentPersonBolk.hentPerson(
identitetsnummere,
UUID.randomUUID().toString(),
"paw-arbeidssoekerregisteret-utgang-pdl",
)
}

private fun List<KeyValue<UUID, HendelseState>>.filterValidHendelseStates(): List<KeyValue<UUID, HendelseState>> =
this.filter { entry ->
val hendelseState = entry.value
Expand All @@ -156,31 +268,7 @@ fun List<Folkeregisterpersonstatus>.filterAvsluttPeriodeGrunnlag(
.toSet()
}

private fun Set<Opplysning>.toAarsak(): String =
this.joinToString(separator = ", ") {
when (it) {
Opplysning.DOED -> "Personen er doed"
Opplysning.SAVNET -> "Personen er savnet"
Opplysning.IKKE_BOSATT -> "Personen er ikke bosatt etter folkeregisterloven"
Opplysning.OPPHOERT_IDENTITET -> "Personen har opphoert identitet"
else -> it.name
}
}

private val negativeOpplysninger = setOf(
Opplysning.IKKE_BOSATT,
Opplysning.SAVNET,
Opplysning.DOED,
Opplysning.OPPHOERT_IDENTITET,
)

private val statusToOpplysningMap = mapOf(
"ikkeBosatt" to Opplysning.IKKE_BOSATT,
"forsvunnet" to Opplysning.SAVNET,
"doedIFolkeregisteret" to Opplysning.DOED,
"opphoert" to Opplysning.OPPHOERT_IDENTITET,
"dNummer" to Opplysning.DNUMMER
)

private val List<Folkeregisterpersonstatus>.erBosattEtterFolkeregisterloven
get(): Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka

import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentForenkletStatus
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentPerson
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka.processors.oppdaterHendelseState
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseSerde
Expand All @@ -18,6 +19,7 @@ fun StreamsBuilder.appTopology(
hendelseLoggTopic: String,
hendelseStateStoreName: String,
pdlHentForenkletStatus: PdlHentForenkletStatus,
pdlHentPerson: PdlHentPerson
): Topology {
stream(hendelseLoggTopic, Consumed.with(Serdes.Long(), HendelseSerde()))
.filter { _, value -> value is Startet }
Expand All @@ -28,7 +30,8 @@ fun StreamsBuilder.appTopology(
.oppdaterHendelseState(
hendelseStateStoreName = hendelseStateStoreName,
prometheusMeterRegistry = prometheusRegistry,
pdlHentForenkletStatus = pdlHentForenkletStatus
pdlHentForenkletStatus = pdlHentForenkletStatus,
pdlHentPerson = pdlHentPerson
)
.to(hendelseLoggTopic, Produced.with(Serdes.Long(), HendelseSerde()))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka.processors

import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentForenkletStatus
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentPerson
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka.scheduleAvsluttPerioder
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka.serdes.HendelseState
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
Expand All @@ -18,13 +19,15 @@ import java.util.UUID
fun KStream<Long, Periode>.oppdaterHendelseState(
hendelseStateStoreName: String,
prometheusMeterRegistry: PrometheusMeterRegistry,
pdlHentForenkletStatus: PdlHentForenkletStatus
pdlHentForenkletStatus: PdlHentForenkletStatus,
pdlHentPerson: PdlHentPerson
): KStream<Long, Hendelse> {
val processor = {
PeriodeProcessor(
hendelseStateStoreName,
prometheusMeterRegistry,
pdlHentForenkletStatus
pdlHentForenkletStatus,
pdlHentPerson
)
}
return process(processor, Named.`as`("periodeProsessor"), hendelseStateStoreName)
Expand All @@ -33,7 +36,8 @@ fun KStream<Long, Periode>.oppdaterHendelseState(
class PeriodeProcessor(
private val hendelseStateStoreName: String,
private val prometheusMeterRegistry: PrometheusMeterRegistry,
private val pdlHentForenkletStatus: PdlHentForenkletStatus
private val pdlHentForenkletStatus: PdlHentForenkletStatus,
private val pdlHentPersonBolk: PdlHentPerson,
) : Processor<Long, Periode, Long, Hendelse> {
private var hendelseStateStore: KeyValueStore<UUID, HendelseState>? = null
private var context: ProcessorContext<Long, Hendelse>? = null
Expand All @@ -47,6 +51,7 @@ class PeriodeProcessor(
requireNotNull(hendelseStateStore),
Duration.ofDays(1),
pdlHentForenkletStatus,
pdlHentPersonBolk,
prometheusMeterRegistry
)
}
Expand Down
Loading

0 comments on commit 885679a

Please sign in to comment.