Skip to content

Commit

Permalink
La til initiell funksjonalitet for håndtering bekreftelse-ansvar
Browse files Browse the repository at this point in the history
  • Loading branch information
nilsmsa committed Oct 23, 2024
1 parent 2dbbe38 commit b90c011
Show file tree
Hide file tree
Showing 24 changed files with 526 additions and 122 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package no.nav.paw.bekreftelse.api.services

import io.opentelemetry.api.trace.Span
import io.opentelemetry.instrumentation.annotations.WithSpan
import no.nav.paw.bekreftelse.api.config.ApplicationConfig
import no.nav.paw.bekreftelse.api.config.ServerConfig
Expand Down Expand Up @@ -54,7 +55,6 @@ class AuthorizationService(
accessToken = accessToken,
tilgangType = tilgangType
)

return authorize(securityContext)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.ktor.server.engine.addShutdownHook
import io.ktor.server.engine.embeddedServer
import io.ktor.server.netty.Netty
import io.ktor.server.routing.routing
import no.nav.paw.bekreftelsetjeneste.ansvar.AnsvarSerde
import no.nav.paw.bekreftelsetjeneste.config.APPLICATION_CONFIG_FILE_NAME
import no.nav.paw.bekreftelsetjeneste.config.ApplicationConfig
import no.nav.paw.bekreftelsetjeneste.config.SERVER_CONFIG_FILE_NAME
Expand All @@ -14,11 +15,14 @@ import no.nav.paw.bekreftelsetjeneste.plugins.buildKafkaStreams
import no.nav.paw.bekreftelsetjeneste.plugins.configureKafka
import no.nav.paw.bekreftelsetjeneste.plugins.configureMetrics
import no.nav.paw.bekreftelsetjeneste.routes.metricsRoutes
import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstandSerde
import no.nav.paw.bekreftelsetjeneste.topology.buildTopology
import no.nav.paw.config.env.appNameOrDefaultForLocal
import no.nav.paw.config.env.currentRuntimeEnvironment
import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration
import no.nav.paw.health.route.healthRoutes
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.state.Stores
import org.slf4j.LoggerFactory

Expand All @@ -42,11 +46,22 @@ fun main() {

fun Application.module(applicationConfig: ApplicationConfig) {
val applicationContext = ApplicationContext.create(applicationConfig)

val kafkaTopology = buildTopology(
applicationContext = applicationContext,
keyValueStateStoreSupplier = Stores::persistentKeyValueStore
val stream = StreamsBuilder()
stream.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(applicationContext.applicationConfig.kafkaTopology.internStateStoreName),
Serdes.UUID(),
InternTilstandSerde()
)
)
stream.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(applicationContext.applicationConfig.kafkaTopology.ansvarStateStoreName),
Serdes.UUID(),
AnsvarSerde()
)
)
val kafkaTopology = stream.buildTopology(applicationContext)
val kafkaStreams = buildKafkaStreams(applicationContext, kafkaTopology)

configureMetrics(applicationContext)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package no.nav.paw.bekreftelsetjeneste.ansvar

import no.nav.paw.bekreftelse.ansvar.v1.vo.Bekreftelsesloesning
import java.time.Duration
import java.util.*

Expand All @@ -8,6 +9,26 @@ data class Ansvar(
val ansvarlige: List<Ansvarlig>
)

data class Ansvarlig(
val loesning: Loesning,
val intervall: Duration,
val gracePeriode: Duration
)

enum class Loesning {
UKJENT_VERDI,
ARBEIDSSOEKERREGISTERET,
DAGPENGER;

companion object {
fun from(value: Bekreftelsesloesning): Loesning = when (value) {
Bekreftelsesloesning.UKJENT_VERDI -> UKJENT_VERDI
Bekreftelsesloesning.ARBEIDSSOEKERREGISTERET -> ARBEIDSSOEKERREGISTERET
Bekreftelsesloesning.DAGPENGER -> DAGPENGER
}
}
}

