Skip to content

Commit

Permalink
La til initiell håndtering ev Melding mottatt
Browse files Browse the repository at this point in the history
  • Loading branch information
nilsmsa committed Sep 11, 2024
1 parent 3add725 commit fe9076a
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 20 deletions.
17 changes: 17 additions & 0 deletions apps/bekreftelse-tjeneste/oversikt.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
Periode starter -> - oppretter intern state med start tidspunkt fra periode
- opprette bekreftelse i tilstand "ikke klar"
Schedule task:
for hver bekreftelse i "ikke klar", sett til til "klar for utfylling" x tid før gjelderTil og send hendelse BekreftelseTilgjenelig

for hver bekreftelse i "klar for utfylling" og gjelderTil passert, sett til til "venter på svar" og send hendelse LeveringsFristUtloept

for hver bekreftelse i "venter på svar" og ingen purring sendt og x tid passert siden frist, send RegisterGracePeriodeGjenstaaendeTid og sett purring sendt timestamp til now()

for hver bekreftelse i "venter på svar" og grace periode utløpt, send RegisterGracePeriodeUtloept

Melding med svar fra bruker mottatt -> - finn matchene berkreftelse i status "venter på svar" || "klar for utfylling" og set til "levert", ta vare på x siste bekreftelser.
- Send ut BekreftelseMeldingMottatt
- Dersom ønsker å avslutte: send BaOmAaAvsluttePeriode

Periode avsluttet -> slett intern state og send PeriodeAvsluttet hendelse

Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ data class ApplicationConfiguration(
val bekreftelseHendelseloggTopic: String,
val stateStoreName: String,
val punctuateInterval: Duration
)
) {
val pawNamespace = "paw"
}
Original file line number Diff line number Diff line change
@@ -1,30 +1,76 @@
package no.nav.paw.bekretelsetjeneste

import no.nav.paw.config.kafka.streams.genericProcess
import no.nav.paw.bekretelsetjeneste.tilstand.InternTilstand
import no.nav.paw.bekreftelse.internehendelser.BaOmAaAvsluttePeriode
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import no.nav.paw.bekreftelse.internehendelser.BekreftelseMeldingMottatt
import no.nav.paw.bekretelsetjeneste.tilstand.Bekreftelse
import no.nav.paw.bekretelsetjeneste.tilstand.InternTilstand
import no.nav.paw.bekretelsetjeneste.tilstand.Tilstand
import no.nav.paw.bekretelsetjeneste.tilstand.Tilstand.KlarForUtfylling
import no.nav.paw.bekretelsetjeneste.tilstand.Tilstand.VenterSvar
import no.nav.paw.config.kafka.streams.genericProcess
import no.nav.paw.rapportering.melding.v1.Melding
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.processor.api.Record
import org.slf4j.LoggerFactory
import java.util.*

