Skip to content

Commit

Permalink
IS-2091: Implement ArbeidstakervarselProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
andersrognstad committed Feb 27, 2024
1 parent 5a18052 commit 1d72a4a
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,33 @@ package no.nav.syfo.infrastructure.kafka.esyfovarsel
import no.nav.syfo.application.IVarselProducer
import no.nav.syfo.domain.PersonIdent
import no.nav.syfo.domain.Varsel
import no.nav.syfo.infrastructure.kafka.esyfovarsel.dto.*
import java.util.*
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.slf4j.LoggerFactory

// TODO: Add esyfovarsel-hendelse as kafkaproducer value
class ArbeidstakervarselProducer(private val kafkaArbeidstakervarselProducer: KafkaProducer<String, String>) : IVarselProducer {
class ArbeidstakervarselProducer(private val kafkaArbeidstakervarselProducer: KafkaProducer<String, EsyfovarselHendelse>) : IVarselProducer {

override fun sendArbeidstakerVarsel(personIdent: PersonIdent, varsel: Varsel) {
val varselHendelse = ArbeidstakerHendelse(
type = HendelseType.SM_ARBEIDSUFORHET_FORHANDSVARSEL,
arbeidstakerFnr = personIdent.value,
data = VarselData(
journalpost = VarselDataJournalpost(
uuid = varsel.uuid.toString(),
id = varsel.journalpostId,
),
),
orgnummer = null,
)

try {
kafkaArbeidstakervarselProducer.send(
ProducerRecord(
ESYFOVARSEL_TOPIC,
UUID.randomUUID().toString(),
"",
varselHendelse,
)
).get()
} catch (e: Exception) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package no.nav.syfo.infrastructure.kafka.esyfovarsel

import no.nav.syfo.infrastructure.kafka.esyfovarsel.dto.EsyfovarselHendelse
import no.nav.syfo.util.configuredJacksonMapper
import org.apache.kafka.common.serialization.Serializer

class KafkaArbeidstakervarselSerializer : Serializer<EsyfovarselHendelse> {
private val mapper = configuredJacksonMapper()
override fun serialize(topic: String?, data: EsyfovarselHendelse?): ByteArray =
mapper.writeValueAsBytes(data)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package no.nav.syfo.infrastructure.kafka.esyfovarsel.dto

import com.fasterxml.jackson.annotation.JsonTypeInfo
import java.io.Serializable

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
sealed interface EsyfovarselHendelse : Serializable {
val type: HendelseType
var data: Any?
}

data class ArbeidstakerHendelse(
override val type: HendelseType,
override var data: Any?,
val arbeidstakerFnr: String,
val orgnummer: String?
) : EsyfovarselHendelse

data class VarselData(
val journalpost: VarselDataJournalpost? = null,
)

data class VarselDataJournalpost(
val uuid: String,
val id: String?
)

enum class HendelseType {
SM_ARBEIDSUFORHET_FORHANDSVARSEL,
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ import no.nav.syfo.infrastructure.database.VarselRepository
import no.nav.syfo.infrastructure.database.VurderingRepository
import no.nav.syfo.infrastructure.database.dropData
import no.nav.syfo.infrastructure.kafka.esyfovarsel.ArbeidstakervarselProducer
import no.nav.syfo.infrastructure.kafka.esyfovarsel.dto.ArbeidstakerHendelse
import no.nav.syfo.infrastructure.kafka.esyfovarsel.dto.EsyfovarselHendelse
import no.nav.syfo.infrastructure.kafka.esyfovarsel.dto.HendelseType
import no.nav.syfo.infrastructure.kafka.esyfovarsel.dto.VarselData
import org.amshove.kluent.shouldBeEmpty
import org.amshove.kluent.shouldBeEqualTo
import org.amshove.kluent.shouldNotBeNull
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
import org.spekframework.spek2.Spek
import org.spekframework.spek2.style.specification.describe
Expand All @@ -27,7 +32,7 @@ class VarselServiceSpek : Spek({

val varselRepository = VarselRepository(database = database)
val vurderingRepository = VurderingRepository(database = database)
val kafkaProducer = mockk<KafkaProducer<String, String>>()
val kafkaProducer = mockk<KafkaProducer<String, EsyfovarselHendelse>>()

val varselProducer = ArbeidstakervarselProducer(kafkaArbeidstakervarselProducer = kafkaProducer)
val varselService = VarselService(varselRepository = varselRepository, varselProducer = varselProducer)
Expand Down Expand Up @@ -60,7 +65,8 @@ class VarselServiceSpek : Spek({
failed.size shouldBeEqualTo 0
success.size shouldBeEqualTo 1

verify(exactly = 1) { kafkaProducer.send(any()) }
val producerRecordSlot = slot<ProducerRecord<String, EsyfovarselHendelse>>()
verify(exactly = 1) { kafkaProducer.send(capture(producerRecordSlot)) }

val publishedVarsel = success.first().getOrThrow()
publishedVarsel.uuid.shouldBeEqualTo(unpublishedVarsel.uuid)
Expand All @@ -69,7 +75,12 @@ class VarselServiceSpek : Spek({

varselRepository.getUnpublishedVarsler().shouldBeEmpty()

// TODO: Test kafkaproducer record value
val esyfovarselHendelse = producerRecordSlot.captured.value() as ArbeidstakerHendelse
esyfovarselHendelse.type.shouldBeEqualTo(HendelseType.SM_ARBEIDSUFORHET_FORHANDSVARSEL)
esyfovarselHendelse.arbeidstakerFnr.shouldBeEqualTo(UserConstants.ARBEIDSTAKER_PERSONIDENT.value)
val varselData = esyfovarselHendelse.data as VarselData
varselData.journalpost?.uuid.shouldBeEqualTo(publishedVarsel.uuid.toString())
varselData.journalpost?.id.shouldBeEqualTo(publishedVarsel.journalpostId)
}

it("publishes nothing when no unpublished varsel") {
Expand Down

0 comments on commit 1d72a4a

Please sign in to comment.