Skip to content

Commit

Permalink
Tester ut async merge detector
Browse files Browse the repository at this point in the history
  • Loading branch information
nilsmsa committed Oct 29, 2024
1 parent dbdcdeb commit 1138866
Showing 1 changed file with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,16 @@ import io.ktor.server.response.respondText
import io.ktor.server.routing.Routing
import io.ktor.server.routing.get
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import kotlinx.coroutines.runBlocking
import no.nav.paw.kafkakeygenerator.Either
import no.nav.paw.kafkakeygenerator.Failure
import no.nav.paw.kafkakeygenerator.merge.MergeDetector
import org.slf4j.LoggerFactory
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletableFuture.supplyAsync
import java.util.concurrent.atomic.AtomicReference

private val task = AtomicReference<CompletableFuture<Either<Failure, Long>>?>(null)

fun Routing.konfigurereHelse(
prometheusMeterRegistry: PrometheusMeterRegistry,
Expand All @@ -24,13 +32,22 @@ fun Routing.konfigurereHelse(
call.respond(prometheusMeterRegistry.scrape())
}
get("internal/mergeDetector") {
call.respondText(
mergeDetector
.findMerges(900)
.map { "Number of pending merges: $it "}
.onRight { mergeLogger.info(it) }
.onLeft { mergeLogger.error("Error: ${it.system}:${it.code}", it.exception) }
.fold( { "Error: ${it.system}:${it.code}" }, { it } )
)
val t = task.get()
if (t == null) {
task.set(supplyAsync { runBlocking { mergeDetector.findMerges(900) } })
call.respondText("Merge detection started")
} else {
if (t.isDone) {
call.respondText(
t.get()
.map { "Number of pending merges: $it "}
.onRight { mergeLogger.info(it) }
.onLeft { mergeLogger.error("Error: ${it.system}:${it.code}", it.exception) }
.fold( { "Error: ${it.system}:${it.code}" }, { it } )
)
} else {
call.respondText("Merge detection in progress")
}
}
}
}

0 comments on commit 1138866

Please sign in to comment.