Skip to content

Commit

Permalink
Rettet navn i ApplicationTestContext, lagt til to i BekreftelseMeldin…
Browse files Browse the repository at this point in the history
…gTopology.kt, flyttet opprettelsen av bekreftelse til punctuator, lagt til fungerende punctuator test, endret logikk i KontrolerFrister.kt.
  • Loading branch information
robertkittilsen committed Sep 19, 2024
1 parent 3860f8a commit 08badc2
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand.KlarForUtfylling
import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand.VenterSvar
import no.nav.paw.config.kafka.streams.Punctuation
import no.nav.paw.config.kafka.streams.genericProcess
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.processor.PunctuationType
import org.apache.kafka.streams.processor.api.Record
import org.slf4j.LoggerFactory
Expand All @@ -37,7 +39,7 @@ fun StreamsBuilder.processBekreftelseMeldingTopic() {
bekreftelse == null -> {
meldingsLogger.warn("Melding {} har ingen matchene bekreftelse", record.value().id)
}
bekreftelse.tilstand is VenterSvar || bekreftelse.tilstand is KlarForUtfylling -> {
bekreftelse.tilstand == VenterSvar || bekreftelse.tilstand == KlarForUtfylling -> {
val (hendelser, oppdatertBekreftelse) = behandleGyldigSvar(gjeldeneTilstand.periode.arbeidsoekerId, record, bekreftelse)
val oppdatertBekreftelser = gjeldeneTilstand.bekreftelser
.filterNot { t -> t.bekreftelseId == oppdatertBekreftelse.bekreftelseId } + oppdatertBekreftelse
Expand All @@ -52,7 +54,7 @@ fun StreamsBuilder.processBekreftelseMeldingTopic() {
}
}
}
}
}.to(bekreftelseHendelseloggTopic, Produced.with(Serdes.Long(), bekreftelseHendelseSerde))
}

fun behandleGyldigSvar(arbeidssoekerId: Long, record: Record<Long, no.nav.paw.bekreftelse.melding.v1.Bekreftelse>, bekreftelse: Bekreftelse): Pair<List<BekreftelseHendelse>, Bekreftelse> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package no.nav.paw.bekreftelsetjeneste

