Skip to content

Commit

Permalink
La til initiell støtte for henting av lagret data i kafka keys (for i…
Browse files Browse the repository at this point in the history
…d merge)
  • Loading branch information
nilsmsa committed Nov 4, 2024
1 parent 145208f commit 447c742
Show file tree
Hide file tree
Showing 20 changed files with 226 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pdlClientConfig:
scope: api://test.test.pdl-api/.default
kafkaKeysConfig:
url: http://localhost:8090/kafka-keys
urlLokalInfo: http://localhost:8090/kafka-keys/lokalinfo
scope: api://test.test.kafka-keys/.default
authProviders:
azure:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
url = "http://localhost:8090/kafka-keys"
urlLokalInfo = "http://localhost:8090/lokal-info"
scope = "api://test.test.kafka-keys/.default"
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ clientId = "paw-arbeidssoekerregisteret-bekreftelse-tjeneste"

[kafkaKeysClient]
url = "http://localhost:8081/api/v2/hentEllerOpprett"
urlLokalInfo = "http://localhost:8081/api/v2/lokalInfo"
scope = "api://local.paw.paw-kafka-key-generator/.default"

[kafkaStreams]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ clientId = "paw-arbeidssoekerregisteret-bekreftelse-utgang"

[kafkaKeysClient]
url = "http://localhost:8081/api/v2/hentEllerOpprett"
urlLokalInfo = "http://localhost:8081/api/v2/lokalInfo"
scope = "api://local.paw.paw-kafka-key-generator/.default"

