From fe9076a163897c96b8cabad73542f83301f5fbca Mon Sep 17 00:00:00 2001 From: Nils Martin Sande Date: Wed, 11 Sep 2024 13:20:59 +0200 Subject: [PATCH] =?UTF-8?q?La=20til=20initiell=20h=C3=A5ndtering=20ev=20Me?= =?UTF-8?q?lding=20mottatt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/bekreftelse-tjeneste/oversikt.txt | 17 ++++++ .../ApplicationConfiguration.kt | 4 +- .../RapportingsMeldingTopology.kt | 60 ++++++++++++++++--- .../no/nav/paw/bekretelsetjeneste/Topology.kt | 2 +- .../tilstand/InternTilstand.kt | 13 ++-- .../bekreftelse/internehendelser/SerdeTest.kt | 1 - settings.gradle.kts | 3 +- 7 files changed, 80 insertions(+), 20 deletions(-) create mode 100644 apps/bekreftelse-tjeneste/oversikt.txt diff --git a/apps/bekreftelse-tjeneste/oversikt.txt b/apps/bekreftelse-tjeneste/oversikt.txt new file mode 100644 index 00000000..74399a31 --- /dev/null +++ b/apps/bekreftelse-tjeneste/oversikt.txt @@ -0,0 +1,17 @@ +Periode starter -> - oppretter intern state med start tidspunkt fra periode + - opprette bekreftelse i tilstand "ikke klar" +Schedule task: + for hver bekreftelse i "ikke klar", sett til til "klar for utfylling" x tid før gjelderTil og send hendelse BekreftelseTilgjenelig + + for hver bekreftelse i "klar for utfylling" og gjelderTil passert, sett til til "venter på svar" og send hendelse LeveringsFristUtloept + + for hver bekreftelse i "venter på svar" og ingen purring sendt og x tid passert siden frist, send RegisterGracePeriodeGjenstaaendeTid og sett purring sendt timestamp til now() + + for hver bekreftelse i "venter på svar" og grace periode utløpt, send RegisterGracePeriodeUtloept + +Melding med svar fra bruker mottatt -> - finn matchene berkreftelse i status "venter på svar" || "klar for utfylling" og set til "levert", ta vare på x siste bekreftelser. + - Send ut BekreftelseMeldingMottatt + - Dersom ønsker å avslutte: send BaOmAaAvsluttePeriode + +Periode avsluttet -> slett intern state og send PeriodeAvsluttet hendelse + diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/ApplicationConfiguration.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/ApplicationConfiguration.kt index 6a0ae1f6..d94642a5 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/ApplicationConfiguration.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/ApplicationConfiguration.kt @@ -9,4 +9,6 @@ data class ApplicationConfiguration( val bekreftelseHendelseloggTopic: String, val stateStoreName: String, val punctuateInterval: Duration -) +) { + val pawNamespace = "paw" +} diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/RapportingsMeldingTopology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/RapportingsMeldingTopology.kt index c24a1926..5ec5f383 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/RapportingsMeldingTopology.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/RapportingsMeldingTopology.kt @@ -1,30 +1,76 @@ package no.nav.paw.bekretelsetjeneste -import no.nav.paw.config.kafka.streams.genericProcess -import no.nav.paw.bekretelsetjeneste.tilstand.InternTilstand +import no.nav.paw.bekreftelse.internehendelser.BaOmAaAvsluttePeriode import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse +import no.nav.paw.bekreftelse.internehendelser.BekreftelseMeldingMottatt +import no.nav.paw.bekretelsetjeneste.tilstand.Bekreftelse +import no.nav.paw.bekretelsetjeneste.tilstand.InternTilstand +import no.nav.paw.bekretelsetjeneste.tilstand.Tilstand +import no.nav.paw.bekretelsetjeneste.tilstand.Tilstand.KlarForUtfylling +import no.nav.paw.bekretelsetjeneste.tilstand.Tilstand.VenterSvar +import no.nav.paw.config.kafka.streams.genericProcess import no.nav.paw.rapportering.melding.v1.Melding import org.apache.kafka.streams.StreamsBuilder +import org.apache.kafka.streams.processor.api.Record import org.slf4j.LoggerFactory +import java.util.* context(ApplicationConfiguration, ApplicationContext) -fun StreamsBuilder.processRapporteringsMeldingTopic() { +fun StreamsBuilder.processBekreftelseMeldingTopic() { stream(bekreftelseTopic) .genericProcess( name = "meldingMottatt", stateStoreName ) { record -> - val gjeldeneTilstand: InternTilstand? = getStateStore(stateStoreName)[record.value().periodeId] + val stateStore = getStateStore(stateStoreName) + val gjeldeneTilstand: InternTilstand? = stateStore[record.value().periodeId] if (gjeldeneTilstand == null) { meldingsLogger.warn("Melding mottatt for periode som ikke er aktiv/eksisterer") - } else { - TODO() + return@genericProcess + } + if (record.value().namespace == pawNamespace) { + val bekreftelse = gjeldeneTilstand.bekreftelser.find { bekreftelse -> bekreftelse.bekreftelseId == record.value().id } + when { + bekreftelse == null -> { + meldingsLogger.warn("Melding {} har ingen matchene bekreftelse", record.value().id) + } + bekreftelse.tilstand is VenterSvar || bekreftelse.tilstand is KlarForUtfylling -> { + val (hendelser, oppdatertBekreftelse) = behandleGyldigSvar(gjeldeneTilstand.periode.arbeidsoekerId, record, bekreftelse) + val oppdatertBekreftelser = gjeldeneTilstand.bekreftelser + .filterNot { t -> t.bekreftelseId == oppdatertBekreftelse.bekreftelseId } + oppdatertBekreftelse + val oppdatertTilstand = gjeldeneTilstand.copy(bekreftelser = oppdatertBekreftelser) + stateStore.put(oppdatertTilstand.periode.periodeId, oppdatertTilstand) + hendelser + .map (record::withValue) + .forEach (::forward) + } + else -> { + meldingsLogger.warn("Melding {} har ikke forventet tilstand, tilstand={}", record.value().id, bekreftelse.tilstand) + } + } } - } +} +fun behandleGyldigSvar(arbeidssoekerId: Long, record: Record, bekreftelse: Bekreftelse): Pair, Bekreftelse> { + val oppdatertBekreftelse = bekreftelse.copy(tilstand = Tilstand.Levert) + val baOmAaAvslutte = if (!record.value().svar.vilFortsetteSomArbeidssoeker) { + BaOmAaAvsluttePeriode( + hendelseId = UUID.randomUUID(), + periodeId = record.value().periodeId, + arbeidssoekerId = arbeidssoekerId + ) + } else null + val meldingMottatt = BekreftelseMeldingMottatt( + hendelseId = UUID.randomUUID(), + periodeId = record.value().periodeId, + arbeidssoekerId = arbeidssoekerId, + bekreftelseId = bekreftelse.bekreftelseId + ) + return listOfNotNull(meldingMottatt, baOmAaAvslutte) to oppdatertBekreftelse } + private val meldingsLogger = LoggerFactory.getLogger("meldingsLogger") diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/Topology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/Topology.kt index 6db4cfdf..c4237e5a 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/Topology.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/Topology.kt @@ -12,7 +12,7 @@ context(ApplicationConfiguration, ApplicationContext) fun StreamsBuilder.appTopology(): Topology { processPeriodeTopic() processAnsvarTopic() - processRapporteringsMeldingTopic() + processBekreftelseMeldingTopic() return build() } diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/tilstand/InternTilstand.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/tilstand/InternTilstand.kt index 8d0a0ba3..28c34bdb 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/tilstand/InternTilstand.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/tilstand/InternTilstand.kt @@ -1,7 +1,6 @@ package no.nav.paw.bekretelsetjeneste.tilstand import no.nav.paw.arbeidssokerregisteret.api.v1.Periode -import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse import java.time.Instant import java.util.UUID @@ -13,22 +12,18 @@ data class InternTilstand( @JvmRecord data class Bekreftelse( - val tilstand: Set, - val rapporteringsId: UUID, + val tilstand: Tilstand, + val sistePurring: Instant?, + val bekreftelseId: UUID, val gjelderFra: Instant, val gjelderTil: Instant ) -@JvmRecord -data class BekreftelseTilstand( - val tidspunkt: Instant, - val tilstand: Tilstand -) - sealed interface Tilstand{ data object IkkeKlarForUtfylling: Tilstand data object KlarForUtfylling: Tilstand data object VenterSvar: Tilstand + data object Levert : Tilstand } diff --git a/domain/bekreftelse-interne-hendelser/src/test/kotlin/no/nav/paw/bekreftelse/internehendelser/SerdeTest.kt b/domain/bekreftelse-interne-hendelser/src/test/kotlin/no/nav/paw/bekreftelse/internehendelser/SerdeTest.kt index 40c345a6..b099a7fe 100644 --- a/domain/bekreftelse-interne-hendelser/src/test/kotlin/no/nav/paw/bekreftelse/internehendelser/SerdeTest.kt +++ b/domain/bekreftelse-interne-hendelser/src/test/kotlin/no/nav/paw/bekreftelse/internehendelser/SerdeTest.kt @@ -10,7 +10,6 @@ class SerdeTest : FreeSpec({ val hendelse = LeveringsfristUtloept( hendelseId = UUID.randomUUID(), periodeId = UUID.randomUUID(), - identitetsnummer = "12345678901", bekreftelseId = UUID.randomUUID(), arbeidssoekerId = 1234567890L ) diff --git a/settings.gradle.kts b/settings.gradle.kts index 8db10d5a..b17ed8c1 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -65,6 +65,8 @@ dependencyResolutionManagement { val pawPdlClientVersion = "24.08.08.40-1" val pawAaregClientVersion = "24.07.04.18-1" val arbeidssokerregisteretVersion = "1.9348086045.48-1" + val rapporteringsSchemaVersion = "24.09.11.8-1" + //Arrow val arrowVersion = "1.2.4" @@ -97,7 +99,6 @@ dependencyResolutionManagement { val otelInstrumentationVersion = "2.4.0" val otelInstrumentationKtorVersion = "2.4.0-alpha" val coroutinesVersion = "1.8.1" - val rapporteringsSchemaVersion = "24.05.15.2-1" val postgresDriverVersion = "42.7.3" val flywayVersion = "10.15.0" val hikariVersion = "5.1.0"