Skip to content

Commit

Permalink
Fikset håndtering av Kafka interactive query i bekreftelse-api
Browse files Browse the repository at this point in the history
  • Loading branch information
naviktthomas committed Oct 2, 2024
1 parent 082b072 commit d5ccb19
Show file tree
Hide file tree
Showing 18 changed files with 148 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ import io.ktor.server.engine.addShutdownHook
import io.ktor.server.engine.embeddedServer
import io.ktor.server.netty.Netty
import io.ktor.server.routing.routing
import no.nav.paw.bekreftelse.api.config.APPLICATION_CONFIG_FILE_NAME
import no.nav.paw.bekreftelse.api.config.ApplicationConfig
import no.nav.paw.bekreftelse.api.config.SERVER_CONFIG_FILE_NAME
import no.nav.paw.bekreftelse.api.config.ServerConfig
import no.nav.paw.bekreftelse.api.context.ApplicationContext
import no.nav.paw.bekreftelse.api.plugins.configureAuthentication
import no.nav.paw.bekreftelse.api.plugins.configureHTTP
Expand All @@ -22,34 +18,30 @@ import no.nav.paw.bekreftelse.api.routes.metricsRoutes
import no.nav.paw.bekreftelse.api.routes.swaggerRoutes
import no.nav.paw.bekreftelse.api.utils.buildApplicationLogger
import no.nav.paw.config.env.appNameOrDefaultForLocal
import no.nav.paw.config.env.currentRuntimeEnvironment
import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration
import no.nav.paw.health.route.healthRoutes

