Skip to content

Commit

Permalink
Omdøping av internhendelser for bekreftelse
Browse files Browse the repository at this point in the history
  • Loading branch information
naviktthomas committed Sep 11, 2024
1 parent 2a714fc commit 217c353
Show file tree
Hide file tree
Showing 25 changed files with 105 additions and 98 deletions.
16 changes: 16 additions & 0 deletions apps/bekreftelse-api/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Bekreftelse API

```mermaid
graph TD
A[POST /rapportering] --> B(Rapportering-api instans)
B --> |POST identitetsnummer| C[Kafka-key-generator]
C -->|arbeidssøkerId| B
B --> D["streams.metadataForKey('arbeidssøkerId', 'RapporteringStateStore')"]
D --> E{Finne hvilke instans som har data for arbeidssøkerId}
E -->|Funnet remote: forward HTTP request| Z["Rapportering-api instans (med arbeidssøkerId)"]
Z -->|Returner respons| B
E -->|Funnet i lokal stateStore spør mot den| F[RapporteringStateStore<br>key = arbeidssøkerId<br/>value = List<RapporteringTilgjengelig>]
G[RapporteringTilgjengelig] -->|Legg til rapportering i liste| F
H[RapporteringsMeldingMottatt]-->|slett rapportId fra liste|F
I[PeriodeAvsluttet] --> |slett arbeidssøkerId|F
```
6 changes: 3 additions & 3 deletions apps/bekreftelse-tjeneste/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ dependencies {
implementation(project(":lib:kafka-streams"))
implementation(project(":lib:kafka-key-generator-client"))
implementation(project(":domain:main-avro-schema"))
implementation(project(":domain:rapportering-interne-hendelser"))
implementation(project(":domain:rapporteringsansvar-schema"))
implementation(project(":domain:rapporteringsmelding-schema"))
implementation(project(":domain:bekreftelse-interne-hendelser"))
implementation(project(":domain:bekreftelsesansvar-schema"))
implementation(project(":domain:bekreftelsesmelding-schema"))
implementation(orgApacheKafka.kafkaStreams)
implementation(jackson.datatypeJsr310)
implementation(jackson.kotlin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package no.nav.paw.meldeplikttjeneste

import no.nav.paw.config.kafka.streams.genericProcess
import no.nav.paw.rapportering.ansvar.v1.AnsvarEndret
import no.nav.paw.rapportering.internehendelser.RapporteringsHendelse
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import org.apache.kafka.streams.StreamsBuilder

context(ApplicationConfiguration, ApplicationContext)
fun StreamsBuilder.processAnsvarTopic() {
stream<Long, AnsvarEndret>(ansvarsTopic)
.genericProcess<Long, AnsvarEndret, Long, RapporteringsHendelse>("ansvarEndret", statStoreName) { record ->
.genericProcess<Long, AnsvarEndret, Long, BekreftelseHendelse>("ansvarEndret", statStoreName) { record ->

}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package no.nav.paw.meldeplikttjeneste

import no.nav.paw.meldeplikttjeneste.tilstand.InternTilstandSerde
import no.nav.paw.rapportering.internehendelser.RapporteringsHendelseSerde
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde

class ApplicationContext(
val internTilstandSerde: InternTilstandSerde,
val rapporteringsHendelseSerde: RapporteringsHendelseSerde
val bekreftelseHendelseSerde: BekreftelseHendelseSerde
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import no.nav.paw.config.kafka.streams.genericProcess
import no.nav.paw.config.kafka.streams.mapWithContext
import no.nav.paw.kafkakeygenerator.client.KafkaKeysResponse
import no.nav.paw.meldeplikttjeneste.tilstand.InternTilstand
import no.nav.paw.meldeplikttjeneste.tilstand.InternTilstandSerde
import no.nav.paw.meldeplikttjeneste.tilstand.initTilstand
import no.nav.paw.rapportering.internehendelser.PeriodeAvsluttet
import no.nav.paw.rapportering.internehendelser.RapporteringsHendelse
import no.nav.paw.bekreftelse.internehendelser.PeriodeAvsluttet
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Produced
Expand All @@ -31,7 +30,7 @@ fun StreamsBuilder.processPeriodeTopic(kafkaKeyFunction: (String) -> KafkaKeysRe
else -> Action.DoNothing
}
}
.genericProcess<Long, Action, Long, RapporteringsHendelse>(
.genericProcess<Long, Action, Long, BekreftelseHendelse>(
name = "executeAction",
punctuation = null,
stateStoreNames = arrayOf(statStoreName)
Expand All @@ -47,15 +46,15 @@ fun StreamsBuilder.processPeriodeTopic(kafkaKeyFunction: (String) -> KafkaKeysRe
action.periode.id,
action.periode.identitetsnummer,
action.arbeidsoekerId
) as RapporteringsHendelse
) as BekreftelseHendelse
)
)
}

Action.DoNothing -> {}
is Action.UpdateState -> keyValueStore.put(action.state.periode.periodeId, action.state)
}
}.to(rapporteringsHendelsesloggTopic, Produced.with(Serdes.Long(), rapporteringsHendelseSerde))
}.to(rapporteringsHendelsesloggTopic, Produced.with(Serdes.Long(), bekreftelseHendelseSerde))
}

