Skip to content

Commit

Permalink
IS-2091: Implement ArbeidstakervarselProducer (#26)
Browse files Browse the repository at this point in the history
* IS-2091: Implement ArbeidstakervarselProducer

* IS-2091: Rename producer implementation
  • Loading branch information
andersrognstad authored Feb 28, 2024
1 parent a82122b commit 3f7b6e9
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/main/kotlin/no/nav/syfo/application/IVarselProducer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ import no.nav.syfo.domain.PersonIdent
import no.nav.syfo.domain.Varsel

interface IVarselProducer {
fun sendArbeidstakerVarsel(personIdent: PersonIdent, varsel: Varsel)
fun sendArbeidstakerForhandsvarsel(personIdent: PersonIdent, varsel: Varsel)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class VarselService(

return unpublishedVarsler.map { (personident, varsel) ->
runCatching {
varselProducer.sendArbeidstakerVarsel(personIdent = personident, varsel = varsel)
varselProducer.sendArbeidstakerForhandsvarsel(personIdent = personident, varsel = varsel)
val publishedVarsel = varsel.publish()
varselRepository.update(publishedVarsel)
publishedVarsel
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package no.nav.syfo.infrastructure.kafka.esyfovarsel

import no.nav.syfo.application.IVarselProducer
import no.nav.syfo.domain.PersonIdent
import no.nav.syfo.domain.Varsel
import no.nav.syfo.infrastructure.kafka.esyfovarsel.dto.*
import java.util.*
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.slf4j.LoggerFactory

class ArbeidstakerForhandsvarselProducer(private val kafkaProducer: KafkaProducer<String, EsyfovarselHendelse>) : IVarselProducer {

override fun sendArbeidstakerForhandsvarsel(personIdent: PersonIdent, varsel: Varsel) {
val varselHendelse = ArbeidstakerHendelse(
type = HendelseType.SM_ARBEIDSUFORHET_FORHANDSVARSEL,
arbeidstakerFnr = personIdent.value,
data = VarselData(
journalpost = VarselDataJournalpost(
uuid = varsel.uuid.toString(),
id = varsel.journalpostId,
),
),
orgnummer = null,
)

try {
kafkaProducer.send(
ProducerRecord(
ESYFOVARSEL_TOPIC,
UUID.randomUUID().toString(),
varselHendelse,
)
).get()
} catch (e: Exception) {
log.error("Exception was thrown when attempting to send hendelse to esyfovarsel: ${e.message}")
throw e
}
}

companion object {
private const val ESYFOVARSEL_TOPIC = "team-esyfo.varselbus"
private val log = LoggerFactory.getLogger(ArbeidstakerForhandsvarselProducer::class.java)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package no.nav.syfo.infrastructure.kafka.esyfovarsel

import no.nav.syfo.infrastructure.kafka.esyfovarsel.dto.EsyfovarselHendelse
import no.nav.syfo.util.configuredJacksonMapper
import org.apache.kafka.common.serialization.Serializer

class KafkaArbeidstakervarselSerializer : Serializer<EsyfovarselHendelse> {
private val mapper = configuredJacksonMapper()
override fun serialize(topic: String?, data: EsyfovarselHendelse?): ByteArray =
mapper.writeValueAsBytes(data)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package no.nav.syfo.infrastructure.kafka.esyfovarsel.dto

import com.fasterxml.jackson.annotation.JsonTypeInfo
import java.io.Serializable

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
sealed interface EsyfovarselHendelse : Serializable {
val type: HendelseType
var data: Any?
}

data class ArbeidstakerHendelse(
override val type: HendelseType,
override var data: Any?,
val arbeidstakerFnr: String,
val orgnummer: String?
) : EsyfovarselHendelse

data class VarselData(
val journalpost: VarselDataJournalpost? = null,
)

data class VarselDataJournalpost(
val uuid: String,
val id: String?
)

enum class HendelseType {
SM_ARBEIDSUFORHET_FORHANDSVARSEL,
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ import no.nav.syfo.generator.generateForhandsvarselVurdering
import no.nav.syfo.infrastructure.database.repository.VarselRepository
import no.nav.syfo.infrastructure.database.repository.VurderingRepository
import no.nav.syfo.infrastructure.database.dropData
import no.nav.syfo.infrastructure.kafka.esyfovarsel.ArbeidstakervarselProducer
import no.nav.syfo.infrastructure.kafka.esyfovarsel.ArbeidstakerForhandsvarselProducer
import no.nav.syfo.infrastructure.kafka.esyfovarsel.dto.ArbeidstakerHendelse
import no.nav.syfo.infrastructure.kafka.esyfovarsel.dto.EsyfovarselHendelse
import no.nav.syfo.infrastructure.kafka.esyfovarsel.dto.HendelseType
import no.nav.syfo.infrastructure.kafka.esyfovarsel.dto.VarselData
import org.amshove.kluent.shouldBeEmpty
import org.amshove.kluent.shouldBeEqualTo
import org.amshove.kluent.shouldNotBeNull
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
import org.spekframework.spek2.Spek
import org.spekframework.spek2.style.specification.describe
Expand All @@ -27,9 +32,9 @@ class VarselServiceSpek : Spek({

val varselRepository = VarselRepository(database = database)
val vurderingRepository = VurderingRepository(database = database)
val kafkaProducer = mockk<KafkaProducer<String, String>>()
val kafkaProducer = mockk<KafkaProducer<String, EsyfovarselHendelse>>()

val varselProducer = ArbeidstakervarselProducer(kafkaArbeidstakervarselProducer = kafkaProducer)
val varselProducer = ArbeidstakerForhandsvarselProducer(kafkaProducer = kafkaProducer)
val varselService = VarselService(varselRepository = varselRepository, varselProducer = varselProducer)

beforeEachTest {
Expand Down Expand Up @@ -60,7 +65,8 @@ class VarselServiceSpek : Spek({
failed.size shouldBeEqualTo 0
success.size shouldBeEqualTo 1

verify(exactly = 1) { kafkaProducer.send(any()) }
val producerRecordSlot = slot<ProducerRecord<String, EsyfovarselHendelse>>()
verify(exactly = 1) { kafkaProducer.send(capture(producerRecordSlot)) }

val publishedVarsel = success.first().getOrThrow()
publishedVarsel.uuid.shouldBeEqualTo(unpublishedVarsel.uuid)
Expand All @@ -69,7 +75,12 @@ class VarselServiceSpek : Spek({

varselRepository.getUnpublishedVarsler().shouldBeEmpty()

// TODO: Test kafkaproducer record value
val esyfovarselHendelse = producerRecordSlot.captured.value() as ArbeidstakerHendelse
esyfovarselHendelse.type.shouldBeEqualTo(HendelseType.SM_ARBEIDSUFORHET_FORHANDSVARSEL)
esyfovarselHendelse.arbeidstakerFnr.shouldBeEqualTo(UserConstants.ARBEIDSTAKER_PERSONIDENT.value)
val varselData = esyfovarselHendelse.data as VarselData
varselData.journalpost?.uuid.shouldBeEqualTo(publishedVarsel.uuid.toString())
varselData.journalpost?.id.shouldBeEqualTo(publishedVarsel.journalpostId)
}

it("publishes nothing when no unpublished varsel") {
Expand Down

0 comments on commit 3f7b6e9

Please sign in to comment.