Skip to content

Commit

Permalink
Erstattet kafka-streams med egen consumer->db->prosessor løsning
Browse files Browse the repository at this point in the history
  • Loading branch information
nilsmsa committed Nov 15, 2024
1 parent a854132 commit 8e1d85d
Show file tree
Hide file tree
Showing 32 changed files with 713 additions and 430 deletions.
4 changes: 2 additions & 2 deletions apps/kafka-key-maintenance/nais/nais-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ spec:
image: {{ image }}
port: 8080
replicas:
min: 1
max: 1
min: 0
max: 0
resources:
limits:
memory: 4096Mi
Expand Down
Original file line number Diff line number Diff line change
@@ -1,71 +1,109 @@
package no.nav.paw.kafkakeymaintenance

import arrow.core.partially1
import io.micrometer.prometheusmetrics.PrometheusConfig
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseSerializer
import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration
import no.nav.paw.config.kafka.*
import no.nav.paw.health.model.HealthStatus
import no.nav.paw.health.model.LivenessHealthIndicator
import no.nav.paw.health.model.ReadinessHealthIndicator
import no.nav.paw.config.kafka.KAFKA_CONFIG_WITH_SCHEME_REG
import no.nav.paw.config.kafka.KafkaFactory
import no.nav.paw.health.repository.HealthIndicatorRepository
import no.nav.paw.kafkakeygenerator.client.createKafkaKeyGeneratorClient
import no.nav.paw.kafkakeymaintenance.db.DatabaseConfig
import no.nav.paw.kafkakeymaintenance.db.dataSource
import no.nav.paw.kafkakeymaintenance.db.migrateDatabase
import no.nav.paw.kafkakeymaintenance.kafka.Topic
import no.nav.paw.kafkakeymaintenance.kafka.initHwm
import no.nav.paw.kafkakeymaintenance.kafka.txContext
import no.nav.paw.kafkakeymaintenance.pdlprocessor.AktorTopologyConfig
import no.nav.paw.kafkakeymaintenance.kafka.topic
import no.nav.paw.kafkakeymaintenance.pdlprocessor.AktorConfig
import no.nav.paw.kafkakeymaintenance.pdlprocessor.DbReaderContext
import no.nav.paw.kafkakeymaintenance.pdlprocessor.DbReaderTask
import no.nav.paw.kafkakeymaintenance.pdlprocessor.functions.hentAlias
import no.nav.paw.kafkakeymaintenance.perioder.consume
import no.nav.paw.kafkakeymaintenance.pdlprocessor.sendSync
import no.nav.paw.kafkakeymaintenance.perioder.dbPerioder
import org.apache.kafka.common.serialization.LongSerializer
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.transactions.transaction
import org.slf4j.LoggerFactory
import java.util.concurrent.CompletableFuture.runAsync
import java.time.Duration
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.system.exitProcess

fun main() {
val applicationContext = ApplicationContext(
consumerVersion = PERIODE_CONSUMER_GROUP_VERSION,
periodeConsumerVersion = PERIODE_CONSUMER_GROUP_VERSION,
aktorConsumerVersion = AKTOR_CONSUMER_GROUP_VERSION,
logger = LoggerFactory.getLogger("app"),
meterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT),
shutdownCalled = AtomicBoolean(false)
)
Runtime.getRuntime().addShutdownHook( Thread { applicationContext.shutdownCalled.set(true) })
Runtime.getRuntime().addShutdownHook(Thread { applicationContext.eventOccured(ShutdownSignal("Shutdown hook")) })
val healthIndicatorRepository = HealthIndicatorRepository()
with(loadNaisOrLocalConfiguration<DatabaseConfig>("database_configuration.toml").dataSource()) {
migrateDatabase(this)
Database.connect(this)
}
val (hwmRebalacingListener, periodeSequence) = with(KafkaFactory(loadNaisOrLocalConfiguration(KAFKA_CONFIG_WITH_SCHEME_REG))) {
initPeriodeConsumer(
periodeTopic = PERIODE_TOPIC,
applicationContext = applicationContext
)
}
val consumerLivenessHealthIndicator = healthIndicatorRepository.addLivenessIndicator(
LivenessHealthIndicator(HealthStatus.UNHEALTHY)
val kafkaFactory = KafkaFactory(loadNaisOrLocalConfiguration(KAFKA_CONFIG_WITH_SCHEME_REG))
val periodeConsumer = kafkaFactory.initPeriodeConsumer(
healthIndicatorRepository = healthIndicatorRepository,
periodeTopic = PERIODE_TOPIC,
applicationContext = applicationContext
)
val consumerReadinessHealthIndicator = healthIndicatorRepository.addReadinessIndicator(ReadinessHealthIndicator())
transaction {
txContext(applicationContext)().initHwm(Topic(PERIODE_TOPIC), 6)
}
runAsync {
consumerReadinessHealthIndicator.setHealthy()
consumerLivenessHealthIndicator.setHealthy()
periodeSequence.consume(txContext(applicationContext))
}.handle { _, throwable ->
throwable?.also { applicationContext.logger.error("Consumer task failed", throwable) }
applicationContext.shutdownCalled.set(true)
consumerReadinessHealthIndicator.setUnhealthy()
consumerLivenessHealthIndicator.setUnhealthy()
}
applicationContext.logger.info("Applikasjonen er startet, consumer: {}", hwmRebalacingListener.currentlyAssignedPartitions)
val aktorConsumer = kafkaFactory.initAktorConsumer(
healthIndicatorRepository = healthIndicatorRepository,
aktorTopic = AKTOR_TOPIC,
applicationContext = applicationContext
)
val producer = kafkaFactory.createProducer(
clientId = "key-maintenance-producer",
keySerializer = LongSerializer::class,
valueSerializer = HendelseSerializer::class
)
val executor = ThreadPoolExecutor(4, 4, 10L, TimeUnit.SECONDS, LinkedBlockingQueue())
val periodeTask = periodeConsumer.run(executor)
val aktorTask = aktorConsumer.run(executor)
val aktorConfig = loadNaisOrLocalConfiguration<AktorConfig>(AktorConfig.configFile)
val antallHendelsePartisjoner = producer.partitionsFor(aktorConfig.hendelseloggTopic).size
val kafkaKeysClient = createKafkaKeyGeneratorClient()
val dbReaderTask = DbReaderTask(
healthIndicatorRepository = healthIndicatorRepository,
applicationContext = applicationContext,
dbReaderContext = DbReaderContext(
aktorConfig = aktorConfig,
receiver = producer::sendSync.partially1(topic(aktorConfig.aktorTopic)),
perioder = dbPerioder(applicationContext),
hentAlias = kafkaKeysClient::hentAlias.partially1(antallHendelsePartisjoner),
aktorDeSerializer = kafkaFactory.kafkaAvroDeSerializer()
)
).run(executor)

applicationContext.logger.info("Applikasjonen er startet")
initKtor(
healthIndicatorRepository = healthIndicatorRepository,
prometheusMeterRegistry = applicationContext.meterRegistry
).start(wait = true)
applicationContext.shutdownCalled.set(true)
applicationContext.logger.info("Applikasjonen er stoppet")
).start(wait = false)
awaitShutdownSignalOrError(applicationContext)
}

fun awaitShutdownSignalOrError(applicationContext: ApplicationContext) {
while (!applicationContext.shutdownCalled.get() && !Thread.currentThread().isInterrupted) {
applicationContext.pollMessage(Duration.ofSeconds(2)).let {
when (it) {
is ErrorOccurred -> {
applicationContext.shutdownCalled.set(true)
Thread.sleep(Duration.ofSeconds(2))
applicationContext.logger.error("Error occurred", it.throwable)
exitProcess(1)
}

is ShutdownSignal -> {
applicationContext.shutdownCalled.set(true)
applicationContext.logger.info("Shutdown signal received from ${it.source}")
exitProcess(0)
}

is Noop -> {}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,42 @@
package no.nav.paw.kafkakeymaintenance

import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.kafkakeymaintenance.kafka.TransactionContext
import org.jetbrains.exposed.sql.Transaction
import org.slf4j.Logger
import java.time.Duration
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

@JvmRecord
data class ApplicationContext(
val consumerVersion: Int,
val periodeConsumerVersion: Int,
val aktorConsumerVersion: Int,
val logger: Logger,
val meterRegistry: PrometheusMeterRegistry,
val shutdownCalled: AtomicBoolean = AtomicBoolean(false),
)
) {
private val messages: BlockingQueue<Message> = LinkedBlockingQueue()

fun eventOccured(message: Message) {
this.messages.add(message)
}

fun pollMessage(timeout: Duration = Duration.ofSeconds(1)): Message =
messages.poll(timeout.toMillis(), TimeUnit.MILLISECONDS) ?: Noop

}

val ApplicationContext.periodeTxContext: Transaction.() -> TransactionContext get() = {
TransactionContext(periodeConsumerVersion, this)
}

val ApplicationContext.aktorTxContext: Transaction.() -> TransactionContext get() = {
TransactionContext(aktorConsumerVersion, this)
}

sealed interface Message
data object Noop: Message
data class ErrorOccurred(val throwable: Throwable): Message
data class ShutdownSignal(val source: String): Message
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package no.nav.paw.kafkakeymaintenance

import no.nav.paw.config.kafka.KafkaFactory
import no.nav.paw.health.repository.HealthIndicatorRepository
import no.nav.paw.kafkakeymaintenance.kafka.*
import no.nav.paw.kafkakeymaintenance.pdlprocessor.lagring.lagreAktorMelding
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.jetbrains.exposed.sql.transactions.transaction
import java.time.Duration

fun KafkaFactory.initAktorConsumer(
healthIndicatorRepository: HealthIndicatorRepository,
aktorTopic: Topic,
applicationContext: ApplicationContext
): HwmConsumer<String, ByteArray> {

val aktorConsumer = createConsumer(
groupId = "kafka-key-maintenance-aktor-v${applicationContext.periodeConsumerVersion}",
clientId = "kafka-key-maintenance-aktor-client-v${applicationContext.periodeConsumerVersion}",
keyDeserializer = StringDeserializer::class,
valueDeserializer = ByteArrayDeserializer::class,
autoCommit = false,
autoOffsetReset = "earliest",
maxPollrecords = 1000
)
val reblancingListener = HwmRebalanceListener(
contextFactory = applicationContext.aktorTxContext,
context = applicationContext,
consumer = aktorConsumer
)
transaction {
txContext(applicationContext.aktorConsumerVersion)().initHwm(
aktorTopic,
aktorConsumer.partitionsFor(aktorTopic.value).count()
)
}
aktorConsumer.subscribe(listOf(aktorTopic.value), reblancingListener)
return HwmConsumer(
name = "${aktorTopic}-consumer",
healthIndicatorRepository = healthIndicatorRepository,
applicationContext = applicationContext,
contextFactory = { tx -> txContext(aktorConsumerVersion)(tx) },
consumer = aktorConsumer,
function = lagreAktorMelding,
pollTimeout = Duration.ofMillis(1000)
)
}

This file was deleted.

Loading

0 comments on commit 8e1d85d

Please sign in to comment.