diff --git a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/KafkaKeys.kt b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/KafkaKeys.kt index 125f5a31..7c71202c 100644 --- a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/KafkaKeys.kt +++ b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/KafkaKeys.kt @@ -23,12 +23,14 @@ class KafkaKeys(private val database: Database) { .map { id -> id?.let(::ArbeidssoekerId) } .flatMap { id -> id?.let(::right) ?: left(Failure("database", FailureCode.DB_NOT_FOUND)) } - fun hent(arbeidssoekerIdRange: LongRange): Either> { + fun hent(currentPos: Long, maxSize: Int): Either> { return attempt { transaction(database) { IdentitetTabell .selectAll() - .where { IdentitetTabell.kafkaKey inList arbeidssoekerIdRange.toList() } + .where { IdentitetTabell.kafkaKey greaterEq currentPos and (IdentitetTabell.kafkaKey less (currentPos + maxSize)) } + .orderBy(column = IdentitetTabell.kafkaKey, order = SortOrder.DESC) + .limit(maxSize) .associate { Identitetsnummer(it[IdentitetTabell.identitetsnummer]) to ArbeidssoekerId(it[IdentitetTabell.kafkaKey]) } diff --git a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/ktor/helse_endepunkt.kt b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/ktor/helse_endepunkt.kt index f3167a97..d4ab9f81 100644 --- a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/ktor/helse_endepunkt.kt +++ b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/ktor/helse_endepunkt.kt @@ -26,7 +26,7 @@ fun Routing.konfigurereHelse( get("internal/mergeDetector") { call.respondText( mergeDetector - .findMerges(500) + .findMerges(900) .map { "Number of pending merges: ${it.size } "} .onRight { mergeLogger.info(it) } .onLeft { mergeLogger.error("Error: ${it.system}:${it.code}", it.exception) } diff --git a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/merge/MergeDetector.kt b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/merge/MergeDetector.kt index 4cba6a98..a7358069 100644 --- a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/merge/MergeDetector.kt +++ b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/merge/MergeDetector.kt @@ -11,16 +11,24 @@ class MergeDetector( private val pdlIdentitesTjeneste: PdlIdentitesTjeneste, private val kafkaKeys: KafkaKeys ) { - suspend fun findMerges(batchSize: Long): Either> { + suspend fun findMerges(batchSize: Int): Either> { require(batchSize > 0) { "Batch size must be greater than 0" } return kafkaKeys.hentSisteArbeidssoekerId() .map { it.value } - .map { (it - batchSize)..it } - .suspendingFlatMap { processRange(it, right(emptyList())) } + .suspendingFlatMap { max -> + processRange( + stopAt = max, + maxSize = batchSize, + currentPos = 0L, + right(emptyList()) + ) + } } tailrec suspend fun processRange( - range: LongRange, + stopAt: Long, + maxSize: Int, + currentPos: Long, results: Either> ): Either> { return when (results) { @@ -29,19 +37,20 @@ class MergeDetector( } is Right -> { - if (range.isEmpty()) { + if (currentPos >= stopAt) { results } else { - val detected = kafkaKeys.hent(range) + val storedData = kafkaKeys.hent(currentPos, maxSize) + val detected = storedData .suspendingFlatMap { pdlIdentitesTjeneste.hentIdenter(it.keys.toList()).map { res -> it to res } } .map { (local, pdl) -> detectMerges(local, pdl) }.map(results.right::plus) - val newStart = range.first - range.count() - val newEnd = range.first - processRange(newStart..