Skip to content

Commit

Permalink
Lagt til validering av arbeidssøkerIder
Browse files Browse the repository at this point in the history
  • Loading branch information
naviktthomas committed Nov 20, 2024
1 parent 8ec0b46 commit 8314cbd
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import no.nav.paw.kafkakeygenerator.config.KAFKA_TOPOLOGY_CONFIG
import no.nav.paw.kafkakeygenerator.config.KafkaTopologyConfig
import no.nav.paw.kafkakeygenerator.config.PDL_CLIENT_CONFIG
import no.nav.paw.kafkakeygenerator.config.PdlClientConfig
import no.nav.paw.kafkakeygenerator.utils.createDataSource
import no.nav.paw.kafkakeygenerator.merge.MergeDetector
import no.nav.paw.kafkakeygenerator.plugin.configSerialization
import no.nav.paw.kafkakeygenerator.plugin.configureAuthentication
Expand All @@ -37,6 +36,7 @@ import no.nav.paw.kafkakeygenerator.repository.KafkaKeysRepository
import no.nav.paw.kafkakeygenerator.service.KafkaConsumerService
import no.nav.paw.kafkakeygenerator.service.KafkaKeysService
import no.nav.paw.kafkakeygenerator.service.PdlService
import no.nav.paw.kafkakeygenerator.utils.createDataSource
import no.nav.paw.kafkakeygenerator.utils.createPdlClient
import no.nav.paw.pdl.PdlClient
import org.apache.kafka.common.serialization.LongDeserializer
Expand Down Expand Up @@ -69,6 +69,7 @@ fun startApplication(
healthIndicatorRepository,
prometheusMeterRegistry,
identitetRepository,
kafkaKeysRepository,
kafkaKeysAuditRepository
)
val pdlService = PdlService(pdlClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,6 @@ class IdentitetRepository(
}
}

fun find(arbeidssoekerId: ArbeidssoekerId): Pair<Identitetsnummer, ArbeidssoekerId>? = transaction(database) {
IdentitetTabell.selectAll()
.where { IdentitetTabell.kafkaKey eq arbeidssoekerId.value }
.singleOrNull()
?.let {
Identitetsnummer(it[IdentitetTabell.identitetsnummer]) to ArbeidssoekerId(it[IdentitetTabell.kafkaKey])
}
}

fun insert(
ident: Identitetsnummer,
arbeidssoekerId: ArbeidssoekerId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ import org.jetbrains.exposed.sql.transactions.transaction

class KafkaKeysRepository(private val database: Database) {

fun find(arbeidssoekerId: ArbeidssoekerId): ArbeidssoekerId? =
transaction(database) {
KafkaKeysTabell.selectAll()
.where { KafkaKeysTabell.id eq arbeidssoekerId.value }
.singleOrNull()?.let { ArbeidssoekerId(it[KafkaKeysTabell.id]) }
}

fun hentSisteArbeidssoekerId(): Either<Failure, ArbeidssoekerId> =
attempt {
transaction(database) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import no.nav.paw.health.model.ReadinessHealthIndicator
import no.nav.paw.health.repository.HealthIndicatorRepository
import no.nav.paw.kafkakeygenerator.repository.IdentitetRepository
import no.nav.paw.kafkakeygenerator.repository.KafkaKeysAuditRepository
import no.nav.paw.kafkakeygenerator.repository.KafkaKeysRepository
import no.nav.paw.kafkakeygenerator.utils.buildErrorLogger
import no.nav.paw.kafkakeygenerator.utils.buildLogger
import no.nav.paw.kafkakeygenerator.utils.countKafkaFailed
import no.nav.paw.kafkakeygenerator.utils.countKafkaIgnored
import no.nav.paw.kafkakeygenerator.utils.countKafkaInserted
import no.nav.paw.kafkakeygenerator.utils.countKafkaProcessed
Expand All @@ -33,6 +35,7 @@ class KafkaConsumerService(
private val healthIndicatorRepository: HealthIndicatorRepository,
private val meterRegistry: MeterRegistry,
private val identitetRepository: IdentitetRepository,
private val kafkaKeysRepository: KafkaKeysRepository,
private val kafkaKeysAuditRepository: KafkaKeysAuditRepository,
) {
private val logger = buildLogger
Expand Down Expand Up @@ -82,6 +85,20 @@ class KafkaConsumerService(
fraArbeidssoekerId: ArbeidssoekerId,
tilArbeidssoekerId: ArbeidssoekerId
) {
kafkaKeysRepository.find(fraArbeidssoekerId).let { arbeidssoekerId ->
if (arbeidssoekerId == null) {
meterRegistry.countKafkaFailed()
throw IllegalStateException("ArbeidssøkerId ikke funnet")
}
}

kafkaKeysRepository.find(tilArbeidssoekerId).let { arbeidssoekerId ->
if (arbeidssoekerId == null) {
meterRegistry.countKafkaFailed()
throw IllegalStateException("ArbeidssøkerId ikke funnet")
}
}

transaction(database) {
identitetsnummerSet.forEach { identitetsnummer ->
val kafkaKey = identitetRepository.find(identitetsnummer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class KafkaConsumerServiceTest : FreeSpec({
meterRegistry = LoggingMeterRegistry(),
healthIndicatorRepository = healthIndicatorRepository,
identitetRepository = IdentitetRepository(database),
kafkaKeysRepository = kafkaKeysRepository,
kafkaKeysAuditRepository = kafkaKeysAuditRepository
)
}
Expand All @@ -56,8 +57,7 @@ class KafkaConsumerServiceTest : FreeSpec({
TestData.getPeriodeStartet(identitetsnummer, arbeidssoekerId),
TestData.getPeriodeAvsluttet(identitetsnummer, arbeidssoekerId),
TestData.getPeriodeStartAvvist(identitetsnummer, arbeidssoekerId),
TestData.getPeriodeAvsluttetAvvist(identitetsnummer, arbeidssoekerId),
TestData.getIdentitetsnummerOpphoert(identitetsnummer, arbeidssoekerId)
TestData.getPeriodeAvsluttetAvvist(identitetsnummer, arbeidssoekerId)
)

kafkaConsumerService.handleRecords(hendelser.asConsumerRecords())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import no.nav.paw.arbeidssokerregisteret.intern.v1.Avsluttet
import no.nav.paw.arbeidssokerregisteret.intern.v1.Avvist
import no.nav.paw.arbeidssokerregisteret.intern.v1.AvvistStoppAvPeriode
import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse
import no.nav.paw.arbeidssokerregisteret.intern.v1.IdentitetsnummerOpphoert
import no.nav.paw.arbeidssokerregisteret.intern.v1.IdentitetsnummerSammenslaatt
import no.nav.paw.arbeidssokerregisteret.intern.v1.Startet
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Bruker
Expand Down Expand Up @@ -175,17 +174,6 @@ object TestData {
flyttetTilArbeidssoekerId = tilArbeidssoekerId.value,
)

fun getIdentitetsnummerOpphoert(
identitetsnummer: Identitetsnummer,
arbeidssoekerId: ArbeidssoekerId
): IdentitetsnummerOpphoert = IdentitetsnummerOpphoert(
id = arbeidssoekerId.value,
hendelseId = UUID.randomUUID(),
identitetsnummer = identitetsnummer.value,
metadata = getMetadata(),
alleIdentitetsnummer = listOf(identitetsnummer.value)
)

fun getPeriodeStartet(
identitetsnummer: Identitetsnummer,
arbeidssoekerId: ArbeidssoekerId
Expand Down

0 comments on commit 8314cbd

Please sign in to comment.