From 5172ab33b84ea48c76d04a8b92873ab594da0912 Mon Sep 17 00:00:00 2001 From: Nils Martin Sande Date: Wed, 9 Oct 2024 07:46:16 +0200 Subject: [PATCH] Oppdaterte kafka streams tilstands klassen til bekreftelse-tjeneste --- .../build.gradle.kts | 2 +- apps/bekreftelse-tjeneste/build.gradle.kts | 2 +- apps/bekreftelse-tjeneste/nais/nais-dev.yaml | 2 +- .../tilstand/Bekreftelse.kt | 51 +++++ .../tilstand/BekreftelseTilstand.kt | 28 +++ .../tilstand/BekreftelseTilstandsLogg.kt | 34 +++ .../tilstand/InternTilstand.kt | 79 ------- .../tilstand/KontrolerFrister.kt | 47 +++- .../tilstand/PeriodeInfo.kt | 17 ++ .../tilstand/RegisterInstillinger.kt | 2 +- .../topology/BekreftelsePunctuator.kt | 43 ++-- .../topology/BekreftelseStream.kt | 22 +- .../BekreftelsePunctuatorTest.kt | 207 +++++++++--------- .../BekreftelseStreamTest.kt | 173 +++++++++------ apps/bekreftelse-utgang/build.gradle.kts | 2 +- apps/hendelselogg-backup/build.gradle.kts | 2 +- apps/utgang-pdl/build.gradle.kts | 2 +- domain/arbeidssoeker-regler/build.gradle.kts | 2 +- .../intern/v1/HendelseSerde.kt | 5 +- gradle/libs.versions.toml | 5 +- gradle/wrapper/gradle-wrapper.properties | 2 +- 21 files changed, 413 insertions(+), 316 deletions(-) create mode 100644 apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/Bekreftelse.kt create mode 100644 apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/BekreftelseTilstand.kt create mode 100644 apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/BekreftelseTilstandsLogg.kt create mode 100644 apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/PeriodeInfo.kt diff --git a/apps/bekreftelse-min-side-oppgaver/build.gradle.kts b/apps/bekreftelse-min-side-oppgaver/build.gradle.kts index 56a56cea..74f6ccb2 100644 --- a/apps/bekreftelse-min-side-oppgaver/build.gradle.kts +++ b/apps/bekreftelse-min-side-oppgaver/build.gradle.kts @@ -18,7 +18,7 @@ dependencies { implementation(project(":domain:main-avro-schema")) implementation(libs.jackson.datatypeJsr310) implementation(libs.jackson.kotlin) - implementation(libs.arrowCore) + implementation(libs.arrow.core.core) implementation(libs.hoplite.yaml) implementation(libs.nav.common.log) implementation(libs.logbackClassic) diff --git a/apps/bekreftelse-tjeneste/build.gradle.kts b/apps/bekreftelse-tjeneste/build.gradle.kts index cada12eb..3b7a788a 100644 --- a/apps/bekreftelse-tjeneste/build.gradle.kts +++ b/apps/bekreftelse-tjeneste/build.gradle.kts @@ -30,7 +30,7 @@ dependencies { implementation(libs.jackson.datatypeJsr310) // Tooling - implementation(libs.arrowCore) + implementation(libs.arrow.core.core) // Logging implementation(libs.logbackClassic) diff --git a/apps/bekreftelse-tjeneste/nais/nais-dev.yaml b/apps/bekreftelse-tjeneste/nais/nais-dev.yaml index e4cc9640..6b92016d 100644 --- a/apps/bekreftelse-tjeneste/nais/nais-dev.yaml +++ b/apps/bekreftelse-tjeneste/nais/nais-dev.yaml @@ -10,7 +10,7 @@ spec: port: 8080 env: - name: KAFKA_STREAMS_ID_SUFFIX - value: "v2" + value: "v3" - name: KAFKA_PAW_ARBEIDSSOKER_BEKREFTELSE_TOPIC value: "paw.arbeidssoker-bekreftelse-beta-v1" - name: KAFKA_PAW_ARBEIDSSOKER_BEKREFTELSE_HENDELSELOGG_TOPIC diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/Bekreftelse.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/Bekreftelse.kt new file mode 100644 index 00000000..f8cfc850 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/Bekreftelse.kt @@ -0,0 +1,51 @@ +package no.nav.paw.bekreftelsetjeneste.tilstand + +import arrow.core.NonEmptyList +import java.time.Duration +import java.time.Instant +import java.util.* + +@JvmRecord +data class Bekreftelse( + val tilstandsLogg: BekreftelseTilstandsLogg, + val bekreftelseId: UUID, + val gjelderFra: Instant, + val gjelderTil: Instant +) + +inline fun Bekreftelse.tilstand(): T? = tilstandsLogg.get() + +inline fun Bekreftelse.has(): Boolean = tilstand() != null + +fun Bekreftelse.sisteTilstand(): BekreftelseTilstand = tilstandsLogg.siste + +operator fun Bekreftelse.plus(bekreftelseTilstand: BekreftelseTilstand): Bekreftelse = + copy(tilstandsLogg = tilstandsLogg + bekreftelseTilstand) + +fun opprettFoersteBekreftelse( + periode: PeriodeInfo, + interval: Duration, +): Bekreftelse = + Bekreftelse( + BekreftelseTilstandsLogg(IkkeKlarForUtfylling(periode.startet), emptyList()), + bekreftelseId = UUID.randomUUID(), + gjelderFra = periode.startet, + gjelderTil = fristForNesteBekreftelse(periode.startet, interval) + ) + + +fun NonEmptyList.opprettNesteTilgjengeligeBekreftelse( + tilgjengeliggjort: Instant, + interval: Duration +): Bekreftelse { + val sisteBekreftelse = maxBy { it.gjelderTil } + return Bekreftelse( + bekreftelseId = UUID.randomUUID(), + gjelderFra = sisteBekreftelse.gjelderTil, + gjelderTil = fristForNesteBekreftelse(sisteBekreftelse.gjelderTil, interval), + tilstandsLogg = BekreftelseTilstandsLogg( + siste = KlarForUtfylling(tilgjengeliggjort), + tidligere = emptyList() + ) + ) +} diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/BekreftelseTilstand.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/BekreftelseTilstand.kt new file mode 100644 index 00000000..33c470a5 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/BekreftelseTilstand.kt @@ -0,0 +1,28 @@ +package no.nav.paw.bekreftelsetjeneste.tilstand + +import com.fasterxml.jackson.annotation.JsonSubTypes +import com.fasterxml.jackson.annotation.JsonTypeInfo +import java.time.Instant + +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "type" +) +@JsonSubTypes( + JsonSubTypes.Type(value = IkkeKlarForUtfylling::class, name = "IkkeKlarForUtfylling"), + JsonSubTypes.Type(value = KlarForUtfylling::class, name = "KlarForUtfylling"), + JsonSubTypes.Type(value = VenterSvar::class, name = "VenterSvar"), + JsonSubTypes.Type(value = GracePeriodeUtloept::class, name = "GracePeriodeUtloept"), + JsonSubTypes.Type(value = Levert::class, name = "Levert") +) +sealed interface BekreftelseTilstand { + val timestamp: Instant +} + +data class GracePeriodeUtloept(override val timestamp: Instant) : BekreftelseTilstand +data class GracePeriodeVarselet(override val timestamp: Instant) : BekreftelseTilstand +data class IkkeKlarForUtfylling(override val timestamp: Instant) : BekreftelseTilstand +data class KlarForUtfylling(override val timestamp: Instant) : BekreftelseTilstand +data class Levert(override val timestamp: Instant) : BekreftelseTilstand +data class VenterSvar(override val timestamp: Instant) : BekreftelseTilstand \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/BekreftelseTilstandsLogg.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/BekreftelseTilstandsLogg.kt new file mode 100644 index 00000000..69338829 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/BekreftelseTilstandsLogg.kt @@ -0,0 +1,34 @@ +package no.nav.paw.bekreftelsetjeneste.tilstand + +import arrow.core.NonEmptyList +import arrow.core.nonEmptyListOf +import org.slf4j.LoggerFactory + +@JvmRecord +data class BekreftelseTilstandsLogg( + val siste: BekreftelseTilstand, + val tidligere: List +) + +private val bekreftelseTilstandsLoggProblemerLogger = LoggerFactory.getLogger(BekreftelseTilstandsLogg::class.java) +operator fun BekreftelseTilstandsLogg.plus(bekreftelseTilstand: BekreftelseTilstand): BekreftelseTilstandsLogg = + (tidligere + siste + bekreftelseTilstand) + .groupBy { it::class } + .values + .map { gruppe -> + if (gruppe.size == 1) gruppe.first() + else { + bekreftelseTilstandsLoggProblemerLogger.warn("Flere tilstander av samme type i tilstandslogg: $gruppe, beholder den nyeste") + gruppe.maxBy { it.timestamp } + } + } + .let { alle -> + val siste = alle.maxBy { it.timestamp } + val tidligere = alle.filterNot { it == siste }.sortedBy { it.timestamp } + BekreftelseTilstandsLogg(siste, tidligere) + } + +inline fun BekreftelseTilstandsLogg.get(): T? = + tidligere.filterIsInstance().firstOrNull() ?: siste as? T + +fun BekreftelseTilstandsLogg.asList(): NonEmptyList = nonEmptyListOf(siste) + tidligere diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstand.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstand.kt index 9e427599..f209645d 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstand.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstand.kt @@ -1,11 +1,6 @@ package no.nav.paw.bekreftelsetjeneste.tilstand -import com.fasterxml.jackson.annotation.JsonSubTypes -import com.fasterxml.jackson.annotation.JsonTypeInfo import no.nav.paw.arbeidssokerregisteret.api.v1.Periode -import java.time.Duration -import java.time.Instant -import java.util.* @JvmRecord data class InternTilstand( @@ -13,50 +8,6 @@ data class InternTilstand( val bekreftelser: List ) -@JvmRecord -data class Bekreftelse( - val tilstand: Tilstand, - val tilgjengeliggjort: Instant?, - val fristUtloept: Instant?, - val sisteVarselOmGjenstaaendeGraceTid: Instant?, - val bekreftelseId: UUID, - val gjelderFra: Instant, - val gjelderTil: Instant -) - -@JsonTypeInfo( - use = JsonTypeInfo.Id.NAME, - include = JsonTypeInfo.As.PROPERTY, - property = "type" -) -@JsonSubTypes( - JsonSubTypes.Type(value = Tilstand.IkkeKlarForUtfylling::class, name = "IkkeKlarForUtfylling"), - JsonSubTypes.Type(value = Tilstand.KlarForUtfylling::class, name = "KlarForUtfylling"), - JsonSubTypes.Type(value = Tilstand.VenterSvar::class, name = "VenterSvar"), - JsonSubTypes.Type(value = Tilstand.GracePeriodeUtloept::class, name = "GracePeriodeUtloept"), - JsonSubTypes.Type(value = Tilstand.Levert::class, name = "Levert") -) -sealed class Tilstand { - data object IkkeKlarForUtfylling : Tilstand() - data object KlarForUtfylling : Tilstand() - data object VenterSvar : Tilstand() - data object GracePeriodeUtloept : Tilstand() - data object Levert : Tilstand() -} - -@JvmRecord -data class PeriodeInfo( - val periodeId: UUID, - val identitetsnummer: String, - val arbeidsoekerId: Long, - val recordKey: Long, - val startet: Instant, - val avsluttet: Instant? -) { - val erAvsluttet: Boolean - get() = avsluttet != null -} - fun initTilstand( id: Long, key: Long, @@ -73,33 +24,3 @@ fun initTilstand( ), bekreftelser = emptyList() ) - -fun initBekreftelsePeriode( - periode: PeriodeInfo, - interval: Duration, -): Bekreftelse = - Bekreftelse( - tilstand = Tilstand.IkkeKlarForUtfylling, - tilgjengeliggjort = null, - fristUtloept = null, - sisteVarselOmGjenstaaendeGraceTid = null, - bekreftelseId = UUID.randomUUID(), - gjelderFra = periode.startet, - gjelderTil = fristForNesteBekreftelse(periode.startet, interval) - ) - -fun List.initNewBekreftelse( - tilgjengeliggjort: Instant, - interval: Duration -): Bekreftelse { - val sisteBekreftelse = maxBy { it.gjelderTil } - - return sisteBekreftelse.copy( - tilstand = Tilstand.KlarForUtfylling, - tilgjengeliggjort = tilgjengeliggjort, - sisteVarselOmGjenstaaendeGraceTid = null, - bekreftelseId = UUID.randomUUID(), - gjelderFra = sisteBekreftelse.gjelderTil, - gjelderTil = fristForNesteBekreftelse(sisteBekreftelse.gjelderTil, interval) - ) -} diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/KontrolerFrister.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/KontrolerFrister.kt index 09946623..058f8a93 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/KontrolerFrister.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/KontrolerFrister.kt @@ -8,30 +8,53 @@ import java.time.Instant const val MAKS_ANTALL_UTSTEENDE_BEKREFTELSER: Int = 100 private val maksAntallLogger = LoggerFactory.getLogger("maksAntallLogger") fun Bekreftelse.erKlarForUtfylling(now: Instant, tilgjengeligOffset: Duration): Boolean = - now.isAfter(gjelderTil.minus(tilgjengeligOffset)) + when (sisteTilstand()) { + is IkkeKlarForUtfylling -> now.isAfter(gjelderTil.minus(tilgjengeligOffset)) + else -> false + } + fun Bekreftelse.harFristUtloept(now: Instant, tilgjengeligOffset: Duration): Boolean = - now.isAfter(tilgjengeliggjort?.plus(tilgjengeligOffset) ?: gjelderTil) + when (val gjeldeneTilstand = sisteTilstand()) { + is KlarForUtfylling -> now.isAfter(gjeldeneTilstand.timestamp.plus(tilgjengeligOffset)) + else -> false + } + fun Bekreftelse.erSisteVarselOmGjenstaaendeGraceTid(now: Instant, varselFoerGraceperiodeUtloept: Duration): Boolean = - sisteVarselOmGjenstaaendeGraceTid == null && now.isAfter(fristUtloept?.plus(varselFoerGraceperiodeUtloept) ?: gjelderTil.plus( - varselFoerGraceperiodeUtloept - )) + when (val gjeldeneTilstand = sisteTilstand()) { + is VenterSvar -> !has() && now.isAfter( + gjeldeneTilstand.timestamp.plus( + varselFoerGraceperiodeUtloept + ) + ) -fun Bekreftelse.harGraceperiodeUtloept(now: Instant, graceperiode: Duration): Boolean = - now.isAfter(fristUtloept?.plus(graceperiode) ?: gjelderTil.plus(graceperiode)) + else -> false + } -fun NonEmptyList.shouldCreateNewBekreftelse(now: Instant, interval: Duration, tilgjengeligOffset: Duration): Boolean = +fun Bekreftelse.harGraceperiodeUtloept(now: Instant, graceperiode: Duration): Boolean = + when (val gjeldeneTilstand = sisteTilstand()) { + is VenterSvar -> now.isAfter(gjeldeneTilstand.timestamp.plus(graceperiode)) + is GracePeriodeVarselet -> tilstandsLogg.get()?.timestamp?.let { ts -> now.isAfter(ts.plus(graceperiode)) } ?: false + else -> false + } + + +fun NonEmptyList.shouldCreateNewBekreftelse( + now: Instant, + interval: Duration, + tilgjengeligOffset: Duration +): Boolean = (size < MAKS_ANTALL_UTSTEENDE_BEKREFTELSER) .also { underGrense -> if (!underGrense) { maksAntallLogger.warn("Maks antall bekreftelser er nådd!") } } && - maxBy { it.gjelderTil } - .let { - now.isAfter(it.gjelderTil.plus(interval.minus(tilgjengeligOffset))) - } + maxBy { it.gjelderTil } + .let { + now.isAfter(it.gjelderTil.plus(interval.minus(tilgjengeligOffset))) + } diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/PeriodeInfo.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/PeriodeInfo.kt new file mode 100644 index 00000000..56f9b9b8 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/PeriodeInfo.kt @@ -0,0 +1,17 @@ +package no.nav.paw.bekreftelsetjeneste.tilstand + +import java.time.Instant +import java.util.* + +@JvmRecord +data class PeriodeInfo( + val periodeId: UUID, + val identitetsnummer: String, + val arbeidsoekerId: Long, + val recordKey: Long, + val startet: Instant, + val avsluttet: Instant? +) { + val erAvsluttet: Boolean + get() = avsluttet != null +} \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/RegisterInstillinger.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/RegisterInstillinger.kt index 46c1f255..9febd49b 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/RegisterInstillinger.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/RegisterInstillinger.kt @@ -13,7 +13,7 @@ fun fristForNesteBekreftelse(forrige: Instant, interval: Duration): Instant { } fun Bekreftelse.gjenstaendeGraceperiode(timestamp: Instant, graceperiode: Duration): Duration { - val utvidetGjelderTil = fristUtloept?.plus(graceperiode) ?: gjelderTil.plus(graceperiode) + val utvidetGjelderTil = tilstand()?.timestamp?.plus(graceperiode) ?: gjelderTil.plus(graceperiode) return if (timestamp.isAfter(utvidetGjelderTil)) { Duration.ZERO diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelsePunctuator.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelsePunctuator.kt index 4a58c5a3..5711587c 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelsePunctuator.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelsePunctuator.kt @@ -7,17 +7,7 @@ import no.nav.paw.bekreftelse.internehendelser.LeveringsfristUtloept import no.nav.paw.bekreftelse.internehendelser.RegisterGracePeriodeGjenstaaendeTid import no.nav.paw.bekreftelse.internehendelser.RegisterGracePeriodeUtloept import no.nav.paw.bekreftelsetjeneste.config.BekreftelseIntervals -import no.nav.paw.bekreftelsetjeneste.tilstand.erKlarForUtfylling -import no.nav.paw.bekreftelsetjeneste.tilstand.erSisteVarselOmGjenstaaendeGraceTid -import no.nav.paw.bekreftelsetjeneste.tilstand.harFristUtloept -import no.nav.paw.bekreftelsetjeneste.tilstand.harGraceperiodeUtloept -import no.nav.paw.bekreftelsetjeneste.tilstand.shouldCreateNewBekreftelse -import no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse -import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand -import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand -import no.nav.paw.bekreftelsetjeneste.tilstand.gjenstaendeGraceperiode -import no.nav.paw.bekreftelsetjeneste.tilstand.initBekreftelsePeriode -import no.nav.paw.bekreftelsetjeneste.tilstand.initNewBekreftelse +import no.nav.paw.bekreftelsetjeneste.tilstand.* import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.processor.api.ProcessorContext import org.apache.kafka.streams.processor.api.Record @@ -64,7 +54,7 @@ private fun processBekreftelser( } private fun InternTilstand.createInitialBekreftelse(interval: Duration): InternTilstand = - copy(bekreftelser = listOf(initBekreftelsePeriode(periode, interval))) + copy(bekreftelser = listOf(opprettFoersteBekreftelse(periode, interval))) private fun InternTilstand.checkAndCreateNewBekreftelse( timestamp: Instant, @@ -73,7 +63,10 @@ private fun InternTilstand.checkAndCreateNewBekreftelse( val nonEmptyBekreftelser = bekreftelser.toNonEmptyListOrNull() ?: return this to null return if (nonEmptyBekreftelser.shouldCreateNewBekreftelse(timestamp, bekreftelseIntervals.interval, bekreftelseIntervals.tilgjengeligOffset)) { - val newBekreftelse = bekreftelser.initNewBekreftelse(tilgjengeliggjort = timestamp, interval = bekreftelseIntervals.interval) + val newBekreftelse = nonEmptyBekreftelser.opprettNesteTilgjengeligeBekreftelse( + tilgjengeliggjort = timestamp, + interval = bekreftelseIntervals.interval + ) copy(bekreftelser = nonEmptyBekreftelser + newBekreftelse) to createNewBekreftelseTilgjengelig(newBekreftelse) } else { this to null @@ -105,8 +98,8 @@ private fun InternTilstand.getProcessedBekreftelseTilstandAndHendelse( bekreftelseIntervals: BekreftelseIntervals, ): Pair { return when { - bekreftelse.tilstand is Tilstand.IkkeKlarForUtfylling && bekreftelse.erKlarForUtfylling(timestamp, bekreftelseIntervals.tilgjengeligOffset) -> { - val updatedBekreftelse = bekreftelse.copy(tilstand = Tilstand.KlarForUtfylling, tilgjengeliggjort = timestamp) + bekreftelse.erKlarForUtfylling(timestamp, bekreftelseIntervals.tilgjengeligOffset) -> { + val updatedBekreftelse = bekreftelse + KlarForUtfylling(timestamp) val hendelse = BekreftelseTilgjengelig( hendelseId = UUID.randomUUID(), periodeId = periode.periodeId, @@ -118,8 +111,8 @@ private fun InternTilstand.getProcessedBekreftelseTilstandAndHendelse( updatedBekreftelse to hendelse } - bekreftelse.tilstand is Tilstand.KlarForUtfylling && bekreftelse.harFristUtloept(timestamp, bekreftelseIntervals.tilgjengeligOffset) -> { - val updatedBekreftelse = bekreftelse.copy(tilstand = Tilstand.VenterSvar, fristUtloept = timestamp) + bekreftelse.harFristUtloept(timestamp, bekreftelseIntervals.tilgjengeligOffset) -> { + val updatedBekreftelse = bekreftelse + VenterSvar(timestamp) val hendelse = LeveringsfristUtloept( hendelseId = UUID.randomUUID(), periodeId = periode.periodeId, @@ -130,25 +123,25 @@ private fun InternTilstand.getProcessedBekreftelseTilstandAndHendelse( updatedBekreftelse to hendelse } - bekreftelse.tilstand == Tilstand.VenterSvar && bekreftelse.erSisteVarselOmGjenstaaendeGraceTid(timestamp, bekreftelseIntervals.varselFoerGraceperiodeUtloept) -> { - val updatedBekreftelse = bekreftelse.copy(sisteVarselOmGjenstaaendeGraceTid = timestamp) - val hendelse = RegisterGracePeriodeGjenstaaendeTid( + bekreftelse.harGraceperiodeUtloept(timestamp, bekreftelseIntervals.graceperiode) -> { + val updatedBekreftelse = bekreftelse + GracePeriodeUtloept(timestamp) + val hendelse = RegisterGracePeriodeUtloept( hendelseId = UUID.randomUUID(), periodeId = periode.periodeId, arbeidssoekerId = periode.arbeidsoekerId, bekreftelseId = bekreftelse.bekreftelseId, - gjenstaandeTid = bekreftelse.gjenstaendeGraceperiode(timestamp, bekreftelseIntervals.graceperiode), - hendelseTidspunkt = Instant.now()) + hendelseTidspunkt = Instant.now()) updatedBekreftelse to hendelse } - bekreftelse.tilstand == Tilstand.VenterSvar && bekreftelse.harGraceperiodeUtloept(timestamp, bekreftelseIntervals.graceperiode) -> { - val updatedBekreftelse = bekreftelse.copy(tilstand = Tilstand.GracePeriodeUtloept) - val hendelse = RegisterGracePeriodeUtloept( + bekreftelse.erSisteVarselOmGjenstaaendeGraceTid(timestamp, bekreftelseIntervals.varselFoerGraceperiodeUtloept) -> { + val updatedBekreftelse = bekreftelse + GracePeriodeVarselet(timestamp) + val hendelse = RegisterGracePeriodeGjenstaaendeTid( hendelseId = UUID.randomUUID(), periodeId = periode.periodeId, arbeidssoekerId = periode.arbeidsoekerId, bekreftelseId = bekreftelse.bekreftelseId, + gjenstaandeTid = bekreftelse.gjenstaendeGraceperiode(timestamp, bekreftelseIntervals.graceperiode), hendelseTidspunkt = Instant.now()) updatedBekreftelse to hendelse } diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelseStream.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelseStream.kt index e5880ed0..83ec78e3 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelseStream.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelseStream.kt @@ -11,11 +11,7 @@ import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde import no.nav.paw.bekreftelse.internehendelser.BekreftelseMeldingMottatt import no.nav.paw.bekreftelsetjeneste.config.ApplicationConfig -import no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse -import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand -import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand -import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand.KlarForUtfylling -import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand.VenterSvar +import no.nav.paw.bekreftelsetjeneste.tilstand.* import no.nav.paw.config.kafka.streams.Punctuation import no.nav.paw.config.kafka.streams.genericProcess import org.apache.kafka.common.serialization.Serdes @@ -92,22 +88,22 @@ fun processPawNamespace( return } - when (bekreftelse.tilstand) { + when (val sisteTilstand = bekreftelse.sisteTilstand()) { is VenterSvar, is KlarForUtfylling -> { val (hendelser, oppdatertBekreftelse) = behandleGyldigSvar(gjeldeneTilstand, record, bekreftelse) oppdaterStateStore(stateStore, gjeldeneTilstand, oppdatertBekreftelse) - Span.current().setAttribute("bekreftelse.tilstand", bekreftelse.tilstand.toString()) + Span.current().setAttribute("bekreftelse.tilstand", sisteTilstand.toString()) forwardHendelser(record, hendelser, forward) } else -> { - Span.current().setAttribute("unexpected_tilstand", bekreftelse.tilstand.toString()) + Span.current().setAttribute("unexpected_tilstand", sisteTilstand.toString()) meldingsLogger.warn( "Melding {} har ikke forventet tilstand, tilstand={}", record.value().id, - bekreftelse.tilstand + sisteTilstand ) } } @@ -125,14 +121,14 @@ fun behandleGyldigSvar( bekreftelse: Bekreftelse ): Pair, Bekreftelse> { val arbeidssoekerId = gjeldeneTilstand.periode.arbeidsoekerId - + val sisteTilstand = bekreftelse.sisteTilstand() Span.current().setAttribute("arbeidsoekerId", arbeidssoekerId.toString()) Span.current().setAttribute("bekreftelseId", bekreftelse.bekreftelseId.toString()) Span.current().setAttribute("periodeId", record.value().periodeId.toString()) - Span.current().setAttribute("bekreftelse.oldTilstand", bekreftelse.tilstand.toString()) + Span.current().setAttribute("bekreftelse.oldTilstand", sisteTilstand.toString()) - val oppdatertBekreftelse = bekreftelse.copy(tilstand = Tilstand.Levert) - Span.current().setAttribute("bekreftelse.newTilstand", Tilstand.Levert.toString()) + val oppdatertBekreftelse = bekreftelse + Levert(Instant.now()) + Span.current().setAttribute("bekreftelse.newTilstand", oppdatertBekreftelse.sisteTilstand().toString()) val vilFortsette = record.value().svar.vilFortsetteSomArbeidssoeker Span.current().setAttribute("vilFortsetteSomArbeidssoeker", vilFortsette.toString()) diff --git a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuatorTest.kt b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuatorTest.kt index a12bb7e1..f5de06e0 100644 --- a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuatorTest.kt +++ b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuatorTest.kt @@ -1,6 +1,7 @@ package no.nav.paw.bekreftelsetjeneste import io.kotest.core.spec.style.FreeSpec +import io.kotest.matchers.should import io.kotest.matchers.shouldBe import io.kotest.matchers.types.shouldBeInstanceOf import no.nav.paw.arbeidssoekerregisteret.testdata.kafkaKeyContext @@ -15,11 +16,7 @@ import no.nav.paw.bekreftelse.internehendelser.RegisterGracePeriodeUtloept import no.nav.paw.bekreftelse.melding.v1.vo.BrukerType import no.nav.paw.bekreftelse.melding.v1.vo.Metadata import no.nav.paw.bekreftelse.melding.v1.vo.Svar -import no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse -import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand -import no.nav.paw.bekreftelsetjeneste.tilstand.PeriodeInfo -import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand -import no.nav.paw.bekreftelsetjeneste.tilstand.fristForNesteBekreftelse +import no.nav.paw.bekreftelsetjeneste.tilstand.* import no.nav.paw.bekreftelsetjeneste.topology.StateStore import java.time.Duration import java.time.Instant @@ -31,9 +28,12 @@ class BekreftelsePunctuatorTest : FreeSpec({ "BekreftelsePunctuator sender riktig hendelser i rekkefølge" - { with(ApplicationTestContext(initialWallClockTime = startTime)) { - with(kafkaKeyContext()){ + with(kafkaKeyContext()) { val (interval, graceperiode, tilgjengeligOffset, varselFoerGraceperiodeUtloept) = applicationConfig.bekreftelseIntervals - val (id, key, periode) = periode(identitetsnummer = identitetsnummer, startetMetadata = metadata(tidspunkt = startTime)) + val (id, key, periode) = periode( + identitetsnummer = identitetsnummer, + startetMetadata = metadata(tidspunkt = startTime) + ) periodeTopic.pipeInput(key, periode) testDriver.advanceWallClockTime(Duration.ofSeconds(5)) @@ -41,29 +41,27 @@ class BekreftelsePunctuatorTest : FreeSpec({ bekreftelseHendelseloggTopicOut.isEmpty shouldBe true val stateStore: StateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - val currentState = stateStore.get(periode.id) - currentState shouldBe InternTilstand( - periode = PeriodeInfo( + + stateStore.get(periode.id) should { currentState -> + currentState.periode shouldBe PeriodeInfo( periodeId = periode.id, identitetsnummer = periode.identitetsnummer, arbeidsoekerId = id, recordKey = key, startet = periode.startet.tidspunkt, avsluttet = periode.avsluttet?.tidspunkt - ), bekreftelser = listOf( - Bekreftelse( - tilstand = Tilstand.IkkeKlarForUtfylling, - tilgjengeliggjort = null, - fristUtloept = null, - sisteVarselOmGjenstaaendeGraceTid = null, - bekreftelseId = currentState.bekreftelser.first().bekreftelseId, - gjelderFra = periode.startet.tidspunkt, - gjelderTil = fristForNesteBekreftelse( - periode.startet.tidspunkt, interval - ) - ) ) - ) + currentState.bekreftelser.size shouldBe 1 + currentState.bekreftelser.first() should { bekreftelse -> + bekreftelse.gjelderFra shouldBe periode.startet.tidspunkt + bekreftelse.gjelderTil shouldBe fristForNesteBekreftelse( + periode.startet.tidspunkt, + interval + ) + bekreftelse.tilstandsLogg.asList().size shouldBe 1 + bekreftelse.has() shouldBe true + } + } } "Etter 11 dager skal det ha blitt sendt en BekreftelseTilgjengelig hendelse" { @@ -163,9 +161,12 @@ class BekreftelsePunctuatorTest : FreeSpec({ } "BekreftelsePunctuator håndterer BekreftelseMeldingMottatt hendelse" { with(ApplicationTestContext(initialWallClockTime = startTime)) { - with(kafkaKeyContext()){ + with(kafkaKeyContext()) { val (interval, graceperiode, tilgjengeligOffset, _) = applicationConfig.bekreftelseIntervals - val (_, key, periode) = periode(identitetsnummer = identitetsnummer, startetMetadata = metadata(tidspunkt = startTime)) + val (_, key, periode) = periode( + identitetsnummer = identitetsnummer, + startetMetadata = metadata(tidspunkt = startTime) + ) periodeTopic.pipeInput(key, periode) testDriver.advanceWallClockTime( @@ -269,28 +270,26 @@ class BekreftelsePunctuatorTest : FreeSpec({ val stateStore: StateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) val currentState = stateStore.get(periode.id) - currentState shouldBe InternTilstand( - periode = PeriodeInfo( + currentState should { + it.periode shouldBe PeriodeInfo( periodeId = periode.id, identitetsnummer = periode.identitetsnummer, arbeidsoekerId = id, recordKey = key, startet = periode.startet.tidspunkt, avsluttet = periode.avsluttet?.tidspunkt - ), bekreftelser = listOf( - Bekreftelse( - tilstand = Tilstand.IkkeKlarForUtfylling, - tilgjengeliggjort = null, - fristUtloept = null, - sisteVarselOmGjenstaaendeGraceTid = null, - bekreftelseId = currentState.bekreftelser.first().bekreftelseId, - gjelderFra = periode.startet.tidspunkt, - gjelderTil = fristForNesteBekreftelse( - periode.startet.tidspunkt, interval - ) - ) ) - ) + it.bekreftelser.size shouldBe 1 + it.bekreftelser.first() should { bekreftelse -> + bekreftelse.gjelderFra shouldBe periode.startet.tidspunkt + bekreftelse.gjelderTil shouldBe fristForNesteBekreftelse( + periode.startet.tidspunkt, + interval + ) + bekreftelse.tilstandsLogg.asList().size shouldBe 1 + bekreftelse.has() shouldBe true + } + } bekreftelseHendelseloggTopicOut.isEmpty shouldBe true } } @@ -312,29 +311,27 @@ class BekreftelsePunctuatorTest : FreeSpec({ val stateStore: StateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) val currentState = stateStore.get(periode.id) - currentState shouldBe InternTilstand( - periode = PeriodeInfo( + currentState should { + it.periode shouldBe PeriodeInfo( periodeId = periode.id, identitetsnummer = periode.identitetsnummer, arbeidsoekerId = id, recordKey = key, startet = periode.startet.tidspunkt, avsluttet = periode.avsluttet?.tidspunkt - ), bekreftelser = listOf( - Bekreftelse( - tilstand = Tilstand.KlarForUtfylling, - tilgjengeliggjort = startTime.plus(interval) - .minus(tilgjengeligOffset).plusSeconds(5), - fristUtloept = null, - sisteVarselOmGjenstaaendeGraceTid = null, - bekreftelseId = currentState.bekreftelser.first().bekreftelseId, - gjelderFra = periode.startet.tidspunkt, - gjelderTil = fristForNesteBekreftelse( - periode.startet.tidspunkt, interval - ) - ) ) - ) + it.bekreftelser.size shouldBe 1 + it.bekreftelser.first() should { bekreftelse -> + bekreftelse.gjelderFra shouldBe periode.startet.tidspunkt + bekreftelse.gjelderTil shouldBe fristForNesteBekreftelse( + periode.startet.tidspunkt, + interval + ) + bekreftelse.tilstandsLogg.asList().size shouldBe 2 + bekreftelse.has() shouldBe true + bekreftelse.has() shouldBe true + } + } bekreftelseHendelseloggTopicOut.isEmpty shouldBe false val hendelser = bekreftelseHendelseloggTopicOut.readKeyValuesToList() hendelser.size shouldBe 1 @@ -368,29 +365,28 @@ class BekreftelsePunctuatorTest : FreeSpec({ } } val currentState = stateStore.get(periode.id) - currentState shouldBe InternTilstand( - periode = PeriodeInfo( + currentState should { + it.periode shouldBe PeriodeInfo( periodeId = periode.id, identitetsnummer = periode.identitetsnummer, arbeidsoekerId = id, recordKey = key, startet = periode.startet.tidspunkt, avsluttet = periode.avsluttet?.tidspunkt - ), bekreftelser = listOf( - Bekreftelse( - tilstand = Tilstand.VenterSvar, - tilgjengeliggjort = startTime.plus(interval) - .minus(tilgjengeligOffset).plusSeconds(5), - fristUtloept = startTime.plus(interval).plusSeconds(10), - sisteVarselOmGjenstaaendeGraceTid = null, - bekreftelseId = currentState.bekreftelser.first().bekreftelseId, - gjelderFra = periode.startet.tidspunkt, - gjelderTil = fristForNesteBekreftelse( - periode.startet.tidspunkt, interval - ) - ) ) - ) + it.bekreftelser.size shouldBe 1 + it.bekreftelser.first() should { bekreftelse -> + bekreftelse.gjelderFra shouldBe periode.startet.tidspunkt + bekreftelse.gjelderTil shouldBe fristForNesteBekreftelse( + periode.startet.tidspunkt, + interval + ) + bekreftelse.tilstandsLogg.asList().size shouldBe 3 + bekreftelse.has() shouldBe true + bekreftelse.has() shouldBe true + bekreftelse.has() shouldBe true + } + } bekreftelseHendelseloggTopicOut.isEmpty shouldBe false val hendelser = bekreftelseHendelseloggTopicOut.readKeyValuesToList() logger.info("hendelser: $hendelser") @@ -422,30 +418,29 @@ class BekreftelsePunctuatorTest : FreeSpec({ val stateStore: StateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) val currentState = stateStore.get(periode.id) - currentState shouldBe InternTilstand( - periode = PeriodeInfo( + currentState should { + it.periode shouldBe PeriodeInfo( periodeId = periode.id, identitetsnummer = periode.identitetsnummer, arbeidsoekerId = id, recordKey = key, startet = periode.startet.tidspunkt, avsluttet = periode.avsluttet?.tidspunkt - ), bekreftelser = listOf( - Bekreftelse( - tilstand = Tilstand.VenterSvar, - tilgjengeliggjort = startTime.plus(interval) - .minus(tilgjengeligOffset).plusSeconds(5), - fristUtloept = startTime.plus(interval).plusSeconds(10), - sisteVarselOmGjenstaaendeGraceTid = startTime.plus(interval) - .plus(varselFoerGraceperiodeUtloept).plusSeconds(15), - bekreftelseId = currentState.bekreftelser.first().bekreftelseId, - gjelderFra = periode.startet.tidspunkt, - gjelderTil = fristForNesteBekreftelse( - periode.startet.tidspunkt, interval - ) - ) ) - ) + it.bekreftelser.size shouldBe 1 + it.bekreftelser.first() should { bekreftelse -> + bekreftelse.gjelderFra shouldBe periode.startet.tidspunkt + bekreftelse.gjelderTil shouldBe fristForNesteBekreftelse( + periode.startet.tidspunkt, + interval + ) + bekreftelse.tilstandsLogg.asList().size shouldBe 4 + bekreftelse.has() shouldBe true + bekreftelse.has() shouldBe true + bekreftelse.has() shouldBe true + bekreftelse.has() shouldBe true + } + } bekreftelseHendelseloggTopicOut.isEmpty shouldBe false val hendelser = bekreftelseHendelseloggTopicOut.readKeyValuesToList() logger.info("hendelser: $hendelser") @@ -479,30 +474,30 @@ class BekreftelsePunctuatorTest : FreeSpec({ val stateStore: StateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) val currentState = stateStore.get(periode.id) - currentState shouldBe InternTilstand( - periode = PeriodeInfo( + currentState should { + it.periode shouldBe PeriodeInfo( periodeId = periode.id, identitetsnummer = periode.identitetsnummer, arbeidsoekerId = id, recordKey = key, startet = periode.startet.tidspunkt, avsluttet = periode.avsluttet?.tidspunkt - ), bekreftelser = listOf( - Bekreftelse( - tilstand = Tilstand.GracePeriodeUtloept, - tilgjengeliggjort = startTime.plus(interval) - .minus(tilgjengeligOffset).plusSeconds(5), - fristUtloept = startTime.plus(interval).plusSeconds(10), - sisteVarselOmGjenstaaendeGraceTid = startTime.plus(interval) - .plus(varselFoerGraceperiodeUtloept).plusSeconds(15), - bekreftelseId = currentState.bekreftelser.first().bekreftelseId, - gjelderFra = periode.startet.tidspunkt, - gjelderTil = fristForNesteBekreftelse( - periode.startet.tidspunkt, interval - ) - ) ) - ) + it.bekreftelser.size shouldBe 1 + it.bekreftelser.first() should { bekreftelse -> + bekreftelse.gjelderFra shouldBe periode.startet.tidspunkt + bekreftelse.gjelderTil shouldBe fristForNesteBekreftelse( + periode.startet.tidspunkt, + interval + ) + bekreftelse.tilstandsLogg.asList().size shouldBe 5 + bekreftelse.has() shouldBe true + bekreftelse.has() shouldBe true + bekreftelse.has() shouldBe true + bekreftelse.has() shouldBe true + bekreftelse.has() shouldBe true + } + } bekreftelseHendelseloggTopicOut.isEmpty shouldBe false val hendelser = bekreftelseHendelseloggTopicOut.readKeyValuesToList() logger.info("hendelser: $hendelser") diff --git a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseStreamTest.kt b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseStreamTest.kt index 53da02f8..ef6c022e 100644 --- a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseStreamTest.kt +++ b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseStreamTest.kt @@ -1,21 +1,37 @@ package no.nav.paw.bekreftelsetjeneste +import io.kotest.assertions.withClue import io.kotest.core.spec.style.FreeSpec +import io.kotest.matchers.collections.shouldContainExactly +import io.kotest.matchers.comparables.shouldBeGreaterThan +import io.kotest.matchers.should import io.kotest.matchers.shouldBe import io.kotest.matchers.types.shouldBeInstanceOf import no.nav.paw.arbeidssoekerregisteret.testdata.bekreftelse.bekreftelseMelding import no.nav.paw.arbeidssoekerregisteret.testdata.kafkaKeyContext import no.nav.paw.arbeidssoekerregisteret.testdata.mainavro.metadata import no.nav.paw.arbeidssoekerregisteret.testdata.mainavro.periode -import no.nav.paw.bekreftelse.internehendelser.BaOmAaAvsluttePeriode -import no.nav.paw.bekreftelse.internehendelser.BekreftelseMeldingMottatt -import no.nav.paw.bekreftelse.internehendelser.BekreftelseTilgjengelig -import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand -import no.nav.paw.bekreftelsetjeneste.tilstand.PeriodeInfo -import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand +import no.nav.paw.bekreftelse.internehendelser.* +import no.nav.paw.bekreftelsetjeneste.tilstand.* +import org.apache.kafka.streams.TestOutputTopic +import java.time.Duration import java.time.Instant import java.util.* +inline fun TestOutputTopic.assertEvent(function: (T) -> A): A = + assertEvent().let(function) + +inline fun TestOutputTopic.assertEvent(): T { + withClue("Expected event of type ${T::class.simpleName}, no event was found") { + isEmpty shouldBe false + } + withClue("Expected event of type ${T::class.simpleName}") { + val event = readValue() + return event.shouldBeInstanceOf() + } +} + +val Int.seconds: Duration get() = Duration.ofSeconds(this.toLong()) class BekreftelseStreamTest : FreeSpec({ @@ -40,22 +56,30 @@ class BekreftelseStreamTest : FreeSpec({ bekreftelseHendelseloggTopicOut.isEmpty shouldBe true } + println("hei" + Thread.currentThread().name) } "Mottatt melding med tilhørende tilstand GracePeriodeUtloept skal tilstand være uendret og hendelselogg skal være tom" { with(ApplicationTestContext(initialWallClockTime = startTime)) { with(kafkaKeyContext()) { - val (interval, _, tilgjengeligOffset, varselFoerGraceperiodeUtloept) = applicationConfig.bekreftelseIntervals - val (id, key, periode) = periode(identitetsnummer = identitetsnummer, startetMetadata = metadata(tidspunkt = startTime)) + val (interval, graceperiode, tilgjengeligOffset, varselFoerGraceperiodeUtloept) = applicationConfig.bekreftelseIntervals + (varselFoerGraceperiodeUtloept.multipliedBy(2) + 5.seconds) shouldBeGreaterThan graceperiode + val (id, key, periode) = periode( + identitetsnummer = identitetsnummer, + startetMetadata = metadata(tidspunkt = startTime) + ) periodeTopic.pipeInput(key, periode) - testDriver.advanceWallClockTime( - interval.minus(tilgjengeligOffset).plusSeconds(5) - ) - val bekreftelseId = (bekreftelseHendelseloggTopicOut.readValue() as BekreftelseTilgjengelig).bekreftelseId - testDriver.advanceWallClockTime(tilgjengeligOffset.plusSeconds(5)) - testDriver.advanceWallClockTime(varselFoerGraceperiodeUtloept.plusSeconds(5)) - testDriver.advanceWallClockTime(varselFoerGraceperiodeUtloept.plusSeconds(5)) + testDriver.advanceWallClockTime(interval - tilgjengeligOffset + 5.seconds) + val bekreftelseId = bekreftelseHendelseloggTopicOut.assertEvent { event: BekreftelseTilgjengelig -> + event.bekreftelseId + } + testDriver.advanceWallClockTime(tilgjengeligOffset + 5.seconds) + bekreftelseHendelseloggTopicOut.assertEvent() + testDriver.advanceWallClockTime(varselFoerGraceperiodeUtloept + 5.seconds) + bekreftelseHendelseloggTopicOut.assertEvent() + testDriver.advanceWallClockTime(varselFoerGraceperiodeUtloept + 50.seconds) + bekreftelseHendelseloggTopicOut.assertEvent() bekreftelseHendelseloggTopicOut.readRecordsToList() val bekreftelseMelding = bekreftelseMelding( @@ -74,31 +98,34 @@ class BekreftelseStreamTest : FreeSpec({ testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) val internTilstand = stateStore[periode.id] - internTilstand shouldBe InternTilstand( - periode = PeriodeInfo( + internTilstand should { + it.periode shouldBe PeriodeInfo( periodeId = periode.id, identitetsnummer = periode.identitetsnummer, arbeidsoekerId = id, recordKey = key, startet = periode.startet.tidspunkt, avsluttet = periode.avsluttet?.tidspunkt - ), - bekreftelser = listOf( - no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse( - tilstand = Tilstand.GracePeriodeUtloept, - tilgjengeliggjort = startTime.plus( - interval.minus(tilgjengeligOffset) - .plusSeconds(5) - ), - fristUtloept = startTime.plus(interval).plusSeconds(10), - sisteVarselOmGjenstaaendeGraceTid = startTime.plus(interval).plus(varselFoerGraceperiodeUtloept).plusSeconds(15), - bekreftelseId = bekreftelseMelding.id, - gjelderFra = bekreftelseMelding.svar.gjelderFra, - gjelderTil = bekreftelseMelding.svar.gjelderTil - ) ) - ) - + it.bekreftelser.size shouldBe 1 + it.bekreftelser.first() should { bekreftelse -> + bekreftelse.bekreftelseId shouldBe bekreftelseMelding.id + bekreftelse.gjelderFra shouldBe bekreftelseMelding.svar.gjelderFra + bekreftelse.gjelderTil shouldBe bekreftelseMelding.svar.gjelderTil + bekreftelse.tilstandsLogg + .asList() + .sortedBy(BekreftelseTilstand::timestamp) + .map { it::class } + .shouldContainExactly( + IkkeKlarForUtfylling::class, + KlarForUtfylling::class, + VenterSvar::class, + GracePeriodeVarselet::class, + GracePeriodeUtloept::class + ) + bekreftelse.sisteTilstand().shouldBeInstanceOf() + } + } bekreftelseHendelseloggTopicOut.isEmpty shouldBe true } } @@ -108,14 +135,19 @@ class BekreftelseStreamTest : FreeSpec({ with(ApplicationTestContext(initialWallClockTime = startTime)) { with(kafkaKeyContext()) { val (interval, _, tilgjengeligOffset, _) = applicationConfig.bekreftelseIntervals - val (id, key, periode) = periode(identitetsnummer = identitetsnummer, startetMetadata = metadata(tidspunkt = startTime)) + val (id, key, periode) = periode( + identitetsnummer = identitetsnummer, + startetMetadata = metadata(tidspunkt = startTime) + ) periodeTopic.pipeInput(key, periode) testDriver.advanceWallClockTime( interval.minus(tilgjengeligOffset).plusSeconds(5) ) - val bekreftelseId = (bekreftelseHendelseloggTopicOut.readValue() as BekreftelseTilgjengelig).bekreftelseId + val bekreftelseId = bekreftelseHendelseloggTopicOut.assertEvent { event: BekreftelseTilgjengelig -> + event.bekreftelseId + } val bekreftelseMelding = bekreftelseMelding( id = bekreftelseId, periodeId = periode.id, @@ -131,30 +163,26 @@ class BekreftelseStreamTest : FreeSpec({ testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) val internTilstand = stateStore[periode.id] - internTilstand shouldBe InternTilstand( - periode = PeriodeInfo( + internTilstand should { + val periode = PeriodeInfo( periodeId = periode.id, identitetsnummer = periode.identitetsnummer, arbeidsoekerId = id, recordKey = key, startet = periode.startet.tidspunkt, avsluttet = periode.avsluttet?.tidspunkt - ), - bekreftelser = listOf( - no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse( - tilstand = Tilstand.Levert, - tilgjengeliggjort = startTime.plus( - interval.minus(tilgjengeligOffset) - .plusSeconds(5) - ), - fristUtloept = null, - sisteVarselOmGjenstaaendeGraceTid = null, - bekreftelseId = bekreftelseMelding.id, - gjelderFra = bekreftelseMelding.svar.gjelderFra, - gjelderTil = bekreftelseMelding.svar.gjelderTil - ) ) - ) + it.periode shouldBe periode + it.bekreftelser.size shouldBe 1 + it.bekreftelser.first().should { bekreftelse -> + bekreftelse.gjelderFra shouldBe bekreftelseMelding.svar.gjelderFra + bekreftelse.gjelderTil shouldBe bekreftelseMelding.svar.gjelderTil + bekreftelseId shouldBe bekreftelseMelding.id + bekreftelse.has() shouldBe true + bekreftelse.has() shouldBe true + bekreftelse.sisteTilstand().shouldBeInstanceOf() + } + } bekreftelseHendelseloggTopicOut.isEmpty shouldBe false val hendelse = bekreftelseHendelseloggTopicOut.readKeyValue() @@ -174,7 +202,10 @@ class BekreftelseStreamTest : FreeSpec({ with(ApplicationTestContext(initialWallClockTime = startTime)) { with(kafkaKeyContext()) { val (interval, _, tilgjengeligOffset, _) = applicationConfig.bekreftelseIntervals - val (id, key, periode) = periode(identitetsnummer = identitetsnummer, startetMetadata = metadata(tidspunkt = startTime)) + val (id, key, periode) = periode( + identitetsnummer = identitetsnummer, + startetMetadata = metadata(tidspunkt = startTime) + ) periodeTopic.pipeInput(key, periode) testDriver.advanceWallClockTime( @@ -197,30 +228,32 @@ class BekreftelseStreamTest : FreeSpec({ testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) val internTilstand = stateStore[periode.id] - internTilstand shouldBe InternTilstand( - periode = PeriodeInfo( + internTilstand should { + it.periode shouldBe PeriodeInfo( periodeId = periode.id, identitetsnummer = periode.identitetsnummer, arbeidsoekerId = id, recordKey = key, startet = periode.startet.tidspunkt, avsluttet = periode.avsluttet?.tidspunkt - ), - bekreftelser = listOf( - no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse( - tilstand = Tilstand.Levert, - tilgjengeliggjort = startTime.plus( - interval.minus(tilgjengeligOffset) - .plusSeconds(5) - ), - fristUtloept = null, - sisteVarselOmGjenstaaendeGraceTid = null, - bekreftelseId = bekreftelseMelding.id, - gjelderFra = bekreftelseMelding.svar.gjelderFra, - gjelderTil = bekreftelseMelding.svar.gjelderTil, - ) ) - ) + it.bekreftelser.size shouldBe 1 + it.bekreftelser.first().should { bekreftelse -> + bekreftelse.gjelderFra shouldBe bekreftelseMelding.svar.gjelderFra + bekreftelse.gjelderTil shouldBe bekreftelseMelding.svar.gjelderTil + bekreftelse.bekreftelseId shouldBe bekreftelseMelding.id + bekreftelse.tilstandsLogg + .asList() + .sortedBy(BekreftelseTilstand::timestamp) + .map { it::class } + .shouldContainExactly( + IkkeKlarForUtfylling::class, + KlarForUtfylling::class, + Levert::class + ) + bekreftelse.sisteTilstand().shouldBeInstanceOf() + } + } bekreftelseHendelseloggTopicOut.isEmpty shouldBe false val hendelse = bekreftelseHendelseloggTopicOut.readKeyValuesToList() diff --git a/apps/bekreftelse-utgang/build.gradle.kts b/apps/bekreftelse-utgang/build.gradle.kts index 7f14bf7f..184dbf02 100644 --- a/apps/bekreftelse-utgang/build.gradle.kts +++ b/apps/bekreftelse-utgang/build.gradle.kts @@ -29,7 +29,7 @@ dependencies { implementation(libs.jackson.datatypeJsr310) // Tooling - implementation(libs.arrowCore) + implementation(libs.arrow.core.core) // Logging implementation(libs.logbackClassic) diff --git a/apps/hendelselogg-backup/build.gradle.kts b/apps/hendelselogg-backup/build.gradle.kts index a0b134b5..ff94cd48 100644 --- a/apps/hendelselogg-backup/build.gradle.kts +++ b/apps/hendelselogg-backup/build.gradle.kts @@ -21,7 +21,7 @@ dependencies { implementation(project(":lib:hoplite-config")) implementation(project(":lib:kafka-key-generator-client")) - implementation(libs.arrowCore) + implementation(libs.arrow.core.core) implementation(libs.bundles.ktorServerWithNettyAndMicrometer) implementation(libs.ktor.server.cors) implementation(libs.ktor.server.swagger) diff --git a/apps/utgang-pdl/build.gradle.kts b/apps/utgang-pdl/build.gradle.kts index 6c0ea01a..43849d55 100644 --- a/apps/utgang-pdl/build.gradle.kts +++ b/apps/utgang-pdl/build.gradle.kts @@ -25,7 +25,7 @@ dependencies { implementation(project(":lib:kafka-streams")) implementation(project(":lib:hoplite-config")) - api(libs.arrowCore) + api(libs.arrow.core.core) implementation(libs.kafka.streams.core) implementation(libs.paw.pdl.client) diff --git a/domain/arbeidssoeker-regler/build.gradle.kts b/domain/arbeidssoeker-regler/build.gradle.kts index 8054b52b..ff3acf54 100644 --- a/domain/arbeidssoeker-regler/build.gradle.kts +++ b/domain/arbeidssoeker-regler/build.gradle.kts @@ -14,7 +14,7 @@ dependencies { implementation(project(":domain:interne-hendelser")) implementation(libs.paw.pdl.client) api(libs.micrometer.registryPrometheus) - api(libs.arrowCore) + api(libs.arrow.core.core) testImplementation(libs.ktor.server.testJvm) testImplementation(libs.test.junit5.runner) testImplementation(libs.test.kotest.assertionsCore) diff --git a/domain/interne-hendelser/src/main/kotlin/no/nav/paw/arbeidssokerregisteret/intern/v1/HendelseSerde.kt b/domain/interne-hendelser/src/main/kotlin/no/nav/paw/arbeidssokerregisteret/intern/v1/HendelseSerde.kt index 3bb36813..b889a58a 100644 --- a/domain/interne-hendelser/src/main/kotlin/no/nav/paw/arbeidssokerregisteret/intern/v1/HendelseSerde.kt +++ b/domain/interne-hendelser/src/main/kotlin/no/nav/paw/arbeidssokerregisteret/intern/v1/HendelseSerde.kt @@ -1,6 +1,9 @@ package no.nav.paw.arbeidssokerregisteret.intern.v1 +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.databind.DeserializationContext import com.fasterxml.jackson.databind.DeserializationFeature +import com.fasterxml.jackson.databind.JsonDeserializer import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import com.fasterxml.jackson.module.kotlin.KotlinFeature @@ -76,4 +79,4 @@ fun eventTypeToClass(type: String?): KClass = avvistStoppAvPeriodeHendelseType -> AvvistStoppAvPeriode::class opplysningerOmArbeidssoekerHendelseType -> OpplysningerOmArbeidssoekerMottatt::class else -> throw IllegalArgumentException("Ukjent hendelse type: '$type'") - } \ No newline at end of file + } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index d1afb867..dd938f3b 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -4,6 +4,7 @@ pawAaregClientVersion = "24.07.04.18-1" arbeidssokerregisteretVersion = "1.9348086045.48-1" bekreftelseSchemaVersion = "24.09.16.1-1" arrowVersion = "1.2.4" +arrowJacksonIntegrationVersion = "0.14.1" noNavCommonVersion = "3.2024.05.23_05.46-2b29fa343e8e" noNavSecurityVersion = "5.0.5" comSksamuelHopliteVersion = "2.8.2" @@ -29,7 +30,9 @@ hikariVersion = "6.0.0" [libraries] coroutinesCore = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version.ref = "coroutinesVersion" } -arrowCore = { group = "io.arrow-kt", name = "arrow-core", version.ref = "arrowVersion" } +arrow-core-core = { group = "io.arrow-kt", name = "arrow-core", version.ref = "arrowVersion" } +arrow-core-serialization = { group = "io.arrow-kt", name = "arrow-core-serialization", version.ref = "arrowVersion" } +arrow-integration-jackson = { group = "io.arrow-kt", name = "arrow-integrations-jackson-module", version.ref = "arrowJacksonIntegrationVersion" } logbackClassic = { group = "ch.qos.logback", name = "logback-classic", version.ref = "logbackVersion" } logstashLogbackEncoder = { group = "net.logstash.logback", name = "logstash-logback-encoder", version.ref = "logstashVersion" } ktor-client-contentNegotiation = { group = "io.ktor", name = "ktor-client-content-negotiation", version = "2.3.12" } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index b82aa23a..df97d72b 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.2-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME