Skip to content

Commit

Permalink
Disablet health checks for Kafka consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
naviktthomas committed Dec 3, 2024
1 parent de5a710 commit 929b097
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,14 @@ fun <K, V> kafkaConsumerPlugin(): ApplicationPlugin<KafkaConsumerPluginConfig<K,
consumeFunction(records)
successFunction(records)
} catch (throwable: Throwable) {
kafkaConsumer.unsubscribe()
kafkaConsumer.close(closeTimeout)
shutdownFlag.set(true)
errorFunction(throwable)
}
}
logger.info("Kafka Consumer avsluttet")
consumeJob?.cancel()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class KafkaConsumerService(
fun handleException(throwable: Throwable) {
errorLogger.error("Kafka Consumer avslutter etter feil", throwable)
Span.current().setStatus(StatusCode.ERROR)
livenessIndicator.setUnhealthy()
readinessIndicator.setUnhealthy()
// livenessIndicator.setUnhealthy() TODO: Disabler for å unngå å ta ned appen
// readinessIndicator.setUnhealthy()
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package no.nav.paw.kafkakeymaintenance.pdlprocessor

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.*
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanContext
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.TraceFlags
import io.opentelemetry.api.trace.TraceState
import io.opentelemetry.context.Context
import io.opentelemetry.instrumentation.annotations.WithSpan
import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse
Expand All @@ -13,8 +17,8 @@ import no.nav.paw.kafkakeygenerator.client.LokaleAlias
import no.nav.paw.kafkakeymaintenance.ApplicationContext
import no.nav.paw.kafkakeymaintenance.ErrorOccurred
import no.nav.paw.kafkakeymaintenance.ShutdownSignal
import no.nav.paw.kafkakeymaintenance.kafka.TransactionContext
import no.nav.paw.kafkakeymaintenance.kafka.Topic
import no.nav.paw.kafkakeymaintenance.kafka.TransactionContext
import no.nav.paw.kafkakeymaintenance.kafka.txContext
import no.nav.paw.kafkakeymaintenance.pdlprocessor.functions.HendelseRecord
import no.nav.paw.kafkakeymaintenance.pdlprocessor.lagring.Data
Expand Down Expand Up @@ -94,7 +98,8 @@ class DbReaderTask(
}
.count()
if (batch.isEmpty()) {
applicationContext.logger.info("Ingen meldinger klare for prosessering, venter ${dbReaderContext.aktorConfig.interval}")
val sleepUntil = Instant.now().plus(dbReaderContext.aktorConfig.interval)
applicationContext.logger.info("Ingen meldinger klare for prosessering, venter til $sleepUntil (+ ${dbReaderContext.aktorConfig.interval})")
Thread.sleep(dbReaderContext.aktorConfig.interval.toMillis())
} else {
applicationContext.logger.info("Genererte {} hendelser fra {} meldinger", count, batch.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ fun harAvvik(data: Data): Boolean =
.map { it.arbeidsoekerId }
.distinct().size > 1)
.also { harAvvik ->
avviksDataLogger.debug("Har avvik: {}}, data: {}}", harAvvik, data.debugString())
avviksDataLogger.debug("Har avvik: {}, data: {}", harAvvik, data.debugString())
}

0 comments on commit 929b097

Please sign in to comment.