diff --git a/apps/kafka-key-generator/nais/nais-prod.yaml b/apps/kafka-key-generator/nais/nais-prod.yaml index 25c0a00c..ebeaae13 100644 --- a/apps/kafka-key-generator/nais/nais-prod.yaml +++ b/apps/kafka-key-generator/nais/nais-prod.yaml @@ -13,10 +13,10 @@ spec: max: 2 resources: limits: - memory: 512Mi + memory: 4096Mi requests: cpu: 50m - memory: 256Mi + memory: 4096Mi liveness: path: /internal/isAlive initialDelay: 10 diff --git a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/KafkaKeys.kt b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/KafkaKeys.kt index 7c71202c..70311b86 100644 --- a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/KafkaKeys.kt +++ b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/KafkaKeys.kt @@ -29,7 +29,7 @@ class KafkaKeys(private val database: Database) { IdentitetTabell .selectAll() .where { IdentitetTabell.kafkaKey greaterEq currentPos and (IdentitetTabell.kafkaKey less (currentPos + maxSize)) } - .orderBy(column = IdentitetTabell.kafkaKey, order = SortOrder.DESC) + .orderBy(column = IdentitetTabell.kafkaKey, order = SortOrder.ASC) .limit(maxSize) .associate { Identitetsnummer(it[IdentitetTabell.identitetsnummer]) to ArbeidssoekerId(it[IdentitetTabell.kafkaKey]) diff --git a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/ktor/helse_endepunkt.kt b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/ktor/helse_endepunkt.kt index 47de0a4a..1ebc0561 100644 --- a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/ktor/helse_endepunkt.kt +++ b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/ktor/helse_endepunkt.kt @@ -23,7 +23,7 @@ fun Routing.konfigurereHelse( get("/internal/metrics") { call.respond(prometheusMeterRegistry.scrape()) } - /*get("internal/mergeDetector") { + get("internal/mergeDetector") { call.respondText( mergeDetector .findMerges(900) @@ -32,5 +32,5 @@ fun Routing.konfigurereHelse( .onLeft { mergeLogger.error("Error: ${it.system}:${it.code}", it.exception) } .fold( { "Error: ${it.system}:${it.code}" }, { it } ) ) - }*/ + } } diff --git a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/merge/MergeDetector.kt b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/merge/MergeDetector.kt index 6e61519f..ee4e93f7 100644 --- a/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/merge/MergeDetector.kt +++ b/apps/kafka-key-generator/src/main/kotlin/no/nav/paw/kafkakeygenerator/merge/MergeDetector.kt @@ -6,11 +6,14 @@ import no.nav.paw.kafkakeygenerator.pdl.PdlIdentitesTjeneste import no.nav.paw.kafkakeygenerator.vo.ArbeidssoekerId import no.nav.paw.kafkakeygenerator.vo.Identitetsnummer import no.nav.paw.pdl.graphql.generated.hentidenter.IdentInformasjon +import org.slf4j.LoggerFactory class MergeDetector( private val pdlIdentitesTjeneste: PdlIdentitesTjeneste, private val kafkaKeys: KafkaKeys ) { + private val logger = LoggerFactory.getLogger("MergeDetector") + suspend fun findMerges(batchSize: Int): Either { require(batchSize > 0) { "Batch size must be greater than 0" } return kafkaKeys.hentSisteArbeidssoekerId() @@ -31,6 +34,7 @@ class MergeDetector( currentPos: Long, results: Either ): Either { + logger.info("Processing range:stopAt={}, maxSize={}, currentPos={}, results={}", stopAt, maxSize, currentPos, results) return when (results) { is Left -> { return results