[kafkaStreams]
Expand Down
4 changes: 3 additions & 1 deletion apps/kafka-key-generator/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ dependencies {
implementation(libs.nav.security.tokenClientCore)
implementation(libs.nav.security.tokenValidationKtorV2)

// Kafka (for å beregne partisjonsnummer)
implementation(libs.kafka.clients)

// Ktor
implementation(libs.ktor.serialization.jackson)

Expand Down Expand Up @@ -69,7 +72,6 @@ dependencies {
testImplementation(libs.test.testContainers.postgresql)
testImplementation(libs.ktor.server.testJvm)
testImplementation(libs.ktor.client.mock)
testImplementation(libs.kafka.clients)
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,50 @@ import no.nav.paw.kafkakeygenerator.mergedetector.hentLagretData
import no.nav.paw.kafkakeygenerator.mergedetector.vo.MergeDetected
import no.nav.paw.kafkakeygenerator.pdl.PdlIdentitesTjeneste
import no.nav.paw.kafkakeygenerator.vo.*
import org.apache.kafka.clients.producer.internals.BuiltInPartitioner
import org.apache.kafka.clients.producer.internals.BuiltInPartitioner.partitionForKey
import org.apache.kafka.common.serialization.Serdes

class Applikasjon(
private val kafkaKeys: KafkaKeys,
private val identitetsTjeneste: PdlIdentitesTjeneste
) {
private val keySerializer = Serdes.Long().serializer()

@WithSpan
fun hentLokaleAlias(
antallPartisjoner: Int,
identitet: Identitetsnummer
): Either<Failure, LokaleAlias> {
return kafkaKeys.hent(identitet)
.map { arbeidssoekerId ->
val recordKey = publicTopicKeyFunction(arbeidssoekerId)
Alias(
identitetsnummer = identitet.value,
arbeidsoekerId = arbeidssoekerId.value,
recordKey = recordKey.value,
partition = partitionForKey(keySerializer.serialize("", recordKey.value), antallPartisjoner)
)
}.flatMap { alias ->
kafkaKeys.hent(ArbeidssoekerId(alias.arbeidsoekerId))
.map { identiteter ->
identiteter.map { identitetsnummer ->
Alias(
identitetsnummer = identitetsnummer.value,
arbeidsoekerId = alias.arbeidsoekerId,
recordKey = alias.recordKey,
partition = alias.partition
)
}
}
}.map { aliases ->
LokaleAlias(
identitetsnummer = identitet.value,
kobliner = aliases
)
}
}

@WithSpan
suspend fun validerLagretData(callId: CallId, identitet: Identitetsnummer): Either<Failure, InfoResponse> {
return hentInfo(callId, identitet)
Expand All @@ -23,7 +62,7 @@ class Applikasjon(
info = info
).map { info to it }
}
.map { (info, lagretDatra ) -> info to findMerge(lagretDatra) }
.map { (info, lagretDatra) -> info to findMerge(lagretDatra) }
.map { (info, merge) ->
InfoResponse(
info = info,
Expand Down Expand Up @@ -54,11 +93,13 @@ class Applikasjon(
{
PdlData(
error = null,
id = it.map { identInfo -> PdlId(
gruppe = identInfo.gruppe.name,
id = identInfo.ident,
gjeldende = !identInfo.historisk
) })
id = it.map { identInfo ->
PdlId(
gruppe = identInfo.gruppe.name,
id = identInfo.ident,
gjeldende = !identInfo.historisk
)
})
}
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ class KafkaKeys(private val database: Database) {
.map { id -> id?.let(::ArbeidssoekerId) }
.flatMap { id -> id?.let(::right) ?: left(Failure("database", FailureCode.DB_NOT_FOUND)) }

fun hent(arbeidssoekerId: ArbeidssoekerId): Either<Failure, List<Identitetsnummer>> =
attempt {
transaction(database) {
IdentitetTabell
.selectAll()
.where { IdentitetTabell.kafkaKey eq arbeidssoekerId.value }
.map { Identitetsnummer(it[IdentitetTabell.identitetsnummer]) }
}
}.mapToFailure { exception ->
Failure("database", FailureCode.INTERNAL_TECHINCAL_ERROR, exception)
}


fun lagre(identitet: Identitetsnummer, arbeidssoekerId: ArbeidssoekerId): Either<Failure, Unit> =
attempt {
transaction(database) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,31 @@ fun Routing.konfigurerApiV2(
post("/api/v2/info") {
hentInfo(applikasjon, logger)
}
post("/api/v2/lokalInfo") {
hentLokalInfo(applikasjon, logger)
}
}
}

@WithSpan
suspend fun PipelineContext<Unit, ApplicationCall>.hentLokalInfo(
applikasjon: Applikasjon,
logger: Logger
) {
val request = call.receive<AliasRequest>()
when (val resultat = applikasjon.hentLokaleAlias(request.antallPartisjoner, request.identer)) {
is Right -> call.respond(
OK, AliasResponse(
alias = resultat.right
)
)
is Left -> {
logger.error("Kunne ikke hente alias for identer: {}", resultat.left.code, resultat.left.exception)
call.respond(
status = InternalServerError,
message = resultat.left.code.name
)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package no.nav.paw.kafkakeygenerator.api.v2

import no.nav.paw.kafkakeygenerator.*
import no.nav.paw.kafkakeygenerator.vo.Identitetsnummer

fun Applikasjon.hentLokaleAlias(antallPartisjoner: Int, identiteter: List<String>): Either<Failure, List<LokaleAlias>> {
return identiteter.mapNotNull { identitet ->
hentLokaleAlias(antallPartisjoner, Identitetsnummer(identitet))
.recover(FailureCode.DB_NOT_FOUND) { right(null) }
}.flatten()
.map(List<LokaleAlias?>::filterNotNull) }
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,20 @@ import no.nav.paw.kafkakeygenerator.vo.Info
data class InfoResponse(
val info: Info,
val mergeDetected: MergeDetected?
)

data class AliasResponse(
val alias: List<LokaleAlias>
)

data class LokaleAlias(
val identitetsnummer: String,
val kobliner: List<Alias>
)

data class Alias(
val identitetsnummer: String,
val arbeidsoekerId: Long,
val recordKey: Long,
val partition: Int,
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,9 @@ package no.nav.paw.kafkakeygenerator.api.v2

data class RequestV2(
val ident: String
)

data class AliasRequest(
val antallPartisjoner: Int,
val identer: List<String>
)
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package no.nav.paw.kafkakeygenerator

import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.should
import io.kotest.matchers.shouldBe
import io.kotest.matchers.shouldNotBe
import io.kotest.matchers.types.shouldBeInstanceOf
import io.ktor.client.*
import io.ktor.client.engine.mock.*
import kotlinx.coroutines.runBlocking
import no.nav.paw.kafkakeygenerator.api.v2.LokaleAlias
import no.nav.paw.kafkakeygenerator.pdl.PdlIdentitesTjeneste
import no.nav.paw.kafkakeygenerator.vo.ArbeidssoekerId
import no.nav.paw.kafkakeygenerator.vo.CallId
import no.nav.paw.kafkakeygenerator.vo.Identitetsnummer
import no.nav.paw.pdl.PdlClient
import org.jetbrains.exposed.sql.Database
import org.junit.jupiter.api.fail
import java.util.*

class ApplikasjonsTest : StringSpec({
Expand Down Expand Up @@ -46,6 +49,18 @@ class ApplikasjonsTest : StringSpec({
person1KafkaNøkler.filterIsInstance<Right<ArbeidssoekerId>>()
.map { it.right }
.distinct().size shouldBe 1
val lokaleAlias = app.hentLokaleAlias(2, Identitetsnummer(person1_dnummer))
hentEllerOpprett(person3_fødselsnummer).shouldBeInstanceOf<Right<ArbeidssoekerId>>()
lokaleAlias
.onLeft { fail { "Uventet feil: $it" } }
.onRight { alias ->
alias.identitetsnummer shouldBe person1_dnummer
alias.kobliner.size shouldBe 4
alias.kobliner.any { it.identitetsnummer == person1_fødselsnummer } shouldBe true
alias.kobliner.any { it.identitetsnummer == person1_dnummer } shouldBe true
alias.kobliner.any { it.identitetsnummer == person1_aktor_id } shouldBe true
alias.kobliner.any { it.identitetsnummer == person1_annen_ident } shouldBe true
}
}
"alle identer for person2 skal gi samme nøkkel" {
val person2KafkaNøkler = listOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const val person1_dnummer = "09127821913"
const val person1_annen_ident = "12129127821913"
const val person2_fødselsnummer = "01017012345"
const val person2_aktor_id = "1649500819544"
const val person3_fødselsnummer = "01017012344"

fun hentSvar(ident: String) =
when(ident) {
Expand All @@ -20,6 +21,7 @@ fun hentSvar(ident: String) =
person1_annen_ident -> person1MockSvar
person2_fødselsnummer -> person2MockSvar
person2_aktor_id -> person2MockSvar
person3_fødselsnummer -> person3MockSvar
else -> ingenTreffMockSvar
}

Expand Down Expand Up @@ -63,6 +65,22 @@ const val person1MockSvar = """
}
"""

const val person3MockSvar = """
{
"data": {
"hentIdenter": {
"identer": [
{
"ident": "$person3_fødselsnummer",
"gruppe": "FOLKEREGISTERIDENT",
"historisk": false
}
]
}
}
}
"""

const val person2MockSvar = """
{
"data": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package no.nav.paw.kafkakeygenerator.client


data class AliasRequest(
val antallPartisjoner: Int,
val identer: List<String>
)

data class AliasResponse(
val alias: List<LokaleAlias>
)

data class LokaleAlias(
val identitetsnummer: String,
val kobliner: List<Alias>
)

data class Alias(
val identitetsnummer: String,
val arbeidsoekerId: Long,
val recordKey: Long,
val partition: Int,
)
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ private fun kafkaKeysMedHttpClient(config: KafkaKeyConfig, m2mTokenFactory: () -
}
}
return StandardKafkaKeysClient(
httpClient,
config.url
httpClient = httpClient,
kafkaKeysUrl = config.url,
kafkaKeysLokalInfoUrl = config.urlLokalInfo
) { m2mTokenFactory() }
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ const val KAFKA_KEY_GENERATOR_CLIENT_CONFIG = "kafka_key_generator_client_config

data class KafkaKeyConfig(
val url: String,
val urlLokalInfo: String,
val scope: String
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ interface KafkaKeysClient {
suspend fun getIdAndKeyOrNull(identitetsnummer: String): KafkaKeysResponse?
suspend fun getIdAndKey(identitetsnummer: String): KafkaKeysResponse =
getIdAndKeyOrNull(identitetsnummer) ?: throw IllegalStateException("Kafka-key-client: Uventet feil mot server: http-status=404")

suspend fun getAlias(antallPartisjoner: Int, identitetsnummer: List<String>): AliasResponse
}

class StandardKafkaKeysClient(
private val httpClient: HttpClient,
private val kafkaKeysUrl: String,
private val kafkaKeysLokalInfoUrl: String,
private val getAccessToken: () -> String
) : KafkaKeysClient {
override suspend fun getIdAndKeyOrNull(identitetsnummer: String): KafkaKeysResponse? =
Expand All @@ -46,4 +49,22 @@ class StandardKafkaKeysClient(
}
}
}

override suspend fun getAlias(antallPartisjoner: Int, identitetsnummer: List<String>): AliasResponse {
return httpClient.post(kafkaKeysLokalInfoUrl) {
header("Authorization", "Bearer ${getAccessToken()}")
contentType(ContentType.Application.Json)
setBody(AliasRequest(antallPartisjoner, identitetsnummer))
}.let { response ->
when (response.status) {
io.ktor.http.HttpStatusCode.OK -> {
response.body<AliasResponse>()
}

else -> {
throw Exception("Kunne ikke hente alias, http_status=${response.status}}")
}
}
}
}
}
Loading

0 comments on commit 447c742

Please sign in to comment.