From 59c0a4d2fd29399038e325624fc1e8eccfdb86bf Mon Sep 17 00:00:00 2001 From: Nils Martin Sande Date: Mon, 28 Oct 2024 20:39:09 +0100 Subject: [PATCH] =?UTF-8?q?Fikset=20h=C3=A5ndtering=20av=20batch=20size=20?= =?UTF-8?q?for=20merge=20h=C3=A5ndtering?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../no/nav/paw/kafkakeygenerator/KafkaKeys.kt | 6 +++-- .../kafkakeygenerator/ktor/helse_endepunkt.kt | 2 +- .../kafkakeygenerator/merge/MergeDetector.kt | 27 ++++++++++++------- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/KafkaKeys.kt b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/KafkaKeys.kt index 125f5a31..7c71202c 100644 --- a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/KafkaKeys.kt +++ b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/KafkaKeys.kt @@ -23,12 +23,14 @@ class KafkaKeys(private val database: Database) { .map { id -> id?.let(::ArbeidssoekerId) } .flatMap { id -> id?.let(::right) ?: left(Failure("database", FailureCode.DB_NOT_FOUND)) } - fun hent(arbeidssoekerIdRange: LongRange): Either> { + fun hent(currentPos: Long, maxSize: Int): Either> { return attempt { transaction(database) { IdentitetTabell .selectAll() - .where { IdentitetTabell.kafkaKey inList arbeidssoekerIdRange.toList() } + .where { IdentitetTabell.kafkaKey greaterEq currentPos and (IdentitetTabell.kafkaKey less (currentPos + maxSize)) } + .orderBy(column = IdentitetTabell.kafkaKey, order = SortOrder.DESC) + .limit(maxSize) .associate { Identitetsnummer(it[IdentitetTabell.identitetsnummer]) to ArbeidssoekerId(it[IdentitetTabell.kafkaKey]) } diff --git a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/ktor/helse_endepunkt.kt b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/ktor/helse_endepunkt.kt index f3167a97..d4ab9f81 100644 --- a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/ktor/helse_endepunkt.kt +++ b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/ktor/helse_endepunkt.kt @@ -26,7 +26,7 @@ fun Routing.konfigurereHelse( get("internal/mergeDetector") { call.respondText( mergeDetector - .findMerges(500) + .findMerges(900) .map { "Number of pending merges: ${it.size } "} .onRight { mergeLogger.info(it) } .onLeft { mergeLogger.error("Error: ${it.system}:${it.code}", it.exception) } diff --git a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/merge/MergeDetector.kt b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/merge/MergeDetector.kt index 4cba6a98..a7358069 100644 --- a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/merge/MergeDetector.kt +++ b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/merge/MergeDetector.kt @@ -11,16 +11,24 @@ class MergeDetector( private val pdlIdentitesTjeneste: PdlIdentitesTjeneste, private val kafkaKeys: KafkaKeys ) { - suspend fun findMerges(batchSize: Long): Either> { + suspend fun findMerges(batchSize: Int): Either> { require(batchSize > 0) { "Batch size must be greater than 0" } return kafkaKeys.hentSisteArbeidssoekerId() .map { it.value } - .map { (it - batchSize)..it } - .suspendingFlatMap { processRange(it, right(emptyList())) } + .suspendingFlatMap { max -> + processRange( + stopAt = max, + maxSize = batchSize, + currentPos = 0L, + right(emptyList()) + ) + } } tailrec suspend fun processRange( - range: LongRange, + stopAt: Long, + maxSize: Int, + currentPos: Long, results: Either> ): Either> { return when (results) { @@ -29,19 +37,20 @@ class MergeDetector( } is Right -> { - if (range.isEmpty()) { + if (currentPos >= stopAt) { results } else { - val detected = kafkaKeys.hent(range) + val storedData = kafkaKeys.hent(currentPos, maxSize) + val detected = storedData .suspendingFlatMap { pdlIdentitesTjeneste.hentIdenter(it.keys.toList()).map { res -> it to res } } .map { (local, pdl) -> detectMerges(local, pdl) }.map(results.right::plus) - val newStart = range.first - range.count() - val newEnd = range.first - processRange(newStart..