diff --git a/apps/kafka-key-maintenance/build.gradle.kts b/apps/kafka-key-maintenance/build.gradle.kts index 475e6cfe..e60367b3 100644 --- a/apps/kafka-key-maintenance/build.gradle.kts +++ b/apps/kafka-key-maintenance/build.gradle.kts @@ -14,6 +14,7 @@ val image: String? by project dependencies { implementation(project(":domain:interne-hendelser")) implementation(project(":domain:pdl-aktoer-schema")) + implementation(project(":domain:main-avro-schema")) implementation(project(":lib:kafka")) implementation(project(":lib:hoplite-config")) implementation(project(":lib:kafka-key-generator-client")) diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/PdlConsumer.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/PdlConsumer.kt new file mode 100644 index 00000000..c538de36 --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/PdlConsumer.kt @@ -0,0 +1,50 @@ +package no.nav.paw.kafkakeymaintenance + +import kotlinx.coroutines.runBlocking +import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient +import no.nav.paw.kafkakeygenerator.client.LokaleAlias +import no.nav.paw.kafkakeymaintenance.functions.genererIdOppdatering +import no.nav.paw.kafkakeymaintenance.functions.harAvvik +import no.nav.paw.kafkakeymaintenance.functions.hentData +import no.nav.paw.kafkakeymaintenance.functions.hentPerioder +import no.nav.paw.kafkakeymaintenance.kafka.Topic +import no.nav.paw.kafkakeymaintenance.kafka.TransactionContext +import no.nav.paw.kafkakeymaintenance.kafka.updateHwm +import no.nav.paw.kafkakeymaintenance.vo.IdOppdatering +import no.nav.paw.kafkakeymaintenance.vo.avviksMelding +import no.nav.person.pdl.aktor.v2.Aktor +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.jetbrains.exposed.sql.Transaction +import org.jetbrains.exposed.sql.transactions.transaction +import java.time.Instant + +const val ANTALL_PARTISJONER = 6 + +fun KafkaKeysClient.hentAlias(identiteter: List): List = runBlocking { + getAlias(ANTALL_PARTISJONER, identiteter).alias +} + +fun process( + txContextFactory: Transaction.() -> TransactionContext, + hentAlias: (List) -> List, + record: ConsumerRecord +): IdOppdatering? = + transaction { + val txContext = txContextFactory() + val valid = txContext.updateHwm( + topic = Topic(record.topic()), + partition = record.partition(), + offset = record.offset(), + time = Instant.ofEpochMilli(record.timestamp()), + lastUpdated = Instant.now() + ) + if (valid) { + hentData(hentAlias, record) + .takeIf(::harAvvik) + ?.let(::avviksMelding) + ?.let(txContext::hentPerioder) + ?.let(::genererIdOppdatering) + } else { + null + } + } diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/GenererAutomatiskIdOppdatering.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/GenererAutomatiskIdOppdatering.kt new file mode 100644 index 00000000..b1748fd4 --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/GenererAutomatiskIdOppdatering.kt @@ -0,0 +1,48 @@ +package no.nav.paw.kafkakeymaintenance.functions + +import no.nav.paw.kafkakeymaintenance.vo.AutomatiskIdOppdatering +import no.nav.paw.kafkakeymaintenance.vo.IdOppdatering +import no.nav.paw.kafkakeymaintenance.perioder.PeriodeRad +import no.nav.paw.kafkakeymaintenance.vo.AvviksMelding +import no.nav.paw.kafkakeymaintenance.vo.IdMap + +fun genererAutomatiskIdOppdatering(avvik: AvviksMelding, periodeRad: PeriodeRad): IdOppdatering { + requireNotNull(avvik.lokaleAlias.find { it.identitetsnummer == periodeRad.identitetsnummer }) { + "Intern logiskfeil, lokal data for identietetsnummer mangler" + }.let { alias -> + return AutomatiskIdOppdatering( + oppdatertData = IdMap( + gjeldeneIdentitetsnummer = avvik.gjeldeneIdentitetsnummer, + arbeidsoekerId = alias.arbeidsoekerId, + recordKey = alias.recordKey, + partisjon = alias.partition, + identiteter = avvik.lokaleAliasSomSkalPekePaaPdlPerson() + ), + frieIdentiteter = avvik.lokaleAliasSomIkkeSkalPekePaaPdlPerson() + ) + } + +} + +fun genererAutomatiskIdOppdatering(avvik: AvviksMelding): IdOppdatering { + val frieIdentiteter = avvik.lokaleAliasSomIkkeSkalPekePaaPdlPerson() + val identerSomSkalPekePaaPdlPerson = avvik.lokaleAliasSomSkalPekePaaPdlPerson() + return if (identerSomSkalPekePaaPdlPerson.isEmpty()) { + AutomatiskIdOppdatering( + oppdatertData = null, + frieIdentiteter = frieIdentiteter + ) + } else { + val data = identerSomSkalPekePaaPdlPerson.maxBy { it.arbeidsoekerId } + AutomatiskIdOppdatering( + oppdatertData = IdMap( + gjeldeneIdentitetsnummer = avvik.gjeldeneIdentitetsnummer, + arbeidsoekerId = data.arbeidsoekerId, + recordKey = data.recordKey, + partisjon = data.partition, + identiteter = identerSomSkalPekePaaPdlPerson + ), + frieIdentiteter = frieIdentiteter + ) + } +} \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/GenererIdOppdatering.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/GenererIdOppdatering.kt new file mode 100644 index 00000000..04c608e5 --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/GenererIdOppdatering.kt @@ -0,0 +1,35 @@ +package no.nav.paw.kafkakeymaintenance.functions + +import arrow.core.NonEmptyList +import arrow.core.nonEmptyListOf +import arrow.core.tail +import no.nav.paw.kafkakeygenerator.client.Alias +import no.nav.paw.kafkakeymaintenance.vo.IdOppdatering +import no.nav.paw.kafkakeymaintenance.vo.ManuellIdOppdatering +import no.nav.paw.kafkakeymaintenance.perioder.PeriodeRad +import no.nav.paw.kafkakeymaintenance.vo.AvviksMelding +import no.nav.paw.kafkakeymaintenance.vo.AvvvikOgPerioder + +fun genererIdOppdatering(avvikOgPerioder: AvvvikOgPerioder): IdOppdatering { + val (avvik, perioder) = avvikOgPerioder + val periode = perioder.firstOrNull() + return if (periode == null) { + genererAutomatiskIdOppdatering(avvik) + } else { + genererIdOppdatering(avvik, nonEmptyListOf(periode, *perioder.tail().toTypedArray())) + } +} + +fun genererIdOppdatering(avvik: AvviksMelding, perioder: NonEmptyList): IdOppdatering { + val aktivPerioder = perioder.filter(PeriodeRad::erAktiv) + return when (aktivPerioder.size) { + 0 -> genererAutomatiskIdOppdatering(avvik, perioder.maxBy(PeriodeRad::fra)) + 1 -> genererAutomatiskIdOppdatering(avvik, aktivPerioder.first()) + else -> ManuellIdOppdatering( + gjeldeneIdentitetsnummer = avvik.gjeldeneIdentitetsnummer, + pdlIdentitetsnummer = avvik.lokaleAliasSomSkalPekePaaPdlPerson().map(Alias::identitetsnummer), + lokaleAlias = avvik.lokaleAlias, + perioder = perioder + ) + } +} \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HarAvvik.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HarAvvik.kt new file mode 100644 index 00000000..55e1e0fb --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HarAvvik.kt @@ -0,0 +1,9 @@ +package no.nav.paw.kafkakeymaintenance.functions + +import no.nav.paw.kafkakeymaintenance.vo.Data + +fun harAvvik(data: Data): Boolean = + data.alias + .flatMap { it.kobliner } + .map { it.arbeidsoekerId } + .distinct().size > 1 \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HentData.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HentData.kt new file mode 100644 index 00000000..c31d3565 --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HentData.kt @@ -0,0 +1,17 @@ +package no.nav.paw.kafkakeymaintenance.functions + +import no.nav.paw.kafkakeygenerator.client.LokaleAlias +import no.nav.paw.kafkakeymaintenance.vo.Data +import no.nav.person.pdl.aktor.v2.Aktor +import no.nav.person.pdl.aktor.v2.Type +import org.apache.kafka.clients.consumer.ConsumerRecord + +fun hentData( + hentAlias: (List) -> List, + record: ConsumerRecord, +): Data = + record.value().identifikatorer + .filter { it.type == Type.FOLKEREGISTERIDENT } + .map { it.idnummer } + .let(hentAlias) + .let { Data(record, it) } \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HentPerioder.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HentPerioder.kt new file mode 100644 index 00000000..b86ae2e7 --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HentPerioder.kt @@ -0,0 +1,15 @@ +package no.nav.paw.kafkakeymaintenance.functions + +import no.nav.paw.kafkakeymaintenance.kafka.TransactionContext +import no.nav.paw.kafkakeymaintenance.perioder.periodeRad +import no.nav.paw.kafkakeymaintenance.vo.AvviksMelding +import no.nav.paw.kafkakeymaintenance.vo.AvvvikOgPerioder + +fun TransactionContext.hentPerioder(avviksMelding: AvviksMelding): AvvvikOgPerioder { + val perioder = avviksMelding + .pdlIdentitetsnummer + .plus(avviksMelding.lokaleAlias.map { it.identitetsnummer }) + .distinct() + .mapNotNull(::periodeRad) + return AvvvikOgPerioder(avviksMelding, perioder) +} \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/perioder/PeriodeRad.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/perioder/PeriodeRad.kt new file mode 100644 index 00000000..2b73d40c --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/perioder/PeriodeRad.kt @@ -0,0 +1,17 @@ +package no.nav.paw.kafkakeymaintenance.perioder + +import no.nav.paw.arbeidssokerregisteret.api.v1.Periode +import java.time.Instant +import java.util.* + +data class PeriodeRad( + val periodeId: UUID, + val identitetsnummer: String, + val fra: Instant, + val til: Instant? +) { + val erAktiv: Boolean = til == null +} + +fun periodeRad(periodeId: UUID, identitetsnummer: String, fra: Instant, til: Instant) = PeriodeRad(periodeId, identitetsnummer, fra, til) +fun periodeRad(periode: Periode) = PeriodeRad(periode.id, periode.identitetsnummer, periode.startet.tidspunkt, periode.avsluttet?.tidspunkt) \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/perioder/PerioderFunctions.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/perioder/PerioderFunctions.kt new file mode 100644 index 00000000..7b94dd4f --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/perioder/PerioderFunctions.kt @@ -0,0 +1,45 @@ +package no.nav.paw.kafkakeymaintenance.perioder + +import no.nav.paw.kafkakeymaintenance.kafka.TransactionContext +import org.jetbrains.exposed.sql.and +import org.jetbrains.exposed.sql.insert +import org.jetbrains.exposed.sql.selectAll +import org.jetbrains.exposed.sql.update + +fun TransactionContext.insertOrUpdate(periode: PeriodeRad) { + periodeRad(periode.identitetsnummer) + .let { lagretDate -> + if (lagretDate == null) { + PerioderTable.insert { + it[version] = appContext.consumerVersion + it[periodeId] = periode.periodeId + it[identitetsnummer] = periode.identitetsnummer + it[fra] = periode.fra + it[til] = periode.til + } + } else { + PerioderTable.update( + where = { (PerioderTable.version eq appContext.consumerVersion) and (PerioderTable.periodeId eq periode.periodeId) } + ) { + it[identitetsnummer] = periode.identitetsnummer + it[fra] = periode.fra + it[til] = periode.til + } + } + } +} + +fun TransactionContext.periodeRad(identitetsnummer: String): PeriodeRad? = + PerioderTable + .selectAll() + .where { + (PerioderTable.version eq appContext.consumerVersion) and + (PerioderTable.identitetsnummer eq identitetsnummer) + }.firstOrNull()?.let { + PeriodeRad( + periodeId = it[PerioderTable.periodeId], + identitetsnummer = it[PerioderTable.identitetsnummer], + fra = it[PerioderTable.fra], + til = it[PerioderTable.til] + ) + } diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/perioder/PerioderTable.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/perioder/PerioderTable.kt new file mode 100644 index 00000000..67f2a07c --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/perioder/PerioderTable.kt @@ -0,0 +1,14 @@ +package no.nav.paw.kafkakeymaintenance.perioder + +import org.jetbrains.exposed.sql.Table +import org.jetbrains.exposed.sql.javatime.timestamp + +object PerioderTable: Table("perioder") { + val version = integer("version") + val periodeId = uuid("periode_id") + val identitetsnummer = varchar("identitetsnummer", 11) + val fra = timestamp("fra") + val til = timestamp("til").nullable() + + override val primaryKey = PrimaryKey(version, periodeId) +} \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/vo/AvviksMelding.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/vo/AvviksMelding.kt new file mode 100644 index 00000000..6dc4f753 --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/vo/AvviksMelding.kt @@ -0,0 +1,28 @@ +package no.nav.paw.kafkakeymaintenance.vo + +import no.nav.paw.kafkakeygenerator.client.Alias +import no.nav.person.pdl.aktor.v2.Type + +fun avviksMelding(data: Data): AvviksMelding { + val fregIder = data.record.value() + .identifikatorer + .filter { it.type == Type.FOLKEREGISTERIDENT } + return AvviksMelding( + gjeldeneIdentitetsnummer = fregIder + .filter { it.gjeldende } + .map { it.idnummer } + .firstOrNull(), + pdlIdentitetsnummer = fregIder.map { it.idnummer }, + lokaleAlias = data.alias.flatMap { it.kobliner } + ) +} + +data class AvviksMelding( + val gjeldeneIdentitetsnummer: String?, + val pdlIdentitetsnummer: List, + val lokaleAlias: List +) { + fun lokaleAliasSomSkalPekePaaPdlPerson() = lokaleAlias.filter { it.identitetsnummer in pdlIdentitetsnummer } + + fun lokaleAliasSomIkkeSkalPekePaaPdlPerson() = lokaleAlias.filter { it.identitetsnummer !in pdlIdentitetsnummer } +} \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/vo/AvvvikOgPerioder.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/vo/AvvvikOgPerioder.kt new file mode 100644 index 00000000..23a78479 --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/vo/AvvvikOgPerioder.kt @@ -0,0 +1,8 @@ +package no.nav.paw.kafkakeymaintenance.vo + +import no.nav.paw.kafkakeymaintenance.perioder.PeriodeRad + +data class AvvvikOgPerioder( + val avviksMelding: AvviksMelding, + val perioder: List +) \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/vo/Data.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/vo/Data.kt new file mode 100644 index 00000000..af8fb74c --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/vo/Data.kt @@ -0,0 +1,10 @@ +package no.nav.paw.kafkakeymaintenance.vo + +import no.nav.paw.kafkakeygenerator.client.LokaleAlias +import no.nav.person.pdl.aktor.v2.Aktor +import org.apache.kafka.clients.consumer.ConsumerRecord + +data class Data( + val record: ConsumerRecord, + val alias: List +) \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/vo/IdMap.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/vo/IdMap.kt new file mode 100644 index 00000000..bd441a7a --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/vo/IdMap.kt @@ -0,0 +1,11 @@ +package no.nav.paw.kafkakeymaintenance.vo + +import no.nav.paw.kafkakeygenerator.client.Alias + +data class IdMap( + val gjeldeneIdentitetsnummer: String?, + val arbeidsoekerId: Long, + val recordKey: Long, + val partisjon: Int, + val identiteter: List +) \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/vo/IdOppdatering.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/vo/IdOppdatering.kt new file mode 100644 index 00000000..51cf385f --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/vo/IdOppdatering.kt @@ -0,0 +1,17 @@ +package no.nav.paw.kafkakeymaintenance.vo + +import no.nav.paw.kafkakeygenerator.client.Alias +import no.nav.paw.kafkakeymaintenance.perioder.PeriodeRad + +interface IdOppdatering {} +data class ManuellIdOppdatering( + val gjeldeneIdentitetsnummer: String?, + val pdlIdentitetsnummer: List, + val lokaleAlias: List, + val perioder: List +) : IdOppdatering + +data class AutomatiskIdOppdatering( + val oppdatertData: IdMap?, + val frieIdentiteter: List +) : IdOppdatering \ No newline at end of file