Skip to content

Commit

Permalink
Implementert scheduleUpdateTilstand, rettet navn i package, laget fel…
Browse files Browse the repository at this point in the history
…les verdier for bekreftelse, lagt til en case i oversikt.txt, lagt in stateStoreNames parameter i Punctuation
  • Loading branch information
robertkittilsen committed Sep 17, 2024
1 parent 645ecad commit 1a83c5e
Show file tree
Hide file tree
Showing 17 changed files with 253 additions and 67 deletions.
6 changes: 4 additions & 2 deletions apps/bekreftelse-tjeneste/oversikt.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
Periode starter -> - oppretter intern state med start tidspunkt fra periode
- opprette bekreftelse i tilstand "ikke klar"
Schedule task:
for hver bekreftelse i "ikke klar", sett til til "klar for utfylling" x tid før gjelderTil og send hendelse BekreftelseTilgjenelig
for hver bekreftelse i "ikke klar", sett til "klar for utfylling" x tid før gjelderTil og send hendelse BekreftelseTilgjengelig

for hver bekreftelse i "klar for utfylling" og gjelderTil passert, sett til til "venter på svar" og send hendelse LeveringsFristUtloept
for hver bekreftelse i "klar for utfylling" og gjelderTil passert, sett til "venter på svar" og send hendelse LeveringsFristUtloept

for hver bekreftelse i "venter på svar" og ingen purring sendt og x tid passert siden frist, send RegisterGracePeriodeGjenstaaendeTid og sett purring sendt timestamp til now()

for hver bekreftelse i "venter på svar" og grace periode utløpt, send RegisterGracePeriodeUtloept

for hver periode hvis det er mindre enn x dager til den siste bekreftelse perioden utgår lag ny bekreftelse periode

Melding med svar fra bruker mottatt -> - finn matchene berkreftelse i status "venter på svar" || "klar for utfylling" og set til "levert", ta vare på x siste bekreftelser.
- Send ut BekreftelseMeldingMottatt
- Dersom ønsker å avslutte: send BaOmAaAvsluttePeriode
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package no.nav.paw.bekretelsetjeneste
package no.nav.paw.bekreftelsetjeneste

import no.nav.paw.config.kafka.streams.genericProcess
import no.nav.paw.bekreftelse.ansvar.v1.AnsvarEndret
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package no.nav.paw.bekretelsetjeneste
package no.nav.paw.bekreftelsetjeneste

import java.time.Duration

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package no.nav.paw.bekretelsetjeneste
package no.nav.paw.bekreftelsetjeneste

import kotlinx.coroutines.runBlocking
import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient
import no.nav.paw.kafkakeygenerator.client.KafkaKeysResponse
import no.nav.paw.bekretelsetjeneste.tilstand.InternTilstandSerde
import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstandSerde
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde

