Skip to content

Commit

Permalink
La til støtte for å trigge telling av utestående id merges
Browse files Browse the repository at this point in the history
  • Loading branch information
nilsmsa committed Oct 28, 2024
1 parent 589d9b7 commit 8788d7a
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import no.nav.paw.kafkakeygenerator.config.dataSource
import no.nav.paw.kafkakeygenerator.config.lastKonfigurasjon
import no.nav.paw.kafkakeygenerator.database.flywayMigrate
import no.nav.paw.kafkakeygenerator.ktor.initKtorServer
import no.nav.paw.kafkakeygenerator.merge.MergeDetector
import no.nav.paw.kafkakeygenerator.pdl.PdlIdentitesTjeneste
import no.nav.paw.kafkakeygenerator.pdl.opprettPdlKlient
import no.nav.paw.pdl.PdlClient
Expand Down Expand Up @@ -42,12 +43,20 @@ fun startApplikasjon(
val database = Database.connect(dataSource)
val prometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
flywayMigrate(dataSource)
val kafkaKeysDbTjeneste = KafkaKeys(database)
val pdlIdTjeneste = PdlIdentitesTjeneste(pdlKlient)
val applikasjon = Applikasjon(
kafkaKeysDbTjeneste,
pdlIdTjeneste
)
val mergeDetector = MergeDetector(
pdlIdTjeneste,
kafkaKeysDbTjeneste
)
initKtorServer(
autentiseringKonfig,
prometheusMeterRegistry,
Applikasjon(
KafkaKeys(database),
PdlIdentitesTjeneste(pdlKlient)
)
applikasjon,
mergeDetector
).start(wait = true)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@ import org.jetbrains.exposed.sql.transactions.transaction

class KafkaKeys(private val database: Database) {

fun hentSisteArbeidssoekerId(): Either<Failure, ArbeidssoekerId> =
attempt {
transaction(database) {
IdentitetTabell
.selectAll()
.orderBy(IdentitetTabell.kafkaKey, SortOrder.DESC)
.firstOrNull()?.get(IdentitetTabell.kafkaKey)
}
}.mapToFailure { exception ->
Failure("database", FailureCode.INTERNAL_TECHINCAL_ERROR, exception)
}
.map { id -> id?.let(::ArbeidssoekerId) }
.flatMap { id -> id?.let(::right) ?: left(Failure("database", FailureCode.DB_NOT_FOUND)) }

fun hent(arbeidssoekerIdRange: LongRange): Either<Failure, Map<Identitetsnummer, ArbeidssoekerId>> {
return attempt {
transaction(database) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@ import io.ktor.server.response.respondText
import io.ktor.server.routing.Routing
import io.ktor.server.routing.get
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.kafkakeygenerator.merge.MergeDetector
import org.slf4j.LoggerFactory

fun Routing.konfigurereHelse(prometheusMeterRegistry: PrometheusMeterRegistry) {
fun Routing.konfigurereHelse(
prometheusMeterRegistry: PrometheusMeterRegistry,
mergeDetector: MergeDetector
) {
val mergeLogger = LoggerFactory.getLogger("MergeDetector")
get("/internal/isAlive") {
call.respondText("ALIVE")
}
Expand All @@ -17,4 +23,14 @@ fun Routing.konfigurereHelse(prometheusMeterRegistry: PrometheusMeterRegistry) {
get("/internal/metrics") {
call.respond(prometheusMeterRegistry.scrape())
}
get("internal/mergeDetector") {
call.respondText(
mergeDetector
.findMerges(1000)
.map { "Number of pending merges: ${it.size } "}
.onRight { mergeLogger.info(it) }
.onLeft { mergeLogger.error("Error: ${it.system}:${it.code}", it.exception) }
.fold( { "Error: ${it.system}:${it.code}" }, { it } )
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import io.ktor.server.netty.Netty
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import no.nav.paw.kafkakeygenerator.Applikasjon
import no.nav.paw.kafkakeygenerator.config.Autentiseringskonfigurasjon
import no.nav.paw.kafkakeygenerator.merge.MergeDetector

fun initKtorServer(
autentiseringKonfigurasjon: Autentiseringskonfigurasjon,
prometheusMeterRegistry: PrometheusMeterRegistry,
applikasjon: Applikasjon
applikasjon: Applikasjon,
mergeDetector: MergeDetector
) = embeddedServer(
factory = Netty,
port = 8080,
Expand All @@ -19,5 +21,10 @@ fun initKtorServer(
callGroupSize = 16
}
) {
konfigurerServer(autentiseringKonfigurasjon, prometheusMeterRegistry, applikasjon)
konfigurerServer(
autentiseringKonfigurasjon = autentiseringKonfigurasjon,
prometheusMeterRegistry = prometheusMeterRegistry,
applikasjon = applikasjon,
mergeDetector = mergeDetector
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import no.nav.paw.kafkakeygenerator.api.recordkey.configureRecordKeyApi
import no.nav.paw.kafkakeygenerator.api.v2.konfigurerApiV2
import no.nav.paw.kafkakeygenerator.config.Autentiseringskonfigurasjon
import no.nav.paw.kafkakeygenerator.masker
import no.nav.paw.kafkakeygenerator.merge.MergeDetector
import no.nav.security.token.support.v2.IssuerConfig
import no.nav.security.token.support.v2.RequiredClaims
import no.nav.security.token.support.v2.TokenSupportConfig
Expand All @@ -35,15 +36,19 @@ import java.time.Duration
fun Application.konfigurerServer(
autentiseringKonfigurasjon: Autentiseringskonfigurasjon,
prometheusMeterRegistry: PrometheusMeterRegistry,
applikasjon: Applikasjon
applikasjon: Applikasjon,
mergeDetector: MergeDetector
) {
autentisering(autentiseringKonfigurasjon)
micrometerMetrics(prometheusMeterRegistry)
configureLogging()
serialisering()
statusPages()
routing {
konfigurereHelse(prometheusMeterRegistry)
konfigurereHelse(
prometheusMeterRegistry = prometheusMeterRegistry,
mergeDetector = mergeDetector
)
konfigurerApiV2(autentiseringKonfigurasjon, applikasjon)
configureRecordKeyApi(autentiseringKonfigurasjon, applikasjon)
swaggerUI(path = "docs", swaggerFile = "openapi/documentation.yaml")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package no.nav.paw.kafkakeygenerator.merge

import no.nav.paw.kafkakeygenerator.*
import no.nav.paw.kafkakeygenerator.mergedetector.vo.MergeDetected
import no.nav.paw.kafkakeygenerator.pdl.PdlIdentitesTjeneste
import no.nav.paw.kafkakeygenerator.vo.ArbeidssoekerId
import no.nav.paw.kafkakeygenerator.vo.Identitetsnummer
import no.nav.paw.pdl.graphql.generated.hentidenter.IdentInformasjon

class MergeDetector(
private val pdlIdentitesTjeneste: PdlIdentitesTjeneste,
private val kafkaKeys: KafkaKeys
) {
suspend fun findMerges(batchSize: Long): Either<Failure, List<MergeDetected>> {
require(batchSize > 0) { "Batch size must be greater than 0" }
return kafkaKeys.hentSisteArbeidssoekerId()
.map { it.value }
.map { (it - batchSize)..it }
.suspendingFlatMap { processRange(it, right(emptyList())) }
}

tailrec suspend fun processRange(
range: LongRange,
results: Either<Failure, List<MergeDetected>>
): Either<Failure, List<MergeDetected>> {
return when (results) {
is Left -> {
return results
}

is Right -> {
if (range.isEmpty()) {
results
} else {
val detected = kafkaKeys.hent(range)
.suspendingFlatMap {
pdlIdentitesTjeneste.hentIdenter(it.keys.toList()).map { res -> it to res }
}
.map { (local, pdl) ->
detectMerges(local, pdl)
}.map(results.right::plus)
val newStart = range.first - range.count()
val newEnd = range.first
processRange(newStart..<newEnd, detected)
}
}
}
}
}

fun detectMerges(
local: Map<Identitetsnummer, ArbeidssoekerId>,
pdl: Map<String, List<IdentInformasjon>>
): List<MergeDetected> {
return pdl.asSequence()
.mapNotNull { (searchedId, resultIds) ->
val arbIds = resultIds
.map { Identitetsnummer(it.ident) }
.mapNotNull { pdlId ->
local[pdlId]?.let { pdlId to it }
}
if (arbIds.map { (_, arbId) -> arbId }.distinct().size > 1) {
MergeDetected(
id = Identitetsnummer(searchedId),
map = arbIds
.groupBy { (_, arbId) -> arbId }
.mapValues { (_, value) -> value.map { (pdlId, _) -> pdlId } }
)
} else {
null
}
}.toList()
}

suspend fun <L, R, A> Either<L, R>.suspendingFlatMap(f: suspend (R) -> Either<L, A>): Either<L, A> =
when (this) {
is Right -> f(this.right)
is Left -> this
}







0 comments on commit 8788d7a

Please sign in to comment.