fun Periode.avsluttet(): Boolean = avsluttet != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@ package no.nav.paw.meldeplikttjeneste

import no.nav.paw.config.kafka.streams.genericProcess
import no.nav.paw.meldeplikttjeneste.tilstand.InternTilstand
import no.nav.paw.rapportering.internehendelser.BaOmAaAvsluttePeriode
import no.nav.paw.rapportering.internehendelser.RapporteringsHendelse
import no.nav.paw.rapportering.internehendelser.RapporteringsMeldingMottatt
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import no.nav.paw.rapportering.melding.v1.Melding
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.state.KeyValueStore
import org.slf4j.LoggerFactory
import java.util.*

context(ApplicationConfiguration, ApplicationContext)
fun StreamsBuilder.processRapporteringsMeldingTopic() {
stream<Long, Melding>(rapporteringsTopic)
.genericProcess<Long, Melding, Long, RapporteringsHendelse>(
.genericProcess<Long, Melding, Long, BekreftelseHendelse>(
name = "meldingMottatt",
statStoreName
) { record ->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package no.nav.paw.meldeplikttjeneste.tilstand

import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import no.nav.paw.rapportering.internehendelser.RapporteringsHendelse
import java.time.Duration
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import java.time.Instant
import java.util.UUID
import kotlin.reflect.KClass
Expand All @@ -15,7 +14,7 @@ data class InternTilstand(

@JvmRecord
data class Rapportering(
val sisteHandling: KClass<RapporteringsHendelse>,
val sisteHandling: KClass<BekreftelseHendelse>,
val rapporteringsId: UUID,
val gjelderFra: Instant,
val gjelderTil: Instant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,16 @@ import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import no.nav.paw.kafkakeygenerator.client.KafkaKeysResponse
import no.nav.paw.meldeplikttjeneste.tilstand.InternTilstandSerde
import no.nav.paw.rapportering.ansvar.v1.AnsvarEndret
import no.nav.paw.rapportering.internehendelser.RapporteringsHendelse
import no.nav.paw.rapportering.internehendelser.RapporteringsHendelseSerde
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde
import no.nav.paw.rapportering.melding.v1.Melding
import org.apache.avro.specific.SpecificRecord
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.common.utils.Time
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.state.Stores
import org.apache.kafka.streams.state.internals.InMemoryKeyValueBytesStoreSupplier
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder
import java.time.Duration
Expand All @@ -30,7 +28,7 @@ class ApplicationTestContext {
val ansvarsTopicSerde: Serde<AnsvarEndret> = opprettSerde()
val rapporteringMeldingSerde: Serde<Melding> = opprettSerde()
val periodeTopicSerde: Serde<Periode> = opprettSerde()
val hendelseLoggSerde: Serde<RapporteringsHendelse> = RapporteringsHendelseSerde()
val hendelseLoggSerde: Serde<BekreftelseHendelse> = BekreftelseHendelseSerde()
val applicationConfiguration = ApplicationConfiguration(
periodeTopic = "periodeTopic",
ansvarsTopic = "ansvarsTopic",
Expand All @@ -41,7 +39,7 @@ class ApplicationTestContext {
)
val applicationContext = ApplicationContext(
internTilstandSerde = InternTilstandSerde(),
rapporteringsHendelseSerde = RapporteringsHendelseSerde()
bekreftelseHendelseSerde = BekreftelseHendelseSerde()
)

val kafkaKeysService = kafkaKeyInstance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import no.nav.paw.arbeidssokerregisteret.api.v1.Bruker
import no.nav.paw.arbeidssokerregisteret.api.v1.BrukerType
import no.nav.paw.arbeidssokerregisteret.api.v1.Metadata
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import no.nav.paw.rapportering.internehendelser.LeveringsfristUtloept
import no.nav.paw.rapportering.internehendelser.RapporteringTilgjengelig
import no.nav.paw.rapportering.melding.v1.Melding
import no.nav.paw.bekreftelse.internehendelser.LeveringsfristUtloept
import no.nav.paw.bekreftelse.internehendelser.BekreftelseTilgjengelig
import java.time.Duration
import java.time.Instant
import java.util.*
Expand All @@ -30,7 +29,7 @@ class IngenAndreTarAnsvarTest: FreeSpec({
hendelseLoggTopic.isEmpty shouldBe false
val kv = hendelseLoggTopic.readKeyValue()
kv.key shouldBe kafkaKeyResponse.key
with(kv.value.shouldBeInstanceOf<RapporteringTilgjengelig>()) {
with(kv.value.shouldBeInstanceOf<BekreftelseTilgjengelig>()) {
periodeId shouldBe periode.id
identitetsnummer shouldBe periode.identitetsnummer
arbeidssoekerId shouldBe kafkaKeyResponse.id
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package no.nav.paw.rapportering.internehendelser
package no.nav.paw.bekreftelse.internehendelser

import java.util.*

const val beOmAaAvsluttePeriodeHendelsesType = "rapportering.be_om_aa_avslutte_periode"
const val beOmAaAvsluttePeriodeHendelsesType = "bekreftelse.be_om_aa_avslutte_periode"

class BaOmAaAvsluttePeriode(
override val hendelseId: UUID,
override val periodeId: UUID,
override val identitetsnummer: String,
override val arbeidssoekerId: Long,
) : RapporteringsHendelse {
) : BekreftelseHendelse {
override val hendelseType: String = beOmAaAvsluttePeriodeHendelsesType
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package no.nav.paw.rapportering.internehendelser
package no.nav.paw.bekreftelse.internehendelser

import java.util.*

interface RapporteringsHendelse {
interface BekreftelseHendelse {
val hendelseType: String
val hendelseId: UUID
val periodeId: UUID
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package no.nav.paw.rapportering.internehendelser
package no.nav.paw.bekreftelse.internehendelser

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
Expand All @@ -12,31 +12,31 @@ private val objectMapper: ObjectMapper = ObjectMapper()
.registerKotlinModule().
registerModules(JavaTimeModule())

class RapporteringsHendelseSerde: Serde<RapporteringsHendelse> {
override fun serializer(): Serializer<RapporteringsHendelse> {
return RapporteringsHendelseSerializer
class BekreftelseHendelseSerde: Serde<BekreftelseHendelse> {
override fun serializer(): Serializer<BekreftelseHendelse> {
return BekreftelseHendelseSerializer
}

override fun deserializer(): Deserializer<RapporteringsHendelse> {
return RapporteringsHendelseDeserializer
override fun deserializer(): Deserializer<BekreftelseHendelse> {
return BekreftelseHendelseDeserializer
}
}

object RapporteringsHendelseSerializer: Serializer<RapporteringsHendelse> {
override fun serialize(topic: String?, data: RapporteringsHendelse?): ByteArray? {
object BekreftelseHendelseSerializer: Serializer<BekreftelseHendelse> {
override fun serialize(topic: String?, data: BekreftelseHendelse?): ByteArray? {
return data?.let { objectMapper.writeValueAsBytes(it) }
}
}

object RapporteringsHendelseDeserializer: Deserializer<RapporteringsHendelse> {
override fun deserialize(topic: String?, data: ByteArray?): RapporteringsHendelse {
object BekreftelseHendelseDeserializer: Deserializer<BekreftelseHendelse> {
override fun deserialize(topic: String?, data: ByteArray?): BekreftelseHendelse {
val node = objectMapper.readTree(data)
return when (val hendelseType = node.get("hendelseType")?.asText()) {
leveringsfristUtloeptHendelseType -> objectMapper.readValue<LeveringsfristUtloept>(node.traverse())
eksternGracePeriodeUtloeptHendelseType -> objectMapper.readValue<EksternGracePeriodeUtloept>(node.traverse())
registerGracePeriodeUtloeptHendelseType -> objectMapper.readValue<RegisterGracePeriodeUtloept>(node.traverse())
rapporteringTilgjengeligHendelseType -> objectMapper.readValue<RapporteringTilgjengelig>(node.traverse())
meldingMottattHendelseType -> objectMapper.readValue<RapporteringsMeldingMottatt>(node.traverse())
bekreftelseTilgjengeligHendelseType -> objectMapper.readValue<BekreftelseTilgjengelig>(node.traverse())
meldingMottattHendelseType -> objectMapper.readValue<BekreftelseMeldingMottatt>(node.traverse())
periodeAvsluttetHendelsesType -> objectMapper.readValue<PeriodeAvsluttet>(node.traverse())
registerGracePeriodeGjenstaandeTid -> objectMapper.readValue<RegisterGracePeriodeGjendstaaendeTid>(node.traverse())
else -> throw IllegalArgumentException("Ukjent hendelseType: $hendelseType")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package no.nav.paw.rapportering.internehendelser
package no.nav.paw.bekreftelse.internehendelser

import java.util.*

const val meldingMottattHendelseType = "rapportering.melding_mottatt"
const val meldingMottattHendelseType = "bekreftelse.melding_mottatt"

data class RapporteringsMeldingMottatt(
data class BekreftelseMeldingMottatt(
override val hendelseId: UUID,
override val periodeId: UUID,
override val identitetsnummer: String,
override val arbeidssoekerId: Long,
val rapporteringsId: UUID
) : RapporteringsHendelse {
val bekreftelseId: UUID
) : BekreftelseHendelse {
override val hendelseType: String = meldingMottattHendelseType
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package no.nav.paw.bekreftelse.internehendelser

import java.time.Instant
import java.util.*

const val bekreftelseTilgjengeligHendelseType = "bekreftelse.tilgjengelig"

data class BekreftelseTilgjengelig (
override val hendelseId: UUID,
override val periodeId: UUID,
override val identitetsnummer: String,
override val arbeidssoekerId: Long,
val bekreftelseId: UUID,
val gjelderFra: Instant,
val gjelderTil: Instant
) : BekreftelseHendelse {
override val hendelseType: String = bekreftelseTilgjengeligHendelseType
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package no.nav.paw.rapportering.internehendelser
package no.nav.paw.bekreftelse.internehendelser

import java.util.*

const val eksternGracePeriodeUtloeptHendelseType = "rapportering.ekstern_grace_periode_utloept"
const val eksternGracePeriodeUtloeptHendelseType = "bekreftelse.ekstern_grace_periode_utloept"

data class EksternGracePeriodeUtloept(
override val hendelseId: UUID,
Expand All @@ -11,6 +11,6 @@ data class EksternGracePeriodeUtloept(
override val arbeidssoekerId: Long,
val ansvarligNamespace: String,
val ansvarligId: String
) : RapporteringsHendelse {
) : BekreftelseHendelse {
override val hendelseType: String = eksternGracePeriodeUtloeptHendelseType
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package no.nav.paw.rapportering.internehendelser
package no.nav.paw.bekreftelse.internehendelser

import java.util.*

const val leveringsfristUtloeptHendelseType = "rapportering.leveringsfrist_utloept"
const val leveringsfristUtloeptHendelseType = "bekreftelse.leveringsfrist_utloept"

data class LeveringsfristUtloept(
override val hendelseId: UUID,
override val periodeId: UUID,
override val identitetsnummer: String,
override val arbeidssoekerId: Long,
val rapporteringsId: UUID
) : RapporteringsHendelse {
val bekreftelseId: UUID
) : BekreftelseHendelse {
override val hendelseType: String = leveringsfristUtloeptHendelseType
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package no.nav.paw.rapportering.internehendelser
package no.nav.paw.bekreftelse.internehendelser

import java.util.*


const val periodeAvsluttetHendelsesType = "rapportering.periode_avsluttet"
const val periodeAvsluttetHendelsesType = "bekreftelse.periode_avsluttet"

data class PeriodeAvsluttet(
override val hendelseId: UUID,
override val periodeId: UUID,
override val identitetsnummer: String,
override val arbeidssoekerId: Long
) : RapporteringsHendelse {
) : BekreftelseHendelse {
override val hendelseType: String = periodeAvsluttetHendelsesType
}
Loading

0 comments on commit 217c353

Please sign in to comment.