Skip to content

Commit

Permalink
La opp topology for lesing av aktør meldinger fra pdl
Browse files Browse the repository at this point in the history
  • Loading branch information
nilsmsa committed Nov 6, 2024
1 parent ac96c5a commit 525b53a
Show file tree
Hide file tree
Showing 13 changed files with 321 additions and 102 deletions.
2 changes: 2 additions & 0 deletions apps/kafka-key-maintenance/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {
implementation(project(":domain:pdl-aktoer-schema"))
implementation(project(":domain:main-avro-schema"))
implementation(project(":lib:kafka"))
implementation(project(":lib:kafka-streams"))
implementation(project(":lib:hoplite-config"))
implementation(project(":lib:kafka-key-generator-client"))

Expand Down Expand Up @@ -45,6 +46,7 @@ dependencies {
implementation(libs.logbackClassic)
implementation(libs.logstashLogbackEncoder)
implementation(libs.kafka.clients)
implementation(libs.kafka.streams.core)
implementation(libs.avro.core)
implementation(libs.avro.kafkaSerializer)
implementation(libs.exposed.core)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package no.nav.paw.kafkakeymaintenance

import kotlinx.coroutines.runBlocking
import no.nav.paw.config.kafka.streams.mapRecord
import no.nav.paw.config.kafka.streams.supressByWallClock
import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient
import no.nav.paw.kafkakeygenerator.client.LokaleAlias
import no.nav.paw.kafkakeymaintenance.functions.processPdlRecord
import no.nav.paw.kafkakeymaintenance.perioder.Perioder
import no.nav.person.pdl.aktor.v2.Aktor
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import java.time.Duration
import java.time.Instant

fun KafkaKeysClient.hentAlias(identiteter: List<String>): List<LokaleAlias> = runBlocking {
getAlias(ANTALL_PARTISJONER, identiteter).alias
}

fun StreamsBuilder.buildAktorTopology(
topic: String,
stateStoreName: String,
supressionDelay: Duration,
interval: Duration = supressionDelay.dividedBy(10),
perioder: Perioder,
hentAlias: (List<String>) -> List<LokaleAlias>
) {
stream<String, Aktor>(topic)
.supressByWallClock(
name = stateStoreName,
duration = supressionDelay,
checkInterval = interval
).mapRecord("aktor_til_hendelse") { record ->
record.withValue(
processPdlRecord(
aktorTopic = topic,
hentAlias = hentAlias,
perioder = perioder,
record = record
)
)
}.flatMap { _, hendelser ->
hendelser.map { hendelseRecord ->
KeyValue.pair(
hendelseRecord.key,
hendelseRecord.hendelse
)
}
}.mapRecord("set_timestamp") { it.withTimestamp(Instant.now().epochSecond) }
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package no.nav.paw.kafkakeymaintenance

import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import no.nav.paw.kafkakeymaintenance.kafka.topic
import no.nav.paw.kafkakeymaintenance.kafka.txContext
import no.nav.paw.kafkakeymaintenance.kafka.updateHwm
import no.nav.paw.kafkakeymaintenance.perioder.insertOrUpdate
import no.nav.paw.kafkakeymaintenance.perioder.periodeRad
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.jetbrains.exposed.sql.transactions.transaction
import java.time.Instant

class PeriodeConsumer(
applicationContext: ApplicationContext,
private val periodeConsumer: Sequence<List<ConsumerRecord<Long, Periode>>>,
) {

private val ctxFactory = txContext(applicationContext)

fun run() {
periodeConsumer.forEach { batch ->
transaction {
val tx = ctxFactory()
batch.forEach { periodeRecord ->
val hwmValid = tx.updateHwm(
topic = topic(periodeRecord.topic()),
partition = periodeRecord.partition(),
offset = periodeRecord.offset(),
time = Instant.ofEpochMilli(periodeRecord.timestamp()),
lastUpdated = Instant.now()
)
if (hwmValid) {
val rad = periodeRad(periodeRecord.value())
tx.insertOrUpdate(rad)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package no.nav.paw.kafkakeymaintenance

const val ANTALL_PARTISJONER = 6
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package no.nav.paw.kafkakeymaintenance.functions

import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse
import no.nav.paw.arbeidssokerregisteret.intern.v1.IdentitetsnummerOpphoert
import no.nav.paw.arbeidssokerregisteret.intern.v1.IdentitetsnummerSammenslaatt
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Metadata
import no.nav.paw.kafkakeymaintenance.vo.AutomatiskIdOppdatering
import no.nav.paw.kafkakeymaintenance.vo.IdMap
import no.nav.paw.kafkakeymaintenance.vo.IdOppdatering
import no.nav.paw.kafkakeymaintenance.vo.ManuellIdOppdatering
import java.util.*

data class HendelseRecord<V: Hendelse>(
val key: Long,
val hendelse: V
)

fun genererHendelser(metadata: Metadata, idOppdatering: IdOppdatering): List<HendelseRecord<Hendelse>> {
return when (idOppdatering) {
is AutomatiskIdOppdatering -> genererHendelse(metadata, idOppdatering)
is ManuellIdOppdatering -> emptyList()
}
}

fun genererHendelse(metadata: Metadata, idOppdatering: AutomatiskIdOppdatering): List<HendelseRecord<Hendelse>> {
val identitetsnummerOpphoert = idOppdatering
.frieIdentiteter
.groupBy { it.arbeidsoekerId }
.map { (arbeidsoekerId, alias) ->
val identiteter = alias.map { it.identitetsnummer }
val hendelse: Hendelse = IdentitetsnummerOpphoert(
id = arbeidsoekerId,
hendelseId = UUID.randomUUID(),
identitetsnummer = identiteter.first(),
metadata = metadata,
alleIdentitetsnummer = identiteter
)
HendelseRecord(alias.first().recordKey, hendelse)
}
val identitetsnummerSammenslaatt = idOppdatering.oppdatertData?.let { genererHendelse(metadata, it) } ?: emptyList()
return identitetsnummerSammenslaatt + identitetsnummerOpphoert
}

fun genererHendelse(metadata: Metadata, idMap: IdMap): List<HendelseRecord<Hendelse>> =
idMap.identiteter
.filter { it.arbeidsoekerId != idMap.arbeidsoekerId }
.groupBy { it.arbeidsoekerId }
.map { (arbeidsoekerId, alias) ->
val identiteter = alias.map { it.identitetsnummer }
val hendelse = IdentitetsnummerSammenslaatt(
id = arbeidsoekerId,
hendelseId = UUID.randomUUID(),
identitetsnummer = identiteter.first(),
metadata = metadata,
alleIdentitetsnummer = identiteter,
flyttetTilArbeidssoekerId = idMap.arbeidsoekerId
)
HendelseRecord(alias.first().recordKey, hendelse)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord

fun hentData(
hentAlias: (List<String>) -> List<LokaleAlias>,
record: ConsumerRecord<String, Aktor>,
aktor: Aktor,
): Data =
record.value().identifikatorer
aktor.identifikatorer
.filter { it.type == Type.FOLKEREGISTERIDENT }
.map { it.idnummer }
.let(hentAlias)
.let { Data(record, it) }
.let { Data(aktor, it) }
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package no.nav.paw.kafkakeymaintenance.functions

import arrow.core.partially1
import kotlinx.coroutines.runBlocking
import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.AvviksType
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Metadata
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.TidspunktFraKilde
import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient
import no.nav.paw.kafkakeygenerator.client.LokaleAlias
import no.nav.paw.kafkakeymaintenance.ANTALL_PARTISJONER
import no.nav.paw.kafkakeymaintenance.kafka.Topic
import no.nav.paw.kafkakeymaintenance.kafka.TransactionContext
import no.nav.paw.kafkakeymaintenance.kafka.updateHwm
import no.nav.paw.kafkakeymaintenance.metadata
import no.nav.paw.kafkakeymaintenance.perioder.Perioder
import no.nav.paw.kafkakeymaintenance.vo.*
import no.nav.person.pdl.aktor.v2.Aktor
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Suppressed
import org.apache.kafka.streams.processor.api.Record
import java.time.Duration
import java.time.Instant


fun processPdlRecord(
aktorTopic: String,
hentAlias: (List<String>) -> List<LokaleAlias>,
perioder: Perioder,
record: Record<String, Aktor>
): List<HendelseRecord<Hendelse>> {
val metadata = metadata(
kilde = aktorTopic,
tidspunkt = Instant.now(),
tidspunktFraKilde = TidspunktFraKilde(
tidspunkt = Instant.ofEpochMilli(record.timestamp()),
avviksType = AvviksType.FORSINKELSE
)
)
return prosesser(hentAlias, record.value(), perioder, metadata)
}

fun Perioder.hentPerioder(avviksMelding: AvviksMelding): AvvvikOgPerioder {
val identiteter = avviksMelding.lokaleAlias
.map { it.identitetsnummer } +
avviksMelding.pdlIdentitetsnummer
return AvvvikOgPerioder(
avviksMelding = avviksMelding,
perioder = get(identiteter)
)
}

fun prosesser(
hentAlias: (List<String>) -> List<LokaleAlias>,
aktor: Aktor,
perioder: Perioder,
metadata: Metadata
) = (hentData(hentAlias, aktor)
.takeIf(::harAvvik)
?.let(::genererAvviksMelding)
?.let(perioder::hentPerioder)
?.let(::genererIdOppdatering)
?.let(::genererHendelser.partially1(metadata))
?: emptyList())
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package no.nav.paw.kafkakeymaintenance.perioder

import no.nav.paw.kafkakeymaintenance.ApplicationContext
import no.nav.paw.kafkakeymaintenance.kafka.txContext
import org.jetbrains.exposed.sql.transactions.transaction

interface Perioder {
operator fun get(identitetsnummer: String): PeriodeRad?

operator fun get(identitetsnummer: Collection<String>): List<PeriodeRad> =
identitetsnummer.mapNotNull { get(it) }
}

fun dbPerioder(appContext: ApplicationContext): Perioder = ExpsedPerioder(appContext)

fun statiskePerioder(rader: Map<String, PeriodeRad>): Perioder = object : Perioder {
override fun get(identitetsnummer: String): PeriodeRad? = rader[identitetsnummer]
}

private class ExpsedPerioder(appContext: ApplicationContext) : Perioder {
private val txFactory = txContext(appContext)

override fun get(identitetsnummer: String): PeriodeRad? =
transaction { txFactory().periodeRad(identitetsnummer) }

override fun get(identitetsnummer: Collection<String>): List<PeriodeRad> =
transaction {
val tx = txFactory()
identitetsnummer.mapNotNull { tx.periodeRad(it) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package no.nav.paw.kafkakeymaintenance.vo
import no.nav.paw.kafkakeygenerator.client.Alias
import no.nav.person.pdl.aktor.v2.Type

fun avviksMelding(data: Data): AvviksMelding {
val fregIder = data.record.value()
fun genererAvviksMelding(data: Data): AvviksMelding {
val fregIder = data.aktor
.identifikatorer
.filter { it.type == Type.FOLKEREGISTERIDENT }
return AvviksMelding(
Expand Down
Loading

0 comments on commit 525b53a

Please sign in to comment.