From feaebfd8ffbd748832e9b1b7c5083a23d84b37be Mon Sep 17 00:00:00 2001 From: robertkittilsen Date: Wed, 18 Sep 2024 09:51:21 +0200 Subject: [PATCH] reversert genericProcess endringer, endret navn til bekreftelsePunctuator, lagt til graceutloept tilstand, lagt til arrow core --- apps/bekreftelse-tjeneste/build.gradle.kts | 1 + .../BekreftelseMeldingTopology.kt | 3 ++- ...teTilstand.kt => BekreftelsePunctuator.kt} | 11 +++++----- .../bekreftelsetjeneste/KontrolerFrister.kt | 5 +++-- .../no/nav/paw/bekreftelsetjeneste/Startup.kt | 1 + .../tilstand/InternTilstand.kt | 5 +++-- .../BekreftelsePunctuatorTest.kt | 20 +++++++++++++++++++ .../config/kafka/streams/GenericProcessor.kt | 7 +++---- 8 files changed, 39 insertions(+), 14 deletions(-) rename apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/{ScheduleUpdateTilstand.kt => BekreftelsePunctuator.kt} (95%) create mode 100644 apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuatorTest.kt diff --git a/apps/bekreftelse-tjeneste/build.gradle.kts b/apps/bekreftelse-tjeneste/build.gradle.kts index 910e9c5c..ce740f0e 100644 --- a/apps/bekreftelse-tjeneste/build.gradle.kts +++ b/apps/bekreftelse-tjeneste/build.gradle.kts @@ -17,6 +17,7 @@ dependencies { implementation(jackson.datatypeJsr310) implementation(jackson.kotlin) implementation(apacheAvro.kafkaStreamsAvroSerde) + implementation(arrow.core) implementation(ktorServer.bundles.withNettyAndMicrometer) diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseMeldingTopology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseMeldingTopology.kt index 9575d1d8..4134a4c9 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseMeldingTopology.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseMeldingTopology.kt @@ -1,5 +1,6 @@ package no.nav.paw.bekreftelsetjeneste +import arrow.core.partially1 import no.nav.paw.bekreftelse.internehendelser.BaOmAaAvsluttePeriode import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse import no.nav.paw.bekreftelse.internehendelser.BekreftelseMeldingMottatt @@ -22,7 +23,7 @@ fun StreamsBuilder.processBekreftelseMeldingTopic() { .genericProcess( name = "meldingMottatt", stateStoreName, - punctuation = Punctuation(punctuateInterval, PunctuationType.WALL_CLOCK_TIME, ::scheduleUpdateTilstand) + punctuation = Punctuation(punctuateInterval, PunctuationType.WALL_CLOCK_TIME, ::bekreftelsePunctuator.partially1(stateStoreName)), ) { record -> val stateStore = getStateStore(stateStoreName) val gjeldeneTilstand: InternTilstand? = stateStore[record.value().periodeId] diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ScheduleUpdateTilstand.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuator.kt similarity index 95% rename from apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ScheduleUpdateTilstand.kt rename to apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuator.kt index c3e05098..9b58847d 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ScheduleUpdateTilstand.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuator.kt @@ -15,8 +15,8 @@ import org.apache.kafka.streams.processor.api.Record import java.time.Instant import java.util.* -fun scheduleUpdateTilstand(timestamp: Instant, ctx: ProcessorContext, stateStoreNames: Array) { - val stateStore: StateStore = ctx.getStateStore(stateStoreNames[0]) +fun bekreftelsePunctuator(stateStoreName: String, timestamp: Instant, ctx: ProcessorContext ) { + val stateStore: StateStore = ctx.getStateStore(stateStoreName) stateStore.all().use { states -> states.forEach { (key, value) -> @@ -67,10 +67,11 @@ fun scheduleUpdateTilstand(timestamp: Instant, ctx: ProcessorContext { - val updatedBekreftelse = bekreftelse.copy(sistePurring = timestamp) + val updatedBekreftelse = bekreftelse.copy(sisteVarselOmGjenstaaendeGraceTid = timestamp) val updatedInternTilstand = value.copy(bekreftelser = value.bekreftelser - bekreftelse + updatedBekreftelse) + stateStore.put(key, updatedInternTilstand) val record = Record( @@ -89,7 +90,7 @@ fun scheduleUpdateTilstand(timestamp: Instant, ctx: ProcessorContext { - // TODO: Mangler vi tilstand.utloept eller er det riktig at den skal fjernes her? + // TODO: oppdater til graceutloept val updatedInternTilstand = value.copy( bekreftelser = value.bekreftelser - bekreftelse ) @@ -113,7 +114,7 @@ fun scheduleUpdateTilstand(timestamp: Instant, ctx: ProcessorContext { val newBekreftelse = bekreftelse.copy( tilstand = Tilstand.KlarForUtfylling, - sistePurring = null, + sisteVarselOmGjenstaaendeGraceTid = null, bekreftelseId = UUID.randomUUID(), gjelderFra = bekreftelse.gjelderTil, gjelderTil = fristForNesteBekreftelse(bekreftelse.gjelderTil, BekreftelseConfig.bekreftelseInterval) diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/KontrolerFrister.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/KontrolerFrister.kt index b5dc9498..5bb89c59 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/KontrolerFrister.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/KontrolerFrister.kt @@ -11,7 +11,7 @@ fun Bekreftelse.harFristUtloept(now: Instant): Boolean = gjelderTil.isBefore(now) fun Bekreftelse.skalPurres(now: Instant): Boolean = - sistePurring == null && gjelderTil.plus( + sisteVarselOmGjenstaaendeGraceTid == null && gjelderTil.plus( BekreftelseConfig.varselFoerGracePeriodeUtloept ).isAfter(now) @@ -20,7 +20,8 @@ fun Bekreftelse.harGracePeriodeUtloept(now: Instant): Boolean = .isAfter(now) fun Bekreftelse.skalLageNyBekreftelseTilgjengelig(now: Instant, bekreftelser: List): Boolean = + // Bruk nyeste dato til å sjekke om det skal lages ny bekreftelse gjelderTil.plus(BekreftelseConfig.bekreftelseInterval) - .minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).isAfter(now) && bekreftelser.size < 2 + .minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).isAfter(now) diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Startup.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Startup.kt index d1c825fe..c7b26513 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Startup.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Startup.kt @@ -48,4 +48,5 @@ fun main() { .withDefaultKeySerde(Serdes.Long()::class) .withDefaultValueSerde(SpecificAvroSerde::class) val streams = KafkaStreams(topology, streamsFactory.properties) + // TODO: start streams } \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstand.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstand.kt index 11662a45..3b7cd129 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstand.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstand.kt @@ -13,7 +13,7 @@ data class InternTilstand( @JvmRecord data class Bekreftelse( val tilstand: Tilstand, - val sistePurring: Instant?, + val sisteVarselOmGjenstaaendeGraceTid: Instant?, val bekreftelseId: UUID, val gjelderFra: Instant, val gjelderTil: Instant @@ -23,6 +23,7 @@ sealed interface Tilstand{ data object IkkeKlarForUtfylling: Tilstand data object KlarForUtfylling: Tilstand data object VenterSvar: Tilstand + data object GracePeriodeUtlopt: Tilstand data object Levert : Tilstand } @@ -57,7 +58,7 @@ fun initTilstand( bekreftelser = listOf( Bekreftelse( tilstand = Tilstand.IkkeKlarForUtfylling, - sistePurring = null, + sisteVarselOmGjenstaaendeGraceTid = null, bekreftelseId = UUID.randomUUID(), gjelderFra = periode.startet.tidspunkt, gjelderTil = fristForNesteBekreftelse(periode.startet.tidspunkt, bekreftelseConfig.bekreftelseInterval) diff --git a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuatorTest.kt b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuatorTest.kt new file mode 100644 index 00000000..83fd17ed --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuatorTest.kt @@ -0,0 +1,20 @@ +package no.nav.paw.bekreftelsetjeneste + +import io.kotest.core.spec.style.FreeSpec + +class BekreftelsePunctuatorTest : FreeSpec({ + + "Tilstand IkkeKlarForUtfylling og bekreftelseTilgjengeligOffset før gjelderTil settes til KlarForUtfylling og sender BekreftelseTilgjengelig hendelse" { + + } + + /*for hver bekreftelse i "klar for utfylling" og gjelderTil passert, sett til "venter på svar" og send hendelse LeveringsFristUtloept + + for hver bekreftelse i "venter på svar" og ingen purring sendt og x tid passert siden frist, send RegisterGracePeriodeGjenstaaendeTid og sett purring sendt timestamp til now() + + for hver bekreftelse i "venter på svar" og grace periode utløpt, send RegisterGracePeriodeUtloept + + for hver periode hvis det er mindre enn x dager til den siste bekreftelse perioden utgår lag ny bekreftelse periode*/ +}) + + diff --git a/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/GenericProcessor.kt b/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/GenericProcessor.kt index e9472006..23bae121 100644 --- a/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/GenericProcessor.kt +++ b/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/GenericProcessor.kt @@ -74,14 +74,13 @@ fun KStream.genericProcess( function: ProcessorContext.(Record) -> Unit ): KStream { val processor = { - GenericProcessor(function = function, punctuation = punctuation, stateStoreNames = stateStoreNames) + GenericProcessor(function = function, punctuation = punctuation) } return process(processor, Named.`as`(name), *stateStoreNames) } class GenericProcessor( private val punctuation: Punctuation? = null, - private vararg val stateStoreNames: String, private val function: ProcessorContext.(Record) -> Unit, ) : Processor { private lateinit var context: ProcessorContext @@ -91,7 +90,7 @@ class GenericProcessor( this.context = requireNotNull(context) { "ProcessorContext must not be null during init" } if (punctuation != null) { context.schedule(punctuation.interval, punctuation.type) { timestamp -> - punctuation.function(Instant.ofEpochMilli(timestamp), this.context, stateStoreNames) + punctuation.function(Instant.ofEpochMilli(timestamp), this.context) } } } @@ -106,5 +105,5 @@ class GenericProcessor( data class Punctuation( val interval: Duration, val type: PunctuationType, - val function: (Instant, ProcessorContext, Array) -> Unit + val function: (Instant, ProcessorContext) -> Unit )