From fc772a69374343e1a5d0a83c5f9782ea6a4485e4 Mon Sep 17 00:00:00 2001 From: Nils Martin Sande Date: Wed, 4 Dec 2024 13:13:50 +0100 Subject: [PATCH] =?UTF-8?q?Oppdaterte=20brukst=C3=B8tte=20api=20med=20st?= =?UTF-8?q?=C3=B8tte=20for=20id=20merge?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../brukerstoette/BrukerstoetteService.kt | 16 ++++--- .../backup/database/DataFunctions.kt | 32 +++++++++++-- .../backup/vo/StoredData.kt | 3 +- .../main/resources/openapi/Brukerstoette.yaml | 9 ++++ .../backup/ApplicationHappyPathTest.kt | 45 ++++++++++++++++++- .../backup/TestData.kt | 8 +++- 6 files changed, 98 insertions(+), 15 deletions(-) diff --git a/apps/hendelselogg-backup/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/brukerstoette/BrukerstoetteService.kt b/apps/hendelselogg-backup/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/brukerstoette/BrukerstoetteService.kt index 6fc727eb..d77da5db 100644 --- a/apps/hendelselogg-backup/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/brukerstoette/BrukerstoetteService.kt +++ b/apps/hendelselogg-backup/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/brukerstoette/BrukerstoetteService.kt @@ -7,6 +7,7 @@ import arrow.core.raise.either import arrow.core.right import no.nav.paw.arbeidssoekerregisteret.backup.api.brukerstoette.models.* import no.nav.paw.arbeidssoekerregisteret.backup.api.oppslagsapi.models.ArbeidssoekerperiodeResponse +import no.nav.paw.arbeidssoekerregisteret.backup.database.readAllNestedRecordsForId import no.nav.paw.arbeidssoekerregisteret.backup.database.readAllRecordsForId import no.nav.paw.arbeidssoekerregisteret.backup.database.txContext import no.nav.paw.arbeidssoekerregisteret.backup.vo.ApplicationContext @@ -32,9 +33,9 @@ class BrukerstoetteService( private val apiOppslagLogger = LoggerFactory.getLogger("api_oppslag_logger") private val txCtx = txContext(applicationContext) suspend fun hentDetaljer(identitetsnummer: String): DetaljerResponse? { - val (id, _) = kafkaKeysClient.getIdAndKey(identitetsnummer) + val (id, key) = kafkaKeysClient.getIdAndKey(identitetsnummer) val hendelser = transaction { - txCtx().readAllRecordsForId(hendelseDeserializer, id) + txCtx().readAllNestedRecordsForId(hendelseDeserializer, id) } if (hendelser.isEmpty()) { return null @@ -54,9 +55,9 @@ class BrukerstoetteService( { it } ) - val partition = hendelser.firstOrNull()?.partition + val partition = hendelser.filterNot { it.merged }.firstOrNull()?.partition return DetaljerResponse( - recordKey = hendelser.first().recordKey, + recordKey = key, kafkaPartition = partition, historikk = innkommendeHendelse.map { snapshot -> snapshot.copy( @@ -64,7 +65,7 @@ class BrukerstoetteService( gjeldeneTilstand = snapshot.gjeldeneTilstand?.let { enrich(it, fraOppslagsApi) } ) }, - arbeidssoekerId = hendelser.first().arbeidssoekerId, + arbeidssoekerId = id, gjeldeneTilstand = sistePeriode?.let { enrich(it, fraOppslagsApi) } ) } @@ -124,6 +125,7 @@ data class ApiData( fun sistePeriode(hendelser: List): Tilstand? = hendelser + .filterNot { it.merged } .sortedBy { it.offset } .fold(null as Tilstand?, ::beregnTilstand) @@ -135,6 +137,9 @@ fun historiskeTilstander(hendelser: List): Iterable = hendelse = Hendelse( hendelseId = hendelse.data.hendelseId, hendelseType = hendelse.data.hendelseType, + merged = hendelse.merged, + kafkaPartition = hendelse.partition, + kafkaOffset = hendelse.offset, metadata = HendelseMetadata( tidspunkt = hendelse.data.metadata.tidspunkt, utfoertAv = HendelseMetadataUtfoertAv( @@ -150,7 +155,6 @@ fun historiskeTilstander(hendelser: List): Iterable = ) }, ), - kafkaOffset = hendelse.offset, data = hendelse.data, api = null ), diff --git a/apps/hendelselogg-backup/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/database/DataFunctions.kt b/apps/hendelselogg-backup/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/database/DataFunctions.kt index 45dcd404..8b99486f 100644 --- a/apps/hendelselogg-backup/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/database/DataFunctions.kt +++ b/apps/hendelselogg-backup/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/database/DataFunctions.kt @@ -3,6 +3,7 @@ package no.nav.paw.arbeidssoekerregisteret.backup.database import no.nav.paw.arbeidssoekerregisteret.backup.database.HendelseTable.recordKey import no.nav.paw.arbeidssoekerregisteret.backup.vo.ApplicationContext import no.nav.paw.arbeidssoekerregisteret.backup.vo.StoredData +import no.nav.paw.arbeidssokerregisteret.intern.v1.ArbeidssoekerIdFlettetInn import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseDeserializer import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseSerializer @@ -40,11 +41,32 @@ fun TransactionContext.readRecord(hendelseDeserializer: HendelseDeserializer, pa recordKey = it[recordKey], arbeidssoekerId = it[HendelseTable.arbeidssoekerId], traceparent = it[HendelseTable.traceparent], - data = hendelseDeserializer.deserializeFromString(it[HendelseTable.data]) + data = hendelseDeserializer.deserializeFromString(it[HendelseTable.data]), + merged = false ) } -fun TransactionContext.readAllRecordsForId(hendelseDeserializer: HendelseDeserializer, arbeidssoekerId: Long): List = +fun TransactionContext.readAllNestedRecordsForId( + hendelseDeserializer: HendelseDeserializer, + arbeidssoekerId: Long, + merged: Boolean = false +): List { + val tmp = readAllRecordsForId(hendelseDeserializer = hendelseDeserializer, arbeidssoekerId = arbeidssoekerId, merged = merged) + return tmp.asSequence() + .map(StoredData::data) + .filterIsInstance() + .map { it.kilde.arbeidssoekerId } + .distinct() + .flatMap { readAllNestedRecordsForId(hendelseDeserializer = hendelseDeserializer, arbeidssoekerId = it, merged = true) }.toList() + .plus(tmp) + .sortedBy { it.data.metadata.tidspunkt } +} + +fun TransactionContext.readAllRecordsForId( + hendelseDeserializer: HendelseDeserializer, + arbeidssoekerId: Long, + merged: Boolean = false +): List = HendelseTable .selectAll() .where { @@ -58,7 +80,8 @@ fun TransactionContext.readAllRecordsForId(hendelseDeserializer: HendelseDeseria recordKey = it[recordKey], arbeidssoekerId = it[HendelseTable.arbeidssoekerId], traceparent = it[HendelseTable.traceparent], - data = hendelseDeserializer.deserializeFromString(it[HendelseTable.data]) + data = hendelseDeserializer.deserializeFromString(it[HendelseTable.data]), + merged = merged ) } @@ -76,7 +99,8 @@ fun Transaction.getOneRecordForId(hendelseDeserializer: HendelseDeserializer, id recordKey = rs.getLong(HendelseTable.recordKey.name), arbeidssoekerId = rs.getLong(HendelseTable.arbeidssoekerId.name), data = hendelseDeserializer.deserializeFromString(rs.getString(HendelseTable.data.name)), - traceparent = rs.getString(HendelseTable.traceparent.name) + traceparent = rs.getString(HendelseTable.traceparent.name), + merged = false ) ) } diff --git a/apps/hendelselogg-backup/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/vo/StoredData.kt b/apps/hendelselogg-backup/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/vo/StoredData.kt index 625a8e7d..7e6c3606 100644 --- a/apps/hendelselogg-backup/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/vo/StoredData.kt +++ b/apps/hendelselogg-backup/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/vo/StoredData.kt @@ -8,5 +8,6 @@ data class StoredData( val recordKey: Long, val arbeidssoekerId: Long, val traceparent: String?, - val data: Hendelse + val data: Hendelse, + val merged: Boolean ) \ No newline at end of file diff --git a/apps/hendelselogg-backup/src/main/resources/openapi/Brukerstoette.yaml b/apps/hendelselogg-backup/src/main/resources/openapi/Brukerstoette.yaml index 6d67ca44..d5c5ce85 100644 --- a/apps/hendelselogg-backup/src/main/resources/openapi/Brukerstoette.yaml +++ b/apps/hendelselogg-backup/src/main/resources/openapi/Brukerstoette.yaml @@ -203,11 +203,20 @@ components: example: "Startet" metadata: $ref: "#/components/schemas/HendelseMetadata" + kafkaPartition: + type: "integer" + format: "int32" + description: "Kafka partition" + example: 1 kafkaOffset: type: "integer" format: "int64" description: "Kafka offset" example: 123234 + merged: + type: "boolean" + description: "Hendelsen stammer opprinnelig fra en annen arbeidssøker id" + example: true api: type: "boolean" description: "Hendelsen finnes i API, bare inkludert dersom relevant" diff --git a/apps/hendelselogg-backup/src/test/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/ApplicationHappyPathTest.kt b/apps/hendelselogg-backup/src/test/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/ApplicationHappyPathTest.kt index 3f46aa39..7c70304a 100644 --- a/apps/hendelselogg-backup/src/test/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/ApplicationHappyPathTest.kt +++ b/apps/hendelselogg-backup/src/test/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/ApplicationHappyPathTest.kt @@ -1,20 +1,27 @@ package no.nav.paw.arbeidssoekerregisteret.backup import io.kotest.core.spec.style.FreeSpec +import io.kotest.matchers.collections.shouldContainExactlyInAnyOrder import io.kotest.matchers.nulls.shouldNotBeNull import io.kotest.matchers.shouldBe import io.micrometer.prometheusmetrics.PrometheusConfig import io.micrometer.prometheusmetrics.PrometheusMeterRegistry import no.nav.paw.arbeidssoekerregisteret.backup.database.getAllHwms import no.nav.paw.arbeidssoekerregisteret.backup.database.initHwm +import no.nav.paw.arbeidssoekerregisteret.backup.database.readAllNestedRecordsForId import no.nav.paw.arbeidssoekerregisteret.backup.database.readRecord import no.nav.paw.arbeidssoekerregisteret.backup.database.txContext import no.nav.paw.arbeidssoekerregisteret.backup.vo.ApplicationContext +import no.nav.paw.arbeidssokerregisteret.intern.v1.ArbeidssoekerIdFlettetInn +import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseSerde +import no.nav.paw.arbeidssokerregisteret.intern.v1.IdentitetsnummerSammenslaatt +import no.nav.paw.arbeidssokerregisteret.intern.v1.Kilde import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration import org.apache.kafka.clients.consumer.ConsumerRecord import org.jetbrains.exposed.sql.transactions.transaction import org.slf4j.LoggerFactory +import java.util.* class ApplicationHappyPathTest : FreeSpec({ "Verifiser enkel applikasjonsflyt" { @@ -28,7 +35,7 @@ class ApplicationHappyPathTest : FreeSpec({ val txCtx = txContext(appCtx) initDbContainer() val partitionCount = 3 - val testData = hendelser() + val (idA, idB, testData) = hendelser() .take(15) .mapIndexed { index, hendelse -> val partition = index % partitionCount @@ -41,11 +48,38 @@ class ApplicationHappyPathTest : FreeSpec({ ) }.chunked(5) .toList() + .let { chunks -> + val (a, b) = chunks + .flatten() + .distinctBy { it.value().id } + .take(2) + Triple(a, b, chunks) + } + val merge = ConsumerRecord( + idA.topic(), + idA.partition(), + idA.offset() + 1000, + (idA.partition() + 100).toLong(), + ArbeidssoekerIdFlettetInn( + identitetsnummer = idA.value().identitetsnummer, + id = idA.value().id, + hendelseId = UUID.randomUUID(), + metadata = idA.value().metadata.copy(aarsak = "Merge"), + kilde = Kilde( + arbeidssoekerId = idB.value().id, + identitetsnummer = setOf( + idB.value().identitetsnummer + ) + ) + ) as Hendelse + ) println("Testdata: $testData") transaction { txCtx().initHwm(partitionCount) } - appCtx.runApplication(HendelseSerde().serializer(), testData.asSequence()) + val mergeAsList = listOf(merge) + val input = testData.plusElement(mergeAsList) + appCtx.runApplication(HendelseSerde().serializer(), input.asSequence()) println("HWMs: ${transaction { txCtx().getAllHwms() }}") testData.flatten().forEach { record -> val partition = record.partition() @@ -59,5 +93,12 @@ class ApplicationHappyPathTest : FreeSpec({ lagretHendelse.offset shouldBe offset lagretHendelse.data shouldBe forventetHendelse } + transaction { + val hendelser = txCtx().readAllNestedRecordsForId( + hendelseDeserializer = HendelseSerde().deserializer(), + arbeidssoekerId = idA.value().id + ) + hendelser.map { it.arbeidssoekerId }.distinct() shouldContainExactlyInAnyOrder listOf(idA.value().id, idB.value().id) + } } }) diff --git a/apps/hendelselogg-backup/src/test/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/TestData.kt b/apps/hendelselogg-backup/src/test/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/TestData.kt index 00c50e8e..86f5893a 100644 --- a/apps/hendelselogg-backup/src/test/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/TestData.kt +++ b/apps/hendelselogg-backup/src/test/kotlin/no/nav/paw/arbeidssoekerregisteret/backup/TestData.kt @@ -114,20 +114,24 @@ fun Hendelse.storedData( offset: Long = 1, recordKey: Long = 1, arbeidssoekerId: Long = 1, - traceparent: String = "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01" + traceparent: String = "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01", + merged: Boolean = false ) = StoredData( partition = 1, offset = 1, recordKey = 1, arbeidssoekerId = 1, traceparent = traceparent, - data = this + data = this, + merged = merged ) fun StoredData.apiHendelse(): no.nav.paw.arbeidssoekerregisteret.backup.api.brukerstoette.models.Hendelse = no.nav.paw.arbeidssoekerregisteret.backup.api.brukerstoette.models.Hendelse( hendelseId = data.hendelseId, hendelseType = data.hendelseType, + merged = merged, + kafkaPartition = partition, metadata = HendelseMetadata( tidspunkt = data.metadata.tidspunkt, utfoertAv = HendelseMetadataUtfoertAv(