From 1138866f68fe3b8d1734254eda3db506aecb139c Mon Sep 17 00:00:00 2001 From: Nils Martin Sande Date: Tue, 29 Oct 2024 06:33:38 +0100 Subject: [PATCH] Tester ut async merge detector --- .../kafkakeygenerator/ktor/helse_endepunkt.kt | 33 ++++++++++++++----- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/ktor/helse_endepunkt.kt b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/ktor/helse_endepunkt.kt index 1ebc0561..346b786e 100644 --- a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/ktor/helse_endepunkt.kt +++ b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/ktor/helse_endepunkt.kt @@ -6,8 +6,16 @@ import io.ktor.server.response.respondText import io.ktor.server.routing.Routing import io.ktor.server.routing.get import io.micrometer.prometheusmetrics.PrometheusMeterRegistry +import kotlinx.coroutines.runBlocking +import no.nav.paw.kafkakeygenerator.Either +import no.nav.paw.kafkakeygenerator.Failure import no.nav.paw.kafkakeygenerator.merge.MergeDetector import org.slf4j.LoggerFactory +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletableFuture.supplyAsync +import java.util.concurrent.atomic.AtomicReference + +private val task = AtomicReference>?>(null) fun Routing.konfigurereHelse( prometheusMeterRegistry: PrometheusMeterRegistry, @@ -24,13 +32,22 @@ fun Routing.konfigurereHelse( call.respond(prometheusMeterRegistry.scrape()) } get("internal/mergeDetector") { - call.respondText( - mergeDetector - .findMerges(900) - .map { "Number of pending merges: $it "} - .onRight { mergeLogger.info(it) } - .onLeft { mergeLogger.error("Error: ${it.system}:${it.code}", it.exception) } - .fold( { "Error: ${it.system}:${it.code}" }, { it } ) - ) + val t = task.get() + if (t == null) { + task.set(supplyAsync { runBlocking { mergeDetector.findMerges(900) } }) + call.respondText("Merge detection started") + } else { + if (t.isDone) { + call.respondText( + t.get() + .map { "Number of pending merges: $it "} + .onRight { mergeLogger.info(it) } + .onLeft { mergeLogger.error("Error: ${it.system}:${it.code}", it.exception) } + .fold( { "Error: ${it.system}:${it.code}" }, { it } ) + ) + } else { + call.respondText("Merge detection in progress") + } + } } }