Skip to content

Commit

Permalink
Bruk ObjectRiver i FeilLytter (#816)
Browse files Browse the repository at this point in the history
* Bruk ObjectRiver i FeilLytter

* Logg penere json
  • Loading branch information
bjerga authored Jan 2, 2025
1 parent b974369 commit 3aae2b0
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fun RapidsConnection.createFeilLytter(database: Database): RapidsConnection =

fun RapidsConnection.createFeilLytter(repository: PostgresBakgrunnsjobbRepository): RapidsConnection =
also {
FeilLytter(it, repository)
FeilLytter(repository).connect(it)
val bgService = BakgrunnsjobbService(repository)
bgService.registrer(FeilProsessor(it))
bgService.startAsync(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,30 @@ package no.nav.helsearbeidsgiver.inntektsmelding.feilbehandler.prosessor
import com.github.navikt.tbd_libs.rapids_and_rivers_api.RapidsConnection
import no.nav.hag.utils.bakgrunnsjobb.Bakgrunnsjobb
import no.nav.hag.utils.bakgrunnsjobb.BakgrunnsjobbProsesserer
import no.nav.helsearbeidsgiver.utils.json.parseJson
import no.nav.helsearbeidsgiver.utils.json.toPretty
import no.nav.helsearbeidsgiver.utils.log.logger
import no.nav.helsearbeidsgiver.utils.log.sikkerLogger

class FeilProsessor(
private val rapid: RapidsConnection,
) : BakgrunnsjobbProsesserer {
private val logger = logger()
private val sikkerLogger = sikkerLogger()

override val type: String
get() = JOB_TYPE

companion object {
const val JOB_TYPE = "kafka-retry-message"
}

private val sikkerLogger = sikkerLogger()

override fun prosesser(jobb: Bakgrunnsjobb) {
sikkerLogger.info("Prosesserer jobb - rekjører melding med id ${jobb.uuid}")
sikkerLogger.debug("Sender melding: ${jobb.data}")
"Prosesserer jobb - rekjører melding med ID '${jobb.uuid}'.".also {
logger.info(it)
sikkerLogger.info(it)
}
sikkerLogger.debug("Sender melding.\n${jobb.data.parseJson().toPretty()}")
rapid.publish(jobb.data)
}
}
Original file line number Diff line number Diff line change
@@ -1,37 +1,42 @@
package no.nav.helsearbeidsgiver.inntektsmelding.feilbehandler.river

import com.github.navikt.tbd_libs.rapids_and_rivers.JsonMessage
import com.github.navikt.tbd_libs.rapids_and_rivers.River
import com.github.navikt.tbd_libs.rapids_and_rivers_api.MessageContext
import com.github.navikt.tbd_libs.rapids_and_rivers_api.RapidsConnection
import kotlinx.serialization.json.JsonElement
import no.nav.hag.utils.bakgrunnsjobb.Bakgrunnsjobb
import no.nav.hag.utils.bakgrunnsjobb.BakgrunnsjobbRepository
import no.nav.hag.utils.bakgrunnsjobb.BakgrunnsjobbStatus
import no.nav.helsearbeidsgiver.felles.BehovType
import no.nav.helsearbeidsgiver.felles.EventName
import no.nav.helsearbeidsgiver.felles.Key
import no.nav.helsearbeidsgiver.felles.json.les
import no.nav.helsearbeidsgiver.felles.json.lesOrNull
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.json.toMap
import no.nav.helsearbeidsgiver.felles.json.toPretty
import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail
import no.nav.helsearbeidsgiver.felles.rapidsrivers.river.ObjectRiver
import no.nav.helsearbeidsgiver.felles.utils.Log
import no.nav.helsearbeidsgiver.inntektsmelding.feilbehandler.prosessor.FeilProsessor
import no.nav.helsearbeidsgiver.utils.json.fromJson
import no.nav.helsearbeidsgiver.utils.json.parseJson
import no.nav.helsearbeidsgiver.utils.json.serializer.UuidSerializer
import no.nav.helsearbeidsgiver.utils.json.toJson
import no.nav.helsearbeidsgiver.utils.json.toPretty
import no.nav.helsearbeidsgiver.utils.log.logger
import no.nav.helsearbeidsgiver.utils.log.sikkerLogger
import java.sql.SQLException
import java.util.UUID

data class Melding(
val fail: Fail,
)

class FeilLytter(
rapidsConnection: RapidsConnection,
private val repository: BakgrunnsjobbRepository,
) : River.PacketListener {
private val jobbType = FeilProsessor.JOB_TYPE

) : ObjectRiver<Melding>() {
private val logger = logger()
private val sikkerLogger = sikkerLogger()

private val jobbType = FeilProsessor.JOB_TYPE
private val eventerSomHaandteres =
listOf(
setOf(
EventName.FORESPOERSEL_MOTTATT,
EventName.FORESPOERSEL_BESVART,
EventName.FORESPOERSEL_FORKASTET,
Expand All @@ -44,45 +49,28 @@ class FeilLytter(
EventName.SELVBESTEMT_IM_LAGRET,
)

init {
sikkerLogger.info("Starter applikasjon - lytter på innkommende feil!")
River(rapidsConnection)
.apply {
validate { msg ->
msg.demandKey(Key.FAIL.toString())
}
}.register(this)
}
override fun les(json: Map<Key, JsonElement>): Melding =
Melding(
fail = Key.FAIL.les(Fail.serializer(), json),
)

override fun onPacket(
packet: JsonMessage,
context: MessageContext,
) {
sikkerLogger.info("Mottok feil: ${packet.toJson().parseJson().toPretty()}")
val fail =
packet
.toJson()
.parseJson()
.toMap()[Key.FAIL]
?.runCatching {
fromJson(Fail.serializer())
}?.getOrNull()

if (fail == null) {
sikkerLogger.warn("Kunne ikke parse feil-objekt, ignorerer...")
return
}
override fun Melding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement>? {
logger.info("Mottok feil.")
sikkerLogger.info("Mottok feil.\n${json.toPretty()}")

if (eventSkalHaandteres(fail.utloesendeMelding)) {
// slå opp transaksjonID. Hvis den finnes, kan det være en annen feilende melding i samme transaksjon: Lagre i så fall
// med egen id. Denne id vil så sendes med som ny transaksjonID ved rekjøring.
val jobbId = fail.kontekstId
val eksisterendeJobb = repository.getById(jobbId)
val eksisterendeJobb = repository.getById(fail.kontekstId)

when {
// Første gang denne flyten feiler
eksisterendeJobb == null -> {
sikkerLogger.info("Lagrer mottatt pakke!")
"Lagrer mottatt pakke.".also {
logger.info(it)
sikkerLogger.info(it)
}

lagre(
Bakgrunnsjobb(
uuid = fail.kontekstId,
Expand All @@ -100,54 +88,99 @@ class FeilLytter(

// Feil i flyt som tidligere har opplevd annen type feil
else -> {
val nyTransaksjonId = UUID.randomUUID()
val utloesendeMeldingMedNyTransaksjonId = fail.utloesendeMelding.plus(Key.KONTEKST_ID to nyTransaksjonId.toJson())
val nyKontekstId = UUID.randomUUID()
val utloesendeMeldingMedNyKontekstId = fail.utloesendeMelding.plus(Key.KONTEKST_ID to nyKontekstId.toJson())

sikkerLogger.info("ID $jobbId finnes fra før med annen utløsende melding. Lagrer en ny jobb på ID '$nyTransaksjonId'.")
"ID '${eksisterendeJobb.uuid}' finnes fra før med annen utløsende melding. Lagrer en ny jobb på ID '$nyKontekstId'.".also {
logger.info(it)
sikkerLogger.info(it)
}

lagre(
Bakgrunnsjobb(
uuid = nyTransaksjonId,
uuid = nyKontekstId,
type = jobbType,
data = utloesendeMeldingMedNyTransaksjonId.toJson().toString(),
data = utloesendeMeldingMedNyKontekstId.toJson().toString(),
maksAntallForsoek = 10,
),
)
}
}
}

return null
}

private fun oppdater(jobb: Bakgrunnsjobb) {
// Dette må gjøres her fordi jobbene er asynkrone og bakgrunnsjobbService ikke får vite at jobben feiler i disse tilfellene
// BakgrunnsjobbService finnVentende() tar heller ikke hensyn til forsøk, kun status på jobben!
if (jobb.forsoek > jobb.maksAntallForsoek) {
jobb.status = BakgrunnsjobbStatus.STOPPET
sikkerLogger.error("Maks forsøk nådd, stopper jobb med id ${jobb.uuid} permanent!")
} else {
jobb.status = BakgrunnsjobbStatus.FEILET
}
try {
repository.update(jobb)
sikkerLogger.info("Oppdaterte eksisterende jobb med id ${jobb.uuid}")
} catch (ex: SQLException) {
sikkerLogger.error("Oppdatering av jobb med id ${jobb.uuid} feilet!", ex)
override fun Melding.haandterFeil(
json: Map<Key, JsonElement>,
error: Throwable,
): Map<Key, JsonElement>? {
"Klarte ikke håndtere fail.".also {
logger.error(it)
sikkerLogger.error(it, error)
}
return null
}

private fun lagre(jobb: Bakgrunnsjobb) {
try {
repository.save(jobb)
sikkerLogger.info("Lagret ny jobb med id ${jobb.uuid}")
} catch (ex: SQLException) {
sikkerLogger.error("Lagring av jobb med id ${jobb.uuid} feilet!", ex)
}
override fun Melding.loggfelt(): Map<String, String> {
val eventName = Key.EVENT_NAME.lesOrNull(EventName.serializer(), fail.utloesendeMelding)
val kontekstId = Key.KONTEKST_ID.lesOrNull(UuidSerializer, fail.utloesendeMelding)
val behovType = Key.BEHOV.lesOrNull(BehovType.serializer(), fail.utloesendeMelding)

val data = fail.utloesendeMelding[Key.DATA]?.toMap().orEmpty()
val forespoerselId = Key.FORESPOERSEL_ID.lesOrNull(UuidSerializer, data)
val selvbestemtId = Key.SELVBESTEMT_ID.lesOrNull(UuidSerializer, data)

return listOf(
Log.klasse(this@FeilLytter),
eventName?.let(Log::event),
kontekstId?.let(Log::transaksjonId),
behovType?.let(Log::behov),
forespoerselId?.let(Log::forespoerselId),
selvbestemtId?.let(Log::selvbestemtId),
).mapNotNull { it }
.toMap()
}

private fun eventSkalHaandteres(utloesendeMelding: Map<Key, JsonElement>): Boolean {
val eventFraMelding = utloesendeMelding[Key.EVENT_NAME]?.fromJson(EventName.serializer())
val skalHaandteres = eventerSomHaandteres.contains(eventFraMelding)
sikkerLogger.info("Event: $eventFraMelding skal håndteres: $skalHaandteres")

"Event '$eventFraMelding' skal håndteres: '$skalHaandteres'.".also {
logger.info(it)
sikkerLogger.info(it)
}

return skalHaandteres
}

private fun lagre(jobb: Bakgrunnsjobb) {
"Lagrer ny jobb med ID '${jobb.uuid}'.".also {
logger.info(it)
sikkerLogger.info(it)
}
repository.save(jobb)
}

private fun oppdater(jobb: Bakgrunnsjobb) {
// Dette må gjøres her fordi jobbene er asynkrone og bakgrunnsjobbService ikke får vite at jobben feiler i disse tilfellene
// BakgrunnsjobbService finnVentende() tar heller ikke hensyn til forsøk, kun status på jobben!
jobb.status =
if (jobb.forsoek > jobb.maksAntallForsoek) {
"Maks forsøk nådd, stopper jobb med ID '${jobb.uuid}' permanent!".also {
logger.error(it)
sikkerLogger.error(it)
}
BakgrunnsjobbStatus.STOPPET
} else {
BakgrunnsjobbStatus.FEILET
}

"Oppdaterer eksisterende jobb med ID '${jobb.uuid}'.".also {
logger.info(it)
sikkerLogger.info(it)
}

repository.update(jobb)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class FeilLytterTest :
val rapid = TestRapid()
val repository = MockBakgrunnsjobbRepository()

FeilLytter(rapid, repository)
FeilLytter(repository).connect(rapid)

afterTest {
repository.deleteAll()
Expand Down

0 comments on commit 3aae2b0

Please sign in to comment.