From 1a83c5e871c3ee9c3e8ca333f8f035f989a204bc Mon Sep 17 00:00:00 2001 From: robertkittilsen Date: Tue, 17 Sep 2024 20:49:02 +0200 Subject: [PATCH] Implementert scheduleUpdateTilstand, rettet navn i package, laget felles verdier for bekreftelse, lagt til en case i oversikt.txt, lagt in stateStoreNames parameter i Punctuation --- apps/bekreftelse-tjeneste/oversikt.txt | 6 +- .../AnsvarTopology.kt | 2 +- .../ApplicationConfiguration.kt | 2 +- .../ApplicationContext.kt | 4 +- .../BekreftelseMeldingTopology.kt} | 17 +- .../bekreftelsetjeneste/KontrolerFrister.kt | 26 +++ .../PeriodeTopology.kt | 13 +- .../ScheduleUpdateTilstand.kt | 149 ++++++++++++++++++ .../Startup.kt | 4 +- .../Topology.kt | 4 +- .../tilstand/InternTilstand.kt | 20 ++- .../tilstand/InternTilstandSerde.kt | 2 +- .../tilstand/RegisterInstillinger.kt | 27 +++- .../bekretelsetjeneste/KontrolerFrister.kt | 19 --- .../ApplicationTestContext.kt | 6 +- .../IngenAndreTarAnsvarTest.kt | 2 +- .../config/kafka/streams/GenericProcessor.kt | 17 +- 17 files changed, 253 insertions(+), 67 deletions(-) rename apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/{bekretelsetjeneste => bekreftelsetjeneste}/AnsvarTopology.kt (92%) rename apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/{bekretelsetjeneste => bekreftelsetjeneste}/ApplicationConfiguration.kt (88%) rename apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/{bekretelsetjeneste => bekreftelsetjeneste}/ApplicationContext.kt (83%) rename apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/{bekretelsetjeneste/RapportingsMeldingTopology.kt => bekreftelsetjeneste/BekreftelseMeldingTopology.kt} (85%) create mode 100644 apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/KontrolerFrister.kt rename apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/{bekretelsetjeneste => bekreftelsetjeneste}/PeriodeTopology.kt (90%) create mode 100644 apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ScheduleUpdateTilstand.kt rename apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/{bekretelsetjeneste => bekreftelsetjeneste}/Startup.kt (95%) rename apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/{bekretelsetjeneste => bekreftelsetjeneste}/Topology.kt (80%) rename apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/{bekretelsetjeneste => bekreftelsetjeneste}/tilstand/InternTilstand.kt (69%) rename apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/{bekretelsetjeneste => bekreftelsetjeneste}/tilstand/InternTilstandSerde.kt (96%) rename apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/{bekretelsetjeneste => bekreftelsetjeneste}/tilstand/RegisterInstillinger.kt (59%) delete mode 100644 apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/KontrolerFrister.kt rename apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/{bekretelsetjeneste => bekreftelsetjeneste}/ApplicationTestContext.kt (96%) rename apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/{bekretelsetjeneste => bekreftelsetjeneste}/IngenAndreTarAnsvarTest.kt (98%) diff --git a/apps/bekreftelse-tjeneste/oversikt.txt b/apps/bekreftelse-tjeneste/oversikt.txt index c558d52e..dcb2c725 100644 --- a/apps/bekreftelse-tjeneste/oversikt.txt +++ b/apps/bekreftelse-tjeneste/oversikt.txt @@ -2,14 +2,16 @@ Periode starter -> - oppretter intern state med start tidspunkt fra periode - opprette bekreftelse i tilstand "ikke klar" Schedule task: - for hver bekreftelse i "ikke klar", sett til til "klar for utfylling" x tid før gjelderTil og send hendelse BekreftelseTilgjenelig + for hver bekreftelse i "ikke klar", sett til "klar for utfylling" x tid før gjelderTil og send hendelse BekreftelseTilgjengelig - for hver bekreftelse i "klar for utfylling" og gjelderTil passert, sett til til "venter på svar" og send hendelse LeveringsFristUtloept + for hver bekreftelse i "klar for utfylling" og gjelderTil passert, sett til "venter på svar" og send hendelse LeveringsFristUtloept for hver bekreftelse i "venter på svar" og ingen purring sendt og x tid passert siden frist, send RegisterGracePeriodeGjenstaaendeTid og sett purring sendt timestamp til now() for hver bekreftelse i "venter på svar" og grace periode utløpt, send RegisterGracePeriodeUtloept + for hver periode hvis det er mindre enn x dager til den siste bekreftelse perioden utgår lag ny bekreftelse periode + Melding med svar fra bruker mottatt -> - finn matchene berkreftelse i status "venter på svar" || "klar for utfylling" og set til "levert", ta vare på x siste bekreftelser. - Send ut BekreftelseMeldingMottatt - Dersom ønsker å avslutte: send BaOmAaAvsluttePeriode diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/AnsvarTopology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/AnsvarTopology.kt similarity index 92% rename from apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/AnsvarTopology.kt rename to apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/AnsvarTopology.kt index 73853648..aaf63e3c 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/AnsvarTopology.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/AnsvarTopology.kt @@ -1,4 +1,4 @@ -package no.nav.paw.bekretelsetjeneste +package no.nav.paw.bekreftelsetjeneste import no.nav.paw.config.kafka.streams.genericProcess import no.nav.paw.bekreftelse.ansvar.v1.AnsvarEndret diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/ApplicationConfiguration.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationConfiguration.kt similarity index 88% rename from apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/ApplicationConfiguration.kt rename to apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationConfiguration.kt index d94642a5..6d166753 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/ApplicationConfiguration.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationConfiguration.kt @@ -1,4 +1,4 @@ -package no.nav.paw.bekretelsetjeneste +package no.nav.paw.bekreftelsetjeneste import java.time.Duration diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/ApplicationContext.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationContext.kt similarity index 83% rename from apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/ApplicationContext.kt rename to apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationContext.kt index 3e2fc2b3..40c13bac 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/ApplicationContext.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationContext.kt @@ -1,9 +1,9 @@ -package no.nav.paw.bekretelsetjeneste +package no.nav.paw.bekreftelsetjeneste import kotlinx.coroutines.runBlocking import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient import no.nav.paw.kafkakeygenerator.client.KafkaKeysResponse -import no.nav.paw.bekretelsetjeneste.tilstand.InternTilstandSerde +import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstandSerde import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde class ApplicationContext( diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/RapportingsMeldingTopology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseMeldingTopology.kt similarity index 85% rename from apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/RapportingsMeldingTopology.kt rename to apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseMeldingTopology.kt index 70e12797..9575d1d8 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/RapportingsMeldingTopology.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseMeldingTopology.kt @@ -1,15 +1,17 @@ -package no.nav.paw.bekretelsetjeneste +package no.nav.paw.bekreftelsetjeneste import no.nav.paw.bekreftelse.internehendelser.BaOmAaAvsluttePeriode import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse import no.nav.paw.bekreftelse.internehendelser.BekreftelseMeldingMottatt -import no.nav.paw.bekretelsetjeneste.tilstand.Bekreftelse -import no.nav.paw.bekretelsetjeneste.tilstand.InternTilstand -import no.nav.paw.bekretelsetjeneste.tilstand.Tilstand -import no.nav.paw.bekretelsetjeneste.tilstand.Tilstand.KlarForUtfylling -import no.nav.paw.bekretelsetjeneste.tilstand.Tilstand.VenterSvar +import no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse +import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand +import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand +import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand.KlarForUtfylling +import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand.VenterSvar +import no.nav.paw.config.kafka.streams.Punctuation import no.nav.paw.config.kafka.streams.genericProcess import org.apache.kafka.streams.StreamsBuilder +import org.apache.kafka.streams.processor.PunctuationType import org.apache.kafka.streams.processor.api.Record import org.slf4j.LoggerFactory import java.util.* @@ -19,7 +21,8 @@ fun StreamsBuilder.processBekreftelseMeldingTopic() { stream(bekreftelseTopic) .genericProcess( name = "meldingMottatt", - stateStoreName + stateStoreName, + punctuation = Punctuation(punctuateInterval, PunctuationType.WALL_CLOCK_TIME, ::scheduleUpdateTilstand) ) { record -> val stateStore = getStateStore(stateStoreName) val gjeldeneTilstand: InternTilstand? = stateStore[record.value().periodeId] diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/KontrolerFrister.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/KontrolerFrister.kt new file mode 100644 index 00000000..b5dc9498 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/KontrolerFrister.kt @@ -0,0 +1,26 @@ +package no.nav.paw.bekreftelsetjeneste + +import no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse +import no.nav.paw.bekreftelsetjeneste.tilstand.BekreftelseConfig +import java.time.Instant + +fun Bekreftelse.erKlarForUtfylling(now: Instant): Boolean = + gjelderTil.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).isAfter(now) + +fun Bekreftelse.harFristUtloept(now: Instant): Boolean = + gjelderTil.isBefore(now) + +fun Bekreftelse.skalPurres(now: Instant): Boolean = + sistePurring == null && gjelderTil.plus( + BekreftelseConfig.varselFoerGracePeriodeUtloept + ).isAfter(now) + +fun Bekreftelse.harGracePeriodeUtloept(now: Instant): Boolean = + gjelderTil.plus(BekreftelseConfig.gracePeriode) + .isAfter(now) + +fun Bekreftelse.skalLageNyBekreftelseTilgjengelig(now: Instant, bekreftelser: List): Boolean = + gjelderTil.plus(BekreftelseConfig.bekreftelseInterval) + .minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).isAfter(now) && bekreftelser.size < 2 + + diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/PeriodeTopology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/PeriodeTopology.kt similarity index 90% rename from apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/PeriodeTopology.kt rename to apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/PeriodeTopology.kt index 272b2981..e4bbe8a8 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/PeriodeTopology.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/PeriodeTopology.kt @@ -1,12 +1,13 @@ -package no.nav.paw.bekretelsetjeneste +package no.nav.paw.bekreftelsetjeneste import no.nav.paw.arbeidssokerregisteret.api.v1.Periode +import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse +import no.nav.paw.bekreftelse.internehendelser.PeriodeAvsluttet +import no.nav.paw.bekreftelsetjeneste.tilstand.BekreftelseConfig +import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand +import no.nav.paw.bekreftelsetjeneste.tilstand.initTilstand import no.nav.paw.config.kafka.streams.genericProcess import no.nav.paw.config.kafka.streams.mapWithContext -import no.nav.paw.bekretelsetjeneste.tilstand.InternTilstand -import no.nav.paw.bekretelsetjeneste.tilstand.initTilstand -import no.nav.paw.bekreftelse.internehendelser.PeriodeAvsluttet -import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.kstream.Produced @@ -25,7 +26,7 @@ fun StreamsBuilder.processPeriodeTopic() { when { currentState == null && periode.avsluttet() -> Action.DoNothing periode.avsluttet() -> Action.DeleteStateAndEmit(arbeidsoekerId, periode) - currentState == null -> Action.UpdateState(initTilstand(id = arbeidsoekerId, key = kafkaKey, periode = periode)) + currentState == null -> Action.UpdateState(initTilstand(id = arbeidsoekerId, key = kafkaKey, periode = periode, BekreftelseConfig)) else -> Action.DoNothing } } diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ScheduleUpdateTilstand.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ScheduleUpdateTilstand.kt new file mode 100644 index 00000000..c3e05098 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ScheduleUpdateTilstand.kt @@ -0,0 +1,149 @@ +package no.nav.paw.bekreftelsetjeneste + +import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse +import no.nav.paw.bekreftelse.internehendelser.BekreftelseTilgjengelig +import no.nav.paw.bekreftelse.internehendelser.LeveringsfristUtloept +import no.nav.paw.bekreftelse.internehendelser.RegisterGracePeriodeGjendstaaendeTid +import no.nav.paw.bekreftelse.internehendelser.RegisterGracePeriodeUtloept +import no.nav.paw.bekreftelsetjeneste.tilstand.BekreftelseConfig +import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand +import no.nav.paw.bekreftelsetjeneste.tilstand.fristForNesteBekreftelse +import no.nav.paw.bekreftelsetjeneste.tilstand.gjenstaendeGracePeriode +import org.apache.kafka.streams.KeyValue +import org.apache.kafka.streams.processor.api.ProcessorContext +import org.apache.kafka.streams.processor.api.Record +import java.time.Instant +import java.util.* + +fun scheduleUpdateTilstand(timestamp: Instant, ctx: ProcessorContext, stateStoreNames: Array) { + val stateStore: StateStore = ctx.getStateStore(stateStoreNames[0]) + + stateStore.all().use { states -> + states.forEach { (key, value) -> + value.bekreftelser.forEach { bekreftelse -> + when { + bekreftelse.tilstand == Tilstand.IkkeKlarForUtfylling + && bekreftelse.erKlarForUtfylling(timestamp) -> { + val updatedBekreftelse = bekreftelse.copy(tilstand = Tilstand.KlarForUtfylling) + val updatedInternTilstand = + value.copy(bekreftelser = value.bekreftelser - bekreftelse + updatedBekreftelse) + + stateStore.put(key, updatedInternTilstand) + + val record = Record( + value.periode.recordKey, + BekreftelseTilgjengelig( + hendelseId = UUID.randomUUID(), + periodeId = value.periode.periodeId, + arbeidssoekerId = value.periode.arbeidsoekerId, + bekreftelseId = bekreftelse.bekreftelseId, + gjelderFra = bekreftelse.gjelderFra, + gjelderTil = bekreftelse.gjelderTil + ), + Instant.now().toEpochMilli() + ) + ctx.forward(record) + } + + bekreftelse.tilstand == Tilstand.KlarForUtfylling + && bekreftelse.harFristUtloept(timestamp) -> { + val updatedBekreftelse = bekreftelse.copy(tilstand = Tilstand.VenterSvar) + val updatedInternTilstand = + value.copy(bekreftelser = value.bekreftelser - bekreftelse + updatedBekreftelse) + + stateStore.put(key, updatedInternTilstand) + + val record = Record( + value.periode.recordKey, + LeveringsfristUtloept( + hendelseId = UUID.randomUUID(), + periodeId = value.periode.periodeId, + arbeidssoekerId = value.periode.arbeidsoekerId, + bekreftelseId = bekreftelse.bekreftelseId, + ), + Instant.now().toEpochMilli() + ) + ctx.forward(record) + } + + bekreftelse.tilstand == Tilstand.VenterSvar && bekreftelse.skalPurres(timestamp) -> { + val updatedBekreftelse = bekreftelse.copy(sistePurring = timestamp) + val updatedInternTilstand = + value.copy(bekreftelser = value.bekreftelser - bekreftelse + updatedBekreftelse) + + stateStore.put(key, updatedInternTilstand) + + val record = Record( + value.periode.recordKey, + RegisterGracePeriodeGjendstaaendeTid( + hendelseId = UUID.randomUUID(), + periodeId = value.periode.periodeId, + arbeidssoekerId = value.periode.arbeidsoekerId, + bekreftelseId = bekreftelse.bekreftelseId, + gjenstaandeTid = gjenstaendeGracePeriode(timestamp, bekreftelse.gjelderTil) + ), + Instant.now().toEpochMilli() + ) + ctx.forward(record) + } + + bekreftelse.tilstand == Tilstand.VenterSvar && bekreftelse.harGracePeriodeUtloept(timestamp) -> { + + // TODO: Mangler vi tilstand.utloept eller er det riktig at den skal fjernes her? + val updatedInternTilstand = value.copy( + bekreftelser = value.bekreftelser - bekreftelse + ) + + stateStore.put(key, updatedInternTilstand) + + val record = Record( + value.periode.recordKey, + RegisterGracePeriodeUtloept( + hendelseId = UUID.randomUUID(), + periodeId = value.periode.periodeId, + arbeidssoekerId = value.periode.arbeidsoekerId, + bekreftelseId = bekreftelse.bekreftelseId + ), + Instant.now().toEpochMilli() + ) + + ctx.forward(record) + } + + bekreftelse.skalLageNyBekreftelseTilgjengelig(timestamp, value.bekreftelser) -> { + val newBekreftelse = bekreftelse.copy( + tilstand = Tilstand.KlarForUtfylling, + sistePurring = null, + bekreftelseId = UUID.randomUUID(), + gjelderFra = bekreftelse.gjelderTil, + gjelderTil = fristForNesteBekreftelse(bekreftelse.gjelderTil, BekreftelseConfig.bekreftelseInterval) + + ) + val updatedInternTilstand = value.copy( + bekreftelser = value.bekreftelser + newBekreftelse + ) + stateStore.put(key, updatedInternTilstand) + + val record = Record( + value.periode.recordKey, + BekreftelseTilgjengelig( + hendelseId = UUID.randomUUID(), + periodeId = value.periode.periodeId, + arbeidssoekerId = value.periode.arbeidsoekerId, + bekreftelseId = newBekreftelse.bekreftelseId, + gjelderFra = newBekreftelse.gjelderFra, + gjelderTil = newBekreftelse.gjelderTil + ), + Instant.now().toEpochMilli() + ) + + ctx.forward(record) + } + } + } + } + } +} + +private operator fun KeyValue.component1(): K = key +private operator fun KeyValue.component2(): V = value diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/Startup.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Startup.kt similarity index 95% rename from apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/Startup.kt rename to apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Startup.kt index a11a306c..d1c825fe 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/Startup.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Startup.kt @@ -1,4 +1,4 @@ -package no.nav.paw.bekretelsetjeneste +package no.nav.paw.bekreftelsetjeneste import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde @@ -7,7 +7,7 @@ import no.nav.paw.config.kafka.KAFKA_STREAMS_CONFIG_WITH_SCHEME_REG import no.nav.paw.config.kafka.KafkaConfig import no.nav.paw.config.kafka.streams.KafkaStreamsFactory import no.nav.paw.kafkakeygenerator.client.createKafkaKeyGeneratorClient -import no.nav.paw.bekretelsetjeneste.tilstand.InternTilstandSerde +import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstandSerde import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.StreamsBuilder diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/Topology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Topology.kt similarity index 80% rename from apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/Topology.kt rename to apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Topology.kt index c4237e5a..4bdbeb06 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/Topology.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Topology.kt @@ -1,6 +1,6 @@ -package no.nav.paw.bekretelsetjeneste +package no.nav.paw.bekreftelsetjeneste -import no.nav.paw.bekretelsetjeneste.tilstand.InternTilstand +import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.Topology import org.apache.kafka.streams.state.KeyValueStore diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/tilstand/InternTilstand.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstand.kt similarity index 69% rename from apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/tilstand/InternTilstand.kt rename to apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstand.kt index 28c34bdb..11662a45 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/tilstand/InternTilstand.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstand.kt @@ -1,8 +1,8 @@ -package no.nav.paw.bekretelsetjeneste.tilstand +package no.nav.paw.bekreftelsetjeneste.tilstand import no.nav.paw.arbeidssokerregisteret.api.v1.Periode import java.time.Instant -import java.util.UUID +import java.util.* @JvmRecord data class InternTilstand( @@ -26,7 +26,6 @@ sealed interface Tilstand{ data object Levert : Tilstand } - @JvmRecord data class PeriodeInfo( val periodeId: UUID, @@ -43,7 +42,8 @@ data class PeriodeInfo( fun initTilstand( id: Long, key: Long, - periode: Periode + periode: Periode, + bekreftelseConfig: BekreftelseConfig ): InternTilstand = InternTilstand( periode = PeriodeInfo( @@ -54,5 +54,13 @@ fun initTilstand( startet = periode.startet.tidspunkt, avsluttet = periode.avsluttet.tidspunkt ), - bekreftelser = emptyList() - ) + bekreftelser = listOf( + Bekreftelse( + tilstand = Tilstand.IkkeKlarForUtfylling, + sistePurring = null, + bekreftelseId = UUID.randomUUID(), + gjelderFra = periode.startet.tidspunkt, + gjelderTil = fristForNesteBekreftelse(periode.startet.tidspunkt, bekreftelseConfig.bekreftelseInterval) + ) + ) + ) \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/tilstand/InternTilstandSerde.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstandSerde.kt similarity index 96% rename from apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/tilstand/InternTilstandSerde.kt rename to apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstandSerde.kt index 79c349e6..5e7e75b7 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/tilstand/InternTilstandSerde.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstandSerde.kt @@ -1,4 +1,4 @@ -package no.nav.paw.bekretelsetjeneste.tilstand +package no.nav.paw.bekreftelsetjeneste.tilstand import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/tilstand/RegisterInstillinger.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/RegisterInstillinger.kt similarity index 59% rename from apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/tilstand/RegisterInstillinger.kt rename to apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/RegisterInstillinger.kt index f9be1bf5..12a7a392 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/tilstand/RegisterInstillinger.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/RegisterInstillinger.kt @@ -1,4 +1,4 @@ -package no.nav.paw.bekretelsetjeneste.tilstand +package no.nav.paw.bekreftelsetjeneste.tilstand import java.time.DayOfWeek import java.time.Duration @@ -8,12 +8,16 @@ import java.time.ZoneId import java.time.temporal.TemporalAdjuster import java.time.temporal.TemporalAdjusters -data class RapporteringsKonfigurasjon( - val rapporteringTilgjengligOffset: Duration, - val varselFoerUtloepAvGracePeriode: Duration -) +// Felles verdier for bekreftelse. +data object BekreftelseConfig { + val bekreftelseInterval:Duration = Duration.ofDays(14) + val gracePeriode: Duration = Duration.ofDays(7) + val bekreftelseTilgjengeligOffset: Duration = Duration.ofDays(3) + val varselFoerGracePeriodeUtloept: Duration = gracePeriode.dividedBy(2) +} -fun fristForNesteRapportering(forrige: Instant, interval: Duration): Instant { +fun fristForNesteBekreftelse(forrige: Instant, interval: Duration): Instant { + // TODO: Finn regler for magic monday og gjør nødvendig justeringer val magicMondayAdjuster = MagicMondayAdjuster() val zoneId = ZoneId.of("Europe/Oslo") return forrige @@ -24,6 +28,17 @@ fun fristForNesteRapportering(forrige: Instant, interval: Duration): Instant { .atStartOfDay(zoneId).toInstant() } +fun gjenstaendeGracePeriode(timestamp: Instant, gjelderTil: Instant): Duration { + val gracePeriode = BekreftelseConfig.gracePeriode + val utvidetGjelderTil = gjelderTil.plus(gracePeriode) + + return if (utvidetGjelderTil.isBefore(timestamp)) { + Duration.ZERO + } else { + Duration.between(timestamp, utvidetGjelderTil) + } +} + class MagicMondayAdjuster: TemporalAdjuster { override fun adjustInto(temporal: java.time.temporal.Temporal): java.time.temporal.Temporal { val internalAdjuster = TemporalAdjusters.next(DayOfWeek.MONDAY) diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/KontrolerFrister.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/KontrolerFrister.kt deleted file mode 100644 index dfdb0f58..00000000 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekretelsetjeneste/KontrolerFrister.kt +++ /dev/null @@ -1,19 +0,0 @@ -package no.nav.paw.bekretelsetjeneste - -import no.nav.paw.config.kafka.streams.Punctuation -import no.nav.paw.bekretelsetjeneste.tilstand.InternTilstand -import no.nav.paw.bekretelsetjeneste.tilstand.RapporteringsKonfigurasjon -import org.apache.kafka.streams.KeyValue -import java.time.Instant - - -context(ApplicationConfiguration, RapporteringsKonfigurasjon) -fun kontrollerFrister(): Punctuation = TODO() - -operator fun KeyValue.component1(): K = key -operator fun KeyValue.component2(): V = value - -context(Instant, RapporteringsKonfigurasjon) -fun rapporteringSkalTilgjengeliggjoeres(tilstand: InternTilstand): Boolean { - TODO() -} \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekretelsetjeneste/ApplicationTestContext.kt b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationTestContext.kt similarity index 96% rename from apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekretelsetjeneste/ApplicationTestContext.kt rename to apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationTestContext.kt index f95ce5a3..214cdb78 100644 --- a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekretelsetjeneste/ApplicationTestContext.kt +++ b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationTestContext.kt @@ -1,4 +1,4 @@ -package no.nav.paw.bekretelsetjeneste +package no.nav.paw.bekreftelsetjeneste import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry import io.confluent.kafka.serializers.KafkaAvroSerializerConfig @@ -8,7 +8,7 @@ import no.nav.paw.bekreftelse.ansvar.v1.AnsvarEndret import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde import no.nav.paw.bekreftelse.melding.v1.Bekreftelse -import no.nav.paw.bekretelsetjeneste.tilstand.InternTilstandSerde +import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstandSerde import no.nav.paw.kafkakeygenerator.client.KafkaKeysResponse import no.nav.paw.kafkakeygenerator.client.inMemoryKafkaKeysMock import org.apache.avro.specific.SpecificRecord @@ -69,7 +69,7 @@ class ApplicationTestContext { ansvarsTopicSerde.serializer() ) - val rapporteringsTopic = testDriver.createInputTopic( + val bekreftelseTopic = testDriver.createInputTopic( applicationConfiguration.bekreftelseTopic, Serdes.Long().serializer(), bekreftelseSerde.serializer() diff --git a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekretelsetjeneste/IngenAndreTarAnsvarTest.kt b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/IngenAndreTarAnsvarTest.kt similarity index 98% rename from apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekretelsetjeneste/IngenAndreTarAnsvarTest.kt rename to apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/IngenAndreTarAnsvarTest.kt index 5429a549..9d753df0 100644 --- a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekretelsetjeneste/IngenAndreTarAnsvarTest.kt +++ b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/IngenAndreTarAnsvarTest.kt @@ -1,4 +1,4 @@ -package no.nav.paw.bekretelsetjeneste +package no.nav.paw.bekreftelsetjeneste import io.kotest.core.annotation.Ignored import io.kotest.core.spec.style.FreeSpec diff --git a/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/GenericProcessor.kt b/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/GenericProcessor.kt index 8dbabb79..e9472006 100644 --- a/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/GenericProcessor.kt +++ b/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/GenericProcessor.kt @@ -74,30 +74,31 @@ fun KStream.genericProcess( function: ProcessorContext.(Record) -> Unit ): KStream { val processor = { - GenericProcessor(function = function, punctuation = punctuation) + GenericProcessor(function = function, punctuation = punctuation, stateStoreNames = stateStoreNames) } return process(processor, Named.`as`(name), *stateStoreNames) } class GenericProcessor( private val punctuation: Punctuation? = null, - private val function: ProcessorContext.(Record) -> Unit + private vararg val stateStoreNames: String, + private val function: ProcessorContext.(Record) -> Unit, ) : Processor { - private var context: ProcessorContext? = null + private lateinit var context: ProcessorContext override fun init(context: ProcessorContext?) { super.init(context) - this.context = context + this.context = requireNotNull(context) { "ProcessorContext must not be null during init" } if (punctuation != null) { - context?.schedule(punctuation.interval, punctuation.type) { time -> - punctuation.function(Instant.ofEpochMilli(time), context) + context.schedule(punctuation.interval, punctuation.type) { timestamp -> + punctuation.function(Instant.ofEpochMilli(timestamp), this.context, stateStoreNames) } } } override fun process(record: Record?) { if (record == null) return - val ctx = requireNotNull(context) { "Context is not initialized" } + val ctx = requireNotNull(context) { "ProcessorContext is not initialized before processing records" } with(ctx) { function(record) } } } @@ -105,5 +106,5 @@ class GenericProcessor( data class Punctuation( val interval: Duration, val type: PunctuationType, - val function: (Instant, ProcessorContext) -> Unit + val function: (Instant, ProcessorContext, Array) -> Unit )