From 38952544508153a0107b3fdc8eab7408accd0904 Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Wed, 10 Jul 2024 13:31:44 +0200 Subject: [PATCH] Lagt til store og punctuator i opplysninger aggregering --- .../nais/nais-dev.yaml | 4 + .../nais/nais-prod.yaml | 8 +- .../arbeidssoekerregisteret/Application.kt | 18 +-- .../config/KafkaSerialization.kt | 12 ++ .../arbeidssoekerregisteret/config/Metrics.kt | 30 ++++- .../context/ApplicationContext.kt | 4 +- .../plugins/kafka/KafkaStreamsPlugin.kt | 4 +- .../ApplicationProperties.kt} | 11 +- .../ServerProperties.kt} | 4 +- .../topology/OpplysningerTopology.kt | 104 ++++++++++++++++- .../local/application_configuration.toml | 3 + .../nais/application_configuration.toml | 3 + .../config/AppConfigTest.kt | 4 +- .../config/kafka/streams/GenericProcessor.kt | 109 ++++++++++++++++++ 14 files changed, 291 insertions(+), 27 deletions(-) rename apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/{config/AppConfig.kt => properties/ApplicationProperties.kt} (72%) rename apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/{config/ServerConfig.kt => properties/ServerProperties.kt} (74%) create mode 100644 lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/GenericProcessor.kt diff --git a/apps/opplysninger-aggregering/nais/nais-dev.yaml b/apps/opplysninger-aggregering/nais/nais-dev.yaml index 70e0ba0d..446ee8d5 100644 --- a/apps/opplysninger-aggregering/nais/nais-dev.yaml +++ b/apps/opplysninger-aggregering/nais/nais-dev.yaml @@ -13,6 +13,10 @@ spec: value: "opplysninger-stream-v1" - name: KAFKA_PAW_OPPLYSNINGER_OM_ARBEIDSSOEKER_TOPIC value: "paw.opplysninger-om-arbeidssoeker-v1" + - name: OPPLYSNINGER_PUNCTUATOR_SCHEDULE + value: "PT10M" + - name: OPPLYSNINGER_LAGRET_TIDSPERIODE + value: "PT1H" resources: limits: memory: 1024Mi diff --git a/apps/opplysninger-aggregering/nais/nais-prod.yaml b/apps/opplysninger-aggregering/nais/nais-prod.yaml index b610a081..dd33cffb 100644 --- a/apps/opplysninger-aggregering/nais/nais-prod.yaml +++ b/apps/opplysninger-aggregering/nais/nais-prod.yaml @@ -13,12 +13,16 @@ spec: value: "opplysninger-stream-v1" - name: KAFKA_PAW_OPPLYSNINGER_OM_ARBEIDSSOEKER_TOPIC value: "paw.opplysninger-om-arbeidssoeker-v1" + - name: OPPLYSNINGER_PUNCTUATOR_SCHEDULE + value: "PT10M" + - name: OPPLYSNINGER_LAGRET_TIDSPERIODE + value: "PT1H" resources: limits: - memory: 3072Mi + memory: 2048Mi requests: cpu: 250m - memory: 2048Mi + memory: 512Mi replicas: min: 2 max: 4 diff --git a/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/Application.kt b/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/Application.kt index d6b35fe7..a60160e8 100644 --- a/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/Application.kt +++ b/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/Application.kt @@ -6,12 +6,12 @@ import io.ktor.server.engine.embeddedServer import io.ktor.server.netty.Netty import io.micrometer.prometheusmetrics.PrometheusConfig import io.micrometer.prometheusmetrics.PrometheusMeterRegistry -import no.nav.paw.arbeidssoekerregisteret.config.APPLICATION_CONFIG_FILE_NAME -import no.nav.paw.arbeidssoekerregisteret.config.APPLICATION_LOGGER_NAME -import no.nav.paw.arbeidssoekerregisteret.config.AppConfig -import no.nav.paw.arbeidssoekerregisteret.config.SERVER_CONFIG_FILE_NAME -import no.nav.paw.arbeidssoekerregisteret.config.SERVER_LOGGER_NAME -import no.nav.paw.arbeidssoekerregisteret.config.ServerConfig +import no.nav.paw.arbeidssoekerregisteret.properties.APPLICATION_CONFIG_FILE_NAME +import no.nav.paw.arbeidssoekerregisteret.properties.APPLICATION_LOGGER_NAME +import no.nav.paw.arbeidssoekerregisteret.properties.ApplicationProperties +import no.nav.paw.arbeidssoekerregisteret.properties.SERVER_CONFIG_FILE_NAME +import no.nav.paw.arbeidssoekerregisteret.properties.SERVER_LOGGER_NAME +import no.nav.paw.arbeidssoekerregisteret.properties.ServerProperties import no.nav.paw.arbeidssoekerregisteret.context.ApplicationContext import no.nav.paw.arbeidssoekerregisteret.plugins.configureKafka import no.nav.paw.arbeidssoekerregisteret.plugins.configureMetrics @@ -23,8 +23,8 @@ import org.slf4j.LoggerFactory fun main() { val logger = LoggerFactory.getLogger(SERVER_LOGGER_NAME) - val serverProperties = loadNaisOrLocalConfiguration(SERVER_CONFIG_FILE_NAME) - val applicationProperties = loadNaisOrLocalConfiguration(APPLICATION_CONFIG_FILE_NAME) + val serverProperties = loadNaisOrLocalConfiguration(SERVER_CONFIG_FILE_NAME) + val applicationProperties = loadNaisOrLocalConfiguration(APPLICATION_CONFIG_FILE_NAME) logger.info("Starter ${applicationProperties.appId}") @@ -47,7 +47,7 @@ fun main() { server.start(wait = true) } -fun Application.module(properties: AppConfig) { +fun Application.module(properties: ApplicationProperties) { val logger = LoggerFactory.getLogger(APPLICATION_LOGGER_NAME) val healthIndicatorService = HealthIndicatorService() val meterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) diff --git a/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/KafkaSerialization.kt b/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/KafkaSerialization.kt index 5c8255a7..903c9707 100644 --- a/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/KafkaSerialization.kt +++ b/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/KafkaSerialization.kt @@ -3,8 +3,11 @@ package no.nav.paw.arbeidssoekerregisteret.config import com.fasterxml.jackson.core.JsonProcessingException import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.readValue +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde +import no.nav.paw.arbeidssokerregisteret.api.v4.OpplysningerOmArbeidssoeker import no.nav.paw.config.env.NaisEnv import no.nav.paw.config.env.currentNaisEnv +import org.apache.avro.specific.SpecificRecord import org.apache.kafka.common.serialization.Deserializer import org.apache.kafka.common.serialization.Serde import org.apache.kafka.common.serialization.Serializer @@ -46,3 +49,12 @@ inline fun buildJsonSerde(naisEnv: NaisEnv, objectMapper: ObjectMapp inline fun buildJsonSerde(): Serde { return buildJsonSerde(currentNaisEnv, buildObjectMapper) } + +inline fun buildAvroSerde(): Serde { + return SpecificAvroSerde() +} + +fun buildOpplysningerOmArbeidssoekerAvroSerde(): Serde { + return buildAvroSerde() +} + diff --git a/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/Metrics.kt b/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/Metrics.kt index 627f9e52..06a981ce 100644 --- a/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/Metrics.kt +++ b/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/Metrics.kt @@ -1,11 +1,39 @@ package no.nav.paw.arbeidssoekerregisteret.config import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tag +import io.micrometer.core.instrument.Tags +import java.time.Instant +import java.time.ZoneId +import java.util.concurrent.atomic.AtomicLong private const val METRIC_PREFIX = "paw_arbeidssoeker_opplysninger_aggregering" -fun MeterRegistry.tellAntallMottatteOpplysninger() { +fun MeterRegistry.tellMottatteOpplysninger() { counter( "${METRIC_PREFIX}_antall_mottatte_opplysninger_total", ).increment() } + +fun MeterRegistry.antallLagredeOpplysningerTotal(antallReference: AtomicLong) { + gauge( + "${METRIC_PREFIX}_antall_lagrede_opplysninger_total", + Tags.empty(), + antallReference + ) { + antallReference.get().toDouble() + } +} + +fun MeterRegistry.antallLagredeOpplysningerSumPerPeriode(timestamp: Instant, antallReference: AtomicLong) { + val zonedDateTime = timestamp.atZone(ZoneId.systemDefault()) + gauge( + "${METRIC_PREFIX}_antall_lagrede_opplysninger_sum_per_periode", + Tags.of( + Tag.of("minute", "${zonedDateTime.minute}") + ), + antallReference + ) { + antallReference.get().toDouble() + } +} diff --git a/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/context/ApplicationContext.kt b/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/context/ApplicationContext.kt index 8aa8f082..1c3c4810 100644 --- a/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/context/ApplicationContext.kt +++ b/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/context/ApplicationContext.kt @@ -1,6 +1,6 @@ package no.nav.paw.arbeidssoekerregisteret.context -import no.nav.paw.arbeidssoekerregisteret.config.AppConfig +import no.nav.paw.arbeidssoekerregisteret.properties.ApplicationProperties import org.slf4j.Logger -data class ApplicationContext(val logger: Logger, val properties: AppConfig) +data class ApplicationContext(val logger: Logger, val properties: ApplicationProperties) diff --git a/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/plugins/kafka/KafkaStreamsPlugin.kt b/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/plugins/kafka/KafkaStreamsPlugin.kt index e150354c..0a1e36bc 100644 --- a/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/plugins/kafka/KafkaStreamsPlugin.kt +++ b/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/plugins/kafka/KafkaStreamsPlugin.kt @@ -7,12 +7,12 @@ import io.ktor.server.application.createApplicationPlugin import io.ktor.server.application.hooks.MonitoringEvent import io.ktor.server.application.log import io.ktor.util.KtorDsl -import no.nav.paw.arbeidssoekerregisteret.config.KafkaStreamsConfig +import no.nav.paw.arbeidssoekerregisteret.properties.KafkaStreamsProperties import org.apache.kafka.streams.KafkaStreams @KtorDsl class KafkaStreamsPluginConfig { - var kafkaStreamsConfig: KafkaStreamsConfig? = null + var kafkaStreamsConfig: KafkaStreamsProperties? = null var kafkaStreams: List? = null } diff --git a/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/AppConfig.kt b/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/properties/ApplicationProperties.kt similarity index 72% rename from apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/AppConfig.kt rename to apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/properties/ApplicationProperties.kt index 959e73b1..b1cef038 100644 --- a/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/AppConfig.kt +++ b/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/properties/ApplicationProperties.kt @@ -1,4 +1,4 @@ -package no.nav.paw.arbeidssoekerregisteret.config +package no.nav.paw.arbeidssoekerregisteret.properties import no.nav.paw.config.env.NaisEnv import no.nav.paw.config.env.currentAppId @@ -11,16 +11,19 @@ const val SERVER_LOGGER_NAME = "no.nav.paw.server" const val APPLICATION_LOGGER_NAME = "no.nav.paw.application" const val APPLICATION_CONFIG_FILE_NAME = "application_configuration.toml" -data class AppConfig( +data class ApplicationProperties( val kafka: KafkaConfig, - val kafkaStreams: KafkaStreamsConfig, + val kafkaStreams: KafkaStreamsProperties, val appName: String = currentAppName ?: "paw-arbeidssoeker-opplysninger-aggregering", val appId: String = currentAppId ?: "paw-arbeidssoeker-opplysninger-aggregering:LOCAL", val naisEnv: NaisEnv = currentNaisEnv ) -data class KafkaStreamsConfig( +data class KafkaStreamsProperties( val shutDownTimeout: Duration, val opplysingerStreamIdSuffix: String, val opplysningerTopic: String, + val opplysningerStore: String, + val opplysningerPunctuatorSchedule: Duration, + val opplysningerLagretTidsperiode: Duration, ) diff --git a/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/ServerConfig.kt b/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/properties/ServerProperties.kt similarity index 74% rename from apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/ServerConfig.kt rename to apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/properties/ServerProperties.kt index 3957bb39..35693e88 100644 --- a/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/ServerConfig.kt +++ b/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/properties/ServerProperties.kt @@ -1,8 +1,8 @@ -package no.nav.paw.arbeidssoekerregisteret.config +package no.nav.paw.arbeidssoekerregisteret.properties const val SERVER_CONFIG_FILE_NAME = "server_configuration.toml" -data class ServerConfig( +data class ServerProperties( val port: Int, val callGroupSize: Int, val workerGroupSize: Int, diff --git a/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/topology/OpplysningerTopology.kt b/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/topology/OpplysningerTopology.kt index 8dac3288..d9ade0c8 100644 --- a/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/topology/OpplysningerTopology.kt +++ b/apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/topology/OpplysningerTopology.kt @@ -1,22 +1,118 @@ package no.nav.paw.arbeidssoekerregisteret.topology import io.micrometer.core.instrument.MeterRegistry -import no.nav.paw.arbeidssoekerregisteret.config.tellAntallMottatteOpplysninger +import no.nav.paw.arbeidssoekerregisteret.config.antallLagredeOpplysningerSumPerPeriode +import no.nav.paw.arbeidssoekerregisteret.config.antallLagredeOpplysningerTotal +import no.nav.paw.arbeidssoekerregisteret.config.buildOpplysningerOmArbeidssoekerAvroSerde +import no.nav.paw.arbeidssoekerregisteret.config.tellMottatteOpplysninger import no.nav.paw.arbeidssoekerregisteret.context.ApplicationContext import no.nav.paw.arbeidssokerregisteret.api.v4.OpplysningerOmArbeidssoeker +import no.nav.paw.config.kafka.streams.Punctuation +import no.nav.paw.config.kafka.streams.genericProcess +import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.Topology +import org.apache.kafka.streams.kstream.Consumed +import org.apache.kafka.streams.processor.PunctuationType +import org.apache.kafka.streams.state.Stores +import org.apache.kafka.streams.state.TimestampedKeyValueStore +import org.apache.kafka.streams.state.ValueAndTimestamp +import java.time.Instant +import java.util.* +import java.util.concurrent.atomic.AtomicLong context(ApplicationContext) fun buildOpplysningerTopology( meterRegistry: MeterRegistry ): Topology = StreamsBuilder().apply { + addOpplysningerStateStore() + addOpplysningerKStream(meterRegistry) +}.build() + +context(ApplicationContext) +private fun StreamsBuilder.addOpplysningerStateStore() { + logger.info("Oppretter state store for opplysninger om arbeidssøker") + val kafkaStreamsProperties = properties.kafkaStreams + + this.addStateStore( + Stores.timestampedKeyValueStoreBuilder( + Stores.persistentKeyValueStore(kafkaStreamsProperties.opplysningerStore), + Serdes.Long(), + buildOpplysningerOmArbeidssoekerAvroSerde() + ) + ) +} + +context(ApplicationContext) +private fun StreamsBuilder.addOpplysningerKStream(meterRegistry: MeterRegistry) { logger.info("Oppretter KStream for opplysninger om arbeidssøker") val kafkaStreamsProperties = properties.kafkaStreams - this.stream(kafkaStreamsProperties.opplysningerTopic) + this + .stream( + kafkaStreamsProperties.opplysningerTopic, + Consumed.with(Serdes.Long(), buildOpplysningerOmArbeidssoekerAvroSerde()) + ) .peek { key, _ -> logger.debug("Mottok event på {} med key {}", kafkaStreamsProperties.opplysningerTopic, key) - meterRegistry.tellAntallMottatteOpplysninger() + meterRegistry.tellMottatteOpplysninger() + }.genericProcess( + name = "processOpplysningerOmArbeidssoeker", + stateStoreNames = arrayOf(kafkaStreamsProperties.opplysningerStore), + punctuation = buildPunctuation(meterRegistry) + ) { record -> + val stateStore: TimestampedKeyValueStore = + getStateStore(kafkaStreamsProperties.opplysningerStore) + logger.debug("Lagrer opplysninger for periode {}", record.value().periodeId) + stateStore.put(record.key(), ValueAndTimestamp.make(record.value(), Instant.now().toEpochMilli())) } -}.build() +} + +context(ApplicationContext) +private fun buildPunctuation(meterRegistry: MeterRegistry): Punctuation { + logger.info("Oppretter Punctuation for opplysninger om arbeidssøker") + val kafkaStreamsProperties = properties.kafkaStreams + + return Punctuation( + kafkaStreamsProperties.opplysningerPunctuatorSchedule, PunctuationType.WALL_CLOCK_TIME + ) { timestamp, context -> + logger.info("Punctuation kjører for tidspunkt {}", timestamp) + + with(context) { + val antallTotalt = AtomicLong(0) + val histogram = mutableMapOf() + + val stateStore: TimestampedKeyValueStore = + getStateStore(kafkaStreamsProperties.opplysningerStore) + for (keyValue in stateStore.all()) { + antallTotalt.incrementAndGet() + val lagretTidspunkt = Instant.ofEpochMilli(keyValue.value.timestamp()) + val utloepTidspunkt = Instant.now().minus(kafkaStreamsProperties.opplysningerLagretTidsperiode) + if (utloepTidspunkt.isAfter(lagretTidspunkt) + ) { + logger.debug( + "Sletter opplysninger for periode {} fordi de har vært lagret mer enn {}m (utløp {} > lagret {})", + keyValue.value.value().periodeId, + kafkaStreamsProperties.opplysningerLagretTidsperiode.toMinutes(), + utloepTidspunkt, + lagretTidspunkt + ) + stateStore.delete(keyValue.key) + continue + } + + val opplysninger = keyValue.value.value() + val antall = histogram[opplysninger.periodeId] + if (antall != null) { + antall.incrementAndGet() + histogram[opplysninger.periodeId] = antall + } else { + histogram[opplysninger.periodeId] = AtomicLong(1) + } + } + + histogram.forEach { (_, antall) -> meterRegistry.antallLagredeOpplysningerSumPerPeriode(timestamp, antall) } + meterRegistry.antallLagredeOpplysningerTotal(antallTotalt) + } + } +} diff --git a/apps/opplysninger-aggregering/src/main/resources/local/application_configuration.toml b/apps/opplysninger-aggregering/src/main/resources/local/application_configuration.toml index e5dba424..3f2a4199 100644 --- a/apps/opplysninger-aggregering/src/main/resources/local/application_configuration.toml +++ b/apps/opplysninger-aggregering/src/main/resources/local/application_configuration.toml @@ -9,3 +9,6 @@ url = "http://localhost:8082" shutDownTimeout = "PT1S" opplysingerStreamIdSuffix = "opplysninger-stream-v1" opplysningerTopic = "paw.opplysninger-om-arbeidssoeker-v1" +opplysningerStore = "opplysninger-store" +opplysningerPunctuatorSchedule = "PT1M" +opplysningerLagretTidsperiode = "PT10M" diff --git a/apps/opplysninger-aggregering/src/main/resources/nais/application_configuration.toml b/apps/opplysninger-aggregering/src/main/resources/nais/application_configuration.toml index 607c2dfb..b295cd2a 100644 --- a/apps/opplysninger-aggregering/src/main/resources/nais/application_configuration.toml +++ b/apps/opplysninger-aggregering/src/main/resources/nais/application_configuration.toml @@ -16,3 +16,6 @@ password = "${KAFKA_SCHEMA_REGISTRY_PASSWORD}" shutDownTimeout = "PT5S" opplysingerStreamIdSuffix = "${KAFKA_OPPLYSNINGER_OM_ARBEIDSSOEKER_STREAM_ID_SUFFIX}" opplysningerTopic = "${KAFKA_PAW_OPPLYSNINGER_OM_ARBEIDSSOEKER_TOPIC}" +opplysningerStore = "opplysninger-store" +opplysningerPunctuatorSchedule = "${OPPLYSNINGER_PUNCTUATOR_SCHEDULE}" +opplysningerLagretTidsperiode = "${OPPLYSNINGER_LAGRET_TIDSPERIODE}" diff --git a/apps/opplysninger-aggregering/src/test/kotlin/no/nav/paw/arbeidssoekerregisteret/config/AppConfigTest.kt b/apps/opplysninger-aggregering/src/test/kotlin/no/nav/paw/arbeidssoekerregisteret/config/AppConfigTest.kt index 7148d0e4..d373c20f 100644 --- a/apps/opplysninger-aggregering/src/test/kotlin/no/nav/paw/arbeidssoekerregisteret/config/AppConfigTest.kt +++ b/apps/opplysninger-aggregering/src/test/kotlin/no/nav/paw/arbeidssoekerregisteret/config/AppConfigTest.kt @@ -2,11 +2,13 @@ package no.nav.paw.arbeidssoekerregisteret.config import io.kotest.core.spec.style.FreeSpec import io.kotest.matchers.shouldNotBe +import no.nav.paw.arbeidssoekerregisteret.properties.APPLICATION_CONFIG_FILE_NAME +import no.nav.paw.arbeidssoekerregisteret.properties.ApplicationProperties import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration class AppConfigTest : FreeSpec({ "Skal laste config" { - val appConfig = loadNaisOrLocalConfiguration(APPLICATION_CONFIG_FILE_NAME) + val appConfig = loadNaisOrLocalConfiguration(APPLICATION_CONFIG_FILE_NAME) appConfig.kafka shouldNotBe null appConfig.kafkaStreams shouldNotBe null } diff --git a/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/GenericProcessor.kt b/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/GenericProcessor.kt new file mode 100644 index 00000000..8dbabb79 --- /dev/null +++ b/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/GenericProcessor.kt @@ -0,0 +1,109 @@ +package no.nav.paw.config.kafka.streams + + +import org.apache.kafka.streams.kstream.KStream +import org.apache.kafka.streams.kstream.Named +import org.apache.kafka.streams.processor.PunctuationType +import org.apache.kafka.streams.processor.api.Processor +import org.apache.kafka.streams.processor.api.ProcessorContext +import org.apache.kafka.streams.processor.api.Record +import java.time.Duration +import java.time.Instant + +fun KStream.filterWithContext( + name: String, + vararg stateStoreNames: String, + function: ProcessorContext.(V) -> Boolean +): KStream { + val processor = { + GenericProcessor { record -> + if (function(record.value())) forward(record) + } + } + return process(processor, Named.`as`(name), *stateStoreNames) +} + +fun KStream.mapNonNull( + name: String, + vararg stateStoreNames: String, + function: ProcessorContext.(V_IN) -> V_OUT? +): KStream { + val processor = { + GenericProcessor { record -> + val result = function(record.value()) + if (result != null) forward(record.withValue(result)) + } + } + return process(processor, Named.`as`(name), *stateStoreNames) +} + +fun KStream.mapWithContext( + name: String, + vararg stateStoreNames: String, + function: ProcessorContext.(V_IN) -> V_OUT +): KStream { + val processor = { + GenericProcessor { record -> + val result = function(record.value()) + forward(record.withValue(result)) + } + } + return process(processor, Named.`as`(name), *stateStoreNames) +} + +fun KStream.mapKeyAndValue( + name: String, + vararg stateStoreNames: String, + function: ProcessorContext.(K_IN, V_IN) -> Pair? +): KStream { + val processor = { + GenericProcessor { record -> + val result = function(record.key(), record.value()) + if (result != null) { + forward(record.withKey(result.first).withValue(result.second)) + } + } + } + return process(processor, Named.`as`(name), *stateStoreNames) +} + +fun KStream.genericProcess( + name: String, + vararg stateStoreNames: String, + punctuation: Punctuation? = null, + function: ProcessorContext.(Record) -> Unit +): KStream { + val processor = { + GenericProcessor(function = function, punctuation = punctuation) + } + return process(processor, Named.`as`(name), *stateStoreNames) +} + +class GenericProcessor( + private val punctuation: Punctuation? = null, + private val function: ProcessorContext.(Record) -> Unit +) : Processor { + private var context: ProcessorContext? = null + + override fun init(context: ProcessorContext?) { + super.init(context) + this.context = context + if (punctuation != null) { + context?.schedule(punctuation.interval, punctuation.type) { time -> + punctuation.function(Instant.ofEpochMilli(time), context) + } + } + } + + override fun process(record: Record?) { + if (record == null) return + val ctx = requireNotNull(context) { "Context is not initialized" } + with(ctx) { function(record) } + } +} + +data class Punctuation( + val interval: Duration, + val type: PunctuationType, + val function: (Instant, ProcessorContext) -> Unit +)