fun ansvar(
periodeId: UUID,
ansvarlig: Ansvarlig? = null
Expand All @@ -18,19 +39,11 @@ fun ansvar(

operator fun Ansvar.plus(ansvarlig: Ansvarlig): Ansvar =
copy(ansvarlige = ansvarlige
.filterNot { it.namespace == ansvarlig.namespace} + ansvarlig
.filterNot { it.loesning == ansvarlig.loesning} + ansvarlig
)

operator fun Ansvar.minus(namespace: String): Ansvar? =
ansvarlige.filterNot { it.namespace == namespace }
.takeIf(List<Ansvarlig>::isNotEmpty)
operator fun Ansvar?.minus(loesning: Loesning): Ansvar? =
this?.ansvarlige
?.filterNot { it.loesning == loesning }
?.takeIf(List<Ansvarlig>::isNotEmpty)
?.let { copy(ansvarlige = it) }



data class Ansvarlig(
val namespace: String,
val id: String,
val intervall: Duration,
val gracePeriode: Duration
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package no.nav.paw.bekreftelsetjeneste.ansvar

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.readValue
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.common.serialization.Serializer

private val ansvarObjectMapper = ObjectMapper()
.registerKotlinModule()
.registerModules(JavaTimeModule())

class AnsvarSerde: Serde<Ansvar> {
private val ansvarSerializer = AnsvarSerializer()
private val ansvarDeserializer = AnsvarDeserializer()

override fun serializer(): Serializer<Ansvar> = ansvarSerializer

override fun deserializer(): Deserializer<Ansvar> = ansvarDeserializer
}

class AnsvarSerializer: Serializer<Ansvar> {
override fun serialize(topic: String?, data: Ansvar): ByteArray {
return ansvarObjectMapper.writeValueAsBytes(data)
}
}

class AnsvarDeserializer: Deserializer<Ansvar> {
override fun deserialize(topic: String?, data: ByteArray): Ansvar {
return ansvarObjectMapper.readValue(data)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package no.nav.paw.bekreftelsetjeneste.ansvar

import no.nav.paw.bekreftelse.ansvar.v1.AnsvarEndret
import no.nav.paw.bekreftelse.ansvar.v1.vo.AvslutterAnsvar
import no.nav.paw.bekreftelse.ansvar.v1.vo.TarAnsvar
import no.nav.paw.bekreftelse.internehendelser.AndreHarOvertattAnsvar
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand
import java.time.Duration
import java.time.Instant
import java.util.*

fun haandterAnsvarEndret(
tilstand: InternTilstand?,
ansvar: Ansvar?,
ansvarEndret: AnsvarEndret
): List<Handling> {
return when (val handling = ansvarEndret.handling) {
is TarAnsvar -> tarAnsvar(
tilstand = tilstand,
ansvar = ansvar,
ansvarEndret = ansvarEndret,
handling = handling
)
is AvslutterAnsvar -> avslutterAnsvar(
ansvar = ansvar,
ansvarEndret = ansvarEndret
)
else -> emptyList()
}
}

fun avslutterAnsvar(
ansvar: Ansvar?,
ansvarEndret: AnsvarEndret
): List<Handling> {
val oppdatertAnsvar = ansvar - Loesning.from(ansvarEndret.bekreftelsesloesning)
val ansvarsHandling = when {
ansvar != null && oppdatertAnsvar == null -> SlettAnsvar(ansvarEndret.periodeId)
ansvar != null && oppdatertAnsvar != null -> SkrivAnsvar(ansvarEndret.periodeId, oppdatertAnsvar)
else -> null
}
return listOfNotNull(ansvarsHandling)
}

fun tarAnsvar(
tilstand: InternTilstand?,
ansvar: Ansvar?,
ansvarEndret: AnsvarEndret,
handling: TarAnsvar
): List<Handling> {
val oppdatertAnsvar =
(ansvar ?: ansvar(ansvarEndret.periodeId)) +
Ansvarlig(
loesning = Loesning.from(ansvarEndret.bekreftelsesloesning),
intervall = Duration.ofMillis(handling.intervalMS),
gracePeriode = Duration.ofMillis(handling.graceMS)
)
val hendelse = tilstand?.let {
AndreHarOvertattAnsvar(
hendelseId = UUID.randomUUID(),
periodeId = ansvarEndret.periodeId,
arbeidssoekerId = tilstand.periode.arbeidsoekerId,
hendelseTidspunkt = Instant.now(),
)
}
return listOfNotNull(
if (ansvar != oppdatertAnsvar) SkrivAnsvar(ansvarEndret.periodeId, oppdatertAnsvar) else null,
hendelse?.let(::SendHendelse)
)
}


sealed interface Handling
data class SlettAnsvar(val id: UUID) : Handling
data class SkrivAnsvar(val id: UUID, val value: Ansvar) : Handling
data class SendHendelse(val hendelse: BekreftelseHendelse) : Handling
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ data class BekreftelseIntervals(
data class KafkaTopologyConfig(
val applicationIdSuffix: String,
val internStateStoreName: String,
val ansvarStateStoreName: String,
val periodeTopic: String,
val bekreftelseTopic: String,
val bekreftelseHendelseloggTopic: String,
val ansvarsTopic: String,
val punctuationInterval: Duration,
val shutdownTimeout: Duration = Duration.ofMinutes(5),
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package no.nav.paw.bekreftelsetjeneste.context

import io.micrometer.prometheusmetrics.PrometheusConfig
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde
import no.nav.paw.bekreftelsetjeneste.config.ApplicationConfig
import no.nav.paw.health.repository.HealthIndicatorRepository
import no.nav.paw.kafkakeygenerator.auth.azureAdM2MTokenClient
Expand All @@ -14,6 +15,8 @@ class ApplicationContext(
val healthIndicatorRepository: HealthIndicatorRepository,
val kafkaKeysClient: KafkaKeysClient
) {
val bekreftelseHendelseSerde = BekreftelseHendelseSerde()

companion object {
fun create(applicationConfig: ApplicationConfig): ApplicationContext {
val prometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ fun opprettFoersteBekreftelse(
periode: PeriodeInfo,
interval: Duration,
currentTime: Instant
): Bekreftelse =
Bekreftelse(
): Bekreftelse {
val start = lastOf(periode.startet, currentTime - interval)
return Bekreftelse(
BekreftelseTilstandsLogg(IkkeKlarForUtfylling(periode.startet), emptyList()),
bekreftelseId = UUID.randomUUID(),
gjelderFra = periode.startet,
gjelderTil = fristForNesteBekreftelse(periode.startet, interval, currentTime)
gjelderTil = fristForNesteBekreftelse(start, interval, periode.startet+interval)
)
}

fun lastOf(a: Instant, b: Instant): Instant = if (a.isAfter(b)) a else b


fun NonEmptyList<Bekreftelse>.opprettNesteTilgjengeligeBekreftelse(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package no.nav.paw.bekreftelsetjeneste.topology

import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.Tags
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.bekreftelse.ansvar.v1.AnsvarEndret
import no.nav.paw.bekreftelse.ansvar.v1.vo.AvslutterAnsvar
import no.nav.paw.bekreftelse.ansvar.v1.vo.TarAnsvar
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde
import no.nav.paw.bekreftelsetjeneste.ansvar.*
import no.nav.paw.bekreftelsetjeneste.config.KafkaTopologyConfig
import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand
import no.nav.paw.config.kafka.streams.mapNonNull
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.state.KeyValueStore
import java.util.*

fun StreamsBuilder.byggAnsvarsStroem(
registry: PrometheusMeterRegistry,
kafkaTopologyConfig: KafkaTopologyConfig,
bekreftelseHendelseSerde: BekreftelseHendelseSerde
) {
stream<Long, AnsvarEndret>(kafkaTopologyConfig.ansvarsTopic)
.peek { _, message -> count(registry, message) }
.mapNonNull(
name = "endre_ansvar",
kafkaTopologyConfig.ansvarStateStoreName,
kafkaTopologyConfig.internStateStoreName
) { message ->
val internStateStore =
getStateStore<KeyValueStore<UUID, InternTilstand>>(kafkaTopologyConfig.internStateStoreName)
val ansvarStateStore = getStateStore<KeyValueStore<UUID, Ansvar>>(kafkaTopologyConfig.ansvarStateStoreName)
val internTilstand = internStateStore[message.periodeId]
val ansvar = ansvarStateStore[message.periodeId]
haandterAnsvarEndret(
tilstand = internTilstand,
ansvar = ansvar,
ansvarEndret = message
).map { handling ->
when (handling) {
is SendHendelse -> handling.hendelse
is SkrivAnsvar -> ansvarStateStore.put(handling.id, handling.value)
is SlettAnsvar -> ansvarStateStore.delete(handling.id)
}
}.filterIsInstance<BekreftelseHendelse>()
}
.flatMapValues { _, value -> value }
.to(
kafkaTopologyConfig.bekreftelseHendelseloggTopic,
Produced.with(Serdes.Long(), bekreftelseHendelseSerde)
)
}

fun count(
registry: PrometheusMeterRegistry,
message: AnsvarEndret
) {
val action = when (message.handling) {
is TarAnsvar -> "tar_ansvar"
is AvslutterAnsvar -> "avslutter_ansvar"
else -> "ukjent"
}
val bekreftelsesloesning = message.bekreftelsesloesning
registry.counter(
"paw_bekreftelse_ansvar_endret",
Tags.of(
Tag.of("bekreftelsesloesing", bekreftelsesloesning.name),
Tag.of("handling", action)
)
)
}
Loading

0 comments on commit b90c011

Please sign in to comment.