fun main() {
val logger = buildApplicationLogger

val serverConfig = loadNaisOrLocalConfiguration<ServerConfig>(SERVER_CONFIG_FILE_NAME)
val applicationConfig = loadNaisOrLocalConfiguration<ApplicationConfig>(APPLICATION_CONFIG_FILE_NAME)
val applicationContext = ApplicationContext.create()
val appName = applicationContext.serverConfig.runtimeEnvironment.appNameOrDefaultForLocal()

logger.info("Starter: ${currentRuntimeEnvironment.appNameOrDefaultForLocal()}")
logger.info("Starter: $appName")

with(serverConfig) {
with(applicationContext.serverConfig) {
embeddedServer(Netty, port = port) {
module(applicationConfig)
module(applicationContext)
}.apply {
addShutdownHook {
logger.info("Avslutter ${applicationConfig.runtimeEnvironment.appNameOrDefaultForLocal()}")
logger.info("Avslutter $appName")
stop(gracePeriodMillis, timeoutMillis)
}
start(wait = true)
}
}
}

fun Application.module(applicationConfig: ApplicationConfig) {
val applicationContext = ApplicationContext.create(applicationConfig)

fun Application.module(applicationContext: ApplicationContext) {
configureMetrics(applicationContext)
configureHTTP(applicationContext)
configureAuthentication(applicationContext)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package no.nav.paw.bekreftelse.api.config

import no.nav.paw.config.env.RuntimeEnvironment
import no.nav.paw.config.env.currentRuntimeEnvironment
import no.nav.paw.config.kafka.KafkaConfig
import no.nav.paw.kafkakeygenerator.auth.AzureM2MConfig
import no.nav.paw.kafkakeygenerator.client.KafkaKeyConfig
import java.net.InetAddress

const val APPLICATION_CONFIG_FILE_NAME = "application_config.toml"

Expand All @@ -16,10 +13,7 @@ data class ApplicationConfig(
val azureM2M: AzureM2MConfig,
val poaoClientConfig: ServiceClientConfig,
val kafkaKeysClient: KafkaKeyConfig,
val kafkaClients: KafkaConfig,
// Env
val runtimeEnvironment: RuntimeEnvironment = currentRuntimeEnvironment,
val hostname: String = InetAddress.getLocalHost().hostName
val kafkaClients: KafkaConfig
)

data class AutorisasjonConfig(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package no.nav.paw.bekreftelse.api.config

import no.nav.paw.config.env.RuntimeEnvironment
import no.nav.paw.config.env.currentRuntimeEnvironment

const val SERVER_CONFIG_FILE_NAME = "server_config.toml"

data class ServerConfig(
val port: Int,
val host: String,
val callGroupSize: Int,
val workerGroupSize: Int,
val connectionGroupSize: Int,
val gracePeriodMillis: Long,
val timeoutMillis: Long
val timeoutMillis: Long,
val runtimeEnvironment: RuntimeEnvironment = currentRuntimeEnvironment
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
import io.ktor.serialization.jackson.jackson
import io.micrometer.prometheusmetrics.PrometheusConfig
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.bekreftelse.api.config.APPLICATION_CONFIG_FILE_NAME
import no.nav.paw.bekreftelse.api.config.ApplicationConfig
import no.nav.paw.bekreftelse.api.config.SERVER_CONFIG_FILE_NAME
import no.nav.paw.bekreftelse.api.config.ServerConfig
import no.nav.paw.bekreftelse.api.consumer.BekreftelseHttpConsumer
import no.nav.paw.bekreftelse.api.plugins.buildKafkaStreams
import no.nav.paw.bekreftelse.api.producer.BekreftelseKafkaProducer
import no.nav.paw.bekreftelse.api.services.AuthorizationService
import no.nav.paw.bekreftelse.api.services.BekreftelseService
import no.nav.paw.bekreftelse.api.topology.buildBekreftelseTopology
import no.nav.paw.bekreftelse.api.utils.configureJackson
import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration
import no.nav.paw.health.repository.HealthIndicatorRepository
import no.nav.paw.kafkakeygenerator.auth.azureAdM2MTokenClient
import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient
Expand All @@ -22,6 +26,7 @@ import no.nav.poao_tilgang.client.PoaoTilgangHttpClient
import org.apache.kafka.streams.KafkaStreams

data class ApplicationContext(
val serverConfig: ServerConfig,
val applicationConfig: ApplicationConfig,
val kafkaKeysClient: KafkaKeysClient,
val prometheusMeterRegistry: PrometheusMeterRegistry,
Expand All @@ -31,9 +36,12 @@ data class ApplicationContext(
val bekreftelseService: BekreftelseService
) {
companion object {
fun create(applicationConfig: ApplicationConfig): ApplicationContext {
fun create(): ApplicationContext {
val serverConfig = loadNaisOrLocalConfiguration<ServerConfig>(SERVER_CONFIG_FILE_NAME)
val applicationConfig = loadNaisOrLocalConfiguration<ApplicationConfig>(APPLICATION_CONFIG_FILE_NAME)

val azureM2MTokenClient = azureAdM2MTokenClient(
applicationConfig.runtimeEnvironment, applicationConfig.azureM2M
serverConfig.runtimeEnvironment, applicationConfig.azureM2M
)

val kafkaKeysClient = kafkaKeysClient(applicationConfig.kafkaKeysClient) {
Expand All @@ -59,10 +67,16 @@ data class ApplicationContext(
)
)

val authorizationService = AuthorizationService(applicationConfig, kafkaKeysClient, poaoTilgangClient)
val authorizationService = AuthorizationService(
serverConfig,
applicationConfig,
kafkaKeysClient,
poaoTilgangClient
)

val bekreftelseTopology = buildBekreftelseTopology(applicationConfig, prometheusMeterRegistry)
val bekreftelseKafkaStreams = buildKafkaStreams(
serverConfig,
applicationConfig,
healthIndicatorRepository,
bekreftelseTopology
Expand All @@ -80,6 +94,7 @@ data class ApplicationContext(
)

return ApplicationContext(
serverConfig,
applicationConfig,
kafkaKeysClient,
prometheusMeterRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ import io.ktor.http.HttpStatusCode
import no.nav.paw.error.exception.ServerResponseException

class DataIkkeFunnetException(message: String) :
ServerResponseException(HttpStatusCode.BadRequest, "PAW_DATA_IKKE_FUNNET", message, null)
ServerResponseException(HttpStatusCode.NotFound, "PAW_DATA_IKKE_FUNNET", message, null)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package no.nav.paw.bekreftelse.api.exception

import io.ktor.http.HttpStatusCode
import no.nav.paw.error.exception.ServerResponseException

class DataTilhoererIkkeBrukerException(message: String) :
ServerResponseException(HttpStatusCode.BadRequest, "PAW_DATA_TILHOERER_IKKE_BRUKER", message, null)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package no.nav.paw.bekreftelse.api.exception

import io.ktor.http.HttpStatusCode
import no.nav.paw.error.exception.ServerResponseException

class SystemfeilException(message: String) :
ServerResponseException(HttpStatusCode.InternalServerError, "PAW_SYSTEMFEIL", message, null)
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fun Application.configureHTTP(applicationContext: ApplicationContext) {
install(CORS) {
val origins = applicationContext.applicationConfig.autorisasjon.getCorsAllowOrigins()

when (applicationContext.applicationConfig.runtimeEnvironment) {
when (applicationContext.serverConfig.runtimeEnvironment) {
is Nais -> {
origins.forEach { allowHost(it) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.ktor.server.application.hooks.MonitoringEvent
import io.ktor.server.application.log
import io.ktor.util.KtorDsl
import no.nav.paw.bekreftelse.api.config.ApplicationConfig
import no.nav.paw.bekreftelse.api.config.ServerConfig
import no.nav.paw.config.kafka.streams.KafkaStreamsFactory
import no.nav.paw.error.handler.withApplicationTerminatingExceptionHandler
import no.nav.paw.health.listener.withHealthIndicatorStateListener
Expand Down Expand Up @@ -44,6 +45,7 @@ val KafkaStreamsPlugin: ApplicationPlugin<KafkaStreamsPluginConfig> =
}

fun buildKafkaStreams(
serverConfig: ServerConfig,
applicationConfig: ApplicationConfig,
healthIndicatorRepository: HealthIndicatorRepository,
topology: Topology
Expand All @@ -52,12 +54,12 @@ fun buildKafkaStreams(
val readinessIndicator = healthIndicatorRepository.addReadinessIndicator(ReadinessHealthIndicator())

val streamsFactory = KafkaStreamsFactory(
applicationConfig.kafkaTopology.applicationIdSuffix,
applicationConfig.kafkaClients
applicationIdSuffix = applicationConfig.kafkaTopology.applicationIdSuffix,
config = applicationConfig.kafkaClients,
)
.withDefaultKeySerde(Serdes.Long()::class)
.withDefaultValueSerde(SpecificAvroSerde::class)
.apply { properties["application.server"] = applicationConfig.hostname }
.withServerConfig(serverConfig.host, serverConfig.port)

val kafkaStreams = KafkaStreams(
topology,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package no.nav.paw.bekreftelse.api.services

import io.opentelemetry.instrumentation.annotations.WithSpan
import no.nav.paw.bekreftelse.api.config.ApplicationConfig
import no.nav.paw.bekreftelse.api.config.ServerConfig
import no.nav.paw.bekreftelse.api.context.RequestContext
import no.nav.paw.bekreftelse.api.context.SecurityContext
import no.nav.paw.bekreftelse.api.exception.BearerTokenManglerException
Expand All @@ -28,6 +29,7 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory

class AuthorizationService(
private val serverConfig: ServerConfig,
private val applicationConfig: ApplicationConfig,
private val kafkaKeysClient: KafkaKeysClient,
private val poaoTilgangClient: PoaoTilgangClient
Expand Down Expand Up @@ -79,7 +81,7 @@ class AuthorizationService(
} else {
logger.debug("NAV-ansatt har benyttet {}-tilgang til informasjon om bruker", tilgangType)
auditLogger.audit(
applicationConfig.runtimeEnvironment,
serverConfig.runtimeEnvironment,
sluttbruker.identitetsnummer,
navAnsatt,
tilgangType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import no.nav.paw.bekreftelse.api.config.ApplicationConfig
import no.nav.paw.bekreftelse.api.consumer.BekreftelseHttpConsumer
import no.nav.paw.bekreftelse.api.context.SecurityContext
import no.nav.paw.bekreftelse.api.exception.DataIkkeFunnetException
import no.nav.paw.bekreftelse.api.exception.DataTilhoererIkkeBrukerException
import no.nav.paw.bekreftelse.api.exception.SystemfeilException
import no.nav.paw.bekreftelse.api.model.BekreftelseRequest
import no.nav.paw.bekreftelse.api.model.InternState
import no.nav.paw.bekreftelse.api.model.TilgjengeligBekreftelserResponse
Expand Down Expand Up @@ -32,7 +34,7 @@ class BekreftelseService(

private fun getInternStateStore(): ReadOnlyKeyValueStore<Long, InternState> {
if (!kafkaStreams.state().isRunningOrRebalancing) {
throw IllegalStateException("Kafka Streams kjører ikke")
throw SystemfeilException("Kafka Streams kjører ikke")
}
if (internStateStore == null) {
internStateStore = kafkaStreams.store(
Expand All @@ -42,7 +44,7 @@ class BekreftelseService(
)
)
}
return checkNotNull(internStateStore) { "Intern state store er ikke initiert" }
return internStateStore ?: throw SystemfeilException("Intern state store er ikke initiert")
}

@WithSpan
Expand All @@ -59,7 +61,7 @@ class BekreftelseService(
val internState = getInternStateStore().get(securityContext.sluttbruker.arbeidssoekerId)

if (internState != null) {
logger.info("Fant ${internState.tilgjendeligeBekreftelser.size} tilgjengelige bekreftelser")
logger.info("Fant ${internState.tilgjendeligeBekreftelser.size} tilgjengelige bekreftelser i lokal state")
return internState.tilgjendeligeBekreftelser.toResponse()
} else {
return finnTilgjengeligBekreftelserFraAnnenNode(securityContext, request)
Expand All @@ -83,7 +85,10 @@ class BekreftelseService(
val tilgjengeligBekreftelse = internState.tilgjendeligeBekreftelser
.firstOrNull { it.bekreftelseId == request.bekreftelseId }
if (tilgjengeligBekreftelse != null) {
logger.info("Rapportering med id ${request.bekreftelseId} funnet")
logger.info("Mottok svar for bekreftelse som er i lokal state")
if (tilgjengeligBekreftelse.arbeidssoekerId != securityContext.sluttbruker.arbeidssoekerId) {
throw DataTilhoererIkkeBrukerException("Bekreftelse tilhører ikke bruker")
}
val bekreftelse = request.asApi(
periodeId = tilgjengeligBekreftelse.periodeId,
gjelderFra = tilgjengeligBekreftelse.gjelderFra,
Expand All @@ -92,7 +97,7 @@ class BekreftelseService(
)
bekreftelseKafkaProducer.produceMessage(securityContext.sluttbruker.kafkaKey, bekreftelse)
} else {
// TODO Rekreftelse ikke funnet. Hva gjør vi?
throw DataIkkeFunnetException("Fant ingen bekreftelse for gitt id")
}
} else {
sendBekreftelseTilAnnenNode(securityContext, request)
Expand All @@ -105,19 +110,23 @@ class BekreftelseService(
): TilgjengeligBekreftelserResponse {
val metadata = kafkaStreams.queryMetadataForKey(
applicationConfig.kafkaTopology.internStateStoreName,
securityContext.sluttbruker.kafkaKey,
securityContext.sluttbruker.arbeidssoekerId,
Serdes.Long().serializer()
)

if (metadata == null || metadata == KeyQueryMetadata.NOT_AVAILABLE) {
logger.warn("Fant ikke metadata for arbeidsoeker, $metadata")
throw DataIkkeFunnetException("Fant ikke data for arbeidsoeker")
logger.error("Fant ikke metadata for arbeidsoeker, $metadata")
throw SystemfeilException("Fant ikke metadata for arbeidsøker i Kafka Streams")
} else {
return bekreftelseHttpConsumer.finnTilgjengeligBekreftelser(
host = metadata.activeHost().host(),
val hostInfo = metadata.activeHost()
val host = "${hostInfo.host()}:${hostInfo.port()}"
val tilgjendeligeBekreftelser = bekreftelseHttpConsumer.finnTilgjengeligBekreftelser(
host = host,
bearerToken = securityContext.accessToken.jwt,
request = request
)
logger.info("Fant ${tilgjendeligeBekreftelser.size} tilgjengelige bekreftelser på node $host")
return tilgjendeligeBekreftelser
}
}

Expand All @@ -127,16 +136,19 @@ class BekreftelseService(
) {
val metadata = kafkaStreams.queryMetadataForKey(
applicationConfig.kafkaTopology.internStateStoreName,
securityContext.sluttbruker.kafkaKey,
securityContext.sluttbruker.arbeidssoekerId,
Serdes.Long().serializer()
)

if (metadata == null || metadata == KeyQueryMetadata.NOT_AVAILABLE) {
logger.warn("Fant ikke metadata for arbeidsoeker, $metadata")
throw DataIkkeFunnetException("Fant ikke data for arbeidsoeker")
logger.error("Fant ikke metadata for arbeidsoeker, $metadata")
throw SystemfeilException("Fant ikke metadata for arbeidsøker i Kafka Streams")
} else {
val hostInfo = metadata.activeHost()
val host = "${hostInfo.host()}:${hostInfo.port()}"
logger.info("Mottok svar for bekreftelse som er på node $host")
bekreftelseHttpConsumer.sendBekreftelse(
host = metadata.activeHost().host(),
host = "${hostInfo.host()}:${hostInfo.port()}",
bearerToken = securityContext.accessToken.jwt,
request = request
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ private fun StreamsBuilder.addBekreftelseKStream(
applicationConfig: ApplicationConfig,
meterRegistry: MeterRegistry
) {
stream(
applicationConfig.kafkaTopology.bekreftelseHendelsesloggTopic,
Consumed.with(Serdes.Long(), BekreftelseHendelseSerde())
)
.oppdaterBekreftelseHendelseState(applicationConfig.kafkaTopology.internStateStoreName, meterRegistry)
with(applicationConfig.kafkaTopology) {
stream(bekreftelseHendelsesloggTopic, Consumed.with(Serdes.Long(), BekreftelseHendelseSerde()))
.oppdaterBekreftelseHendelseState(internStateStoreName, meterRegistry)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
port = 8080
host = "localhost"
callGroupSize = 16
workerGroupSize = 8
connectionGroupSize = 8
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
port = 8080
host = "${HOSTNAME}"
callGroupSize = 16
workerGroupSize = 8
connectionGroupSize = 8
Expand Down
Loading

0 comments on commit d5ccb19

Please sign in to comment.