Skip to content

Commit

Permalink
La til basis funksjoner for id håndtering
Browse files Browse the repository at this point in the history
  • Loading branch information
nilsmsa committed Nov 4, 2024
1 parent 447c742 commit 2030527
Show file tree
Hide file tree
Showing 15 changed files with 325 additions and 0 deletions.
1 change: 1 addition & 0 deletions apps/kafka-key-maintenance/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ val image: String? by project
dependencies {
implementation(project(":domain:interne-hendelser"))
implementation(project(":domain:pdl-aktoer-schema"))
implementation(project(":domain:main-avro-schema"))
implementation(project(":lib:kafka"))
implementation(project(":lib:hoplite-config"))
implementation(project(":lib:kafka-key-generator-client"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package no.nav.paw.kafkakeymaintenance

import kotlinx.coroutines.runBlocking
import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient
import no.nav.paw.kafkakeygenerator.client.LokaleAlias
import no.nav.paw.kafkakeymaintenance.functions.genererIdOppdatering
import no.nav.paw.kafkakeymaintenance.functions.harAvvik
import no.nav.paw.kafkakeymaintenance.functions.hentData
import no.nav.paw.kafkakeymaintenance.functions.hentPerioder
import no.nav.paw.kafkakeymaintenance.kafka.Topic
import no.nav.paw.kafkakeymaintenance.kafka.TransactionContext
import no.nav.paw.kafkakeymaintenance.kafka.updateHwm
import no.nav.paw.kafkakeymaintenance.vo.IdOppdatering
import no.nav.paw.kafkakeymaintenance.vo.avviksMelding
import no.nav.person.pdl.aktor.v2.Aktor
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.transactions.transaction
import java.time.Instant

const val ANTALL_PARTISJONER = 6

fun KafkaKeysClient.hentAlias(identiteter: List<String>): List<LokaleAlias> = runBlocking {
getAlias(ANTALL_PARTISJONER, identiteter).alias
}

fun process(
txContextFactory: Transaction.() -> TransactionContext,
hentAlias: (List<String>) -> List<LokaleAlias>,
record: ConsumerRecord<String, Aktor>
): IdOppdatering? =
transaction {
val txContext = txContextFactory()
val valid = txContext.updateHwm(
topic = Topic(record.topic()),
partition = record.partition(),
offset = record.offset(),
time = Instant.ofEpochMilli(record.timestamp()),
lastUpdated = Instant.now()
)
if (valid) {
hentData(hentAlias, record)
.takeIf(::harAvvik)
?.let(::avviksMelding)
?.let(txContext::hentPerioder)
?.let(::genererIdOppdatering)
} else {
null
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package no.nav.paw.kafkakeymaintenance.functions

import no.nav.paw.kafkakeymaintenance.vo.AutomatiskIdOppdatering
import no.nav.paw.kafkakeymaintenance.vo.IdOppdatering
import no.nav.paw.kafkakeymaintenance.perioder.PeriodeRad
import no.nav.paw.kafkakeymaintenance.vo.AvviksMelding
import no.nav.paw.kafkakeymaintenance.vo.IdMap

fun genererAutomatiskIdOppdatering(avvik: AvviksMelding, periodeRad: PeriodeRad): IdOppdatering {
requireNotNull(avvik.lokaleAlias.find { it.identitetsnummer == periodeRad.identitetsnummer }) {
"Intern logiskfeil, lokal data for identietetsnummer mangler"
}.let { alias ->
return AutomatiskIdOppdatering(
oppdatertData = IdMap(
gjeldeneIdentitetsnummer = avvik.gjeldeneIdentitetsnummer,
arbeidsoekerId = alias.arbeidsoekerId,
recordKey = alias.recordKey,
partisjon = alias.partition,
identiteter = avvik.lokaleAliasSomSkalPekePaaPdlPerson()
),
frieIdentiteter = avvik.lokaleAliasSomIkkeSkalPekePaaPdlPerson()
)
}

}

fun genererAutomatiskIdOppdatering(avvik: AvviksMelding): IdOppdatering {
val frieIdentiteter = avvik.lokaleAliasSomIkkeSkalPekePaaPdlPerson()
val identerSomSkalPekePaaPdlPerson = avvik.lokaleAliasSomSkalPekePaaPdlPerson()
return if (identerSomSkalPekePaaPdlPerson.isEmpty()) {
AutomatiskIdOppdatering(
oppdatertData = null,
frieIdentiteter = frieIdentiteter
)
} else {
val data = identerSomSkalPekePaaPdlPerson.maxBy { it.arbeidsoekerId }
AutomatiskIdOppdatering(
oppdatertData = IdMap(
gjeldeneIdentitetsnummer = avvik.gjeldeneIdentitetsnummer,
arbeidsoekerId = data.arbeidsoekerId,
recordKey = data.recordKey,
partisjon = data.partition,
identiteter = identerSomSkalPekePaaPdlPerson
),
frieIdentiteter = frieIdentiteter
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package no.nav.paw.kafkakeymaintenance.functions

import arrow.core.NonEmptyList
import arrow.core.nonEmptyListOf
import arrow.core.tail
import no.nav.paw.kafkakeygenerator.client.Alias
import no.nav.paw.kafkakeymaintenance.vo.IdOppdatering
import no.nav.paw.kafkakeymaintenance.vo.ManuellIdOppdatering
import no.nav.paw.kafkakeymaintenance.perioder.PeriodeRad
import no.nav.paw.kafkakeymaintenance.vo.AvviksMelding
import no.nav.paw.kafkakeymaintenance.vo.AvvvikOgPerioder

fun genererIdOppdatering(avvikOgPerioder: AvvvikOgPerioder): IdOppdatering {
val (avvik, perioder) = avvikOgPerioder
val periode = perioder.firstOrNull()
return if (periode == null) {
genererAutomatiskIdOppdatering(avvik)
} else {
genererIdOppdatering(avvik, nonEmptyListOf(periode, *perioder.tail().toTypedArray()))
}
}

fun genererIdOppdatering(avvik: AvviksMelding, perioder: NonEmptyList<PeriodeRad>): IdOppdatering {
val aktivPerioder = perioder.filter(PeriodeRad::erAktiv)
return when (aktivPerioder.size) {
0 -> genererAutomatiskIdOppdatering(avvik, perioder.maxBy(PeriodeRad::fra))
1 -> genererAutomatiskIdOppdatering(avvik, aktivPerioder.first())
else -> ManuellIdOppdatering(
gjeldeneIdentitetsnummer = avvik.gjeldeneIdentitetsnummer,
pdlIdentitetsnummer = avvik.lokaleAliasSomSkalPekePaaPdlPerson().map(Alias::identitetsnummer),
lokaleAlias = avvik.lokaleAlias,
perioder = perioder
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package no.nav.paw.kafkakeymaintenance.functions

import no.nav.paw.kafkakeymaintenance.vo.Data

fun harAvvik(data: Data): Boolean =
data.alias
.flatMap { it.kobliner }
.map { it.arbeidsoekerId }
.distinct().size > 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package no.nav.paw.kafkakeymaintenance.functions

import no.nav.paw.kafkakeygenerator.client.LokaleAlias
import no.nav.paw.kafkakeymaintenance.vo.Data
import no.nav.person.pdl.aktor.v2.Aktor
import no.nav.person.pdl.aktor.v2.Type
import org.apache.kafka.clients.consumer.ConsumerRecord

fun hentData(
hentAlias: (List<String>) -> List<LokaleAlias>,
record: ConsumerRecord<String, Aktor>,
): Data =
record.value().identifikatorer
.filter { it.type == Type.FOLKEREGISTERIDENT }
.map { it.idnummer }
.let(hentAlias)
.let { Data(record, it) }
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package no.nav.paw.kafkakeymaintenance.functions

import no.nav.paw.kafkakeymaintenance.kafka.TransactionContext
import no.nav.paw.kafkakeymaintenance.perioder.periodeRad
import no.nav.paw.kafkakeymaintenance.vo.AvviksMelding
import no.nav.paw.kafkakeymaintenance.vo.AvvvikOgPerioder

fun TransactionContext.hentPerioder(avviksMelding: AvviksMelding): AvvvikOgPerioder {
val perioder = avviksMelding
.pdlIdentitetsnummer
.plus(avviksMelding.lokaleAlias.map { it.identitetsnummer })
.distinct()
.mapNotNull(::periodeRad)
return AvvvikOgPerioder(avviksMelding, perioder)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package no.nav.paw.kafkakeymaintenance.perioder

import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import java.time.Instant
import java.util.*

data class PeriodeRad(
val periodeId: UUID,
val identitetsnummer: String,
val fra: Instant,
val til: Instant?
) {
val erAktiv: Boolean = til == null
}

fun periodeRad(periodeId: UUID, identitetsnummer: String, fra: Instant, til: Instant) = PeriodeRad(periodeId, identitetsnummer, fra, til)
fun periodeRad(periode: Periode) = PeriodeRad(periode.id, periode.identitetsnummer, periode.startet.tidspunkt, periode.avsluttet?.tidspunkt)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package no.nav.paw.kafkakeymaintenance.perioder

import no.nav.paw.kafkakeymaintenance.kafka.TransactionContext
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.insert
import org.jetbrains.exposed.sql.selectAll
import org.jetbrains.exposed.sql.update

fun TransactionContext.insertOrUpdate(periode: PeriodeRad) {
periodeRad(periode.identitetsnummer)
.let { lagretDate ->
if (lagretDate == null) {
PerioderTable.insert {
it[version] = appContext.consumerVersion
it[periodeId] = periode.periodeId
it[identitetsnummer] = periode.identitetsnummer
it[fra] = periode.fra
it[til] = periode.til
}
} else {
PerioderTable.update(
where = { (PerioderTable.version eq appContext.consumerVersion) and (PerioderTable.periodeId eq periode.periodeId) }
) {
it[identitetsnummer] = periode.identitetsnummer
it[fra] = periode.fra
it[til] = periode.til
}
}
}
}

fun TransactionContext.periodeRad(identitetsnummer: String): PeriodeRad? =
PerioderTable
.selectAll()
.where {
(PerioderTable.version eq appContext.consumerVersion) and
(PerioderTable.identitetsnummer eq identitetsnummer)
}.firstOrNull()?.let {
PeriodeRad(
periodeId = it[PerioderTable.periodeId],
identitetsnummer = it[PerioderTable.identitetsnummer],
fra = it[PerioderTable.fra],
til = it[PerioderTable.til]
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package no.nav.paw.kafkakeymaintenance.perioder

import org.jetbrains.exposed.sql.Table
import org.jetbrains.exposed.sql.javatime.timestamp

object PerioderTable: Table("perioder") {
val version = integer("version")
val periodeId = uuid("periode_id")
val identitetsnummer = varchar("identitetsnummer", 11)
val fra = timestamp("fra")
val til = timestamp("til").nullable()

override val primaryKey = PrimaryKey(version, periodeId)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package no.nav.paw.kafkakeymaintenance.vo

import no.nav.paw.kafkakeygenerator.client.Alias
import no.nav.person.pdl.aktor.v2.Type

fun avviksMelding(data: Data): AvviksMelding {
val fregIder = data.record.value()
.identifikatorer
.filter { it.type == Type.FOLKEREGISTERIDENT }
return AvviksMelding(
gjeldeneIdentitetsnummer = fregIder
.filter { it.gjeldende }
.map { it.idnummer }
.firstOrNull(),
pdlIdentitetsnummer = fregIder.map { it.idnummer },
lokaleAlias = data.alias.flatMap { it.kobliner }
)
}

data class AvviksMelding(
val gjeldeneIdentitetsnummer: String?,
val pdlIdentitetsnummer: List<String>,
val lokaleAlias: List<Alias>
) {
fun lokaleAliasSomSkalPekePaaPdlPerson() = lokaleAlias.filter { it.identitetsnummer in pdlIdentitetsnummer }

fun lokaleAliasSomIkkeSkalPekePaaPdlPerson() = lokaleAlias.filter { it.identitetsnummer !in pdlIdentitetsnummer }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package no.nav.paw.kafkakeymaintenance.vo

import no.nav.paw.kafkakeymaintenance.perioder.PeriodeRad

data class AvvvikOgPerioder(
val avviksMelding: AvviksMelding,
val perioder: List<PeriodeRad>
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package no.nav.paw.kafkakeymaintenance.vo

import no.nav.paw.kafkakeygenerator.client.LokaleAlias
import no.nav.person.pdl.aktor.v2.Aktor
import org.apache.kafka.clients.consumer.ConsumerRecord

data class Data(
val record: ConsumerRecord<String, Aktor>,
val alias: List<LokaleAlias>
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package no.nav.paw.kafkakeymaintenance.vo

import no.nav.paw.kafkakeygenerator.client.Alias

data class IdMap(
val gjeldeneIdentitetsnummer: String?,
val arbeidsoekerId: Long,
val recordKey: Long,
val partisjon: Int,
val identiteter: List<Alias>
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package no.nav.paw.kafkakeymaintenance.vo

import no.nav.paw.kafkakeygenerator.client.Alias
import no.nav.paw.kafkakeymaintenance.perioder.PeriodeRad

interface IdOppdatering {}
data class ManuellIdOppdatering(
val gjeldeneIdentitetsnummer: String?,
val pdlIdentitetsnummer: List<String>,
val lokaleAlias: List<Alias>,
val perioder: List<PeriodeRad>
) : IdOppdatering

data class AutomatiskIdOppdatering(
val oppdatertData: IdMap?,
val frieIdentiteter: List<Alias>
) : IdOppdatering

0 comments on commit 2030527

Please sign in to comment.