From 8314cbd347e09fec69edf489cf226dfd8ecf20da Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Wed, 20 Nov 2024 08:55:05 +0100 Subject: [PATCH] =?UTF-8?q?Lagt=20til=20validering=20av=20arbeidss=C3=B8ke?= =?UTF-8?q?rIder?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../no/nav/paw/kafkakeygenerator/Application.kt | 3 ++- .../repository/IdentitetRepository.kt | 9 --------- .../repository/KafkaKeysRepository.kt | 7 +++++++ .../service/KafkaConsumerService.kt | 17 +++++++++++++++++ .../service/KafkaConsumerServiceTest.kt | 4 ++-- .../nav/paw/kafkakeygenerator/test/TestData.kt | 12 ------------ 6 files changed, 28 insertions(+), 24 deletions(-) diff --git a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/Application.kt b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/Application.kt index fbe8e57d..a2797694 100644 --- a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/Application.kt +++ b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/Application.kt @@ -21,7 +21,6 @@ import no.nav.paw.kafkakeygenerator.config.KAFKA_TOPOLOGY_CONFIG import no.nav.paw.kafkakeygenerator.config.KafkaTopologyConfig import no.nav.paw.kafkakeygenerator.config.PDL_CLIENT_CONFIG import no.nav.paw.kafkakeygenerator.config.PdlClientConfig -import no.nav.paw.kafkakeygenerator.utils.createDataSource import no.nav.paw.kafkakeygenerator.merge.MergeDetector import no.nav.paw.kafkakeygenerator.plugin.configSerialization import no.nav.paw.kafkakeygenerator.plugin.configureAuthentication @@ -37,6 +36,7 @@ import no.nav.paw.kafkakeygenerator.repository.KafkaKeysRepository import no.nav.paw.kafkakeygenerator.service.KafkaConsumerService import no.nav.paw.kafkakeygenerator.service.KafkaKeysService import no.nav.paw.kafkakeygenerator.service.PdlService +import no.nav.paw.kafkakeygenerator.utils.createDataSource import no.nav.paw.kafkakeygenerator.utils.createPdlClient import no.nav.paw.pdl.PdlClient import org.apache.kafka.common.serialization.LongDeserializer @@ -69,6 +69,7 @@ fun startApplication( healthIndicatorRepository, prometheusMeterRegistry, identitetRepository, + kafkaKeysRepository, kafkaKeysAuditRepository ) val pdlService = PdlService(pdlClient) diff --git a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/repository/IdentitetRepository.kt b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/repository/IdentitetRepository.kt index c0a08a73..dfb24ce0 100644 --- a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/repository/IdentitetRepository.kt +++ b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/repository/IdentitetRepository.kt @@ -21,15 +21,6 @@ class IdentitetRepository( } } - fun find(arbeidssoekerId: ArbeidssoekerId): Pair? = transaction(database) { - IdentitetTabell.selectAll() - .where { IdentitetTabell.kafkaKey eq arbeidssoekerId.value } - .singleOrNull() - ?.let { - Identitetsnummer(it[IdentitetTabell.identitetsnummer]) to ArbeidssoekerId(it[IdentitetTabell.kafkaKey]) - } - } - fun insert( ident: Identitetsnummer, arbeidssoekerId: ArbeidssoekerId diff --git a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/repository/KafkaKeysRepository.kt b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/repository/KafkaKeysRepository.kt index 3a17abba..0fd2ba8d 100644 --- a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/repository/KafkaKeysRepository.kt +++ b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/repository/KafkaKeysRepository.kt @@ -22,6 +22,13 @@ import org.jetbrains.exposed.sql.transactions.transaction class KafkaKeysRepository(private val database: Database) { + fun find(arbeidssoekerId: ArbeidssoekerId): ArbeidssoekerId? = + transaction(database) { + KafkaKeysTabell.selectAll() + .where { KafkaKeysTabell.id eq arbeidssoekerId.value } + .singleOrNull()?.let { ArbeidssoekerId(it[KafkaKeysTabell.id]) } + } + fun hentSisteArbeidssoekerId(): Either = attempt { transaction(database) { diff --git a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/service/KafkaConsumerService.kt b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/service/KafkaConsumerService.kt index 6d6d4e4a..b3a26077 100644 --- a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/service/KafkaConsumerService.kt +++ b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/service/KafkaConsumerService.kt @@ -12,8 +12,10 @@ import no.nav.paw.health.model.ReadinessHealthIndicator import no.nav.paw.health.repository.HealthIndicatorRepository import no.nav.paw.kafkakeygenerator.repository.IdentitetRepository import no.nav.paw.kafkakeygenerator.repository.KafkaKeysAuditRepository +import no.nav.paw.kafkakeygenerator.repository.KafkaKeysRepository import no.nav.paw.kafkakeygenerator.utils.buildErrorLogger import no.nav.paw.kafkakeygenerator.utils.buildLogger +import no.nav.paw.kafkakeygenerator.utils.countKafkaFailed import no.nav.paw.kafkakeygenerator.utils.countKafkaIgnored import no.nav.paw.kafkakeygenerator.utils.countKafkaInserted import no.nav.paw.kafkakeygenerator.utils.countKafkaProcessed @@ -33,6 +35,7 @@ class KafkaConsumerService( private val healthIndicatorRepository: HealthIndicatorRepository, private val meterRegistry: MeterRegistry, private val identitetRepository: IdentitetRepository, + private val kafkaKeysRepository: KafkaKeysRepository, private val kafkaKeysAuditRepository: KafkaKeysAuditRepository, ) { private val logger = buildLogger @@ -82,6 +85,20 @@ class KafkaConsumerService( fraArbeidssoekerId: ArbeidssoekerId, tilArbeidssoekerId: ArbeidssoekerId ) { + kafkaKeysRepository.find(fraArbeidssoekerId).let { arbeidssoekerId -> + if (arbeidssoekerId == null) { + meterRegistry.countKafkaFailed() + throw IllegalStateException("ArbeidssøkerId ikke funnet") + } + } + + kafkaKeysRepository.find(tilArbeidssoekerId).let { arbeidssoekerId -> + if (arbeidssoekerId == null) { + meterRegistry.countKafkaFailed() + throw IllegalStateException("ArbeidssøkerId ikke funnet") + } + } + transaction(database) { identitetsnummerSet.forEach { identitetsnummer -> val kafkaKey = identitetRepository.find(identitetsnummer) diff --git a/apps/kafka-key-generator/src/test/kotlin/no/nav/paw/kafkakeygenerator/service/KafkaConsumerServiceTest.kt b/apps/kafka-key-generator/src/test/kotlin/no/nav/paw/kafkakeygenerator/service/KafkaConsumerServiceTest.kt index d95b84aa..744389ac 100644 --- a/apps/kafka-key-generator/src/test/kotlin/no/nav/paw/kafkakeygenerator/service/KafkaConsumerServiceTest.kt +++ b/apps/kafka-key-generator/src/test/kotlin/no/nav/paw/kafkakeygenerator/service/KafkaConsumerServiceTest.kt @@ -41,6 +41,7 @@ class KafkaConsumerServiceTest : FreeSpec({ meterRegistry = LoggingMeterRegistry(), healthIndicatorRepository = healthIndicatorRepository, identitetRepository = IdentitetRepository(database), + kafkaKeysRepository = kafkaKeysRepository, kafkaKeysAuditRepository = kafkaKeysAuditRepository ) } @@ -56,8 +57,7 @@ class KafkaConsumerServiceTest : FreeSpec({ TestData.getPeriodeStartet(identitetsnummer, arbeidssoekerId), TestData.getPeriodeAvsluttet(identitetsnummer, arbeidssoekerId), TestData.getPeriodeStartAvvist(identitetsnummer, arbeidssoekerId), - TestData.getPeriodeAvsluttetAvvist(identitetsnummer, arbeidssoekerId), - TestData.getIdentitetsnummerOpphoert(identitetsnummer, arbeidssoekerId) + TestData.getPeriodeAvsluttetAvvist(identitetsnummer, arbeidssoekerId) ) kafkaConsumerService.handleRecords(hendelser.asConsumerRecords()) diff --git a/apps/kafka-key-generator/src/test/kotlin/no/nav/paw/kafkakeygenerator/test/TestData.kt b/apps/kafka-key-generator/src/test/kotlin/no/nav/paw/kafkakeygenerator/test/TestData.kt index 87977de8..0cb6a204 100644 --- a/apps/kafka-key-generator/src/test/kotlin/no/nav/paw/kafkakeygenerator/test/TestData.kt +++ b/apps/kafka-key-generator/src/test/kotlin/no/nav/paw/kafkakeygenerator/test/TestData.kt @@ -13,7 +13,6 @@ import no.nav.paw.arbeidssokerregisteret.intern.v1.Avsluttet import no.nav.paw.arbeidssokerregisteret.intern.v1.Avvist import no.nav.paw.arbeidssokerregisteret.intern.v1.AvvistStoppAvPeriode import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse -import no.nav.paw.arbeidssokerregisteret.intern.v1.IdentitetsnummerOpphoert import no.nav.paw.arbeidssokerregisteret.intern.v1.IdentitetsnummerSammenslaatt import no.nav.paw.arbeidssokerregisteret.intern.v1.Startet import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Bruker @@ -175,17 +174,6 @@ object TestData { flyttetTilArbeidssoekerId = tilArbeidssoekerId.value, ) - fun getIdentitetsnummerOpphoert( - identitetsnummer: Identitetsnummer, - arbeidssoekerId: ArbeidssoekerId - ): IdentitetsnummerOpphoert = IdentitetsnummerOpphoert( - id = arbeidssoekerId.value, - hendelseId = UUID.randomUUID(), - identitetsnummer = identitetsnummer.value, - metadata = getMetadata(), - alleIdentitetsnummer = listOf(identitetsnummer.value) - ) - fun getPeriodeStartet( identitetsnummer: Identitetsnummer, arbeidssoekerId: ArbeidssoekerId