Skip to content

Commit

Permalink
Fullførte første utkast av kafka-key-maintenance
Browse files Browse the repository at this point in the history
  • Loading branch information
nilsmsa committed Nov 8, 2024
1 parent 049adb7 commit 85a10dd
Show file tree
Hide file tree
Showing 25 changed files with 502 additions and 134 deletions.
4 changes: 4 additions & 0 deletions apps/kafka-key-maintenance/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation(project(":lib:kafka-streams"))
implementation(project(":lib:hoplite-config"))
implementation(project(":lib:kafka-key-generator-client"))
implementation(project(":lib:error-handling"))

implementation(libs.arrow.core.core)
implementation(libs.bundles.ktorServerWithNettyAndMicrometer)
Expand Down Expand Up @@ -49,6 +50,7 @@ dependencies {
implementation(libs.kafka.streams.core)
implementation(libs.avro.core)
implementation(libs.avro.kafkaSerializer)
implementation(libs.avro.kafkaStreamsSerde)
implementation(libs.exposed.core)
implementation(libs.exposed.jdbc)
implementation(libs.exposed.javaTime)
Expand All @@ -65,6 +67,8 @@ dependencies {
testImplementation(libs.test.testContainers.core)
testImplementation(libs.test.testContainers.postgresql)
testImplementation(libs.ktor.server.testJvm)
testImplementation(libs.kafka.streams.test)
testImplementation(project(":test:kafka-streams-test-functions"))
}

java {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package no.nav.paw.kafkakeymaintenance

import io.micrometer.prometheusmetrics.PrometheusConfig
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration
import no.nav.paw.config.kafka.*
import no.nav.paw.health.model.HealthStatus
import no.nav.paw.health.model.LivenessHealthIndicator
import no.nav.paw.health.model.ReadinessHealthIndicator
import no.nav.paw.health.repository.HealthIndicatorRepository
import no.nav.paw.kafkakeygenerator.client.createKafkaKeyGeneratorClient
import no.nav.paw.kafkakeymaintenance.db.DatabaseConfig
import no.nav.paw.kafkakeymaintenance.db.dataSource
import no.nav.paw.kafkakeymaintenance.db.migrateDatabase
import no.nav.paw.kafkakeymaintenance.kafka.txContext
import no.nav.paw.kafkakeymaintenance.pdlprocessor.AktorTopologyConfig
import no.nav.paw.kafkakeymaintenance.pdlprocessor.functions.hentAlias
import no.nav.paw.kafkakeymaintenance.perioder.consume
import no.nav.paw.kafkakeymaintenance.perioder.dbPerioder
import org.jetbrains.exposed.sql.Database
import org.slf4j.LoggerFactory
import java.util.concurrent.CompletableFuture.runAsync
import java.util.concurrent.atomic.AtomicBoolean

fun main() {
val applicationContext = ApplicationContext(
consumerVersion = PERIODE_CONSUMER_GROUP_VERSION,
logger = LoggerFactory.getLogger("app"),
meterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT),
shutdownCalled = AtomicBoolean(false)
)
Runtime.getRuntime().addShutdownHook( Thread { applicationContext.shutdownCalled.set(true) })
val healthIndicatorRepository = HealthIndicatorRepository()
with(loadNaisOrLocalConfiguration<DatabaseConfig>("database_configuration.toml").dataSource()) {
migrateDatabase(this)
Database.connect(this)
}
val periodeSequence = with(KafkaFactory(loadNaisOrLocalConfiguration(KAFKA_CONFIG_WITH_SCHEME_REG))) {
initPeriodeConsumer(
periodeTopic = PERIODE_TOPIC,
applicationContext = applicationContext
)
}
val consumerLivenessHealthIndicator = healthIndicatorRepository.addLivenessIndicator(
LivenessHealthIndicator(HealthStatus.UNHEALTHY)
)
val consumerReadinessHealthIndicator = healthIndicatorRepository.addReadinessIndicator(ReadinessHealthIndicator())
runAsync {
consumerReadinessHealthIndicator.setHealthy()
consumerLivenessHealthIndicator.setHealthy()
periodeSequence.consume(txContext(applicationContext))
}.handle { _, throwable ->
throwable?.also { applicationContext.logger.error("Consumer task failed", throwable) }
applicationContext.shutdownCalled.set(true)
consumerReadinessHealthIndicator.setUnhealthy()
consumerLivenessHealthIndicator.setUnhealthy()
}
initStreams(
aktorTopologyConfig = loadNaisOrLocalConfiguration(AktorTopologyConfig.configFile),
healthIndicatorRepository = healthIndicatorRepository,
perioder = dbPerioder(applicationContext),
hentAlias = createKafkaKeyGeneratorClient()::hentAlias
).start()

initKtor(
healthIndicatorRepository = healthIndicatorRepository,
prometheusMeterRegistry = applicationContext.meterRegistry
).start(wait = true)
applicationContext.shutdownCalled.set(true)
applicationContext.logger.info("Applikasjonen er stoppet")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package no.nav.paw.kafkakeymaintenance

import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.health.repository.HealthIndicatorRepository
import no.nav.paw.health.route.healthRoutes

fun initKtor(
healthIndicatorRepository: HealthIndicatorRepository,
prometheusMeterRegistry: PrometheusMeterRegistry? = null,
vararg additionalRoutes: Route.() -> Unit
) =
embeddedServer(Netty, port = 8080) {
routing {
healthRoutes(healthIndicatorRepository)
prometheusMeterRegistry?.let {
get("/internal/metrics") {
call.respondText(prometheusMeterRegistry.scrape())
}
}
additionalRoutes.forEach { it() }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package no.nav.paw.kafkakeymaintenance

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration
import no.nav.paw.config.kafka.KAFKA_STREAMS_CONFIG_WITH_SCHEME_REG
import no.nav.paw.config.kafka.streams.KafkaStreamsFactory
import no.nav.paw.error.handler.withApplicationTerminatingExceptionHandler
import no.nav.paw.health.listener.withHealthIndicatorStateListener
import no.nav.paw.health.model.LivenessHealthIndicator
import no.nav.paw.health.model.ReadinessHealthIndicator
import no.nav.paw.health.repository.HealthIndicatorRepository
import no.nav.paw.kafkakeygenerator.client.LokaleAlias
import no.nav.paw.kafkakeymaintenance.pdlprocessor.AktorTopologyConfig
import no.nav.paw.kafkakeymaintenance.pdlprocessor.buildAktorTopology
import no.nav.paw.kafkakeymaintenance.perioder.Perioder
import no.nav.person.pdl.aktor.v2.Aktor
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier
import org.apache.kafka.streams.state.Stores

fun initStreams(
aktorTopologyConfig: AktorTopologyConfig,
healthIndicatorRepository: HealthIndicatorRepository,
perioder: Perioder,
hentAlias: (List<String>) -> List<LokaleAlias>
): KafkaStreams {
val kafkaStreamsFactory = KafkaStreamsFactory(
"beta-v1",
loadNaisOrLocalConfiguration(KAFKA_STREAMS_CONFIG_WITH_SCHEME_REG)
).withExactlyOnce()
.withDefaultKeySerde(Serdes.StringSerde::class)
.withDefaultValueSerde(SpecificAvroSerde::class)

val topology = initTopology(
aktorSerde = kafkaStreamsFactory.createSpecificAvroSerde<Aktor>(),
aktorTopologyConfig = aktorTopologyConfig,
perioder = perioder,
hentAlias = hentAlias,
stateStoreBuilderFactory = Stores::persistentTimestampedKeyValueStore
)

val streams = KafkaStreams(topology, kafkaStreamsFactory.properties)
streams
.withHealthIndicatorStateListener(
livenessIndicator = healthIndicatorRepository.addLivenessIndicator(LivenessHealthIndicator()),
readinessIndicator = healthIndicatorRepository.addReadinessIndicator(ReadinessHealthIndicator())
)
streams.withApplicationTerminatingExceptionHandler()
return streams
}

fun initTopology(
stateStoreBuilderFactory: (String) -> KeyValueBytesStoreSupplier,
aktorTopologyConfig: AktorTopologyConfig,
perioder: Perioder,
hentAlias: (List<String>) -> List<LokaleAlias>,
aktorSerde: Serde<Aktor>
): Topology {
val streamsBuilder = StreamsBuilder()
.addStateStore(
Stores.timestampedKeyValueStoreBuilder(
stateStoreBuilderFactory(aktorTopologyConfig.stateStoreName),
Serdes.StringSerde(),
aktorSerde
)
)
streamsBuilder.buildAktorTopology(
aktorSerde = aktorSerde,
aktorTopologyConfig = aktorTopologyConfig,
perioder = perioder,
hentAlias = hentAlias
)
return streamsBuilder.build()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package no.nav.paw.kafkakeymaintenance

import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import no.nav.paw.config.kafka.KafkaFactory
import no.nav.paw.config.kafka.asSequence
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.LongDeserializer
import java.time.Duration

fun KafkaFactory.initPeriodeConsumer(
periodeTopic: String,
applicationContext: ApplicationContext
): Sequence<Iterable<ConsumerRecord<Long, Periode>>> {
val periodeConsumer = createConsumer(
groupId = "kafka-key-maintenance-v${applicationContext.consumerVersion}",
clientId = "kafka-key-maintenance-client-v${applicationContext.consumerVersion}",
keyDeserializer = LongDeserializer::class,
valueDeserializer = PeriodeDeserializer::class,
autoCommit = false,
autoOffsetReset = "earliest"
)
periodeConsumer.subscribe(listOf(periodeTopic))
return periodeConsumer.asSequence(
stop = applicationContext.shutdownCalled,
pollTimeout = Duration.ofMillis(500),
closeTimeout = Duration.ofSeconds(1)
)
}

class PeriodeDeserializer : SpecificAvroDeserializer<Periode>()

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
package no.nav.paw.kafkakeymaintenance

const val ANTALL_PARTISJONER = 6
const val PERIODE_CONSUMER_GROUP_VERSION = 1
const val PERIODE_TOPIC = "paw.arbeidssokerperioder-v1"

This file was deleted.

Loading

0 comments on commit 85a10dd

Please sign in to comment.