Skip to content

Commit

Permalink
Gjør endring for å switche om fra v1 til v2 for utmelding av arbeidss…
Browse files Browse the repository at this point in the history
…økere
  • Loading branch information
robertkittilsen committed Sep 13, 2024
1 parent ac8534f commit 9d3f66c
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 508 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics
import io.micrometer.prometheusmetrics.PrometheusConfig
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentForenkletStatus
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentPerson
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.health.Health
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.health.initKtor
Expand Down Expand Up @@ -51,7 +50,6 @@ fun main() {
applicationConfiguration.periodeTopic,
applicationConfiguration.hendelseloggTopic,
applicationConfiguration.hendelseStateStoreName,
pdlHentForenkletStatus = PdlHentForenkletStatus.create(),
pdlHentPerson = PdlHentPerson.create(),
)
val kafkaStreams = KafkaStreams(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka

import arrow.core.Either
import arrow.core.NonEmptyList
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.ApplicationInfo
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentForenkletStatus
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentPerson
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka.serdes.HendelseState
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.metrics.tellPdlAvsluttetHendelser
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.metrics.tellStatusFraPdlHentPersonBolk
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.utils.*
import no.nav.paw.arbeidssokerregisteret.application.*
import no.nav.paw.arbeidssokerregisteret.application.opplysninger.DomeneOpplysning
Expand All @@ -18,7 +14,6 @@ import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Bruker
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.BrukerType
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Metadata
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Opplysning
import no.nav.paw.pdl.graphql.generated.hentforenkletstatusbolk.Folkeregisterpersonstatus
import no.nav.paw.pdl.graphql.generated.hentpersonbolk.HentPersonBolkResult
import no.nav.paw.pdl.graphql.generated.hentpersonbolk.Person
import org.apache.kafka.streams.KeyValue
Expand All @@ -32,22 +27,19 @@ import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.Instant
import java.util.*
import no.nav.paw.pdl.graphql.generated.hentforenkletstatusbolk.HentPersonBolkResult as ForenkletStatusBolkResult

data class EvalueringResultat(
val grunnlagV1: List<Folkeregisterpersonstatus>? = null,
val grunnlagV2: Set<RegelId>? = null,
val hendelseState: HendelseState,
val grunnlag: Set<RegelId>,
val detaljer: Set<no.nav.paw.arbeidssokerregisteret.application.opplysninger.Opplysning>,
val avsluttPeriode: Boolean,
val slettForhaandsGodkjenning: Boolean,
val detaljer: Set<no.nav.paw.arbeidssokerregisteret.application.opplysninger.Opplysning> = emptySet()
)

fun scheduleAvsluttPerioder(
ctx: ProcessorContext<Long, Hendelse>,
hendelseStateStore: KeyValueStore<UUID, HendelseState>,
interval: Duration,
pdlHentForenkletStatus: PdlHentForenkletStatus,
pdlHentPersonBolk: PdlHentPerson,
prometheusMeterRegistry: PrometheusMeterRegistry,
regler: Regler
Expand All @@ -63,9 +55,8 @@ fun scheduleAvsluttPerioder(
.chunked(1000) { chunk ->
val identitetsnummere = chunk.map { it.value.identitetsnummer }

// Versjon 2
val pdlHentPersonResults = hentPersonBolk(identitetsnummere, pdlHentPersonBolk)
val resultaterV2: List<EvalueringResultat> = if (pdlHentPersonResults == null) {
val resultater: List<EvalueringResultat> = if (pdlHentPersonResults == null) {
logger.warn("PDL hentPersonBolk returnerte null")
emptyList()
} else {
Expand All @@ -76,35 +67,18 @@ fun scheduleAvsluttPerioder(
)
}

// Versjon 1
val pdlHentForenkletStatusResults = hentForenkletStatus(identitetsnummere, pdlHentForenkletStatus)
val resultaterV1: List<EvalueringResultat> = if (pdlHentForenkletStatusResults == null) {
logger.warn("PDL hentForenkletStatus returnerte null")
emptyList()
} else {
pdlHentForenkletStatusResults.processResults(
chunk,
prometheusMeterRegistry,
logger
)
}

resultaterV1.sortedByDescending { it.hendelseState.startetTidspunkt }
.compareResults(resultaterV2.sortedByDescending { it.hendelseState.startetTidspunkt }, logger)
resultaterV1.onEach { resultat ->
val folkeregisterpersonstatus = resultat.grunnlagV1
resultater.forEach { resultat ->
val hendelseState = resultat.hendelseState
if (resultat.avsluttPeriode && folkeregisterpersonstatus != null) {
if (resultat.avsluttPeriode) {
sendAvsluttetHendelse(
folkeregisterpersonstatus,
resultat.grunnlag,
resultat.detaljer,
hendelseState,
hendelseStateStore,
ctx,
prometheusMeterRegistry
)
}
}.forEach { resultat ->
val hendelseState = resultat.hendelseState
if (resultat.slettForhaandsGodkjenning) {
slettForhaandsGodkjenning(hendelseState, hendelseStateStore)
}
Expand All @@ -113,33 +87,6 @@ fun scheduleAvsluttPerioder(
}
}

fun List<ForenkletStatusBolkResult>.processResults(
chunk: List<KeyValue<UUID, HendelseState>>,
prometheusMeterRegistry: PrometheusMeterRegistry,
logger: Logger
): List<EvalueringResultat> =
this.filter { result ->
prometheusMeterRegistry.tellStatusFraPdlHentPersonBolk(result.code)
if (result.code in pdlErrorResponses) {
logger.error("Feil ved henting av forenklet status fra PDL: ${result.code}")
false
} else true
}.mapNotNull { result ->
hentFolkeregisterpersonstatusOgHendelseState(result, chunk)
}.map { (folkeregisterpersonstatus, hendelseState) ->

val avsluttPeriode = !folkeregisterpersonstatus.erBosattEtterFolkeregisterloven &&
folkeregisterpersonstatus.filterAvsluttPeriodeGrunnlag(hendelseState.opplysninger).isNotEmpty()
val skalSletteForhaandsGodkjenning = skalSletteForhaandsGodkjenning(hendelseState, avsluttPeriode)

EvalueringResultat(
grunnlagV1 = folkeregisterpersonstatus,
hendelseState = hendelseState,
avsluttPeriode = avsluttPeriode,
slettForhaandsGodkjenning = skalSletteForhaandsGodkjenning
)
}

fun isPdlResultOK(code: String, logger: Logger): Boolean =
if (code in pdlErrorResponses) {
logger.error("Feil ved henting av Person fra PDL: $code")
Expand All @@ -166,28 +113,6 @@ fun Set<Opplysning>.toDomeneOpplysninger() = this
.mapNotNull { hendelseOpplysningTilDomeneOpplysninger(it) }
.toSet()

fun Set<Opplysning>.erForhaandsGodkjent() = Opplysning.FORHAANDSGODKJENT_AV_ANSATT in this

fun skalAvsluttePeriode(
pdlEvaluering: Either<NonEmptyList<Problem>, GrunnlagForGodkjenning>,
opplysningerEvaluering: Either<NonEmptyList<Problem>, GrunnlagForGodkjenning>,
erForhaandsgodkjent: Boolean
) = pdlEvaluering.fold(
{ pdlEvalueringLeft ->
when (opplysningerEvaluering) {
is Either.Left -> {
!(erForhaandsgodkjent && opplysningerEvaluering.value.map { it.regel.id }
.containsAll(pdlEvalueringLeft.map { it.regel.id }))
}

is Either.Right -> {
true
}
}
},
{ false }
)

fun List<HentPersonBolkResult>.processPdlResultsV2(
regler: Regler,
chunk: List<KeyValue<UUID, HendelseState>>,
Expand All @@ -205,46 +130,27 @@ fun List<HentPersonBolkResult>.processPdlResultsV2(
gjeldeneOpplysninger = gjeldeneOpplysninger
)
EvalueringResultat(
grunnlagV2 = resultat.grunnlag,
hendelseState = hendelseState,
grunnlag = resultat.grunnlag,
detaljer = gjeldeneOpplysninger.toSet(),
avsluttPeriode = resultat.periodeSkalAvsluttes,
slettForhaandsGodkjenning = resultat.forhaandsgodkjenningSkalSlettes,
detaljer = gjeldeneOpplysninger.toSet()
)
}

fun Either<NonEmptyList<Problem>, GrunnlagForGodkjenning>.toAarsak(): String =
this.fold(
{ problems ->
problems.joinToString(", ") { problem -> problem.regel.id.beskrivelse }
},
{ "Ingen årsak" }
)

fun skalSletteForhaandsGodkjenning(
hendelseState: HendelseState,
avsluttPeriode: Boolean
) = Opplysning.FORHAANDSGODKJENT_AV_ANSATT in hendelseState.opplysninger
&& hendelseState.opplysninger.any { it in negativeOpplysninger } && !avsluttPeriode

fun sendAvsluttetHendelse(
folkeregisterpersonstatus: List<Folkeregisterpersonstatus>,
grunnlag: Set<RegelId>,
detaljer: Set<no.nav.paw.arbeidssokerregisteret.application.opplysninger.Opplysning>,
hendelseState: HendelseState,
hendelseStateStore: KeyValueStore<UUID, HendelseState>,
ctx: ProcessorContext<Long, Hendelse>,
prometheusMeterRegistry: PrometheusMeterRegistry,
) {
val aarsak = folkeregisterpersonstatus
.filterAvsluttPeriodeGrunnlag(hendelseState.opplysninger)
.ifEmpty {
return
}
.toAarsak()

val avsluttetHendelse = genererAvsluttetHendelseRecord(hendelseState, aarsak)
val avsluttetHendelse = genererAvsluttetHendelseRecord(hendelseState, grunnlag, detaljer)
ctx.forward(avsluttetHendelse)
.also {
prometheusMeterRegistry.tellPdlAvsluttetHendelser(aarsak)
prometheusMeterRegistry.tellPdlAvsluttetHendelser(grunnlag.joinToString { it.beskrivelse })
hendelseStateStore.delete(hendelseState.periodeId)
}
}
Expand All @@ -258,72 +164,6 @@ fun slettForhaandsGodkjenning(hendelseState: HendelseState, hendelseStateStore:
hendelseStateStore.put(hendelseState.periodeId, oppdatertHendelseState)
}

fun List<EvalueringResultat>.compareResults(
other: List<EvalueringResultat>,
logger: Logger
) {
val evalueringResultaterV1 = this.associateBy { it.hendelseState.periodeId }
val evalueringResultaterV2 = other.associateBy { it.hendelseState.periodeId }

val matchingPeriodeIder = evalueringResultaterV1.keys.intersect(evalueringResultaterV2.keys)

matchingPeriodeIder.forEach { periodeId ->
val resultatV1 = evalueringResultaterV1[periodeId]
val resultatV2 = evalueringResultaterV2[periodeId]

if (resultatV1 == null || resultatV2 == null) {
logger.error("Missing result for periodeId: $periodeId in either v1 or v2")
return@forEach
}

if (resultatV1.avsluttPeriode != resultatV2.avsluttPeriode) {
logger.warn(
"AvsluttPeriode mismatch for periodeId: $periodeId, med startet tidspunkt: ${resultatV1.hendelseState.startetTidspunkt}" +
"v1: ${resultatV1.avsluttPeriode}, aarsak: ${
resultatV1.grunnlagV1?.filterAvsluttPeriodeGrunnlag(
resultatV1.hendelseState.opplysninger
)?.toAarsak()
}, " +
"v2: ${resultatV2.avsluttPeriode}, aarsak: ${resultatV2.grunnlagV2}, detaljer: ${resultatV2.detaljer}"
)
}

if (resultatV1.slettForhaandsGodkjenning != resultatV2.slettForhaandsGodkjenning &&
resultatV2.grunnlagV2 != setOf(EuEoesStatsborgerOver18Aar)
) {
logger.warn(
"SlettForhaandsGodkjenning mismatch for periodeId: $periodeId, " +
"v1: ${resultatV1.slettForhaandsGodkjenning}, " +
"v2: ${resultatV2.slettForhaandsGodkjenning}, " +
"v2 grunnlag: ${resultatV2.grunnlagV2}"
)
}
}
logger.info("Finished comparing results")
}

fun hentFolkeregisterpersonstatusOgHendelseState(
result: ForenkletStatusBolkResult,
chunk: List<KeyValue<UUID, HendelseState>>
): Pair<List<Folkeregisterpersonstatus>, HendelseState>? {
val person = result.person ?: return null
val hendelseState = chunk.find { it.value.identitetsnummer == result.ident }
?.value ?: return null

return Pair(person.folkeregisterpersonstatus, hendelseState)
}

fun hentForenkletStatus(
identitetsnummere: List<String>,
pdlHentForenkletStatus: PdlHentForenkletStatus,
): List<ForenkletStatusBolkResult>? {
return pdlHentForenkletStatus.hentForenkletStatus(
identitetsnummere,
UUID.randomUUID().toString(),
"paw-arbeidssoekerregisteret-utgang-pdl",
)
}

fun hentPersonBolk(
identitetsnummere: List<String>,
pdlHentPersonBolk: PdlHentPerson,
Expand All @@ -341,32 +181,15 @@ fun List<KeyValue<UUID, HendelseState>>.filterValidHendelseStates(): List<KeyVal
hendelseState.harTilhoerendePeriode && hendelseState.brukerId != null
}

fun List<Folkeregisterpersonstatus>.filterAvsluttPeriodeGrunnlag(
opplysningerFraStartetHendelse: Set<Opplysning>
): Set<Opplysning> {
val isForhaandsGodkjent = Opplysning.FORHAANDSGODKJENT_AV_ANSATT in opplysningerFraStartetHendelse

return this.asSequence()
.mapNotNull { status -> statusToOpplysningMap[status.forenkletStatus] }
.filter { it in negativeOpplysninger }
// Opplysning FORHAANDSGODKJENT_AV_ANSATT + DOED/SAVNET/IKKE_BOSATT/OPPHOERT_IDENTITET skal overstyre tilsvarende forenkletStatus fra PDL
.filterNot { it in opplysningerFraStartetHendelse && isForhaandsGodkjent }
.toSet()
}


val List<Folkeregisterpersonstatus>.erBosattEtterFolkeregisterloven
get(): Boolean =
this.any { it.forenkletStatus == "bosattEtterFolkeregisterloven" }

val pdlErrorResponses = setOf(
"bad_request",
"not_found"
)

fun genererAvsluttetHendelseRecord(
hendelseState: HendelseState,
aarsak: String,
grunnlag: Set<RegelId>,
detaljer: Set<no.nav.paw.arbeidssokerregisteret.application.opplysninger.Opplysning>
): Record<Long, Avsluttet> = Record(
hendelseState.recordKey,
Avsluttet(
Expand All @@ -375,13 +198,14 @@ fun genererAvsluttetHendelseRecord(
identitetsnummer = hendelseState.identitetsnummer,
metadata = Metadata(
tidspunkt = Instant.now(),
aarsak = aarsak,
aarsak = grunnlag.joinToString { it.beskrivelse },
kilde = "paw-arbeidssoekerregisteret-utgang-pdl",
utfoertAv = Bruker(
type = BrukerType.SYSTEM,
id = ApplicationInfo.id
)
)
),
opplysninger = detaljer.filterIsInstance<DomeneOpplysning>().map(::domeneOpplysningTilHendelseOpplysning).toSet()
),
Instant.now().toEpochMilli()
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka

import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentForenkletStatus
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.clients.pdl.PdlHentPerson
import no.nav.paw.arbeidssoekerregisteret.utgang.pdl.kafka.processors.oppdaterHendelseState
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
Expand All @@ -18,7 +17,6 @@ fun StreamsBuilder.appTopology(
periodeTopic: String,
hendelseLoggTopic: String,
hendelseStateStoreName: String,
pdlHentForenkletStatus: PdlHentForenkletStatus,
pdlHentPerson: PdlHentPerson
): Topology {
stream(hendelseLoggTopic, Consumed.with(Serdes.Long(), HendelseSerde()))
Expand All @@ -30,7 +28,6 @@ fun StreamsBuilder.appTopology(
.oppdaterHendelseState(
hendelseStateStoreName = hendelseStateStoreName,
prometheusMeterRegistry = prometheusRegistry,
pdlHentForenkletStatus = pdlHentForenkletStatus,
pdlHentPerson = pdlHentPerson
)
.to(hendelseLoggTopic, Produced.with(Serdes.Long(), HendelseSerde()))
Expand Down
Loading

0 comments on commit 9d3f66c

Please sign in to comment.