Skip to content

Commit

Permalink
oppsett av bekreftelse-utgang
Browse files Browse the repository at this point in the history
  • Loading branch information
robertkittilsen committed Oct 2, 2024
1 parent 33d16a4 commit f3d5b26
Show file tree
Hide file tree
Showing 20 changed files with 670 additions and 1 deletion.
78 changes: 78 additions & 0 deletions apps/bekreftelse-utgang/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@

plugins {
kotlin("jvm")
id("com.google.cloud.tools.jib")
application
}

val jvmMajorVersion: String by project
val baseImage: String by project
val image: String? by project


dependencies {
// Project
implementation(project(":lib:hoplite-config"))
implementation(project(":lib:error-handling"))
implementation(project(":lib:kafka-streams"))
implementation(project(":lib:kafka-key-generator-client"))
implementation(project(":domain:main-avro-schema"))
implementation(project(":domain:bekreftelse-interne-hendelser"))
implementation(project(":domain:interne-hendelser"))

// Server
implementation(libs.bundles.ktorServerWithNettyAndMicrometer)

// Serialization
implementation(libs.ktorSerializationJackson)
implementation(libs.ktorSerializationJson)
implementation(libs.jacksonDatatypeJsr310)

// Tooling
implementation(libs.arrowCore)

// Logging
implementation(libs.logbackClassic)
implementation(libs.logstashLogbackEncoder)
implementation(libs.log)
implementation(libs.auditLog)

// Instrumentation
implementation(libs.micrometerRegistryPrometheus)
implementation(libs.opentelemetryAnnotations)

// Kafka
implementation(libs.kafkaStreams)
implementation(libs.kafkaStreamsAvroSerde)

// Testing
testImplementation(libs.ktorServerTestsJvm)
testImplementation(libs.streamsTest)
testImplementation(libs.bundles.testLibsWithUnitTesting)
testImplementation(project(":test:test-data-lib"))
}

java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(jvmMajorVersion))
}
}

application {
mainClass.set("no.nav.paw.bekreftelseutgang.ApplicationKt")
}

tasks.withType<Test>().configureEach {
useJUnitPlatform()
}

jib {
from.image = "$baseImage:$jvmMajorVersion"
to.image = "${image ?: project.name}:${project.version}"
container {
environment = mapOf(
"IMAGE_WITH_VERSION" to "${image ?: project.name}:${project.version}"
)
jvmFlags = listOf("-XX:ActiveProcessorCount=4", "-XX:+UseZGC", "-XX:+ZGenerational")
}
}
52 changes: 52 additions & 0 deletions apps/bekreftelse-utgang/nais/nais-dev.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
apiVersion: nais.io/v1alpha1
kind: Application
metadata:
name: paw-arbeidssoekerregisteret-bekreftelse-utgang
namespace: paw
labels:
team: paw
spec:
image: {{ image }}
port: 8080
env:
- name: KAFKA_STREAMS_ID_SUFFIX
value: "v1"
- name: KAFKA_PAW_ARBEIDSSOKER_BEKREFTELSE_HENDELSELOGG_TOPIC
value: "paw.arbeidssoker-bekreftelse-hendelseslogg-beta-v1"
- name: KAFKA_PAW_ARBEIDSSOKER_HENDELSELOGG_TOPIC
value: "paw.arbeidssoker-hendelseslogg-v1"
- name: KAFKA_PUNCTUATOR_INTERVAL
value: "PT1M"
azure:
application:
enabled: true
replicas:
min: 1
max: 1
resources:
limits:
memory: 1024Mi
requests:
cpu: 20m
memory: 256Mi
liveness:
path: /internal/isAlive
initialDelay: 10
readiness:
path: /internal/isReady
initialDelay: 10
prometheus:
enabled: true
path: /internal/metrics
observability:
autoInstrumentation:
enabled: true
runtime: java
kafka:
pool: nav-dev
streams: true
accessPolicy:
outbound:
rules:
- application: paw-kafka-key-generator
namespace: paw
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main.kotlin.no.nav.paw.bekreftelseutgang

import io.ktor.server.application.Application
import io.ktor.server.engine.addShutdownHook
import io.ktor.server.engine.embeddedServer
import io.ktor.server.netty.Netty
import io.ktor.server.routing.routing
import no.nav.paw.bekreftelseutgang.config.APPLICATION_CONFIG_FILE_NAME
import no.nav.paw.bekreftelseutgang.config.ApplicationConfig
import no.nav.paw.bekreftelseutgang.config.SERVER_CONFIG_FILE_NAME
import no.nav.paw.bekreftelseutgang.config.ServerConfig
import no.nav.paw.bekreftelseutgang.context.ApplicationContext
import no.nav.paw.bekreftelseutgang.plugins.buildKafkaStreams
import no.nav.paw.bekreftelseutgang.plugins.configureKafka
import no.nav.paw.bekreftelseutgang.plugins.configureMetrics
import no.nav.paw.bekreftelseutgang.routes.metricsRoutes
import no.nav.paw.bekreftelseutgang.topology.buildTopology
import no.nav.paw.config.env.appNameOrDefaultForLocal
import no.nav.paw.config.env.currentRuntimeEnvironment
import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration
import no.nav.paw.health.route.healthRoutes
import org.slf4j.LoggerFactory