import arrow.core.nonEmptyListOf
import arrow.core.tail
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import no.nav.paw.bekreftelse.internehendelser.BekreftelseTilgjengelig
import no.nav.paw.bekreftelse.internehendelser.LeveringsfristUtloept
import no.nav.paw.bekreftelse.internehendelser.RegisterGracePeriodeGjendstaaendeTid
import no.nav.paw.bekreftelse.internehendelser.RegisterGracePeriodeUtloept
import no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse
import no.nav.paw.bekreftelsetjeneste.tilstand.BekreftelseConfig
import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand
import no.nav.paw.bekreftelsetjeneste.tilstand.fristForNesteBekreftelse
Expand All @@ -20,7 +23,20 @@ fun bekreftelsePunctuator(stateStoreName: String, timestamp: Instant, ctx: Proce

stateStore.all().use { states ->
states.forEach { (key, value) ->
if (skalLageNyBekreftelseTilgjengelig(timestamp, value.bekreftelser)) {
val existingBekreftelse = value.bekreftelser.firstOrNull()
if (existingBekreftelse == null){
val newBekreftelse = Bekreftelse(
tilstand = Tilstand.IkkeKlarForUtfylling,
sisteVarselOmGjenstaaendeGraceTid = null,
bekreftelseId = UUID.randomUUID(),
gjelderFra = value.periode.startet,
gjelderTil = fristForNesteBekreftelse(value.periode.startet, BekreftelseConfig.bekreftelseInterval)
)
val updatedInternTilstand = value.copy(
bekreftelser = value.bekreftelser + newBekreftelse
)
stateStore.put(key, updatedInternTilstand)
} else if (skalLageNyBekreftelseTilgjengelig(timestamp, nonEmptyListOf(existingBekreftelse, *value.bekreftelser.tail().toTypedArray()))) {
val newBekreftelse = value.bekreftelser.last().copy(
tilstand = Tilstand.KlarForUtfylling,
sisteVarselOmGjenstaaendeGraceTid = null,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
package no.nav.paw.bekreftelsetjeneste

import arrow.core.NonEmptyList
import no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse
import no.nav.paw.bekreftelsetjeneste.tilstand.BekreftelseConfig
import java.time.Instant

fun Bekreftelse.erKlarForUtfylling(now: Instant): Boolean =
gjelderTil.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).isAfter(now)
now.isAfter(gjelderTil.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset))

fun Bekreftelse.harFristUtloept(now: Instant): Boolean =
gjelderTil.isBefore(now)
now.isAfter(gjelderTil)

fun Bekreftelse.erSisteVarselOmGjenstaaendeGraceTid(now: Instant): Boolean =
sisteVarselOmGjenstaaendeGraceTid == null && gjelderTil.plus(
sisteVarselOmGjenstaaendeGraceTid == null && now.isAfter(gjelderTil.plus(
BekreftelseConfig.varselFoerGracePeriodeUtloept
).isAfter(now)
))

fun Bekreftelse.harGracePeriodeUtloept(now: Instant): Boolean =
gjelderTil.plus(BekreftelseConfig.gracePeriode)
.isAfter(now)
now.isAfter(gjelderTil.plus(BekreftelseConfig.gracePeriode))

fun skalLageNyBekreftelseTilgjengelig(now: Instant, bekreftelser: List<Bekreftelse>): Boolean =
bekreftelser.maxOf { it.gjelderTil }
.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset)
.isAfter(now)
fun skalLageNyBekreftelseTilgjengelig(now: Instant, bekreftelser: NonEmptyList<Bekreftelse>): Boolean =
now.isAfter(
bekreftelser.maxOf { it.gjelderTil }.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset)
)


Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package no.nav.paw.bekreftelsetjeneste
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import no.nav.paw.bekreftelse.internehendelser.PeriodeAvsluttet
import no.nav.paw.bekreftelsetjeneste.tilstand.BekreftelseConfig
import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand
import no.nav.paw.bekreftelsetjeneste.tilstand.initTilstand
import no.nav.paw.config.kafka.streams.genericProcess
Expand All @@ -18,15 +17,15 @@ import java.util.*
context(ApplicationConfiguration, ApplicationContext)
fun StreamsBuilder.processPeriodeTopic() {
stream<Long, Periode>(periodeTopic)
.mapWithContext("lagreEllerSlettPeriode", stateStoreName) { periode ->
.mapWithContext<Long, Periode, Action>("lagreEllerSlettPeriode", stateStoreName) { periode ->
val keyValueStore: KeyValueStore<UUID, InternTilstand> = getStateStore(stateStoreName)
val currentState = keyValueStore[periode.id]
val (arbeidsoekerId, kafkaKey) = currentState?.let { it.periode.arbeidsoekerId to it.periode.recordKey } ?:
kafkaKeyFunction(periode.identitetsnummer).let { it.id to it.key}
when {
currentState == null && periode.avsluttet() -> Action.DoNothing
periode.avsluttet() -> Action.DeleteStateAndEmit(arbeidsoekerId, periode)
currentState == null -> Action.UpdateState(initTilstand(id = arbeidsoekerId, key = kafkaKey, periode = periode, BekreftelseConfig))
currentState == null -> Action.UpdateState(initTilstand(id = arbeidsoekerId, key = kafkaKey, periode = periode))
else -> Action.DoNothing
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ data class Bekreftelse(
val gjelderTil: Instant
)

sealed interface Tilstand{
data object IkkeKlarForUtfylling: Tilstand
data object KlarForUtfylling: Tilstand
data object VenterSvar: Tilstand
data object GracePeriodeUtlopt: Tilstand
data object Levert : Tilstand
enum class Tilstand {
IkkeKlarForUtfylling,
KlarForUtfylling,
VenterSvar,
GracePeriodeUtlopt,
Levert
}

@JvmRecord
Expand All @@ -44,7 +44,6 @@ fun initTilstand(
id: Long,
key: Long,
periode: Periode,
bekreftelseConfig: BekreftelseConfig
): InternTilstand =
InternTilstand(
periode = PeriodeInfo(
Expand All @@ -53,15 +52,7 @@ fun initTilstand(
arbeidsoekerId = id,
recordKey = key,
startet = periode.startet.tidspunkt,
avsluttet = periode.avsluttet.tidspunkt
avsluttet = periode.avsluttet?.tidspunkt
),
bekreftelser = listOf(
Bekreftelse(
tilstand = Tilstand.IkkeKlarForUtfylling,
sisteVarselOmGjenstaaendeGraceTid = null,
bekreftelseId = UUID.randomUUID(),
gjelderFra = periode.startet.tidspunkt,
gjelderTil = fristForNesteBekreftelse(periode.startet.tidspunkt, bekreftelseConfig.bekreftelseInterval)
)
)
bekreftelser = emptyList()
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ fun fristForNesteBekreftelse(forrige: Instant, interval: Duration): Instant {
return forrige
.plus(interval)
.let { LocalDate.ofInstant(it, ZoneId.systemDefault()) }
.with(magicMondayAdjuster)
.plus(Duration.ofDays(1))
//.with(magicMondayAdjuster)
.plusDays(1)
.atStartOfDay(zoneId).toInstant()
}

fun gjenstaendeGracePeriode(timestamp: Instant, gjelderTil: Instant): Duration {
val gracePeriode = BekreftelseConfig.gracePeriode
val utvidetGjelderTil = gjelderTil.plus(gracePeriode)

return if (utvidetGjelderTil.isBefore(timestamp)) {
return if (timestamp.isAfter(utvidetGjelderTil)) {
Duration.ZERO
} else {
Duration.between(timestamp, utvidetGjelderTil)
Expand All @@ -50,7 +50,7 @@ class MagicMondayAdjuster: TemporalAdjuster {
return internalTemporal
.with(internalAdjuster)
.skipForwardIfNotMagicMonday()
.plus(Duration.ofDays(1))
.plusDays(1)
.atStartOfDay(ZoneId.systemDefault())
.toInstant()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.state.internals.InMemoryKeyValueBytesStoreSupplier
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.*

Expand All @@ -31,9 +32,9 @@ class ApplicationTestContext {
val applicationConfiguration = ApplicationConfiguration(
periodeTopic = "periodeTopic",
ansvarsTopic = "ansvarsTopic",
bekreftelseTopic = "rapporteringsTopic",
bekreftelseHendelseloggTopic = "rapporteringsHendelsesloggTopic",
stateStoreName = "statStoreName",
bekreftelseTopic = "bekreftelseTopic",
bekreftelseHendelseloggTopic = "bekreftelseHendelsesloggTopic",
stateStoreName = "stateStoreName",
punctuateInterval = Duration.ofSeconds(1)
)
val applicationContext = ApplicationContext(
Expand All @@ -42,6 +43,8 @@ class ApplicationTestContext {
kafkaKeysClient = inMemoryKafkaKeysMock()
)

val logger = LoggerFactory.getLogger(ApplicationTestContext::class.java)

val testDriver: TopologyTestDriver =
with(applicationContext) {
with(applicationConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,40 @@
package no.nav.paw.bekreftelsetjeneste

import io.kotest.core.spec.style.FreeSpec
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.shouldBeInstanceOf
import no.nav.paw.bekreftelse.internehendelser.BekreftelseTilgjengelig
import no.nav.paw.bekreftelsetjeneste.tilstand.BekreftelseConfig
import no.nav.paw.bekreftelsetjeneste.tilstand.fristForNesteBekreftelse

class BekreftelsePunctuatorTest : FreeSpec({

"Tilstand IkkeKlarForUtfylling og bekreftelseTilgjengeligOffset før gjelderTil settes til KlarForUtfylling og sender BekreftelseTilgjengelig hendelse" {

"Når KlarForUtfylling sendes BekreftelseTilgjengelig hendelse" - {
with(ApplicationTestContext()){
val (periode, kafkaKeyResponse) = periode(identitetsnummer = "12345678901")
periodeTopic.pipeInput(kafkaKeyResponse.key, periode)
"Når perioden opprettes skal det ikke skje noe" {
hendelseLoggTopic.isEmpty shouldBe true
}
"Etter 11 dager skal det sendes en BekreftelseTilgjengelig hendelse" {
testDriver.advanceWallClockTime(BekreftelseConfig.bekreftelseInterval.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).plusDays(1))
val stateStore:StateStore = testDriver.getKeyValueStore(applicationConfiguration.stateStoreName)
stateStore.all().use {
it.forEach {
logger.info("key: ${it.key}, value: ${it.value}")
}
}
hendelseLoggTopic.isEmpty shouldBe false
val kv = hendelseLoggTopic.readKeyValue()
kv.key shouldBe kafkaKeyResponse.key
with(kv.value.shouldBeInstanceOf<BekreftelseTilgjengelig>()) {
periodeId shouldBe periode.id
arbeidssoekerId shouldBe kafkaKeyResponse.id
gjelderFra shouldBe periode.startet.tidspunkt
gjelderTil shouldBe fristForNesteBekreftelse(periode.startet.tidspunkt, BekreftelseConfig.bekreftelseInterval)
}
}
}
}

/*for hver bekreftelse i "klar for utfylling" og gjelderTil passert, sett til "venter på svar" og send hendelse LeveringsFristUtloept
Expand Down

0 comments on commit 08badc2

Please sign in to comment.