Skip to content

Commit

Permalink
Oppdaterte brukstøtte api med støtte for id merge
Browse files Browse the repository at this point in the history
  • Loading branch information
nilsmsa committed Dec 4, 2024
1 parent 84d71db commit fc772a6
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import arrow.core.raise.either
import arrow.core.right
import no.nav.paw.arbeidssoekerregisteret.backup.api.brukerstoette.models.*
import no.nav.paw.arbeidssoekerregisteret.backup.api.oppslagsapi.models.ArbeidssoekerperiodeResponse
import no.nav.paw.arbeidssoekerregisteret.backup.database.readAllNestedRecordsForId
import no.nav.paw.arbeidssoekerregisteret.backup.database.readAllRecordsForId
import no.nav.paw.arbeidssoekerregisteret.backup.database.txContext
import no.nav.paw.arbeidssoekerregisteret.backup.vo.ApplicationContext
Expand All @@ -32,9 +33,9 @@ class BrukerstoetteService(
private val apiOppslagLogger = LoggerFactory.getLogger("api_oppslag_logger")
private val txCtx = txContext(applicationContext)
suspend fun hentDetaljer(identitetsnummer: String): DetaljerResponse? {
val (id, _) = kafkaKeysClient.getIdAndKey(identitetsnummer)
val (id, key) = kafkaKeysClient.getIdAndKey(identitetsnummer)
val hendelser = transaction {
txCtx().readAllRecordsForId(hendelseDeserializer, id)
txCtx().readAllNestedRecordsForId(hendelseDeserializer, id)
}
if (hendelser.isEmpty()) {
return null
Expand All @@ -54,17 +55,17 @@ class BrukerstoetteService(
{ it }
)

val partition = hendelser.firstOrNull()?.partition
val partition = hendelser.filterNot { it.merged }.firstOrNull()?.partition
return DetaljerResponse(
recordKey = hendelser.first().recordKey,
recordKey = key,
kafkaPartition = partition,
historikk = innkommendeHendelse.map { snapshot ->
snapshot.copy(
nyTilstand = snapshot.nyTilstand?.let { enrich(it, fraOppslagsApi) },
gjeldeneTilstand = snapshot.gjeldeneTilstand?.let { enrich(it, fraOppslagsApi) }
)
},
arbeidssoekerId = hendelser.first().arbeidssoekerId,
arbeidssoekerId = id,
gjeldeneTilstand = sistePeriode?.let { enrich(it, fraOppslagsApi) }
)
}
Expand Down Expand Up @@ -124,6 +125,7 @@ data class ApiData(

fun sistePeriode(hendelser: List<StoredData>): Tilstand? =
hendelser
.filterNot { it.merged }
.sortedBy { it.offset }
.fold(null as Tilstand?, ::beregnTilstand)

Expand All @@ -135,6 +137,9 @@ fun historiskeTilstander(hendelser: List<StoredData>): Iterable<Snapshot> =
hendelse = Hendelse(
hendelseId = hendelse.data.hendelseId,
hendelseType = hendelse.data.hendelseType,
merged = hendelse.merged,
kafkaPartition = hendelse.partition,
kafkaOffset = hendelse.offset,
metadata = HendelseMetadata(
tidspunkt = hendelse.data.metadata.tidspunkt,
utfoertAv = HendelseMetadataUtfoertAv(
Expand All @@ -150,7 +155,6 @@ fun historiskeTilstander(hendelser: List<StoredData>): Iterable<Snapshot> =
)
},
),
kafkaOffset = hendelse.offset,
data = hendelse.data,
api = null
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package no.nav.paw.arbeidssoekerregisteret.backup.database
import no.nav.paw.arbeidssoekerregisteret.backup.database.HendelseTable.recordKey
import no.nav.paw.arbeidssoekerregisteret.backup.vo.ApplicationContext
import no.nav.paw.arbeidssoekerregisteret.backup.vo.StoredData
import no.nav.paw.arbeidssokerregisteret.intern.v1.ArbeidssoekerIdFlettetInn
import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse
import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseDeserializer
import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseSerializer
Expand Down Expand Up @@ -40,11 +41,32 @@ fun TransactionContext.readRecord(hendelseDeserializer: HendelseDeserializer, pa
recordKey = it[recordKey],
arbeidssoekerId = it[HendelseTable.arbeidssoekerId],
traceparent = it[HendelseTable.traceparent],
data = hendelseDeserializer.deserializeFromString(it[HendelseTable.data])
data = hendelseDeserializer.deserializeFromString(it[HendelseTable.data]),
merged = false
)
}

fun TransactionContext.readAllRecordsForId(hendelseDeserializer: HendelseDeserializer, arbeidssoekerId: Long): List<StoredData> =
fun TransactionContext.readAllNestedRecordsForId(
hendelseDeserializer: HendelseDeserializer,
arbeidssoekerId: Long,
merged: Boolean = false
): List<StoredData> {
val tmp = readAllRecordsForId(hendelseDeserializer = hendelseDeserializer, arbeidssoekerId = arbeidssoekerId, merged = merged)
return tmp.asSequence()
.map(StoredData::data)
.filterIsInstance<ArbeidssoekerIdFlettetInn>()
.map { it.kilde.arbeidssoekerId }
.distinct()
.flatMap { readAllNestedRecordsForId(hendelseDeserializer = hendelseDeserializer, arbeidssoekerId = it, merged = true) }.toList()
.plus(tmp)
.sortedBy { it.data.metadata.tidspunkt }
}

fun TransactionContext.readAllRecordsForId(
hendelseDeserializer: HendelseDeserializer,
arbeidssoekerId: Long,
merged: Boolean = false
): List<StoredData> =
HendelseTable
.selectAll()
.where {
Expand All @@ -58,7 +80,8 @@ fun TransactionContext.readAllRecordsForId(hendelseDeserializer: HendelseDeseria
recordKey = it[recordKey],
arbeidssoekerId = it[HendelseTable.arbeidssoekerId],
traceparent = it[HendelseTable.traceparent],
data = hendelseDeserializer.deserializeFromString(it[HendelseTable.data])
data = hendelseDeserializer.deserializeFromString(it[HendelseTable.data]),
merged = merged
)
}

Expand All @@ -76,7 +99,8 @@ fun Transaction.getOneRecordForId(hendelseDeserializer: HendelseDeserializer, id
recordKey = rs.getLong(HendelseTable.recordKey.name),
arbeidssoekerId = rs.getLong(HendelseTable.arbeidssoekerId.name),
data = hendelseDeserializer.deserializeFromString(rs.getString(HendelseTable.data.name)),
traceparent = rs.getString(HendelseTable.traceparent.name)
traceparent = rs.getString(HendelseTable.traceparent.name),
merged = false
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ data class StoredData(
val recordKey: Long,
val arbeidssoekerId: Long,
val traceparent: String?,
val data: Hendelse
val data: Hendelse,
val merged: Boolean
)
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,20 @@ components:
example: "Startet"
metadata:
$ref: "#/components/schemas/HendelseMetadata"
kafkaPartition:
type: "integer"
format: "int32"
description: "Kafka partition"
example: 1
kafkaOffset:
type: "integer"
format: "int64"
description: "Kafka offset"
example: 123234
merged:
type: "boolean"
description: "Hendelsen stammer opprinnelig fra en annen arbeidssøker id"
example: true
api:
type: "boolean"
description: "Hendelsen finnes i API, bare inkludert dersom relevant"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
package no.nav.paw.arbeidssoekerregisteret.backup

import io.kotest.core.spec.style.FreeSpec
import io.kotest.matchers.collections.shouldContainExactlyInAnyOrder
import io.kotest.matchers.nulls.shouldNotBeNull
import io.kotest.matchers.shouldBe
import io.micrometer.prometheusmetrics.PrometheusConfig
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.arbeidssoekerregisteret.backup.database.getAllHwms
import no.nav.paw.arbeidssoekerregisteret.backup.database.initHwm
import no.nav.paw.arbeidssoekerregisteret.backup.database.readAllNestedRecordsForId
import no.nav.paw.arbeidssoekerregisteret.backup.database.readRecord
import no.nav.paw.arbeidssoekerregisteret.backup.database.txContext
import no.nav.paw.arbeidssoekerregisteret.backup.vo.ApplicationContext
import no.nav.paw.arbeidssokerregisteret.intern.v1.ArbeidssoekerIdFlettetInn
import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse
import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseSerde
import no.nav.paw.arbeidssokerregisteret.intern.v1.IdentitetsnummerSammenslaatt
import no.nav.paw.arbeidssokerregisteret.intern.v1.Kilde
import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.jetbrains.exposed.sql.transactions.transaction
import org.slf4j.LoggerFactory
import java.util.*

class ApplicationHappyPathTest : FreeSpec({
"Verifiser enkel applikasjonsflyt" {
Expand All @@ -28,7 +35,7 @@ class ApplicationHappyPathTest : FreeSpec({
val txCtx = txContext(appCtx)
initDbContainer()
val partitionCount = 3
val testData = hendelser()
val (idA, idB, testData) = hendelser()
.take(15)
.mapIndexed { index, hendelse ->
val partition = index % partitionCount
Expand All @@ -41,11 +48,38 @@ class ApplicationHappyPathTest : FreeSpec({
)
}.chunked(5)
.toList()
.let { chunks ->
val (a, b) = chunks
.flatten()
.distinctBy { it.value().id }
.take(2)
Triple(a, b, chunks)
}
val merge = ConsumerRecord(
idA.topic(),
idA.partition(),
idA.offset() + 1000,
(idA.partition() + 100).toLong(),
ArbeidssoekerIdFlettetInn(
identitetsnummer = idA.value().identitetsnummer,
id = idA.value().id,
hendelseId = UUID.randomUUID(),
metadata = idA.value().metadata.copy(aarsak = "Merge"),
kilde = Kilde(
arbeidssoekerId = idB.value().id,
identitetsnummer = setOf(
idB.value().identitetsnummer
)
)
) as Hendelse
)
println("Testdata: $testData")
transaction {
txCtx().initHwm(partitionCount)
}
appCtx.runApplication(HendelseSerde().serializer(), testData.asSequence())
val mergeAsList = listOf(merge)
val input = testData.plusElement(mergeAsList)
appCtx.runApplication(HendelseSerde().serializer(), input.asSequence())
println("HWMs: ${transaction { txCtx().getAllHwms() }}")
testData.flatten().forEach { record ->
val partition = record.partition()
Expand All @@ -59,5 +93,12 @@ class ApplicationHappyPathTest : FreeSpec({
lagretHendelse.offset shouldBe offset
lagretHendelse.data shouldBe forventetHendelse
}
transaction {
val hendelser = txCtx().readAllNestedRecordsForId(
hendelseDeserializer = HendelseSerde().deserializer(),
arbeidssoekerId = idA.value().id
)
hendelser.map { it.arbeidssoekerId }.distinct() shouldContainExactlyInAnyOrder listOf(idA.value().id, idB.value().id)
}
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,24 @@ fun Hendelse.storedData(
offset: Long = 1,
recordKey: Long = 1,
arbeidssoekerId: Long = 1,
traceparent: String = "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"
traceparent: String = "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",
merged: Boolean = false
) = StoredData(
partition = 1,
offset = 1,
recordKey = 1,
arbeidssoekerId = 1,
traceparent = traceparent,
data = this
data = this,
merged = merged
)

fun StoredData.apiHendelse(): no.nav.paw.arbeidssoekerregisteret.backup.api.brukerstoette.models.Hendelse =
no.nav.paw.arbeidssoekerregisteret.backup.api.brukerstoette.models.Hendelse(
hendelseId = data.hendelseId,
hendelseType = data.hendelseType,
merged = merged,
kafkaPartition = partition,
metadata = HendelseMetadata(
tidspunkt = data.metadata.tidspunkt,
utfoertAv = HendelseMetadataUtfoertAv(
Expand Down

0 comments on commit fc772a6

Please sign in to comment.