class ApplicationContext(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package no.nav.paw.bekretelsetjeneste
package no.nav.paw.bekreftelsetjeneste

import no.nav.paw.bekreftelse.internehendelser.BaOmAaAvsluttePeriode
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import no.nav.paw.bekreftelse.internehendelser.BekreftelseMeldingMottatt
import no.nav.paw.bekretelsetjeneste.tilstand.Bekreftelse
import no.nav.paw.bekretelsetjeneste.tilstand.InternTilstand
import no.nav.paw.bekretelsetjeneste.tilstand.Tilstand
import no.nav.paw.bekretelsetjeneste.tilstand.Tilstand.KlarForUtfylling
import no.nav.paw.bekretelsetjeneste.tilstand.Tilstand.VenterSvar
import no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse
import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand
import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand
import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand.KlarForUtfylling
import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand.VenterSvar
import no.nav.paw.config.kafka.streams.Punctuation
import no.nav.paw.config.kafka.streams.genericProcess
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.processor.PunctuationType
import org.apache.kafka.streams.processor.api.Record
import org.slf4j.LoggerFactory
import java.util.*
Expand All @@ -19,7 +21,8 @@ fun StreamsBuilder.processBekreftelseMeldingTopic() {
stream<Long, no.nav.paw.bekreftelse.melding.v1.Bekreftelse>(bekreftelseTopic)
.genericProcess<Long, no.nav.paw.bekreftelse.melding.v1.Bekreftelse, Long, BekreftelseHendelse>(
name = "meldingMottatt",
stateStoreName
stateStoreName,
punctuation = Punctuation(punctuateInterval, PunctuationType.WALL_CLOCK_TIME, ::scheduleUpdateTilstand)
) { record ->
val stateStore = getStateStore<StateStore>(stateStoreName)
val gjeldeneTilstand: InternTilstand? = stateStore[record.value().periodeId]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package no.nav.paw.bekreftelsetjeneste

import no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse
import no.nav.paw.bekreftelsetjeneste.tilstand.BekreftelseConfig
import java.time.Instant

fun Bekreftelse.erKlarForUtfylling(now: Instant): Boolean =
gjelderTil.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).isAfter(now)

fun Bekreftelse.harFristUtloept(now: Instant): Boolean =
gjelderTil.isBefore(now)

fun Bekreftelse.skalPurres(now: Instant): Boolean =
sistePurring == null && gjelderTil.plus(
BekreftelseConfig.varselFoerGracePeriodeUtloept
).isAfter(now)

fun Bekreftelse.harGracePeriodeUtloept(now: Instant): Boolean =
gjelderTil.plus(BekreftelseConfig.gracePeriode)
.isAfter(now)

fun Bekreftelse.skalLageNyBekreftelseTilgjengelig(now: Instant, bekreftelser: List<Bekreftelse>): Boolean =
gjelderTil.plus(BekreftelseConfig.bekreftelseInterval)
.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).isAfter(now) && bekreftelser.size < 2


Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package no.nav.paw.bekretelsetjeneste
package no.nav.paw.bekreftelsetjeneste

import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import no.nav.paw.bekreftelse.internehendelser.PeriodeAvsluttet
import no.nav.paw.bekreftelsetjeneste.tilstand.BekreftelseConfig
import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand
import no.nav.paw.bekreftelsetjeneste.tilstand.initTilstand
import no.nav.paw.config.kafka.streams.genericProcess
import no.nav.paw.config.kafka.streams.mapWithContext
import no.nav.paw.bekretelsetjeneste.tilstand.InternTilstand
import no.nav.paw.bekretelsetjeneste.tilstand.initTilstand
import no.nav.paw.bekreftelse.internehendelser.PeriodeAvsluttet
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Produced
Expand All @@ -25,7 +26,7 @@ fun StreamsBuilder.processPeriodeTopic() {
when {
currentState == null && periode.avsluttet() -> Action.DoNothing
periode.avsluttet() -> Action.DeleteStateAndEmit(arbeidsoekerId, periode)
currentState == null -> Action.UpdateState(initTilstand(id = arbeidsoekerId, key = kafkaKey, periode = periode))
currentState == null -> Action.UpdateState(initTilstand(id = arbeidsoekerId, key = kafkaKey, periode = periode, BekreftelseConfig))
else -> Action.DoNothing
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package no.nav.paw.bekreftelsetjeneste

import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import no.nav.paw.bekreftelse.internehendelser.BekreftelseTilgjengelig
import no.nav.paw.bekreftelse.internehendelser.LeveringsfristUtloept
import no.nav.paw.bekreftelse.internehendelser.RegisterGracePeriodeGjendstaaendeTid
import no.nav.paw.bekreftelse.internehendelser.RegisterGracePeriodeUtloept
import no.nav.paw.bekreftelsetjeneste.tilstand.BekreftelseConfig
import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand
import no.nav.paw.bekreftelsetjeneste.tilstand.fristForNesteBekreftelse
import no.nav.paw.bekreftelsetjeneste.tilstand.gjenstaendeGracePeriode
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.processor.api.ProcessorContext
import org.apache.kafka.streams.processor.api.Record
import java.time.Instant
import java.util.*

fun scheduleUpdateTilstand(timestamp: Instant, ctx: ProcessorContext<Long, BekreftelseHendelse>, stateStoreNames: Array<out String>) {
val stateStore: StateStore = ctx.getStateStore(stateStoreNames[0])

stateStore.all().use { states ->
states.forEach { (key, value) ->
value.bekreftelser.forEach { bekreftelse ->
when {
bekreftelse.tilstand == Tilstand.IkkeKlarForUtfylling
&& bekreftelse.erKlarForUtfylling(timestamp) -> {
val updatedBekreftelse = bekreftelse.copy(tilstand = Tilstand.KlarForUtfylling)
val updatedInternTilstand =
value.copy(bekreftelser = value.bekreftelser - bekreftelse + updatedBekreftelse)

stateStore.put(key, updatedInternTilstand)

val record = Record<Long, BekreftelseHendelse>(
value.periode.recordKey,
BekreftelseTilgjengelig(
hendelseId = UUID.randomUUID(),
periodeId = value.periode.periodeId,
arbeidssoekerId = value.periode.arbeidsoekerId,
bekreftelseId = bekreftelse.bekreftelseId,
gjelderFra = bekreftelse.gjelderFra,
gjelderTil = bekreftelse.gjelderTil
),
Instant.now().toEpochMilli()
)
ctx.forward(record)
}

bekreftelse.tilstand == Tilstand.KlarForUtfylling
&& bekreftelse.harFristUtloept(timestamp) -> {
val updatedBekreftelse = bekreftelse.copy(tilstand = Tilstand.VenterSvar)
val updatedInternTilstand =
value.copy(bekreftelser = value.bekreftelser - bekreftelse + updatedBekreftelse)

stateStore.put(key, updatedInternTilstand)

val record = Record<Long, BekreftelseHendelse>(
value.periode.recordKey,
LeveringsfristUtloept(
hendelseId = UUID.randomUUID(),
periodeId = value.periode.periodeId,
arbeidssoekerId = value.periode.arbeidsoekerId,
bekreftelseId = bekreftelse.bekreftelseId,
),
Instant.now().toEpochMilli()
)
ctx.forward(record)
}

bekreftelse.tilstand == Tilstand.VenterSvar && bekreftelse.skalPurres(timestamp) -> {
val updatedBekreftelse = bekreftelse.copy(sistePurring = timestamp)
val updatedInternTilstand =
value.copy(bekreftelser = value.bekreftelser - bekreftelse + updatedBekreftelse)

stateStore.put(key, updatedInternTilstand)

val record = Record<Long, BekreftelseHendelse>(
value.periode.recordKey,
RegisterGracePeriodeGjendstaaendeTid(
hendelseId = UUID.randomUUID(),
periodeId = value.periode.periodeId,
arbeidssoekerId = value.periode.arbeidsoekerId,
bekreftelseId = bekreftelse.bekreftelseId,
gjenstaandeTid = gjenstaendeGracePeriode(timestamp, bekreftelse.gjelderTil)
),
Instant.now().toEpochMilli()
)
ctx.forward(record)
}

bekreftelse.tilstand == Tilstand.VenterSvar && bekreftelse.harGracePeriodeUtloept(timestamp) -> {

// TODO: Mangler vi tilstand.utloept eller er det riktig at den skal fjernes her?
val updatedInternTilstand = value.copy(
bekreftelser = value.bekreftelser - bekreftelse
)

stateStore.put(key, updatedInternTilstand)

val record = Record<Long, BekreftelseHendelse>(
value.periode.recordKey,
RegisterGracePeriodeUtloept(
hendelseId = UUID.randomUUID(),
periodeId = value.periode.periodeId,
arbeidssoekerId = value.periode.arbeidsoekerId,
bekreftelseId = bekreftelse.bekreftelseId
),
Instant.now().toEpochMilli()
)

ctx.forward(record)
}

bekreftelse.skalLageNyBekreftelseTilgjengelig(timestamp, value.bekreftelser) -> {
val newBekreftelse = bekreftelse.copy(
tilstand = Tilstand.KlarForUtfylling,
sistePurring = null,
bekreftelseId = UUID.randomUUID(),
gjelderFra = bekreftelse.gjelderTil,
gjelderTil = fristForNesteBekreftelse(bekreftelse.gjelderTil, BekreftelseConfig.bekreftelseInterval)

)
val updatedInternTilstand = value.copy(
bekreftelser = value.bekreftelser + newBekreftelse
)
stateStore.put(key, updatedInternTilstand)

val record = Record<Long, BekreftelseHendelse>(
value.periode.recordKey,
BekreftelseTilgjengelig(
hendelseId = UUID.randomUUID(),
periodeId = value.periode.periodeId,
arbeidssoekerId = value.periode.arbeidsoekerId,
bekreftelseId = newBekreftelse.bekreftelseId,
gjelderFra = newBekreftelse.gjelderFra,
gjelderTil = newBekreftelse.gjelderTil
),
Instant.now().toEpochMilli()
)

ctx.forward(record)
}
}
}
}
}
}

private operator fun <K, V> KeyValue<K, V>.component1(): K = key
private operator fun <K, V> KeyValue<K, V>.component2(): V = value
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package no.nav.paw.bekretelsetjeneste
package no.nav.paw.bekreftelsetjeneste

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde
Expand All @@ -7,7 +7,7 @@ import no.nav.paw.config.kafka.KAFKA_STREAMS_CONFIG_WITH_SCHEME_REG
import no.nav.paw.config.kafka.KafkaConfig
import no.nav.paw.config.kafka.streams.KafkaStreamsFactory
import no.nav.paw.kafkakeygenerator.client.createKafkaKeyGeneratorClient
import no.nav.paw.bekretelsetjeneste.tilstand.InternTilstandSerde
import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstandSerde
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsBuilder
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package no.nav.paw.bekretelsetjeneste
package no.nav.paw.bekreftelsetjeneste

import no.nav.paw.bekretelsetjeneste.tilstand.InternTilstand
import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.state.KeyValueStore
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package no.nav.paw.bekretelsetjeneste.tilstand
package no.nav.paw.bekreftelsetjeneste.tilstand

import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import java.time.Instant
import java.util.UUID
import java.util.*

@JvmRecord
data class InternTilstand(
Expand All @@ -26,7 +26,6 @@ sealed interface Tilstand{
data object Levert : Tilstand
}


@JvmRecord
data class PeriodeInfo(
val periodeId: UUID,
Expand All @@ -43,7 +42,8 @@ data class PeriodeInfo(
fun initTilstand(
id: Long,
key: Long,
periode: Periode
periode: Periode,
bekreftelseConfig: BekreftelseConfig
): InternTilstand =
InternTilstand(
periode = PeriodeInfo(
Expand All @@ -54,5 +54,13 @@ fun initTilstand(
startet = periode.startet.tidspunkt,
avsluttet = periode.avsluttet.tidspunkt
),
bekreftelser = emptyList()
)
bekreftelser = listOf(
Bekreftelse(
tilstand = Tilstand.IkkeKlarForUtfylling,
sistePurring = null,
bekreftelseId = UUID.randomUUID(),
gjelderFra = periode.startet.tidspunkt,
gjelderTil = fristForNesteBekreftelse(periode.startet.tidspunkt, bekreftelseConfig.bekreftelseInterval)
)
)
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package no.nav.paw.bekretelsetjeneste.tilstand
package no.nav.paw.bekreftelsetjeneste.tilstand

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
Expand Down
Loading

0 comments on commit 1a83c5e

Please sign in to comment.