Skip to content

Commit

Permalink
Fikset håndtering av batch size for merge håndtering
Browse files Browse the repository at this point in the history
  • Loading branch information
nilsmsa committed Oct 28, 2024
1 parent 868e1ca commit 59c0a4d
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ class KafkaKeys(private val database: Database) {
.map { id -> id?.let(::ArbeidssoekerId) }
.flatMap { id -> id?.let(::right) ?: left(Failure("database", FailureCode.DB_NOT_FOUND)) }

fun hent(arbeidssoekerIdRange: LongRange): Either<Failure, Map<Identitetsnummer, ArbeidssoekerId>> {
fun hent(currentPos: Long, maxSize: Int): Either<Failure, Map<Identitetsnummer, ArbeidssoekerId>> {
return attempt {
transaction(database) {
IdentitetTabell
.selectAll()
.where { IdentitetTabell.kafkaKey inList arbeidssoekerIdRange.toList() }
.where { IdentitetTabell.kafkaKey greaterEq currentPos and (IdentitetTabell.kafkaKey less (currentPos + maxSize)) }
.orderBy(column = IdentitetTabell.kafkaKey, order = SortOrder.DESC)
.limit(maxSize)
.associate {
Identitetsnummer(it[IdentitetTabell.identitetsnummer]) to ArbeidssoekerId(it[IdentitetTabell.kafkaKey])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fun Routing.konfigurereHelse(
get("internal/mergeDetector") {
call.respondText(
mergeDetector
.findMerges(500)
.findMerges(900)
.map { "Number of pending merges: ${it.size } "}
.onRight { mergeLogger.info(it) }
.onLeft { mergeLogger.error("Error: ${it.system}:${it.code}", it.exception) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,24 @@ class MergeDetector(
private val pdlIdentitesTjeneste: PdlIdentitesTjeneste,
private val kafkaKeys: KafkaKeys
) {
suspend fun findMerges(batchSize: Long): Either<Failure, List<MergeDetected>> {
suspend fun findMerges(batchSize: Int): Either<Failure, List<MergeDetected>> {
require(batchSize > 0) { "Batch size must be greater than 0" }
return kafkaKeys.hentSisteArbeidssoekerId()
.map { it.value }
.map { (it - batchSize)..it }
.suspendingFlatMap { processRange(it, right(emptyList())) }
.suspendingFlatMap { max ->
processRange(
stopAt = max,
maxSize = batchSize,
currentPos = 0L,
right(emptyList())
)
}
}

tailrec suspend fun processRange(
range: LongRange,
stopAt: Long,
maxSize: Int,
currentPos: Long,
results: Either<Failure, List<MergeDetected>>
): Either<Failure, List<MergeDetected>> {
return when (results) {
Expand All @@ -29,19 +37,20 @@ class MergeDetector(
}

is Right -> {
if (range.isEmpty()) {
if (currentPos >= stopAt) {
results
} else {
val detected = kafkaKeys.hent(range)
val storedData = kafkaKeys.hent(currentPos, maxSize)
val detected = storedData
.suspendingFlatMap {
pdlIdentitesTjeneste.hentIdenter(it.keys.toList()).map { res -> it to res }
}
.map { (local, pdl) ->
detectMerges(local, pdl)
}.map(results.right::plus)
val newStart = range.first - range.count()
val newEnd = range.first
processRange(newStart..<newEnd, detected)
val newStart =
storedData.fold({ -1L }, { it.values.maxOfOrNull(ArbeidssoekerId::value)?.plus(1) ?: -1 })
processRange(stopAt, maxSize, newStart, detected)
}
}
}
Expand Down

0 comments on commit 59c0a4d

Please sign in to comment.