context(ApplicationConfiguration, ApplicationContext)
fun StreamsBuilder.processRapporteringsMeldingTopic() {
fun StreamsBuilder.processBekreftelseMeldingTopic() {
stream<Long, Melding>(bekreftelseTopic)
.genericProcess<Long, Melding, Long, BekreftelseHendelse>(
name = "meldingMottatt",
stateStoreName
) { record ->
val gjeldeneTilstand: InternTilstand? = getStateStore<StateStore>(stateStoreName)[record.value().periodeId]
val stateStore = getStateStore<StateStore>(stateStoreName)
val gjeldeneTilstand: InternTilstand? = stateStore[record.value().periodeId]
if (gjeldeneTilstand == null) {
meldingsLogger.warn("Melding mottatt for periode som ikke er aktiv/eksisterer")
} else {
TODO()
return@genericProcess
}
if (record.value().namespace == pawNamespace) {
val bekreftelse = gjeldeneTilstand.bekreftelser.find { bekreftelse -> bekreftelse.bekreftelseId == record.value().id }
when {
bekreftelse == null -> {
meldingsLogger.warn("Melding {} har ingen matchene bekreftelse", record.value().id)
}
bekreftelse.tilstand is VenterSvar || bekreftelse.tilstand is KlarForUtfylling -> {
val (hendelser, oppdatertBekreftelse) = behandleGyldigSvar(gjeldeneTilstand.periode.arbeidsoekerId, record, bekreftelse)
val oppdatertBekreftelser = gjeldeneTilstand.bekreftelser
.filterNot { t -> t.bekreftelseId == oppdatertBekreftelse.bekreftelseId } + oppdatertBekreftelse
val oppdatertTilstand = gjeldeneTilstand.copy(bekreftelser = oppdatertBekreftelser)
stateStore.put(oppdatertTilstand.periode.periodeId, oppdatertTilstand)
hendelser
.map (record::withValue)
.forEach (::forward)
}
else -> {
meldingsLogger.warn("Melding {} har ikke forventet tilstand, tilstand={}", record.value().id, bekreftelse.tilstand)
}
}
}

}
}

fun behandleGyldigSvar(arbeidssoekerId: Long, record: Record<Long, Melding>, bekreftelse: Bekreftelse): Pair<List<BekreftelseHendelse>, Bekreftelse> {
val oppdatertBekreftelse = bekreftelse.copy(tilstand = Tilstand.Levert)
val baOmAaAvslutte = if (!record.value().svar.vilFortsetteSomArbeidssoeker) {
BaOmAaAvsluttePeriode(
hendelseId = UUID.randomUUID(),
periodeId = record.value().periodeId,
arbeidssoekerId = arbeidssoekerId
)
} else null
val meldingMottatt = BekreftelseMeldingMottatt(
hendelseId = UUID.randomUUID(),
periodeId = record.value().periodeId,
arbeidssoekerId = arbeidssoekerId,
bekreftelseId = bekreftelse.bekreftelseId
)
return listOfNotNull(meldingMottatt, baOmAaAvslutte) to oppdatertBekreftelse
}


private val meldingsLogger = LoggerFactory.getLogger("meldingsLogger")


Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ context(ApplicationConfiguration, ApplicationContext)
fun StreamsBuilder.appTopology(): Topology {
processPeriodeTopic()
processAnsvarTopic()
processRapporteringsMeldingTopic()
processBekreftelseMeldingTopic()

return build()
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package no.nav.paw.bekretelsetjeneste.tilstand

import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import java.time.Instant
import java.util.UUID

Expand All @@ -13,22 +12,18 @@ data class InternTilstand(

@JvmRecord
data class Bekreftelse(
val tilstand: Set<BekreftelseTilstand>,
val rapporteringsId: UUID,
val tilstand: Tilstand,
val sistePurring: Instant?,
val bekreftelseId: UUID,
val gjelderFra: Instant,
val gjelderTil: Instant
)

@JvmRecord
data class BekreftelseTilstand(
val tidspunkt: Instant,
val tilstand: Tilstand
)

sealed interface Tilstand{
data object IkkeKlarForUtfylling: Tilstand
data object KlarForUtfylling: Tilstand
data object VenterSvar: Tilstand
data object Levert : Tilstand
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ class SerdeTest : FreeSpec({
val hendelse = LeveringsfristUtloept(
hendelseId = UUID.randomUUID(),
periodeId = UUID.randomUUID(),
identitetsnummer = "12345678901",
bekreftelseId = UUID.randomUUID(),
arbeidssoekerId = 1234567890L
)
Expand Down
3 changes: 2 additions & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ dependencyResolutionManagement {
val pawPdlClientVersion = "24.08.08.40-1"
val pawAaregClientVersion = "24.07.04.18-1"
val arbeidssokerregisteretVersion = "1.9348086045.48-1"
val rapporteringsSchemaVersion = "24.09.11.8-1"


//Arrow
val arrowVersion = "1.2.4"
Expand Down Expand Up @@ -97,7 +99,6 @@ dependencyResolutionManagement {
val otelInstrumentationVersion = "2.4.0"
val otelInstrumentationKtorVersion = "2.4.0-alpha"
val coroutinesVersion = "1.8.1"
val rapporteringsSchemaVersion = "24.05.15.2-1"
val postgresDriverVersion = "42.7.3"
val flywayVersion = "10.15.0"
val hikariVersion = "5.1.0"
Expand Down

0 comments on commit fe9076a

Please sign in to comment.