fun main() {
val logger = LoggerFactory.getLogger("no.nav.paw.logger.application")

val serverConfig = loadNaisOrLocalConfiguration<ServerConfig>(SERVER_CONFIG_FILE_NAME)
val applicationConfig = loadNaisOrLocalConfiguration<ApplicationConfig>(APPLICATION_CONFIG_FILE_NAME)

logger.info("Starter: ${currentRuntimeEnvironment.appNameOrDefaultForLocal()}")

with(serverConfig) {
embeddedServer(Netty, port = port) {
module(applicationConfig)
}.apply {
addShutdownHook { stop(gracePeriodMillis, timeoutMillis) }
start(wait = true)
}
}
}

fun Application.module(applicationConfig: ApplicationConfig) {
val applicationContext = ApplicationContext.create(applicationConfig)

val kafkaTopology = buildTopology(applicationContext)
val kafkaStreams = buildKafkaStreams(applicationContext, kafkaTopology)

configureMetrics(applicationContext)
configureKafka(applicationContext, kafkaStreams)

routing {
healthRoutes(applicationContext.healthIndicatorRepository)
metricsRoutes(applicationContext)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package no.nav.paw.bekreftelseutgang.config

import no.nav.paw.config.env.RuntimeEnvironment
import no.nav.paw.config.env.currentRuntimeEnvironment
import no.nav.paw.config.kafka.KafkaConfig
import no.nav.paw.kafkakeygenerator.auth.AzureM2MConfig
import no.nav.paw.kafkakeygenerator.client.KafkaKeyConfig
import java.net.InetAddress
import java.time.Duration

const val APPLICATION_CONFIG_FILE_NAME = "application_config.toml"

data class ApplicationConfig(
val kafkaTopology: KafkaTopologyConfig,
val kafkaStreams: KafkaConfig,
val azureM2M: AzureM2MConfig,
val kafkaKeysClient: KafkaKeyConfig,
// Env
val runtimeEnvironment: RuntimeEnvironment = currentRuntimeEnvironment,
val hostname: String = InetAddress.getLocalHost().hostName
)

data class KafkaTopologyConfig(
val applicationIdSuffix: String,
val internStateStoreName: String,
val hendelseloggTopic: String,
val bekreftelseHendelseloggTopic: String,
val punctuationInterval: Duration,
val shutdownTimeout: Duration = Duration.ofMinutes(5),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package no.nav.paw.bekreftelseutgang.config

const val SERVER_CONFIG_FILE_NAME = "server_config.toml"

data class ServerConfig(
val port: Int,
val callGroupSize: Int,
val workerGroupSize: Int,
val connectionGroupSize: Int,
val gracePeriodMillis: Long,
val timeoutMillis: Long
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package no.nav.paw.bekreftelseutgang.context

import io.micrometer.prometheusmetrics.PrometheusConfig
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.bekreftelseutgang.config.ApplicationConfig
import no.nav.paw.health.repository.HealthIndicatorRepository
import no.nav.paw.kafkakeygenerator.auth.azureAdM2MTokenClient
import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient
import no.nav.paw.kafkakeygenerator.client.kafkaKeysClient

class ApplicationContext(
val applicationConfig: ApplicationConfig,
val prometheusMeterRegistry: PrometheusMeterRegistry,
val healthIndicatorRepository: HealthIndicatorRepository,
val kafkaKeysClient: KafkaKeysClient
) {
companion object {
fun create(applicationConfig: ApplicationConfig): ApplicationContext {
val prometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)

val healthIndicatorRepository = HealthIndicatorRepository()

val azureM2MTokenClient = azureAdM2MTokenClient(
applicationConfig.runtimeEnvironment, applicationConfig.azureM2M
)

val kafkaKeysClient = kafkaKeysClient(applicationConfig.kafkaKeysClient) {
azureM2MTokenClient.createMachineToMachineToken(applicationConfig.kafkaKeysClient.scope)
}

return ApplicationContext(
applicationConfig,
prometheusMeterRegistry,
healthIndicatorRepository,
kafkaKeysClient
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package no.nav.paw.bekreftelseutgang.plugins


import io.ktor.server.application.Application
import io.ktor.server.application.install
import no.nav.paw.bekreftelseutgang.context.ApplicationContext
import org.apache.kafka.streams.KafkaStreams

fun Application.configureKafka(
applicationContext: ApplicationContext,
kafkaStreams: KafkaStreams
) {
install(KafkaStreamsPlugin) {
shutDownTimeout = applicationContext.applicationConfig.kafkaTopology.shutdownTimeout
kafkaStreamsList = listOf(kafkaStreams)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package no.nav.paw.bekreftelseutgang.plugins

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
import io.ktor.server.application.ApplicationPlugin
import io.ktor.server.application.ApplicationStarted
import io.ktor.server.application.ApplicationStopping
import io.ktor.server.application.createApplicationPlugin
import io.ktor.server.application.hooks.MonitoringEvent
import io.ktor.server.application.log
import io.ktor.util.KtorDsl
import no.nav.paw.bekreftelseutgang.context.ApplicationContext
import no.nav.paw.config.kafka.streams.KafkaStreamsFactory
import no.nav.paw.error.handler.withApplicationTerminatingExceptionHandler
import no.nav.paw.health.listener.withHealthIndicatorStateListener
import no.nav.paw.health.model.LivenessHealthIndicator
import no.nav.paw.health.model.ReadinessHealthIndicator
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.Topology
import java.time.Duration

@KtorDsl
class KafkaStreamsPluginConfig {
var shutDownTimeout: Duration? = null
var kafkaStreamsList: List<KafkaStreams>? = null
}

val KafkaStreamsPlugin: ApplicationPlugin<KafkaStreamsPluginConfig> =
createApplicationPlugin("KafkaStreams", ::KafkaStreamsPluginConfig) {
val shutDownTimeout = requireNotNull(pluginConfig.shutDownTimeout) { "ShutDownTimeout er null" }
val kafkaStreamsList = requireNotNull(pluginConfig.kafkaStreamsList) { "KafkaStreams er null" }

on(MonitoringEvent(ApplicationStarted)) { application ->
application.log.info("Starter Kafka Streams")
kafkaStreamsList.forEach { stream -> stream.start() }
}

on(MonitoringEvent(ApplicationStopping)) { application ->
application.log.info("Stopper Kafka Streams")
kafkaStreamsList.forEach { stream -> stream.close(shutDownTimeout) }
}
}

fun buildKafkaStreams(
applicationContext: ApplicationContext,
topology: Topology
): KafkaStreams {
val applicationConfig = applicationContext.applicationConfig
val healthIndicatorRepository = applicationContext.healthIndicatorRepository

val livenessIndicator = healthIndicatorRepository.addLivenessIndicator(LivenessHealthIndicator())
val readinessIndicator = healthIndicatorRepository.addReadinessIndicator(ReadinessHealthIndicator())

val streamsFactory = KafkaStreamsFactory(
applicationConfig.kafkaTopology.applicationIdSuffix,
applicationConfig.kafkaStreams
)
.withDefaultKeySerde(Serdes.Long()::class)
.withDefaultValueSerde(SpecificAvroSerde::class)
.apply { properties["application.server"] = applicationConfig.hostname }

val kafkaStreams = KafkaStreams(
topology,
StreamsConfig(streamsFactory.properties)
)
kafkaStreams.withHealthIndicatorStateListener(livenessIndicator, readinessIndicator)
kafkaStreams.withApplicationTerminatingExceptionHandler()
return kafkaStreams
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package no.nav.paw.bekreftelseutgang.plugins

import io.ktor.server.application.Application
import io.ktor.server.application.install
import io.ktor.server.metrics.micrometer.MicrometerMetrics
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics
import io.micrometer.core.instrument.binder.system.ProcessorMetrics
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig
import no.nav.paw.bekreftelseutgang.context.ApplicationContext
import java.time.Duration

fun Application.configureMetrics(applicationContext: ApplicationContext) {
install(MicrometerMetrics) {
registry = applicationContext.prometheusMeterRegistry
meterBinders = listOf(
JvmMemoryMetrics(),
JvmGcMetrics(),
ProcessorMetrics()
)
distributionStatisticConfig =
DistributionStatisticConfig.builder()
.percentilesHistogram(true)
.maximumExpectedValue(Duration.ofSeconds(1).toNanos().toDouble())
.minimumExpectedValue(Duration.ofMillis(20).toNanos().toDouble())
.serviceLevelObjectives(
Duration.ofMillis(150).toNanos().toDouble(),
Duration.ofMillis(500).toNanos().toDouble()
)
.build()
}
}
Loading

0 comments on commit f3d5b26

Please sign in to comment.