Skip to content

Commit

Permalink
reversert genericProcess endringer, endret navn til bekreftelsePunctu…
Browse files Browse the repository at this point in the history
…ator, lagt til graceutloept tilstand, lagt til arrow core
  • Loading branch information
robertkittilsen committed Sep 18, 2024
1 parent 789575f commit feaebfd
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 14 deletions.
1 change: 1 addition & 0 deletions apps/bekreftelse-tjeneste/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies {
implementation(jackson.datatypeJsr310)
implementation(jackson.kotlin)
implementation(apacheAvro.kafkaStreamsAvroSerde)
implementation(arrow.core)

implementation(ktorServer.bundles.withNettyAndMicrometer)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package no.nav.paw.bekreftelsetjeneste

import arrow.core.partially1
import no.nav.paw.bekreftelse.internehendelser.BaOmAaAvsluttePeriode
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import no.nav.paw.bekreftelse.internehendelser.BekreftelseMeldingMottatt
Expand All @@ -22,7 +23,7 @@ fun StreamsBuilder.processBekreftelseMeldingTopic() {
.genericProcess<Long, no.nav.paw.bekreftelse.melding.v1.Bekreftelse, Long, BekreftelseHendelse>(
name = "meldingMottatt",
stateStoreName,
punctuation = Punctuation(punctuateInterval, PunctuationType.WALL_CLOCK_TIME, ::scheduleUpdateTilstand)
punctuation = Punctuation(punctuateInterval, PunctuationType.WALL_CLOCK_TIME, ::bekreftelsePunctuator.partially1(stateStoreName)),
) { record ->
val stateStore = getStateStore<StateStore>(stateStoreName)
val gjeldeneTilstand: InternTilstand? = stateStore[record.value().periodeId]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import org.apache.kafka.streams.processor.api.Record
import java.time.Instant
import java.util.*

fun scheduleUpdateTilstand(timestamp: Instant, ctx: ProcessorContext<Long, BekreftelseHendelse>, stateStoreNames: Array<out String>) {
val stateStore: StateStore = ctx.getStateStore(stateStoreNames[0])
fun bekreftelsePunctuator(stateStoreName: String, timestamp: Instant, ctx: ProcessorContext<Long, BekreftelseHendelse> ) {
val stateStore: StateStore = ctx.getStateStore(stateStoreName)

stateStore.all().use { states ->
states.forEach { (key, value) ->
Expand Down Expand Up @@ -67,10 +67,11 @@ fun scheduleUpdateTilstand(timestamp: Instant, ctx: ProcessorContext<Long, Bekre
}

bekreftelse.tilstand == Tilstand.VenterSvar && bekreftelse.skalPurres(timestamp) -> {
val updatedBekreftelse = bekreftelse.copy(sistePurring = timestamp)
val updatedBekreftelse = bekreftelse.copy(sisteVarselOmGjenstaaendeGraceTid = timestamp)
val updatedInternTilstand =
value.copy(bekreftelser = value.bekreftelser - bekreftelse + updatedBekreftelse)


stateStore.put(key, updatedInternTilstand)

val record = Record<Long, BekreftelseHendelse>(
Expand All @@ -89,7 +90,7 @@ fun scheduleUpdateTilstand(timestamp: Instant, ctx: ProcessorContext<Long, Bekre

bekreftelse.tilstand == Tilstand.VenterSvar && bekreftelse.harGracePeriodeUtloept(timestamp) -> {

// TODO: Mangler vi tilstand.utloept eller er det riktig at den skal fjernes her?
// TODO: oppdater til graceutloept
val updatedInternTilstand = value.copy(
bekreftelser = value.bekreftelser - bekreftelse
)
Expand All @@ -113,7 +114,7 @@ fun scheduleUpdateTilstand(timestamp: Instant, ctx: ProcessorContext<Long, Bekre
bekreftelse.skalLageNyBekreftelseTilgjengelig(timestamp, value.bekreftelser) -> {
val newBekreftelse = bekreftelse.copy(
tilstand = Tilstand.KlarForUtfylling,
sistePurring = null,
sisteVarselOmGjenstaaendeGraceTid = null,
bekreftelseId = UUID.randomUUID(),
gjelderFra = bekreftelse.gjelderTil,
gjelderTil = fristForNesteBekreftelse(bekreftelse.gjelderTil, BekreftelseConfig.bekreftelseInterval)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fun Bekreftelse.harFristUtloept(now: Instant): Boolean =
gjelderTil.isBefore(now)

fun Bekreftelse.skalPurres(now: Instant): Boolean =
sistePurring == null && gjelderTil.plus(
sisteVarselOmGjenstaaendeGraceTid == null && gjelderTil.plus(
BekreftelseConfig.varselFoerGracePeriodeUtloept
).isAfter(now)

Expand All @@ -20,7 +20,8 @@ fun Bekreftelse.harGracePeriodeUtloept(now: Instant): Boolean =
.isAfter(now)

fun Bekreftelse.skalLageNyBekreftelseTilgjengelig(now: Instant, bekreftelser: List<Bekreftelse>): Boolean =
// Bruk nyeste dato til å sjekke om det skal lages ny bekreftelse
gjelderTil.plus(BekreftelseConfig.bekreftelseInterval)
.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).isAfter(now) && bekreftelser.size < 2
.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).isAfter(now)


Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ fun main() {
.withDefaultKeySerde(Serdes.Long()::class)
.withDefaultValueSerde(SpecificAvroSerde::class)
val streams = KafkaStreams(topology, streamsFactory.properties)
// TODO: start streams
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ data class InternTilstand(
@JvmRecord
data class Bekreftelse(
val tilstand: Tilstand,
val sistePurring: Instant?,
val sisteVarselOmGjenstaaendeGraceTid: Instant?,
val bekreftelseId: UUID,
val gjelderFra: Instant,
val gjelderTil: Instant
Expand All @@ -23,6 +23,7 @@ sealed interface Tilstand{
data object IkkeKlarForUtfylling: Tilstand
data object KlarForUtfylling: Tilstand
data object VenterSvar: Tilstand
data object GracePeriodeUtlopt: Tilstand
data object Levert : Tilstand
}

Expand Down Expand Up @@ -57,7 +58,7 @@ fun initTilstand(
bekreftelser = listOf(
Bekreftelse(
tilstand = Tilstand.IkkeKlarForUtfylling,
sistePurring = null,
sisteVarselOmGjenstaaendeGraceTid = null,
bekreftelseId = UUID.randomUUID(),
gjelderFra = periode.startet.tidspunkt,
gjelderTil = fristForNesteBekreftelse(periode.startet.tidspunkt, bekreftelseConfig.bekreftelseInterval)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package no.nav.paw.bekreftelsetjeneste

import io.kotest.core.spec.style.FreeSpec

class BekreftelsePunctuatorTest : FreeSpec({

"Tilstand IkkeKlarForUtfylling og bekreftelseTilgjengeligOffset før gjelderTil settes til KlarForUtfylling og sender BekreftelseTilgjengelig hendelse" {

}

/*for hver bekreftelse i "klar for utfylling" og gjelderTil passert, sett til "venter på svar" og send hendelse LeveringsFristUtloept
for hver bekreftelse i "venter på svar" og ingen purring sendt og x tid passert siden frist, send RegisterGracePeriodeGjenstaaendeTid og sett purring sendt timestamp til now()
for hver bekreftelse i "venter på svar" og grace periode utløpt, send RegisterGracePeriodeUtloept
for hver periode hvis det er mindre enn x dager til den siste bekreftelse perioden utgår lag ny bekreftelse periode*/
})


Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,13 @@ fun <K_IN, V_IN, K_OUT, V_OUT> KStream<K_IN, V_IN>.genericProcess(
function: ProcessorContext<K_OUT, V_OUT>.(Record<K_IN, V_IN>) -> Unit
): KStream<K_OUT, V_OUT> {
val processor = {
GenericProcessor(function = function, punctuation = punctuation, stateStoreNames = stateStoreNames)
GenericProcessor(function = function, punctuation = punctuation)
}
return process(processor, Named.`as`(name), *stateStoreNames)
}

class GenericProcessor<K_IN, V_IN, K_OUT, V_OUT>(
private val punctuation: Punctuation<K_OUT, V_OUT>? = null,
private vararg val stateStoreNames: String,
private val function: ProcessorContext<K_OUT, V_OUT>.(Record<K_IN, V_IN>) -> Unit,
) : Processor<K_IN, V_IN, K_OUT, V_OUT> {
private lateinit var context: ProcessorContext<K_OUT, V_OUT>
Expand All @@ -91,7 +90,7 @@ class GenericProcessor<K_IN, V_IN, K_OUT, V_OUT>(
this.context = requireNotNull(context) { "ProcessorContext must not be null during init" }
if (punctuation != null) {
context.schedule(punctuation.interval, punctuation.type) { timestamp ->
punctuation.function(Instant.ofEpochMilli(timestamp), this.context, stateStoreNames)
punctuation.function(Instant.ofEpochMilli(timestamp), this.context)
}
}
}
Expand All @@ -106,5 +105,5 @@ class GenericProcessor<K_IN, V_IN, K_OUT, V_OUT>(
data class Punctuation<K, V>(
val interval: Duration,
val type: PunctuationType,
val function: (Instant, ProcessorContext<K, V>, Array<out String>) -> Unit
val function: (Instant, ProcessorContext<K, V>) -> Unit
)

0 comments on commit feaebfd

Please sign in to comment.