diff --git a/apps/bekreftelse-api/src/main/kotlin/no/nav/paw/bekreftelse/api/services/AuthorizationService.kt b/apps/bekreftelse-api/src/main/kotlin/no/nav/paw/bekreftelse/api/services/AuthorizationService.kt index e714942d..eb56ec49 100644 --- a/apps/bekreftelse-api/src/main/kotlin/no/nav/paw/bekreftelse/api/services/AuthorizationService.kt +++ b/apps/bekreftelse-api/src/main/kotlin/no/nav/paw/bekreftelse/api/services/AuthorizationService.kt @@ -1,5 +1,6 @@ package no.nav.paw.bekreftelse.api.services +import io.opentelemetry.api.trace.Span import io.opentelemetry.instrumentation.annotations.WithSpan import no.nav.paw.bekreftelse.api.config.ApplicationConfig import no.nav.paw.bekreftelse.api.config.ServerConfig @@ -54,7 +55,6 @@ class AuthorizationService( accessToken = accessToken, tilgangType = tilgangType ) - return authorize(securityContext) } diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Application.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Application.kt index b7751fd2..802c5c05 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Application.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Application.kt @@ -5,6 +5,7 @@ import io.ktor.server.engine.addShutdownHook import io.ktor.server.engine.embeddedServer import io.ktor.server.netty.Netty import io.ktor.server.routing.routing +import no.nav.paw.bekreftelsetjeneste.ansvar.AnsvarSerde import no.nav.paw.bekreftelsetjeneste.config.APPLICATION_CONFIG_FILE_NAME import no.nav.paw.bekreftelsetjeneste.config.ApplicationConfig import no.nav.paw.bekreftelsetjeneste.config.SERVER_CONFIG_FILE_NAME @@ -14,11 +15,14 @@ import no.nav.paw.bekreftelsetjeneste.plugins.buildKafkaStreams import no.nav.paw.bekreftelsetjeneste.plugins.configureKafka import no.nav.paw.bekreftelsetjeneste.plugins.configureMetrics import no.nav.paw.bekreftelsetjeneste.routes.metricsRoutes +import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstandSerde import no.nav.paw.bekreftelsetjeneste.topology.buildTopology import no.nav.paw.config.env.appNameOrDefaultForLocal import no.nav.paw.config.env.currentRuntimeEnvironment import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration import no.nav.paw.health.route.healthRoutes +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.state.Stores import org.slf4j.LoggerFactory @@ -42,11 +46,22 @@ fun main() { fun Application.module(applicationConfig: ApplicationConfig) { val applicationContext = ApplicationContext.create(applicationConfig) - - val kafkaTopology = buildTopology( - applicationContext = applicationContext, - keyValueStateStoreSupplier = Stores::persistentKeyValueStore + val stream = StreamsBuilder() + stream.addStateStore( + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(applicationContext.applicationConfig.kafkaTopology.internStateStoreName), + Serdes.UUID(), + InternTilstandSerde() + ) + ) + stream.addStateStore( + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(applicationContext.applicationConfig.kafkaTopology.ansvarStateStoreName), + Serdes.UUID(), + AnsvarSerde() + ) ) + val kafkaTopology = stream.buildTopology(applicationContext) val kafkaStreams = buildKafkaStreams(applicationContext, kafkaTopology) configureMetrics(applicationContext) diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ansvar/Ansvar.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ansvar/Ansvar.kt index b7bbf656..38ac40aa 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ansvar/Ansvar.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ansvar/Ansvar.kt @@ -1,5 +1,6 @@ package no.nav.paw.bekreftelsetjeneste.ansvar +import no.nav.paw.bekreftelse.ansvar.v1.vo.Bekreftelsesloesning import java.time.Duration import java.util.* @@ -8,6 +9,26 @@ data class Ansvar( val ansvarlige: List ) +data class Ansvarlig( + val loesning: Loesning, + val intervall: Duration, + val gracePeriode: Duration +) + +enum class Loesning { + UKJENT_VERDI, + ARBEIDSSOEKERREGISTERET, + DAGPENGER; + + companion object { + fun from(value: Bekreftelsesloesning): Loesning = when (value) { + Bekreftelsesloesning.UKJENT_VERDI -> UKJENT_VERDI + Bekreftelsesloesning.ARBEIDSSOEKERREGISTERET -> ARBEIDSSOEKERREGISTERET + Bekreftelsesloesning.DAGPENGER -> DAGPENGER + } + } +} + fun ansvar( periodeId: UUID, ansvarlig: Ansvarlig? = null @@ -18,19 +39,11 @@ fun ansvar( operator fun Ansvar.plus(ansvarlig: Ansvarlig): Ansvar = copy(ansvarlige = ansvarlige - .filterNot { it.namespace == ansvarlig.namespace} + ansvarlig + .filterNot { it.loesning == ansvarlig.loesning} + ansvarlig ) -operator fun Ansvar.minus(namespace: String): Ansvar? = - ansvarlige.filterNot { it.namespace == namespace } - .takeIf(List::isNotEmpty) +operator fun Ansvar?.minus(loesning: Loesning): Ansvar? = + this?.ansvarlige + ?.filterNot { it.loesning == loesning } + ?.takeIf(List::isNotEmpty) ?.let { copy(ansvarlige = it) } - - - -data class Ansvarlig( - val namespace: String, - val id: String, - val intervall: Duration, - val gracePeriode: Duration -) \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ansvar/AnsvarSerde.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ansvar/AnsvarSerde.kt new file mode 100644 index 00000000..eb49ba97 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ansvar/AnsvarSerde.kt @@ -0,0 +1,34 @@ +package no.nav.paw.bekreftelsetjeneste.ansvar + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import com.fasterxml.jackson.module.kotlin.readValue +import com.fasterxml.jackson.module.kotlin.registerKotlinModule +import org.apache.kafka.common.serialization.Deserializer +import org.apache.kafka.common.serialization.Serde +import org.apache.kafka.common.serialization.Serializer + +private val ansvarObjectMapper = ObjectMapper() + .registerKotlinModule() + .registerModules(JavaTimeModule()) + +class AnsvarSerde: Serde { + private val ansvarSerializer = AnsvarSerializer() + private val ansvarDeserializer = AnsvarDeserializer() + + override fun serializer(): Serializer = ansvarSerializer + + override fun deserializer(): Deserializer = ansvarDeserializer +} + +class AnsvarSerializer: Serializer { + override fun serialize(topic: String?, data: Ansvar): ByteArray { + return ansvarObjectMapper.writeValueAsBytes(data) + } +} + +class AnsvarDeserializer: Deserializer { + override fun deserialize(topic: String?, data: ByteArray): Ansvar { + return ansvarObjectMapper.readValue(data) + } +} \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ansvar/handterAnsvarEndret.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ansvar/handterAnsvarEndret.kt new file mode 100644 index 00000000..747c23cc --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ansvar/handterAnsvarEndret.kt @@ -0,0 +1,77 @@ +package no.nav.paw.bekreftelsetjeneste.ansvar + +import no.nav.paw.bekreftelse.ansvar.v1.AnsvarEndret +import no.nav.paw.bekreftelse.ansvar.v1.vo.AvslutterAnsvar +import no.nav.paw.bekreftelse.ansvar.v1.vo.TarAnsvar +import no.nav.paw.bekreftelse.internehendelser.AndreHarOvertattAnsvar +import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse +import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand +import java.time.Duration +import java.time.Instant +import java.util.* + +fun haandterAnsvarEndret( + tilstand: InternTilstand?, + ansvar: Ansvar?, + ansvarEndret: AnsvarEndret +): List { + return when (val handling = ansvarEndret.handling) { + is TarAnsvar -> tarAnsvar( + tilstand = tilstand, + ansvar = ansvar, + ansvarEndret = ansvarEndret, + handling = handling + ) + is AvslutterAnsvar -> avslutterAnsvar( + ansvar = ansvar, + ansvarEndret = ansvarEndret + ) + else -> emptyList() + } +} + +fun avslutterAnsvar( + ansvar: Ansvar?, + ansvarEndret: AnsvarEndret +): List { + val oppdatertAnsvar = ansvar - Loesning.from(ansvarEndret.bekreftelsesloesning) + val ansvarsHandling = when { + ansvar != null && oppdatertAnsvar == null -> SlettAnsvar(ansvarEndret.periodeId) + ansvar != null && oppdatertAnsvar != null -> SkrivAnsvar(ansvarEndret.periodeId, oppdatertAnsvar) + else -> null + } + return listOfNotNull(ansvarsHandling) +} + +fun tarAnsvar( + tilstand: InternTilstand?, + ansvar: Ansvar?, + ansvarEndret: AnsvarEndret, + handling: TarAnsvar +): List { + val oppdatertAnsvar = + (ansvar ?: ansvar(ansvarEndret.periodeId)) + + Ansvarlig( + loesning = Loesning.from(ansvarEndret.bekreftelsesloesning), + intervall = Duration.ofMillis(handling.intervalMS), + gracePeriode = Duration.ofMillis(handling.graceMS) + ) + val hendelse = tilstand?.let { + AndreHarOvertattAnsvar( + hendelseId = UUID.randomUUID(), + periodeId = ansvarEndret.periodeId, + arbeidssoekerId = tilstand.periode.arbeidsoekerId, + hendelseTidspunkt = Instant.now(), + ) + } + return listOfNotNull( + if (ansvar != oppdatertAnsvar) SkrivAnsvar(ansvarEndret.periodeId, oppdatertAnsvar) else null, + hendelse?.let(::SendHendelse) + ) +} + + +sealed interface Handling +data class SlettAnsvar(val id: UUID) : Handling +data class SkrivAnsvar(val id: UUID, val value: Ansvar) : Handling +data class SendHendelse(val hendelse: BekreftelseHendelse) : Handling diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/config/ApplicationConfig.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/config/ApplicationConfig.kt index ea527ca3..fc780bb5 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/config/ApplicationConfig.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/config/ApplicationConfig.kt @@ -31,9 +31,11 @@ data class BekreftelseIntervals( data class KafkaTopologyConfig( val applicationIdSuffix: String, val internStateStoreName: String, + val ansvarStateStoreName: String, val periodeTopic: String, val bekreftelseTopic: String, val bekreftelseHendelseloggTopic: String, + val ansvarsTopic: String, val punctuationInterval: Duration, val shutdownTimeout: Duration = Duration.ofMinutes(5), ) diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/context/ApplicationContext.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/context/ApplicationContext.kt index 7e8203a1..65e17cb9 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/context/ApplicationContext.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/context/ApplicationContext.kt @@ -2,6 +2,7 @@ package no.nav.paw.bekreftelsetjeneste.context import io.micrometer.prometheusmetrics.PrometheusConfig import io.micrometer.prometheusmetrics.PrometheusMeterRegistry +import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde import no.nav.paw.bekreftelsetjeneste.config.ApplicationConfig import no.nav.paw.health.repository.HealthIndicatorRepository import no.nav.paw.kafkakeygenerator.auth.azureAdM2MTokenClient @@ -14,6 +15,8 @@ class ApplicationContext( val healthIndicatorRepository: HealthIndicatorRepository, val kafkaKeysClient: KafkaKeysClient ) { + val bekreftelseHendelseSerde = BekreftelseHendelseSerde() + companion object { fun create(applicationConfig: ApplicationConfig): ApplicationContext { val prometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/Bekreftelse.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/Bekreftelse.kt index eb35306c..4b4947bf 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/Bekreftelse.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/Bekreftelse.kt @@ -26,13 +26,17 @@ fun opprettFoersteBekreftelse( periode: PeriodeInfo, interval: Duration, currentTime: Instant -): Bekreftelse = - Bekreftelse( +): Bekreftelse { + val start = lastOf(periode.startet, currentTime - interval) + return Bekreftelse( BekreftelseTilstandsLogg(IkkeKlarForUtfylling(periode.startet), emptyList()), bekreftelseId = UUID.randomUUID(), gjelderFra = periode.startet, - gjelderTil = fristForNesteBekreftelse(periode.startet, interval, currentTime) + gjelderTil = fristForNesteBekreftelse(start, interval, periode.startet+interval) ) +} + +fun lastOf(a: Instant, b: Instant): Instant = if (a.isAfter(b)) a else b fun NonEmptyList.opprettNesteTilgjengeligeBekreftelse( diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/AnsvarStroem.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/AnsvarStroem.kt new file mode 100644 index 00000000..a4633c57 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/AnsvarStroem.kt @@ -0,0 +1,74 @@ +package no.nav.paw.bekreftelsetjeneste.topology + +import io.micrometer.core.instrument.Tag +import io.micrometer.core.instrument.Tags +import io.micrometer.prometheusmetrics.PrometheusMeterRegistry +import no.nav.paw.bekreftelse.ansvar.v1.AnsvarEndret +import no.nav.paw.bekreftelse.ansvar.v1.vo.AvslutterAnsvar +import no.nav.paw.bekreftelse.ansvar.v1.vo.TarAnsvar +import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse +import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde +import no.nav.paw.bekreftelsetjeneste.ansvar.* +import no.nav.paw.bekreftelsetjeneste.config.KafkaTopologyConfig +import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand +import no.nav.paw.config.kafka.streams.mapNonNull +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.StreamsBuilder +import org.apache.kafka.streams.kstream.Produced +import org.apache.kafka.streams.state.KeyValueStore +import java.util.* + +fun StreamsBuilder.byggAnsvarsStroem( + registry: PrometheusMeterRegistry, + kafkaTopologyConfig: KafkaTopologyConfig, + bekreftelseHendelseSerde: BekreftelseHendelseSerde +) { + stream(kafkaTopologyConfig.ansvarsTopic) + .peek { _, message -> count(registry, message) } + .mapNonNull( + name = "endre_ansvar", + kafkaTopologyConfig.ansvarStateStoreName, + kafkaTopologyConfig.internStateStoreName + ) { message -> + val internStateStore = + getStateStore>(kafkaTopologyConfig.internStateStoreName) + val ansvarStateStore = getStateStore>(kafkaTopologyConfig.ansvarStateStoreName) + val internTilstand = internStateStore[message.periodeId] + val ansvar = ansvarStateStore[message.periodeId] + haandterAnsvarEndret( + tilstand = internTilstand, + ansvar = ansvar, + ansvarEndret = message + ).map { handling -> + when (handling) { + is SendHendelse -> handling.hendelse + is SkrivAnsvar -> ansvarStateStore.put(handling.id, handling.value) + is SlettAnsvar -> ansvarStateStore.delete(handling.id) + } + }.filterIsInstance() + } + .flatMapValues { _, value -> value } + .to( + kafkaTopologyConfig.bekreftelseHendelseloggTopic, + Produced.with(Serdes.Long(), bekreftelseHendelseSerde) + ) +} + +fun count( + registry: PrometheusMeterRegistry, + message: AnsvarEndret +) { + val action = when (message.handling) { + is TarAnsvar -> "tar_ansvar" + is AvslutterAnsvar -> "avslutter_ansvar" + else -> "ukjent" + } + val bekreftelsesloesning = message.bekreftelsesloesning + registry.counter( + "paw_bekreftelse_ansvar_endret", + Tags.of( + Tag.of("bekreftelsesloesing", bekreftelsesloesning.name), + Tag.of("handling", action) + ) + ) +} diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelsePunctuator.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelsePunctuator.kt index 4d086777..4f2d35b1 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelsePunctuator.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelsePunctuator.kt @@ -11,28 +11,45 @@ import no.nav.paw.bekreftelsetjeneste.tilstand.* import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.processor.api.ProcessorContext import org.apache.kafka.streams.processor.api.Record +import org.slf4j.LoggerFactory import java.time.Duration import java.time.Instant import java.util.* - +private val punctuatorLogger = LoggerFactory.getLogger("punctuator.bekreftelse") fun bekreftelsePunctuator( - stateStoreName: String, + interntilstandStateStoreName: String, + ansvarStateStoreName: String, bekreftelseIntervals: BekreftelseIntervals, timestamp: Instant, ctx: ProcessorContext ) { - val stateStore: StateStore = ctx.getStateStore(stateStoreName) - - stateStore.all().use { states -> - states.forEach { (key, value) -> - val (updatedState, bekreftelseHendelser) = processBekreftelser(bekreftelseIntervals, value, timestamp) - - bekreftelseHendelser.forEach { - ctx.forward(Record(value.periode.recordKey, it, Instant.now().toEpochMilli())) - } - stateStore.put(key, updatedState) + val internTilstandStateStore: InternTilstandStateStore = ctx.getStateStore(interntilstandStateStoreName) + val ansvarStateStore: AnsvarStateStore = ctx.getStateStore(ansvarStateStoreName) + + internTilstandStateStore + .all() + .use { states -> + states + .asSequence() + .filter { (_, value) -> (ansvarStateStore.get(value.periode.periodeId) == null) + .also { result -> + punctuatorLogger.info("Periode ${value.periode.periodeId}, registeret har ansvar: $result") + }} + .forEach { (key, value) -> + val (updatedState, bekreftelseHendelser) = processBekreftelser( + bekreftelseIntervals, + value, + timestamp + ) + punctuatorLogger.info("Wallclocktime: $timestamp") + punctuatorLogger.info("Eksiterende bekreftelser: ${value.bekreftelser} ${if (value.bekreftelser.isEmpty()) ", periode startet: ${value.periode.startet}" else ""}") + punctuatorLogger.info("Oppdaterte bekreftelser: ${updatedState.bekreftelser}") + bekreftelseHendelser.forEach { + ctx.forward(Record(value.periode.recordKey, it, Instant.now().toEpochMilli())) + } + internTilstandStateStore.put(key, updatedState) + } } - } } fun processBekreftelser( @@ -62,7 +79,12 @@ private fun InternTilstand.checkAndCreateNewBekreftelse( ): Pair { val nonEmptyBekreftelser = bekreftelser.toNonEmptyListOrNull() ?: return this to null - return if (nonEmptyBekreftelser.shouldCreateNewBekreftelse(timestamp, bekreftelseIntervals.interval, bekreftelseIntervals.tilgjengeligOffset)) { + return if (nonEmptyBekreftelser.shouldCreateNewBekreftelse( + timestamp, + bekreftelseIntervals.interval, + bekreftelseIntervals.tilgjengeligOffset + ) + ) { val newBekreftelse = nonEmptyBekreftelser.opprettNesteTilgjengeligeBekreftelse( tilgjengeliggjort = timestamp, interval = bekreftelseIntervals.interval, @@ -79,13 +101,21 @@ private fun InternTilstand.handleUpdateBekreftelser( ): Pair { val updatedBekreftelser = bekreftelser.map { bekreftelse -> generateSequence(bekreftelse to null as BekreftelseHendelse?) { (currentBekreftelse, _) -> - getProcessedBekreftelseTilstandAndHendelse(currentBekreftelse, timestamp, bekreftelseIntervals).takeIf { it.second != null } + getProcessedBekreftelseTilstandAndHendelse( + currentBekreftelse, + timestamp, + bekreftelseIntervals + ).takeIf { it.second != null } }.last().first } val hendelse: BekreftelseHendelse? = bekreftelser.flatMap { bekreftelse -> generateSequence(bekreftelse to null as BekreftelseHendelse?) { (currentBekreftelse, _) -> - getProcessedBekreftelseTilstandAndHendelse(currentBekreftelse, timestamp, bekreftelseIntervals).takeIf { it.second != null } + getProcessedBekreftelseTilstandAndHendelse( + currentBekreftelse, + timestamp, + bekreftelseIntervals + ).takeIf { it.second != null } }.mapNotNull { it.second } }.lastOrNull() @@ -107,7 +137,8 @@ private fun InternTilstand.getProcessedBekreftelseTilstandAndHendelse( bekreftelseId = bekreftelse.bekreftelseId, gjelderFra = bekreftelse.gjelderFra, gjelderTil = bekreftelse.gjelderTil, - hendelseTidspunkt = Instant.now()) + hendelseTidspunkt = Instant.now() + ) updatedBekreftelse to hendelse } @@ -118,8 +149,9 @@ private fun InternTilstand.getProcessedBekreftelseTilstandAndHendelse( periodeId = periode.periodeId, arbeidssoekerId = periode.arbeidsoekerId, bekreftelseId = bekreftelse.bekreftelseId, - hendelseTidspunkt = Instant.now(), - leveringsfrist = bekreftelse.gjelderTil) + hendelseTidspunkt = Instant.now(), + leveringsfrist = bekreftelse.gjelderTil + ) updatedBekreftelse to hendelse } @@ -130,11 +162,15 @@ private fun InternTilstand.getProcessedBekreftelseTilstandAndHendelse( periodeId = periode.periodeId, arbeidssoekerId = periode.arbeidsoekerId, bekreftelseId = bekreftelse.bekreftelseId, - hendelseTidspunkt = Instant.now()) + hendelseTidspunkt = Instant.now() + ) updatedBekreftelse to hendelse } - bekreftelse.erSisteVarselOmGjenstaaendeGraceTid(timestamp, bekreftelseIntervals.varselFoerGraceperiodeUtloept) -> { + bekreftelse.erSisteVarselOmGjenstaaendeGraceTid( + timestamp, + bekreftelseIntervals.varselFoerGraceperiodeUtloept + ) -> { val updatedBekreftelse = bekreftelse + GracePeriodeVarselet(timestamp) val hendelse = RegisterGracePeriodeGjenstaaendeTid( hendelseId = UUID.randomUUID(), @@ -142,7 +178,8 @@ private fun InternTilstand.getProcessedBekreftelseTilstandAndHendelse( arbeidssoekerId = periode.arbeidsoekerId, bekreftelseId = bekreftelse.bekreftelseId, gjenstaandeTid = bekreftelse.gjenstaendeGraceperiode(timestamp, bekreftelseIntervals.graceperiode), - hendelseTidspunkt = Instant.now()) + hendelseTidspunkt = Instant.now() + ) updatedBekreftelse to hendelse } diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelseStream.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelseStream.kt index 1fd88436..02c4fdca 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelseStream.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelseStream.kt @@ -31,15 +31,19 @@ fun StreamsBuilder.buildBekreftelseStream(applicationConfig: ApplicationConfig) .genericProcess( name = "meldingMottatt", internStateStoreName, + ansvarStateStoreName, punctuation = Punctuation( punctuationInterval, PunctuationType.WALL_CLOCK_TIME, - ::bekreftelsePunctuator.partially1(internStateStoreName).partially1(applicationConfig.bekreftelseIntervals) + ::bekreftelsePunctuator + .partially1(internStateStoreName) + .partially1(ansvarStateStoreName) + .partially1(applicationConfig.bekreftelseIntervals) ), ) { record -> - val stateStore = getStateStore(internStateStoreName) + val internTilstandStateStore = getStateStore(internStateStoreName) - val gjeldendeTilstand: InternTilstand? = retrieveState(stateStore, record) + val gjeldendeTilstand: InternTilstand? = retrieveState(internTilstandStateStore, record) if (gjeldendeTilstand == null) { Span.current().setStatus(StatusCode.ERROR).addEvent("tilstand is null for record", Attributes.of( @@ -52,9 +56,8 @@ fun StreamsBuilder.buildBekreftelseStream(applicationConfig: ApplicationConfig) if (record.value().bekreftelsesloesning == Bekreftelsesloesning.ARBEIDSSOEKERREGISTERET) { val (nyTilstand, hendelser) = processPawNamespace(record.value(), gjeldendeTilstand) if (nyTilstand != gjeldendeTilstand) { - stateStore.put(gjeldendeTilstand.periode.periodeId, nyTilstand) + internTilstandStateStore.put(gjeldendeTilstand.periode.periodeId, nyTilstand) } - forwardHendelser(record, hendelser, this::forward) } } @@ -67,11 +70,11 @@ fun StreamsBuilder.buildBekreftelseStream(applicationConfig: ApplicationConfig) kind = SpanKind.INTERNAL ) fun retrieveState( - stateStore: StateStore, + internTilstandStateStore: InternTilstandStateStore, record: Record ): InternTilstand? { val periodeId = record.value().periodeId - val state = stateStore[periodeId] + val state = internTilstandStateStore[periodeId] Span.current().setAttribute("periodeId", periodeId.toString()) return state diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/Topology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/Topology.kt index 798a064f..369ad8f0 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/Topology.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/Topology.kt @@ -1,35 +1,31 @@ package no.nav.paw.bekreftelsetjeneste.topology import kotlinx.coroutines.runBlocking +import no.nav.paw.bekreftelsetjeneste.ansvar.Ansvar import no.nav.paw.bekreftelsetjeneste.context.ApplicationContext import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand -import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstandSerde import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient import no.nav.paw.kafkakeygenerator.client.KafkaKeysResponse -import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.Topology -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier import org.apache.kafka.streams.state.KeyValueStore -import org.apache.kafka.streams.state.Stores import java.util.* -typealias StateStore = KeyValueStore +typealias InternTilstandStateStore = KeyValueStore +typealias AnsvarStateStore = KeyValueStore -fun buildTopology( - applicationContext: ApplicationContext, - keyValueStateStoreSupplier: (String) -> KeyValueBytesStoreSupplier -): Topology = StreamsBuilder().apply { - addStateStore( - Stores.keyValueStoreBuilder( - keyValueStateStoreSupplier(applicationContext.applicationConfig.kafkaTopology.internStateStoreName), - Serdes.UUID(), - InternTilstandSerde() - ) - ) +fun StreamsBuilder.buildTopology( + applicationContext: ApplicationContext +): Topology { buildPeriodeStream(applicationContext.applicationConfig, applicationContext.kafkaKeysClient) buildBekreftelseStream(applicationContext.applicationConfig) -}.build() + byggAnsvarsStroem( + registry = applicationContext.prometheusMeterRegistry, + kafkaTopologyConfig = applicationContext.applicationConfig.kafkaTopology, + bekreftelseHendelseSerde = applicationContext.bekreftelseHendelseSerde + ) + return build() +} fun KafkaKeysClient.getIdAndKeyBlocking(identitetsnummer: String): KafkaKeysResponse = runBlocking { getIdAndKey(identitetsnummer) diff --git a/apps/bekreftelse-tjeneste/src/main/resources/local/application_config.toml b/apps/bekreftelse-tjeneste/src/main/resources/local/application_config.toml index 5d729881..798eccc2 100644 --- a/apps/bekreftelse-tjeneste/src/main/resources/local/application_config.toml +++ b/apps/bekreftelse-tjeneste/src/main/resources/local/application_config.toml @@ -6,8 +6,10 @@ tilgjengeligOffset = "P3D" [kafkaTopology] applicationIdSuffix = "v1" internStateStoreName = "intern-tilstand" +ansvarStateStoreName = "ansvar" periodeTopic = "paw.arbeidssokerperioder-v1" bekreftelseTopic = "paw.arbeidssoker-bekreftelse-v1" +ansvarsTopic = "paw.arbeidssoker-bekreftelse-ansvar-v1" bekreftelseHendelseloggTopic = "paw.arbeidssoker-bekreftelse-hendelseslogg-v1" punctuationInterval = "PT5S" diff --git a/apps/bekreftelse-tjeneste/src/main/resources/nais/application_config.toml b/apps/bekreftelse-tjeneste/src/main/resources/nais/application_config.toml index d95897c2..c2547390 100644 --- a/apps/bekreftelse-tjeneste/src/main/resources/nais/application_config.toml +++ b/apps/bekreftelse-tjeneste/src/main/resources/nais/application_config.toml @@ -6,9 +6,11 @@ tilgjengeligOffset = "${BEKREFTELSE_GRACEPERIODE}" [kafkaTopology] applicationIdSuffix = "${KAFKA_STREAMS_ID_SUFFIX}" internStateStoreName = "intern-tilstand" +ansvarStateStoreName = "ansvar" periodeTopic = "${KAFKA_PAW_ARBEIDSSOKERPERIODER_TOPIC}" bekreftelseTopic = "${KAFKA_PAW_ARBEIDSSOKER_BEKREFTELSE_TOPIC}" bekreftelseHendelseloggTopic = "${KAFKA_PAW_ARBEIDSSOKER_BEKREFTELSE_HENDELSELOGG_TOPIC}" +ansvarsTopic = "${KAFKA_PAW_ANSVARS_TOPIC}" punctuationInterval = "${KAFKA_PUNCTUATOR_INTERVAL}" [azureM2M] diff --git a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationTestContext.kt b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationTestContext.kt index 8c15a995..b1c6b361 100644 --- a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationTestContext.kt +++ b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationTestContext.kt @@ -3,6 +3,7 @@ package no.nav.paw.bekreftelsetjeneste import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry import io.confluent.kafka.serializers.KafkaAvroSerializerConfig import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde +import io.micrometer.prometheusmetrics.PrometheusConfig import io.micrometer.prometheusmetrics.PrometheusMeterRegistry import io.mockk.mockk import no.nav.paw.arbeidssokerregisteret.api.v1.Periode @@ -10,9 +11,11 @@ import no.nav.paw.bekreftelse.ansvar.v1.AnsvarEndret import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde import no.nav.paw.bekreftelse.melding.v1.Bekreftelse +import no.nav.paw.bekreftelsetjeneste.ansvar.AnsvarSerde import no.nav.paw.bekreftelsetjeneste.config.APPLICATION_CONFIG_FILE_NAME import no.nav.paw.bekreftelsetjeneste.config.ApplicationConfig import no.nav.paw.bekreftelsetjeneste.context.ApplicationContext +import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstandSerde import no.nav.paw.bekreftelsetjeneste.topology.buildTopology import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration import no.nav.paw.health.repository.HealthIndicatorRepository @@ -20,6 +23,7 @@ import no.nav.paw.kafkakeygenerator.client.inMemoryKafkaKeysMock import org.apache.avro.specific.SpecificRecord import org.apache.kafka.common.serialization.Serde import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.streams.TestInputTopic import org.apache.kafka.streams.TestOutputTopic @@ -39,17 +43,29 @@ class ApplicationTestContext(initialWallClockTime: Instant = Instant.now()) { val kafkaKeysClient = inMemoryKafkaKeysMock() val applicationContext = ApplicationContext( applicationConfig = applicationConfig, - prometheusMeterRegistry = mockk(), + prometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT), healthIndicatorRepository = HealthIndicatorRepository(), kafkaKeysClient = kafkaKeysClient ) val logger: Logger = LoggerFactory.getLogger(ApplicationTestContext::class.java) - val topology = buildTopology( - applicationContext = applicationContext, - keyValueStateStoreSupplier = Stores::inMemoryKeyValueStore - ) + val topology = StreamsBuilder() + .addStateStore( + Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore(applicationContext.applicationConfig.kafkaTopology.internStateStoreName), + Serdes.UUID(), + InternTilstandSerde() + ) + ) + .addStateStore( + Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore(applicationContext.applicationConfig.kafkaTopology.ansvarStateStoreName), + Serdes.UUID(), + AnsvarSerde() + ) + ) + .buildTopology(applicationContext) val testDriver: TopologyTestDriver = TopologyTestDriver(topology, kafkaStreamProperties, initialWallClockTime) @@ -65,6 +81,12 @@ class ApplicationTestContext(initialWallClockTime: Instant = Instant.now()) { bekreftelseSerde.serializer() ) + val ansvarsTopic: TestInputTopic = testDriver.createInputTopic( + applicationConfig.kafkaTopology.ansvarsTopic, + Serdes.Long().serializer(), + ansvarsTopicSerde.serializer() + ) + val bekreftelseHendelseloggTopicOut: TestOutputTopic = testDriver.createOutputTopic( applicationConfig.kafkaTopology.bekreftelseHendelseloggTopic, Serdes.Long().deserializer(), diff --git a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuatorTest.kt b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuatorTest.kt index 1aadd46f..836e796e 100644 --- a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuatorTest.kt +++ b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuatorTest.kt @@ -18,7 +18,7 @@ import no.nav.paw.bekreftelse.melding.v1.vo.BrukerType import no.nav.paw.bekreftelse.melding.v1.vo.Metadata import no.nav.paw.bekreftelse.melding.v1.vo.Svar import no.nav.paw.bekreftelsetjeneste.tilstand.* -import no.nav.paw.bekreftelsetjeneste.topology.StateStore +import no.nav.paw.bekreftelsetjeneste.topology.InternTilstandStateStore import java.time.Duration import java.time.Instant @@ -40,10 +40,10 @@ class BekreftelsePunctuatorTest : FreeSpec({ "Når perioden opprettes skal det opprettes en intern tilstand med en bekreftelse" { bekreftelseHendelseloggTopicOut.isEmpty shouldBe true - val stateStore: StateStore = + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - stateStore.get(periode.id) should { currentState -> + internTilstandStateStore.get(periode.id) should { currentState -> currentState.periode shouldBe PeriodeInfo( periodeId = periode.id, identitetsnummer = periode.identitetsnummer, @@ -68,9 +68,9 @@ class BekreftelsePunctuatorTest : FreeSpec({ "Etter 11 dager skal det ha blitt sendt en BekreftelseTilgjengelig hendelse" { testDriver.advanceWallClockTime(interval.minus(tilgjengeligOffset)) - val stateStore: StateStore = + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - stateStore.all().use { + internTilstandStateStore.all().use { it.forEach { logger.info("key: ${it.key}, value: ${it.value}") } @@ -86,9 +86,9 @@ class BekreftelsePunctuatorTest : FreeSpec({ "Etter 14 dager skal det ha blitt sendt en LeveringsFristUtloept hendelse" { testDriver.advanceWallClockTime(tilgjengeligOffset.plusSeconds(5)) - val stateStore: StateStore = + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - stateStore.all().use { + internTilstandStateStore.all().use { it.forEach { logger.info("key: ${it.key}, value: ${it.value}") } @@ -103,9 +103,9 @@ class BekreftelsePunctuatorTest : FreeSpec({ } "Etter 17,5 dager uten svar skal det ha blitt sendt en RegisterGracePeriodeGjenstaaendeTid hendelse" { testDriver.advanceWallClockTime(varselFoerGraceperiodeUtloept.plusSeconds(5)) - val stateStore: StateStore = + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - stateStore.all().use { + internTilstandStateStore.all().use { it.forEach { logger.info("key: ${it.key}, value: ${it.value}") } @@ -120,9 +120,9 @@ class BekreftelsePunctuatorTest : FreeSpec({ } "Etter 21 dager uten svar skal det ha blitt sendt en RegisterGracePeriodeUtloept hendelse" { testDriver.advanceWallClockTime(varselFoerGraceperiodeUtloept.plusSeconds(5)) - val stateStore: StateStore = + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - stateStore.all().use { + internTilstandStateStore.all().use { it.forEach { logger.info("key: ${it.key}, value: ${it.value}") } @@ -143,9 +143,9 @@ class BekreftelsePunctuatorTest : FreeSpec({ .plus(interval.minus(tilgjengeligOffset)) ) ) - val stateStore: StateStore = + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - stateStore.all().use { + internTilstandStateStore.all().use { it.forEach { logger.info("key: ${it.key}, value: ${it.value}") } @@ -175,9 +175,9 @@ class BekreftelsePunctuatorTest : FreeSpec({ interval.minus(tilgjengeligOffset) .plusSeconds(5) ) - val stateStore: StateStore = + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - val currentState = stateStore.get(periode.id) + val currentState = internTilstandStateStore.get(periode.id) bekreftelseTopic.pipeInput( key, no.nav.paw.bekreftelse.melding.v1.Bekreftelse( periode.id, Bekreftelsesloesning.ARBEIDSSOEKERREGISTERET, currentState.bekreftelser.first().bekreftelseId, Svar( @@ -197,7 +197,7 @@ class BekreftelsePunctuatorTest : FreeSpec({ testDriver.advanceWallClockTime(tilgjengeligOffset.plus(graceperiode)) val hendelseLoggOutput = bekreftelseHendelseloggTopicOut.readKeyValuesToList() logger.info("hendelseOutput: $hendelseLoggOutput") - stateStore.all().use { + internTilstandStateStore.all().use { it.forEach { logger.info("key: ${it.key}, value: ${it.value}") } @@ -223,9 +223,9 @@ class BekreftelsePunctuatorTest : FreeSpec({ interval.minus(tilgjengeligOffset) .plusSeconds(5) ) - val stateStore: StateStore = + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - val currentState = stateStore.get(periode.id) + val currentState = internTilstandStateStore.get(periode.id) bekreftelseTopic.pipeInput( key, no.nav.paw.bekreftelse.melding.v1.Bekreftelse( periode.id, Bekreftelsesloesning.ARBEIDSSOEKERREGISTERET, currentState.bekreftelser.first().bekreftelseId, Svar( @@ -245,7 +245,7 @@ class BekreftelsePunctuatorTest : FreeSpec({ testDriver.advanceWallClockTime(tilgjengeligOffset.plus(graceperiode)) val hendelseLoggOutput = bekreftelseHendelseloggTopicOut.readKeyValuesToList() logger.info("hendelseOutput: $hendelseLoggOutput") - stateStore.all().use { + internTilstandStateStore.all().use { it.forEach { logger.info("key: ${it.key}, value: ${it.value}") } @@ -269,9 +269,9 @@ class BekreftelsePunctuatorTest : FreeSpec({ ) periodeTopic.pipeInput(key, periode) testDriver.advanceWallClockTime(Duration.ofSeconds(5)) - val stateStore: StateStore = + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - val currentState = stateStore.get(periode.id) + val currentState = internTilstandStateStore.get(periode.id) currentState should { it.periode shouldBe PeriodeInfo( periodeId = periode.id, @@ -311,9 +311,9 @@ class BekreftelsePunctuatorTest : FreeSpec({ interval.minus(tilgjengeligOffset) .plusSeconds(5) ) - val stateStore: StateStore = + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - val currentState = stateStore.get(periode.id) + val currentState = internTilstandStateStore.get(periode.id) currentState should { it.periode shouldBe PeriodeInfo( periodeId = periode.id, @@ -361,14 +361,14 @@ class BekreftelsePunctuatorTest : FreeSpec({ ) // Spoler frem til LeveringsfristUtloept testDriver.advanceWallClockTime(tilgjengeligOffset.plusSeconds(5)) - val stateStore: StateStore = + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - stateStore.all().use { + internTilstandStateStore.all().use { it.forEach { logger.info("key: ${it.key}, value: ${it.value}") } } - val currentState = stateStore.get(periode.id) + val currentState = internTilstandStateStore.get(periode.id) currentState should { it.periode shouldBe PeriodeInfo( periodeId = periode.id, @@ -420,9 +420,9 @@ class BekreftelsePunctuatorTest : FreeSpec({ testDriver.advanceWallClockTime(tilgjengeligOffset.plusSeconds(5)) // Spoler frem til RegisterGracePeriodeGjenstaaende testDriver.advanceWallClockTime(varselFoerGraceperiodeUtloept.plusSeconds(5)) - val stateStore: StateStore = + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - val currentState = stateStore.get(periode.id) + val currentState = internTilstandStateStore.get(periode.id) currentState should { it.periode shouldBe PeriodeInfo( periodeId = periode.id, @@ -477,9 +477,9 @@ class BekreftelsePunctuatorTest : FreeSpec({ testDriver.advanceWallClockTime(varselFoerGraceperiodeUtloept.plusSeconds(5)) // Spoler frem til RegisterGracePeriodeUtloept testDriver.advanceWallClockTime(varselFoerGraceperiodeUtloept.plusSeconds(5)) - val stateStore: StateStore = + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - val currentState = stateStore.get(periode.id) + val currentState = internTilstandStateStore.get(periode.id) currentState should { it.periode shouldBe PeriodeInfo( periodeId = periode.id, diff --git a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/PeriodeStreamTest.kt b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/PeriodeStreamTest.kt index e34e03fe..065e1932 100644 --- a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/PeriodeStreamTest.kt +++ b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/PeriodeStreamTest.kt @@ -9,7 +9,7 @@ import no.nav.paw.arbeidssoekerregisteret.testdata.mainavro.periode import no.nav.paw.bekreftelse.internehendelser.PeriodeAvsluttet import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand import no.nav.paw.bekreftelsetjeneste.tilstand.initTilstand -import no.nav.paw.bekreftelsetjeneste.topology.StateStore +import no.nav.paw.bekreftelsetjeneste.topology.InternTilstandStateStore import java.time.Instant class PeriodeStreamTest : FreeSpec({ @@ -22,8 +22,8 @@ class PeriodeStreamTest : FreeSpec({ val (_, key, periode) = periode(identitetsnummer = identitetsnummer, avsluttetMetadata = metadata(tidspunkt = startTime)) periodeTopic.pipeInput(key, periode) - val stateStore: StateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - stateStore.get(periode.id) shouldBe null + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + internTilstandStateStore.get(periode.id) shouldBe null bekreftelseHendelseloggTopicOut.isEmpty shouldBe true } @@ -36,8 +36,8 @@ class PeriodeStreamTest : FreeSpec({ val (id, key, periode) = periode(identitetsnummer = identitetsnummer, startetMetadata = metadata(tidspunkt = startTime)) periodeTopic.pipeInput(key, periode) - val stateStore: StateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - val currentState = stateStore.get(periode.id) + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + val currentState = internTilstandStateStore.get(periode.id) currentState.shouldBeInstanceOf() currentState shouldBe initTilstand(id, key, periode) } @@ -49,14 +49,14 @@ class PeriodeStreamTest : FreeSpec({ with(kafkaKeyContext()) { val (_, key, periode) = periode(identitetsnummer = identitetsnummer, startetMetadata = metadata(tidspunkt = startTime)) periodeTopic.pipeInput(key, periode) - val stateStore: StateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - stateStore.get(periode.id).shouldBeInstanceOf() + internTilstandStateStore.get(periode.id).shouldBeInstanceOf() val (_, _, periode2) = periode(periodeId = periode.id, identitetsnummer = identitetsnummer, avsluttetMetadata = metadata(tidspunkt = startTime)) periodeTopic.pipeInput(key, periode2) - stateStore.get(periode.id) shouldBe null + internTilstandStateStore.get(periode.id) shouldBe null bekreftelseHendelseloggTopicOut.isEmpty shouldBe false val kv = bekreftelseHendelseloggTopicOut.readKeyValue() kv.key shouldBe key @@ -69,12 +69,12 @@ class PeriodeStreamTest : FreeSpec({ with(ApplicationTestContext(initialWallClockTime = startTime)) { with(kafkaKeyContext()) { val (id, key, periode) = periode(identitetsnummer = identitetsnummer, startetMetadata = metadata(tidspunkt = startTime)) - val stateStore: StateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) - stateStore.put(periode.id, initTilstand(id, key, periode)) - val state = stateStore.get(periode.id) + val internTilstandStateStore: InternTilstandStateStore = testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + internTilstandStateStore.put(periode.id, initTilstand(id, key, periode)) + val state = internTilstandStateStore.get(periode.id) periodeTopic.pipeInput(key, periode) - stateStore.get(periode.id) shouldBe state + internTilstandStateStore.get(periode.id) shouldBe state bekreftelseHendelseloggTopicOut.isEmpty shouldBe true } } diff --git a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ansvar/AndreTarAnsvarMenDetSendesAldriInnNoeTest.kt b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ansvar/AndreTarAnsvarMenDetSendesAldriInnNoeTest.kt new file mode 100644 index 00000000..542c256b --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ansvar/AndreTarAnsvarMenDetSendesAldriInnNoeTest.kt @@ -0,0 +1,55 @@ +package no.nav.paw.bekreftelsetjeneste.ansvar + +import io.kotest.core.spec.style.FreeSpec +import io.kotest.matchers.shouldBe +import no.nav.paw.arbeidssoekerregisteret.testdata.bekreftelse.avslutterAnsvar +import no.nav.paw.arbeidssoekerregisteret.testdata.bekreftelse.tarAnsvar +import no.nav.paw.arbeidssoekerregisteret.testdata.kafkaKeyContext +import no.nav.paw.arbeidssoekerregisteret.testdata.mainavro.periode +import no.nav.paw.bekreftelse.internehendelser.AndreHarOvertattAnsvar +import no.nav.paw.bekreftelse.internehendelser.BekreftelseTilgjengelig +import no.nav.paw.bekreftelsetjeneste.ApplicationTestContext +import no.nav.paw.test.assertEvent +import no.nav.paw.test.assertNoMessage + +class AndreTarAnsvarMenDetSendesAldriInnNoeTest: FreeSpec({ + with(ApplicationTestContext()) { + val intervall = applicationConfig.bekreftelseIntervals.interval + val grace = applicationConfig.bekreftelseIntervals.graceperiode + with(kafkaKeyContext()) { + "Applikasjonstest hvor noen tar ansvar etter at perioden er lest, men avslutter ansvar igjen før en eneste" + + " bekreftelse er levert" - { + val (id, key, periode) = periode(identitetsnummer = "12345678902") + periodeTopic.pipeInput(key, periode) + "Når perioden opprettes skal det ikke skje noe" { + bekreftelseHendelseloggTopicOut.assertNoMessage() + } + "Når andre tar ansvar sendes en event" { + val tarAnsvar = tarAnsvar(periodeId = periode.id) + ansvarsTopic.pipeInput(key, tarAnsvar) + logger.info("Tar ansvar: $tarAnsvar") + bekreftelseHendelseloggTopicOut.assertEvent { hedelse: AndreHarOvertattAnsvar -> + hedelse.periodeId shouldBe periode.id + hedelse.arbeidssoekerId shouldBe id + } + } + "Når leveringsfristen utløper når andre har ansvar skjer det ingenting" { + bekreftelseHendelseloggTopicOut.assertNoMessage() + testDriver.advanceWallClockTime(intervall + 1.day) + bekreftelseHendelseloggTopicOut.assertNoMessage() + } + "Når grace perioden utløpet når andre har ansvar skjer det ingenting" { + testDriver.advanceWallClockTime(grace + 1.day) + bekreftelseHendelseloggTopicOut.isEmpty shouldBe true + } + "Når andre avslutter ansvar sendes blie en ny bekreftelse tilgjengelig" { + ansvarsTopic.pipeInput(key, avslutterAnsvar(periodeId = periode.id)) + testDriver.advanceWallClockTime(1.day) + bekreftelseHendelseloggTopicOut.assertEvent { hendelse: BekreftelseTilgjengelig -> + logger.info("Bekreftelse tilgjengelig: $hendelse") + } + } + } + } + } +}) \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/IngenAndreTarAnsvarTest.kt b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ansvar/IngenAndreTarAnsvarTest.kt similarity index 74% rename from apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/IngenAndreTarAnsvarTest.kt rename to apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ansvar/IngenAndreTarAnsvarTest.kt index 91a7e236..07abbb31 100644 --- a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/IngenAndreTarAnsvarTest.kt +++ b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ansvar/IngenAndreTarAnsvarTest.kt @@ -1,6 +1,5 @@ -package no.nav.paw.bekreftelsetjeneste +package no.nav.paw.bekreftelsetjeneste.ansvar -import io.kotest.core.annotation.Ignored import io.kotest.core.spec.style.FreeSpec import io.kotest.matchers.shouldBe import io.kotest.matchers.types.shouldBeInstanceOf @@ -8,21 +7,23 @@ import no.nav.paw.arbeidssoekerregisteret.testdata.kafkaKeyContext import no.nav.paw.arbeidssoekerregisteret.testdata.mainavro.periode import no.nav.paw.bekreftelse.internehendelser.BekreftelseTilgjengelig import no.nav.paw.bekreftelse.internehendelser.LeveringsfristUtloept +import no.nav.paw.bekreftelsetjeneste.ApplicationTestContext import java.time.Duration -@Ignored("Midlertidig disablet av Thomas") class IngenAndreTarAnsvarTest : FreeSpec({ with(ApplicationTestContext()) { + val intervall = applicationConfig.bekreftelseIntervals.interval + val grace = applicationConfig.bekreftelseIntervals.graceperiode with(kafkaKeyContext()) { - "Applikasjons test hvor ingen andre tar ansvar" - { + "Applikasjonstest hvor ingen andre tar ansvar" - { "Bruker avslutter via rapportering" - { val (id, key, periode) = periode(identitetsnummer = "12345678901") periodeTopic.pipeInput(key, periode) - "Nå perioden opprettes skal det ikke skje noe" { + "Når perioden opprettes skal det ikke skje noe" { bekreftelseHendelseloggTopicOut.isEmpty shouldBe true } - "Etter 13 dager skal en rapportering være tilgjengelig" { - testDriver.advanceWallClockTime(Duration.ofDays(13)) + "Etter ${intervall.toDays()} dager skal en rapportering være tilgjengelig" { + testDriver.advanceWallClockTime(intervall) bekreftelseHendelseloggTopicOut.isEmpty shouldBe false val kv = bekreftelseHendelseloggTopicOut.readKeyValue() kv.key shouldBe key @@ -33,7 +34,7 @@ class IngenAndreTarAnsvarTest : FreeSpec({ } } "Når rapporteringen ikke blir besvart innen fristen sendes det ut en melding" { - testDriver.advanceWallClockTime(Duration.ofDays(4)) + testDriver.advanceWallClockTime(grace + 1.day) bekreftelseHendelseloggTopicOut.isEmpty shouldBe false val kv = bekreftelseHendelseloggTopicOut.readKeyValue() kv.key shouldBe key @@ -45,5 +46,8 @@ class IngenAndreTarAnsvarTest : FreeSpec({ } } } + } }) + +val Int.day: Duration get() = Duration.ofDays(this.toLong()) \ No newline at end of file diff --git a/domain/bekreftelse-interne-hendelser/src/main/kotlin/no/nav/paw/bekreftelse/internehendelser/AndreHarOvertattAnsvar.kt b/domain/bekreftelse-interne-hendelser/src/main/kotlin/no/nav/paw/bekreftelse/internehendelser/AndreHarOvertattAnsvar.kt new file mode 100644 index 00000000..f20a18de --- /dev/null +++ b/domain/bekreftelse-interne-hendelser/src/main/kotlin/no/nav/paw/bekreftelse/internehendelser/AndreHarOvertattAnsvar.kt @@ -0,0 +1,15 @@ +package no.nav.paw.bekreftelse.internehendelser + +import java.time.Instant +import java.util.* + +const val andreHarOvertattAnsvarHendelsesType = "bekreftelse.andre_har_overtatt_ansvar" + +class AndreHarOvertattAnsvar( + override val hendelseId: UUID, + override val periodeId: UUID, + override val arbeidssoekerId: Long, + override val hendelseTidspunkt: Instant, +) : BekreftelseHendelse { + override val hendelseType: String = andreHarOvertattAnsvarHendelsesType +} \ No newline at end of file diff --git a/domain/bekreftelse-interne-hendelser/src/main/kotlin/no/nav/paw/bekreftelse/internehendelser/BekreftelseHendelseSerde.kt b/domain/bekreftelse-interne-hendelser/src/main/kotlin/no/nav/paw/bekreftelse/internehendelser/BekreftelseHendelseSerde.kt index 8163b41d..6786c5cf 100644 --- a/domain/bekreftelse-interne-hendelser/src/main/kotlin/no/nav/paw/bekreftelse/internehendelser/BekreftelseHendelseSerde.kt +++ b/domain/bekreftelse-interne-hendelser/src/main/kotlin/no/nav/paw/bekreftelse/internehendelser/BekreftelseHendelseSerde.kt @@ -45,6 +45,7 @@ class BekreftelseHendelseDeserializer: Deserializer { periodeAvsluttetHendelsesType -> objectMapper.readValue(node.traverse()) registerGracePeriodeGjenstaaendeTid -> objectMapper.readValue(node.traverse()) baOmAaAvsluttePeriodeHendelsesType -> objectMapper.readValue(node.traverse()) + andreHarOvertattAnsvarHendelsesType -> objectMapper.readValue(node.traverse()) else -> throw IllegalArgumentException("Ukjent hendelseType: $hendelseType") } } diff --git a/test/kafka-streams-test-functions/src/main/kotlin/no/nav/paw/test/KafkaStreamsTestExtension.kt b/test/kafka-streams-test-functions/src/main/kotlin/no/nav/paw/test/KafkaStreamsTestExtension.kt index c60515c2..210b4c8d 100644 --- a/test/kafka-streams-test-functions/src/main/kotlin/no/nav/paw/test/KafkaStreamsTestExtension.kt +++ b/test/kafka-streams-test-functions/src/main/kotlin/no/nav/paw/test/KafkaStreamsTestExtension.kt @@ -19,4 +19,11 @@ inline fun TestOutputTopic<*, in T>.assertEvent(): T { } } +fun TestOutputTopic<*, *>.assertNoMessage() { + val message = if (isEmpty) null else readValue() + withClue("Expected no message, but found: $message") { + message shouldBe null + } +} + val Int.seconds: Duration get() = Duration.ofSeconds(this.toLong()) \ No newline at end of file diff --git a/test/test-data-lib/build.gradle.kts b/test/test-data-lib/build.gradle.kts index 9466b8b5..f58f54df 100644 --- a/test/test-data-lib/build.gradle.kts +++ b/test/test-data-lib/build.gradle.kts @@ -5,6 +5,7 @@ plugins { dependencies { implementation(project(":domain:bekreftelse-interne-hendelser")) implementation(project(":domain:bekreftelsesmelding-avro-schema")) + implementation(project(":domain:bekreftelsesansvar-avro-schema")) implementation(project(":domain:main-avro-schema")) implementation(project(":lib:kafka-key-generator-client")) diff --git a/test/test-data-lib/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/testdata/bekreftelse/AnsvarMeldingGenerator.kt b/test/test-data-lib/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/testdata/bekreftelse/AnsvarMeldingGenerator.kt new file mode 100644 index 00000000..29311eb1 --- /dev/null +++ b/test/test-data-lib/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/testdata/bekreftelse/AnsvarMeldingGenerator.kt @@ -0,0 +1,37 @@ +package no.nav.paw.arbeidssoekerregisteret.testdata.bekreftelse + +import no.nav.paw.bekreftelse.ansvar.v1.AnsvarEndret +import no.nav.paw.bekreftelse.ansvar.v1.vo.AvslutterAnsvar +import no.nav.paw.bekreftelse.ansvar.v1.vo.Bekreftelsesloesning +import no.nav.paw.bekreftelse.ansvar.v1.vo.TarAnsvar +import java.time.Duration +import java.util.UUID + + +fun tarAnsvar( + periodeId: UUID = UUID.randomUUID(), + bekreftelsesloesning: Bekreftelsesloesning = Bekreftelsesloesning.DAGPENGER, + grace: Duration = Duration.ofDays(14), + interval: Duration = Duration.ofDays(7) +): AnsvarEndret = + AnsvarEndret.newBuilder() + .setHandling( + TarAnsvar + .newBuilder() + .setGraceMS(grace.toMillis()) + .setIntervalMS(interval.toMillis()) + .build() + ) + .setPeriodeId(periodeId) + .setBekreftelsesloesning(bekreftelsesloesning) + .build() + +fun avslutterAnsvar( + periodeId: UUID = UUID.randomUUID(), + bekreftelsesloesning: Bekreftelsesloesning = Bekreftelsesloesning.DAGPENGER +): AnsvarEndret = + AnsvarEndret.newBuilder() + .setHandling(AvslutterAnsvar()) + .setBekreftelsesloesning(bekreftelsesloesning) + .setPeriodeId(periodeId) + .build() \ No newline at end of file