From 85a10dd73c1adf28c680b69ca81ddfaf9224c205 Mon Sep 17 00:00:00 2001 From: Nils Martin Sande Date: Fri, 8 Nov 2024 14:47:04 +0100 Subject: [PATCH] =?UTF-8?q?Fullf=C3=B8rte=20f=C3=B8rste=20utkast=20av=20ka?= =?UTF-8?q?fka-key-maintenance?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/kafka-key-maintenance/build.gradle.kts | 4 + .../paw/kafkakeymaintenance/AktorTopology.kt | 50 ------ .../nav/paw/kafkakeymaintenance/AppStartup.kt | 71 +++++++++ .../InitHealthAndMetrics.kt | 27 ++++ .../kafkakeymaintenance/InitKafkaStreams.kt | 78 +++++++++ .../InitPeriodeConsumer.kt | 31 ++++ .../kafkakeymaintenance/PeriodeConsumer.kt | 40 ----- .../paw/kafkakeymaintenance/StaticConfig.kt | 2 + .../functions/HentPerioder.kt | 15 -- .../pdlprocessor/AktorTopology.kt | 47 ++++++ .../pdlprocessor/AktorTopologyConfig.kt | 15 ++ .../PdlRecordProcessor.kt | 24 +-- .../functions}/ApplicationMetadata.kt | 2 +- .../GenererAutomatiskIdOppdatering.kt | 2 +- .../functions/GenererHendelse.kt | 2 +- .../functions/GenererIdOppdatering.kt | 2 +- .../{ => pdlprocessor}/functions/HarAvvik.kt | 2 +- .../pdlprocessor/functions/HentAlias.kt | 10 ++ .../{ => pdlprocessor}/functions/HentData.kt | 3 +- .../pdlprocessor/functions/HentPerioder.kt | 15 ++ .../perioder/PeriodeConsumer.kt | 33 ++++ .../resources/local/kafka_configuration.toml | 1 + .../resources/nais/aktor_topology_config.toml | 4 + .../nais/database_configuration.toml | 6 + .../paw/kafkakeymaintenance/TopologyTest.kt | 150 ++++++++++++++++++ 25 files changed, 502 insertions(+), 134 deletions(-) delete mode 100644 apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/AktorTopology.kt create mode 100644 apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/AppStartup.kt create mode 100644 apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/InitHealthAndMetrics.kt create mode 100644 apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/InitKafkaStreams.kt create mode 100644 apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/InitPeriodeConsumer.kt delete mode 100644 apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/PeriodeConsumer.kt delete mode 100644 apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HentPerioder.kt create mode 100644 apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/AktorTopology.kt create mode 100644 apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/AktorTopologyConfig.kt rename apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/{functions => pdlprocessor}/PdlRecordProcessor.kt (61%) rename apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/{ => pdlprocessor/functions}/ApplicationMetadata.kt (92%) rename apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/{ => pdlprocessor}/functions/GenererAutomatiskIdOppdatering.kt (96%) rename apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/{ => pdlprocessor}/functions/GenererHendelse.kt (97%) rename apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/{ => pdlprocessor}/functions/GenererIdOppdatering.kt (95%) rename apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/{ => pdlprocessor}/functions/HarAvvik.kt (75%) create mode 100644 apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/HentAlias.kt rename apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/{ => pdlprocessor}/functions/HentData.kt (80%) create mode 100644 apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/HentPerioder.kt create mode 100644 apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/perioder/PeriodeConsumer.kt create mode 100644 apps/kafka-key-maintenance/src/main/resources/local/kafka_configuration.toml create mode 100644 apps/kafka-key-maintenance/src/main/resources/nais/aktor_topology_config.toml create mode 100644 apps/kafka-key-maintenance/src/main/resources/nais/database_configuration.toml create mode 100644 apps/kafka-key-maintenance/src/test/kotlin/no/nav/paw/kafkakeymaintenance/TopologyTest.kt diff --git a/apps/kafka-key-maintenance/build.gradle.kts b/apps/kafka-key-maintenance/build.gradle.kts index f4af3be8..648997a5 100644 --- a/apps/kafka-key-maintenance/build.gradle.kts +++ b/apps/kafka-key-maintenance/build.gradle.kts @@ -19,6 +19,7 @@ dependencies { implementation(project(":lib:kafka-streams")) implementation(project(":lib:hoplite-config")) implementation(project(":lib:kafka-key-generator-client")) + implementation(project(":lib:error-handling")) implementation(libs.arrow.core.core) implementation(libs.bundles.ktorServerWithNettyAndMicrometer) @@ -49,6 +50,7 @@ dependencies { implementation(libs.kafka.streams.core) implementation(libs.avro.core) implementation(libs.avro.kafkaSerializer) + implementation(libs.avro.kafkaStreamsSerde) implementation(libs.exposed.core) implementation(libs.exposed.jdbc) implementation(libs.exposed.javaTime) @@ -65,6 +67,8 @@ dependencies { testImplementation(libs.test.testContainers.core) testImplementation(libs.test.testContainers.postgresql) testImplementation(libs.ktor.server.testJvm) + testImplementation(libs.kafka.streams.test) + testImplementation(project(":test:kafka-streams-test-functions")) } java { diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/AktorTopology.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/AktorTopology.kt deleted file mode 100644 index 520e0aa5..00000000 --- a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/AktorTopology.kt +++ /dev/null @@ -1,50 +0,0 @@ -package no.nav.paw.kafkakeymaintenance - -import kotlinx.coroutines.runBlocking -import no.nav.paw.config.kafka.streams.mapRecord -import no.nav.paw.config.kafka.streams.supressByWallClock -import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient -import no.nav.paw.kafkakeygenerator.client.LokaleAlias -import no.nav.paw.kafkakeymaintenance.functions.processPdlRecord -import no.nav.paw.kafkakeymaintenance.perioder.Perioder -import no.nav.person.pdl.aktor.v2.Aktor -import org.apache.kafka.streams.KeyValue -import org.apache.kafka.streams.StreamsBuilder -import java.time.Duration -import java.time.Instant - -fun KafkaKeysClient.hentAlias(identiteter: List): List = runBlocking { - getAlias(ANTALL_PARTISJONER, identiteter).alias -} - -fun StreamsBuilder.buildAktorTopology( - topic: String, - stateStoreName: String, - supressionDelay: Duration, - interval: Duration = supressionDelay.dividedBy(10), - perioder: Perioder, - hentAlias: (List) -> List -) { - stream(topic) - .supressByWallClock( - name = stateStoreName, - duration = supressionDelay, - checkInterval = interval - ).mapRecord("aktor_til_hendelse") { record -> - record.withValue( - processPdlRecord( - aktorTopic = topic, - hentAlias = hentAlias, - perioder = perioder, - record = record - ) - ) - }.flatMap { _, hendelser -> - hendelser.map { hendelseRecord -> - KeyValue.pair( - hendelseRecord.key, - hendelseRecord.hendelse - ) - } - }.mapRecord("set_timestamp") { it.withTimestamp(Instant.now().epochSecond) } -} \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/AppStartup.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/AppStartup.kt new file mode 100644 index 00000000..5c95c4fe --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/AppStartup.kt @@ -0,0 +1,71 @@ +package no.nav.paw.kafkakeymaintenance + +import io.micrometer.prometheusmetrics.PrometheusConfig +import io.micrometer.prometheusmetrics.PrometheusMeterRegistry +import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration +import no.nav.paw.config.kafka.* +import no.nav.paw.health.model.HealthStatus +import no.nav.paw.health.model.LivenessHealthIndicator +import no.nav.paw.health.model.ReadinessHealthIndicator +import no.nav.paw.health.repository.HealthIndicatorRepository +import no.nav.paw.kafkakeygenerator.client.createKafkaKeyGeneratorClient +import no.nav.paw.kafkakeymaintenance.db.DatabaseConfig +import no.nav.paw.kafkakeymaintenance.db.dataSource +import no.nav.paw.kafkakeymaintenance.db.migrateDatabase +import no.nav.paw.kafkakeymaintenance.kafka.txContext +import no.nav.paw.kafkakeymaintenance.pdlprocessor.AktorTopologyConfig +import no.nav.paw.kafkakeymaintenance.pdlprocessor.functions.hentAlias +import no.nav.paw.kafkakeymaintenance.perioder.consume +import no.nav.paw.kafkakeymaintenance.perioder.dbPerioder +import org.jetbrains.exposed.sql.Database +import org.slf4j.LoggerFactory +import java.util.concurrent.CompletableFuture.runAsync +import java.util.concurrent.atomic.AtomicBoolean + +fun main() { + val applicationContext = ApplicationContext( + consumerVersion = PERIODE_CONSUMER_GROUP_VERSION, + logger = LoggerFactory.getLogger("app"), + meterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT), + shutdownCalled = AtomicBoolean(false) + ) + Runtime.getRuntime().addShutdownHook( Thread { applicationContext.shutdownCalled.set(true) }) + val healthIndicatorRepository = HealthIndicatorRepository() + with(loadNaisOrLocalConfiguration("database_configuration.toml").dataSource()) { + migrateDatabase(this) + Database.connect(this) + } + val periodeSequence = with(KafkaFactory(loadNaisOrLocalConfiguration(KAFKA_CONFIG_WITH_SCHEME_REG))) { + initPeriodeConsumer( + periodeTopic = PERIODE_TOPIC, + applicationContext = applicationContext + ) + } + val consumerLivenessHealthIndicator = healthIndicatorRepository.addLivenessIndicator( + LivenessHealthIndicator(HealthStatus.UNHEALTHY) + ) + val consumerReadinessHealthIndicator = healthIndicatorRepository.addReadinessIndicator(ReadinessHealthIndicator()) + runAsync { + consumerReadinessHealthIndicator.setHealthy() + consumerLivenessHealthIndicator.setHealthy() + periodeSequence.consume(txContext(applicationContext)) + }.handle { _, throwable -> + throwable?.also { applicationContext.logger.error("Consumer task failed", throwable) } + applicationContext.shutdownCalled.set(true) + consumerReadinessHealthIndicator.setUnhealthy() + consumerLivenessHealthIndicator.setUnhealthy() + } + initStreams( + aktorTopologyConfig = loadNaisOrLocalConfiguration(AktorTopologyConfig.configFile), + healthIndicatorRepository = healthIndicatorRepository, + perioder = dbPerioder(applicationContext), + hentAlias = createKafkaKeyGeneratorClient()::hentAlias + ).start() + + initKtor( + healthIndicatorRepository = healthIndicatorRepository, + prometheusMeterRegistry = applicationContext.meterRegistry + ).start(wait = true) + applicationContext.shutdownCalled.set(true) + applicationContext.logger.info("Applikasjonen er stoppet") +} diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/InitHealthAndMetrics.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/InitHealthAndMetrics.kt new file mode 100644 index 00000000..477014ea --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/InitHealthAndMetrics.kt @@ -0,0 +1,27 @@ +package no.nav.paw.kafkakeymaintenance + +import io.ktor.server.application.* +import io.ktor.server.engine.* +import io.ktor.server.netty.* +import io.ktor.server.response.* +import io.ktor.server.routing.* +import io.micrometer.prometheusmetrics.PrometheusMeterRegistry +import no.nav.paw.health.repository.HealthIndicatorRepository +import no.nav.paw.health.route.healthRoutes + +fun initKtor( + healthIndicatorRepository: HealthIndicatorRepository, + prometheusMeterRegistry: PrometheusMeterRegistry? = null, + vararg additionalRoutes: Route.() -> Unit +) = + embeddedServer(Netty, port = 8080) { + routing { + healthRoutes(healthIndicatorRepository) + prometheusMeterRegistry?.let { + get("/internal/metrics") { + call.respondText(prometheusMeterRegistry.scrape()) + } + } + additionalRoutes.forEach { it() } + } + } \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/InitKafkaStreams.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/InitKafkaStreams.kt new file mode 100644 index 00000000..f4d4e1f8 --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/InitKafkaStreams.kt @@ -0,0 +1,78 @@ +package no.nav.paw.kafkakeymaintenance + +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde +import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration +import no.nav.paw.config.kafka.KAFKA_STREAMS_CONFIG_WITH_SCHEME_REG +import no.nav.paw.config.kafka.streams.KafkaStreamsFactory +import no.nav.paw.error.handler.withApplicationTerminatingExceptionHandler +import no.nav.paw.health.listener.withHealthIndicatorStateListener +import no.nav.paw.health.model.LivenessHealthIndicator +import no.nav.paw.health.model.ReadinessHealthIndicator +import no.nav.paw.health.repository.HealthIndicatorRepository +import no.nav.paw.kafkakeygenerator.client.LokaleAlias +import no.nav.paw.kafkakeymaintenance.pdlprocessor.AktorTopologyConfig +import no.nav.paw.kafkakeymaintenance.pdlprocessor.buildAktorTopology +import no.nav.paw.kafkakeymaintenance.perioder.Perioder +import no.nav.person.pdl.aktor.v2.Aktor +import org.apache.kafka.common.serialization.Serde +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.KafkaStreams +import org.apache.kafka.streams.StreamsBuilder +import org.apache.kafka.streams.Topology +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier +import org.apache.kafka.streams.state.Stores + +fun initStreams( + aktorTopologyConfig: AktorTopologyConfig, + healthIndicatorRepository: HealthIndicatorRepository, + perioder: Perioder, + hentAlias: (List) -> List +): KafkaStreams { + val kafkaStreamsFactory = KafkaStreamsFactory( + "beta-v1", + loadNaisOrLocalConfiguration(KAFKA_STREAMS_CONFIG_WITH_SCHEME_REG) + ).withExactlyOnce() + .withDefaultKeySerde(Serdes.StringSerde::class) + .withDefaultValueSerde(SpecificAvroSerde::class) + + val topology = initTopology( + aktorSerde = kafkaStreamsFactory.createSpecificAvroSerde(), + aktorTopologyConfig = aktorTopologyConfig, + perioder = perioder, + hentAlias = hentAlias, + stateStoreBuilderFactory = Stores::persistentTimestampedKeyValueStore + ) + + val streams = KafkaStreams(topology, kafkaStreamsFactory.properties) + streams + .withHealthIndicatorStateListener( + livenessIndicator = healthIndicatorRepository.addLivenessIndicator(LivenessHealthIndicator()), + readinessIndicator = healthIndicatorRepository.addReadinessIndicator(ReadinessHealthIndicator()) + ) + streams.withApplicationTerminatingExceptionHandler() + return streams +} + +fun initTopology( + stateStoreBuilderFactory: (String) -> KeyValueBytesStoreSupplier, + aktorTopologyConfig: AktorTopologyConfig, + perioder: Perioder, + hentAlias: (List) -> List, + aktorSerde: Serde +): Topology { + val streamsBuilder = StreamsBuilder() + .addStateStore( + Stores.timestampedKeyValueStoreBuilder( + stateStoreBuilderFactory(aktorTopologyConfig.stateStoreName), + Serdes.StringSerde(), + aktorSerde + ) + ) + streamsBuilder.buildAktorTopology( + aktorSerde = aktorSerde, + aktorTopologyConfig = aktorTopologyConfig, + perioder = perioder, + hentAlias = hentAlias + ) + return streamsBuilder.build() +} \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/InitPeriodeConsumer.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/InitPeriodeConsumer.kt new file mode 100644 index 00000000..dfc6f1b5 --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/InitPeriodeConsumer.kt @@ -0,0 +1,31 @@ +package no.nav.paw.kafkakeymaintenance + +import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer +import no.nav.paw.arbeidssokerregisteret.api.v1.Periode +import no.nav.paw.config.kafka.KafkaFactory +import no.nav.paw.config.kafka.asSequence +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.serialization.LongDeserializer +import java.time.Duration + +fun KafkaFactory.initPeriodeConsumer( + periodeTopic: String, + applicationContext: ApplicationContext +): Sequence>> { + val periodeConsumer = createConsumer( + groupId = "kafka-key-maintenance-v${applicationContext.consumerVersion}", + clientId = "kafka-key-maintenance-client-v${applicationContext.consumerVersion}", + keyDeserializer = LongDeserializer::class, + valueDeserializer = PeriodeDeserializer::class, + autoCommit = false, + autoOffsetReset = "earliest" + ) + periodeConsumer.subscribe(listOf(periodeTopic)) + return periodeConsumer.asSequence( + stop = applicationContext.shutdownCalled, + pollTimeout = Duration.ofMillis(500), + closeTimeout = Duration.ofSeconds(1) + ) +} + +class PeriodeDeserializer : SpecificAvroDeserializer() \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/PeriodeConsumer.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/PeriodeConsumer.kt deleted file mode 100644 index 0c0d5c84..00000000 --- a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/PeriodeConsumer.kt +++ /dev/null @@ -1,40 +0,0 @@ -package no.nav.paw.kafkakeymaintenance - -import no.nav.paw.arbeidssokerregisteret.api.v1.Periode -import no.nav.paw.kafkakeymaintenance.kafka.topic -import no.nav.paw.kafkakeymaintenance.kafka.txContext -import no.nav.paw.kafkakeymaintenance.kafka.updateHwm -import no.nav.paw.kafkakeymaintenance.perioder.insertOrUpdate -import no.nav.paw.kafkakeymaintenance.perioder.periodeRad -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.jetbrains.exposed.sql.transactions.transaction -import java.time.Instant - -class PeriodeConsumer( - applicationContext: ApplicationContext, - private val periodeConsumer: Sequence>>, -) { - - private val ctxFactory = txContext(applicationContext) - - fun run() { - periodeConsumer.forEach { batch -> - transaction { - val tx = ctxFactory() - batch.forEach { periodeRecord -> - val hwmValid = tx.updateHwm( - topic = topic(periodeRecord.topic()), - partition = periodeRecord.partition(), - offset = periodeRecord.offset(), - time = Instant.ofEpochMilli(periodeRecord.timestamp()), - lastUpdated = Instant.now() - ) - if (hwmValid) { - val rad = periodeRad(periodeRecord.value()) - tx.insertOrUpdate(rad) - } - } - } - } - } -} \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/StaticConfig.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/StaticConfig.kt index bdabf48f..9266fddc 100644 --- a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/StaticConfig.kt +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/StaticConfig.kt @@ -1,3 +1,5 @@ package no.nav.paw.kafkakeymaintenance const val ANTALL_PARTISJONER = 6 +const val PERIODE_CONSUMER_GROUP_VERSION = 1 +const val PERIODE_TOPIC = "paw.arbeidssokerperioder-v1" \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HentPerioder.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HentPerioder.kt deleted file mode 100644 index b86ae2e7..00000000 --- a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HentPerioder.kt +++ /dev/null @@ -1,15 +0,0 @@ -package no.nav.paw.kafkakeymaintenance.functions - -import no.nav.paw.kafkakeymaintenance.kafka.TransactionContext -import no.nav.paw.kafkakeymaintenance.perioder.periodeRad -import no.nav.paw.kafkakeymaintenance.vo.AvviksMelding -import no.nav.paw.kafkakeymaintenance.vo.AvvvikOgPerioder - -fun TransactionContext.hentPerioder(avviksMelding: AvviksMelding): AvvvikOgPerioder { - val perioder = avviksMelding - .pdlIdentitetsnummer - .plus(avviksMelding.lokaleAlias.map { it.identitetsnummer }) - .distinct() - .mapNotNull(::periodeRad) - return AvvvikOgPerioder(avviksMelding, perioder) -} \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/AktorTopology.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/AktorTopology.kt new file mode 100644 index 00000000..ba1b6e61 --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/AktorTopology.kt @@ -0,0 +1,47 @@ +package no.nav.paw.kafkakeymaintenance.pdlprocessor + +import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseSerde +import no.nav.paw.config.kafka.streams.mapRecord +import no.nav.paw.config.kafka.streams.supressByWallClock +import no.nav.paw.kafkakeygenerator.client.LokaleAlias +import no.nav.paw.kafkakeymaintenance.perioder.Perioder +import no.nav.person.pdl.aktor.v2.Aktor +import org.apache.kafka.common.serialization.Serde +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.KeyValue +import org.apache.kafka.streams.StreamsBuilder +import org.apache.kafka.streams.kstream.Consumed +import org.apache.kafka.streams.kstream.Produced + +fun StreamsBuilder.buildAktorTopology( + aktorTopologyConfig: AktorTopologyConfig, + perioder: Perioder, + hentAlias: (List) -> List, + aktorSerde: Serde +) { + stream(aktorTopologyConfig.aktorTopic, Consumed.with(Serdes.String(), aktorSerde)) + .supressByWallClock( + name = aktorTopologyConfig.stateStoreName, + duration = aktorTopologyConfig.supressionDelay, + checkInterval = aktorTopologyConfig.interval + ).mapRecord("aktor_til_hendelse") { record -> + record.withValue( + processPdlRecord( + aktorTopic = aktorTopologyConfig.aktorTopic, + hentAlias = hentAlias, + perioder = perioder, + record = record + ) + ) + }.flatMap { _, hendelser -> + hendelser.map { hendelseRecord -> + KeyValue.pair( + hendelseRecord.key, + hendelseRecord.hendelse + ) + } + } + .mapRecord("set_timestamp") { it.withTimestamp(System.currentTimeMillis()) } + .to(aktorTopologyConfig.hendelseloggTopic, Produced.with(Serdes.Long(), HendelseSerde())) +} + diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/AktorTopologyConfig.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/AktorTopologyConfig.kt new file mode 100644 index 00000000..40b22cc8 --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/AktorTopologyConfig.kt @@ -0,0 +1,15 @@ +package no.nav.paw.kafkakeymaintenance.pdlprocessor + +import java.time.Duration + +data class AktorTopologyConfig( + val aktorTopic: String, + val hendelseloggTopic: String, + val supressionDelay: Duration, + val interval: Duration, + val stateStoreName: String = "aktor_supression" +) { + companion object { + val configFile: String get() = "aktor_topology_config.toml" + } +} \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/PdlRecordProcessor.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/PdlRecordProcessor.kt similarity index 61% rename from apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/PdlRecordProcessor.kt rename to apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/PdlRecordProcessor.kt index 1e8e9c57..ab9a68d9 100644 --- a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/PdlRecordProcessor.kt +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/PdlRecordProcessor.kt @@ -1,26 +1,16 @@ -package no.nav.paw.kafkakeymaintenance.functions +package no.nav.paw.kafkakeymaintenance.pdlprocessor import arrow.core.partially1 -import kotlinx.coroutines.runBlocking import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.AvviksType import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Metadata import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.TidspunktFraKilde -import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient import no.nav.paw.kafkakeygenerator.client.LokaleAlias -import no.nav.paw.kafkakeymaintenance.ANTALL_PARTISJONER -import no.nav.paw.kafkakeymaintenance.kafka.Topic -import no.nav.paw.kafkakeymaintenance.kafka.TransactionContext -import no.nav.paw.kafkakeymaintenance.kafka.updateHwm -import no.nav.paw.kafkakeymaintenance.metadata +import no.nav.paw.kafkakeymaintenance.pdlprocessor.functions.* import no.nav.paw.kafkakeymaintenance.perioder.Perioder import no.nav.paw.kafkakeymaintenance.vo.* import no.nav.person.pdl.aktor.v2.Aktor -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.streams.StreamsBuilder -import org.apache.kafka.streams.kstream.Suppressed import org.apache.kafka.streams.processor.api.Record -import java.time.Duration import java.time.Instant @@ -41,16 +31,6 @@ fun processPdlRecord( return prosesser(hentAlias, record.value(), perioder, metadata) } -fun Perioder.hentPerioder(avviksMelding: AvviksMelding): AvvvikOgPerioder { - val identiteter = avviksMelding.lokaleAlias - .map { it.identitetsnummer } + - avviksMelding.pdlIdentitetsnummer - return AvvvikOgPerioder( - avviksMelding = avviksMelding, - perioder = get(identiteter) - ) -} - fun prosesser( hentAlias: (List) -> List, aktor: Aktor, diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/ApplicationMetadata.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/ApplicationMetadata.kt similarity index 92% rename from apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/ApplicationMetadata.kt rename to apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/ApplicationMetadata.kt index ced3e24b..1ac35dc3 100644 --- a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/ApplicationMetadata.kt +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/ApplicationMetadata.kt @@ -1,4 +1,4 @@ -package no.nav.paw.kafkakeymaintenance +package no.nav.paw.kafkakeymaintenance.pdlprocessor.functions import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Bruker import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.BrukerType diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/GenererAutomatiskIdOppdatering.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/GenererAutomatiskIdOppdatering.kt similarity index 96% rename from apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/GenererAutomatiskIdOppdatering.kt rename to apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/GenererAutomatiskIdOppdatering.kt index b1748fd4..ecfb6a27 100644 --- a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/GenererAutomatiskIdOppdatering.kt +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/GenererAutomatiskIdOppdatering.kt @@ -1,4 +1,4 @@ -package no.nav.paw.kafkakeymaintenance.functions +package no.nav.paw.kafkakeymaintenance.pdlprocessor.functions import no.nav.paw.kafkakeymaintenance.vo.AutomatiskIdOppdatering import no.nav.paw.kafkakeymaintenance.vo.IdOppdatering diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/GenererHendelse.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/GenererHendelse.kt similarity index 97% rename from apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/GenererHendelse.kt rename to apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/GenererHendelse.kt index 576ebd0f..5b12e87f 100644 --- a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/GenererHendelse.kt +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/GenererHendelse.kt @@ -1,4 +1,4 @@ -package no.nav.paw.kafkakeymaintenance.functions +package no.nav.paw.kafkakeymaintenance.pdlprocessor.functions import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse import no.nav.paw.arbeidssokerregisteret.intern.v1.IdentitetsnummerOpphoert diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/GenererIdOppdatering.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/GenererIdOppdatering.kt similarity index 95% rename from apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/GenererIdOppdatering.kt rename to apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/GenererIdOppdatering.kt index 04c608e5..f40f99b6 100644 --- a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/GenererIdOppdatering.kt +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/GenererIdOppdatering.kt @@ -1,4 +1,4 @@ -package no.nav.paw.kafkakeymaintenance.functions +package no.nav.paw.kafkakeymaintenance.pdlprocessor.functions import arrow.core.NonEmptyList import arrow.core.nonEmptyListOf diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HarAvvik.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/HarAvvik.kt similarity index 75% rename from apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HarAvvik.kt rename to apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/HarAvvik.kt index 55e1e0fb..45d27c3f 100644 --- a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HarAvvik.kt +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/HarAvvik.kt @@ -1,4 +1,4 @@ -package no.nav.paw.kafkakeymaintenance.functions +package no.nav.paw.kafkakeymaintenance.pdlprocessor.functions import no.nav.paw.kafkakeymaintenance.vo.Data diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/HentAlias.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/HentAlias.kt new file mode 100644 index 00000000..c53af4fc --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/HentAlias.kt @@ -0,0 +1,10 @@ +package no.nav.paw.kafkakeymaintenance.pdlprocessor.functions + +import kotlinx.coroutines.runBlocking +import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient +import no.nav.paw.kafkakeygenerator.client.LokaleAlias +import no.nav.paw.kafkakeymaintenance.ANTALL_PARTISJONER + +fun KafkaKeysClient.hentAlias(identiteter: List): List = runBlocking { + getAlias(ANTALL_PARTISJONER, identiteter).alias +} \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HentData.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/HentData.kt similarity index 80% rename from apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HentData.kt rename to apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/HentData.kt index 99b22e69..abf38967 100644 --- a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/functions/HentData.kt +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/HentData.kt @@ -1,10 +1,9 @@ -package no.nav.paw.kafkakeymaintenance.functions +package no.nav.paw.kafkakeymaintenance.pdlprocessor.functions import no.nav.paw.kafkakeygenerator.client.LokaleAlias import no.nav.paw.kafkakeymaintenance.vo.Data import no.nav.person.pdl.aktor.v2.Aktor import no.nav.person.pdl.aktor.v2.Type -import org.apache.kafka.clients.consumer.ConsumerRecord fun hentData( hentAlias: (List) -> List, diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/HentPerioder.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/HentPerioder.kt new file mode 100644 index 00000000..533fd67e --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/pdlprocessor/functions/HentPerioder.kt @@ -0,0 +1,15 @@ +package no.nav.paw.kafkakeymaintenance.pdlprocessor.functions + +import no.nav.paw.kafkakeymaintenance.perioder.Perioder +import no.nav.paw.kafkakeymaintenance.vo.AvviksMelding +import no.nav.paw.kafkakeymaintenance.vo.AvvvikOgPerioder + +fun Perioder.hentPerioder(avviksMelding: AvviksMelding): AvvvikOgPerioder { + val identiteter = avviksMelding.lokaleAlias + .map { it.identitetsnummer } + + avviksMelding.pdlIdentitetsnummer + return AvvvikOgPerioder( + avviksMelding = avviksMelding, + perioder = get(identiteter.distinct()) + ) +} \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/perioder/PeriodeConsumer.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/perioder/PeriodeConsumer.kt new file mode 100644 index 00000000..82ac4a0c --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/perioder/PeriodeConsumer.kt @@ -0,0 +1,33 @@ +package no.nav.paw.kafkakeymaintenance.perioder + +import no.nav.paw.arbeidssokerregisteret.api.v1.Periode +import no.nav.paw.kafkakeymaintenance.kafka.TransactionContext +import no.nav.paw.kafkakeymaintenance.kafka.topic +import no.nav.paw.kafkakeymaintenance.kafka.updateHwm +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.jetbrains.exposed.sql.Transaction +import org.jetbrains.exposed.sql.transactions.transaction +import java.time.Instant + +fun Sequence>>.consume( + ctxFactory: Transaction.() -> TransactionContext +) { + forEach { batch -> + transaction { + val tx = ctxFactory() + batch.forEach { periodeRecord -> + val hwmValid = tx.updateHwm( + topic = topic(periodeRecord.topic()), + partition = periodeRecord.partition(), + offset = periodeRecord.offset(), + time = Instant.ofEpochMilli(periodeRecord.timestamp()), + lastUpdated = Instant.now() + ) + if (hwmValid) { + val rad = periodeRad(periodeRecord.value()) + tx.insertOrUpdate(rad) + } + } + } + } +} \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/resources/local/kafka_configuration.toml b/apps/kafka-key-maintenance/src/main/resources/local/kafka_configuration.toml new file mode 100644 index 00000000..3bf5dd12 --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/resources/local/kafka_configuration.toml @@ -0,0 +1 @@ +brokers="localhost:9092" diff --git a/apps/kafka-key-maintenance/src/main/resources/nais/aktor_topology_config.toml b/apps/kafka-key-maintenance/src/main/resources/nais/aktor_topology_config.toml new file mode 100644 index 00000000..e882686a --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/resources/nais/aktor_topology_config.toml @@ -0,0 +1,4 @@ +aktorTopic = "pdl.aktor-v2" +hendelseloggTopic = "${HENDELSELOGG_TOPIC}" +supressionDelay = "PT24H" +internval = "PT1H" \ No newline at end of file diff --git a/apps/kafka-key-maintenance/src/main/resources/nais/database_configuration.toml b/apps/kafka-key-maintenance/src/main/resources/nais/database_configuration.toml new file mode 100644 index 00000000..291c8994 --- /dev/null +++ b/apps/kafka-key-maintenance/src/main/resources/nais/database_configuration.toml @@ -0,0 +1,6 @@ +host = "${NAIS_DATABASE_PAW_KAFKA_KEY_MAINTENANCE_PAWKAFKAKEYSMAINTENANCEMAINTENANCE_HOST}" +port = "${NAIS_DATABASE_PAW_KAFKA_KEY_MAINTENANCE_PAWKAFKAKEYSMAINTENANCE_PORT}" +username = "${NAIS_DATABASE_PAW_KAFKA_KEY_MAINTENANCE_PAWKAFKAKEYSMAINTENANCE_USERNAME}" +password = "${NAIS_DATABASE_PAW_KAFKA_KEY_MAINTENANCE_PAWKAFKAKEYSMAINTENANCE_PASSWORD}" +name = "${NAIS_DATABASE_PAW_KAFKA_KEY_MAINTENANCE_PAWKAFKAKEYSMAINTENANCE_DATABASE}" +jdbc = "${NAIS_DATABASE_PAW_KAFKA_KEY_MAINTENANCE_PAWKAFKAKEYSMAINTENANCE_URL}" diff --git a/apps/kafka-key-maintenance/src/test/kotlin/no/nav/paw/kafkakeymaintenance/TopologyTest.kt b/apps/kafka-key-maintenance/src/test/kotlin/no/nav/paw/kafkakeymaintenance/TopologyTest.kt new file mode 100644 index 00000000..d97f18c2 --- /dev/null +++ b/apps/kafka-key-maintenance/src/test/kotlin/no/nav/paw/kafkakeymaintenance/TopologyTest.kt @@ -0,0 +1,150 @@ +package no.nav.paw.kafkakeymaintenance + +import io.kotest.core.spec.style.FreeSpec +import io.kotest.matchers.should +import io.kotest.matchers.shouldBe +import io.kotest.matchers.types.shouldBeInstanceOf +import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseSerde +import no.nav.paw.arbeidssokerregisteret.intern.v1.IdentitetsnummerSammenslaatt +import no.nav.paw.kafkakeygenerator.client.Alias +import no.nav.paw.kafkakeygenerator.client.LokaleAlias +import no.nav.paw.kafkakeymaintenance.pdlprocessor.AktorTopologyConfig +import no.nav.paw.kafkakeymaintenance.perioder.PeriodeRad +import no.nav.paw.kafkakeymaintenance.perioder.statiskePerioder +import no.nav.paw.test.kafkaStreamProperties +import no.nav.paw.test.minutes +import no.nav.paw.test.opprettSerde +import no.nav.person.pdl.aktor.v2.Aktor +import no.nav.person.pdl.aktor.v2.Identifikator +import no.nav.person.pdl.aktor.v2.Type +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.KeyValue +import org.apache.kafka.streams.TopologyTestDriver +import org.apache.kafka.streams.state.Stores +import java.time.Duration +import java.time.Instant.parse +import java.util.* + +class TopologyTest : FreeSpec({ + val testTime = parse("2021-12-03T12:00:00Z") + + "Test standard merge" { + val person1Alias = LokaleAlias( + periodePerson1.identitetsnummer, listOf( + Alias( + identitetsnummer = periodePerson1.identitetsnummer, + arbeidsoekerId = 0L, + recordKey = 0L, + partition = 0 + ) + ) + ) + val person2Alias = LokaleAlias( + periodePerson2.identitetsnummer, listOf( + Alias( + identitetsnummer = periodePerson2.identitetsnummer, + arbeidsoekerId = 1L, + recordKey = 1L, + partition = 1 + ) + ) + ) + val aliasMap = mapOf( + periodePerson1.identitetsnummer to person1Alias, + periodePerson2.identitetsnummer to person2Alias + ) + + fun hentAlias(identitetsnummer: List): List { + return identitetsnummer.mapNotNull { aliasMap[it] } + } + + val topology = initTopology( + stateStoreBuilderFactory = Stores::inMemoryKeyValueStore, + aktorTopologyConfig = aktorTopologyConfig, + perioder = perioder, + hentAlias = ::hentAlias, + aktorSerde = opprettSerde() + ) + val testDriver = TopologyTestDriver(topology, kafkaStreamProperties, testTime) + val aktorTopic = testDriver.createInputTopic( + aktorTopologyConfig.aktorTopic, + Serdes.StringSerde().serializer(), + opprettSerde().serializer() + ) + val hendelseloggTopic = testDriver.createOutputTopic( + aktorTopologyConfig.hendelseloggTopic, + Serdes.Long().deserializer(), + HendelseSerde().deserializer() + ) + aktorTopic.pipeInput( + "p1", + aktor( + id(identifikasjonsnummer = periodePerson1.identitetsnummer, gjeldende = true), + id(identifikasjonsnummer = periodePerson2.identitetsnummer, gjeldende = false) + ), + testTime + ) + hendelseloggTopic.isEmpty shouldBe true + testDriver.advanceWallClockTime(aktorTopologyConfig.supressionDelay - 1.minutes) + hendelseloggTopic.isEmpty shouldBe true + testDriver.advanceWallClockTime(5.minutes) + hendelseloggTopic.isEmpty shouldBe false + hendelseloggTopic.readKeyValuesToList() should { + it.size shouldBe 1 + it.first() should { (key, hendelse) -> + key shouldBe 1L + hendelse.identitetsnummer shouldBe periodePerson2.identitetsnummer + hendelse.shouldBeInstanceOf() + hendelse.flyttetTilArbeidssoekerId shouldBe 0L + hendelse.alleIdentitetsnummer shouldBe listOf(periodePerson2.identitetsnummer) + } + } + + } +}) + +operator fun KeyValue.component1(): K = key +operator fun KeyValue.component2(): V = value + +fun aktor( + vararg id: Triple +): Aktor = Aktor( + id.map { Identifikator(it.third, it.first, it.second) } +) + +fun id( + type: Type = Type.FOLKEREGISTERIDENT, + gjeldende: Boolean = false, + identifikasjonsnummer: String +): Triple = Triple(type, gjeldende, identifikasjonsnummer) + +val perioder = statiskePerioder( + mapOf( + periodePerson1.identitetsnummer to periodePerson1, + periodePerson2.identitetsnummer to periodePerson2 + ) +) + +val periodePerson1 + get() = PeriodeRad( + periodeId = UUID.randomUUID(), + identitetsnummer = "12345678901", + fra = parse("2021-09-01T12:00:00Z"), + til = null + ) + +val periodePerson2 + get() = PeriodeRad( + periodeId = UUID.randomUUID(), + identitetsnummer = "12345678902", + fra = parse("2021-09-11T12:00:00Z"), + til = parse("2021-09-27T12:00:00Z") + ) +val aktorTopologyConfig + get() = AktorTopologyConfig( + aktorTopic = "aktor_topic", + hendelseloggTopic = "hendelselogg_topic", + supressionDelay = Duration.ofHours(1), + interval = Duration.ofMinutes(1), + stateStoreName = "suppression_store" + )