Skip to content

Commit

Permalink
Logikk for å sende Avsluttet hendelse i BekreftelseUtgangStream.kt, l…
Browse files Browse the repository at this point in the history
…aget PeriodeStream for å lagre identitetsnummer i state for åpne perioder, og slette state for avsluttede perioder
  • Loading branch information
robertkittilsen committed Oct 2, 2024
1 parent 92fca31 commit c19d3ac
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 30 deletions.
4 changes: 3 additions & 1 deletion apps/bekreftelse-utgang/nais/nais-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ spec:
env:
- name: KAFKA_STREAMS_ID_SUFFIX
value: "v1"
- name: KAFKA_PAW_ARBEIDSSOKERPERIODER_TOPIC
value: "paw.arbeidssokerperioder-v1"
- name: KAFKA_PAW_ARBEIDSSOKER_BEKREFTELSE_HENDELSELOGG_TOPIC
value: "paw.arbeidssoker-bekreftelse-hendelseslogg-beta-v1"
- name: KAFKA_PAW_ARBEIDSSOKER_HENDELSELOGG_TOPIC
value: "paw.arbeidssoker-hendelseslogg-v1"
- name: KAFKA_PUNCTUATOR_INTERVAL
value: "PT1M"
value: "PT5M"
azure:
application:
enabled: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ data class ApplicationConfig(
data class KafkaTopologyConfig(
val applicationIdSuffix: String,
val internStateStoreName: String,
val periodeTopic: String,
val hendelseloggTopic: String,
val bekreftelseHendelseloggTopic: String,
val punctuationInterval: Duration,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,78 @@
package no.nav.paw.bekreftelseutgang.topology

import no.nav.paw.arbeidssokerregisteret.intern.v1.Avsluttet
import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse
import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseSerde
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Bruker
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.BrukerType
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Metadata
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
import no.nav.paw.bekreftelse.internehendelser.baOmAaAvsluttePeriodeHendelsesType
import no.nav.paw.bekreftelse.internehendelser.registerGracePeriodeUtloeptHendelseType
import no.nav.paw.bekreftelseutgang.config.ApplicationConfig
import no.nav.paw.config.kafka.streams.Punctuation
import no.nav.paw.config.env.appImageOrDefaultForLocal
import no.nav.paw.config.kafka.streams.genericProcess
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.processor.PunctuationType
import org.slf4j.LoggerFactory
import org.apache.kafka.streams.state.KeyValueStore
import java.time.Instant
import java.util.*

fun StreamsBuilder.buildBekreftelseUtgangStream(applicationConfig: ApplicationConfig) {
with(applicationConfig.kafkaTopology) {
stream<Long, no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse>(bekreftelseHendelseloggTopic)
.genericProcess<Long, no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse, Long, Hendelse>(
stream<Long, BekreftelseHendelse>(bekreftelseHendelseloggTopic)
.genericProcess<Long, BekreftelseHendelse, Long, Hendelse>(
name = "bekreftelseUtgangStream",
internStateStoreName,
punctuation = Punctuation(
punctuationInterval,
PunctuationType.WALL_CLOCK_TIME,
{ _, _ -> true }, // TODO()
),
punctuation = null,
) { record ->
// TODO()
}
.to(hendelseloggTopic, Produced.with(Serdes.Long(), HendelseSerde()))
val stateStore = getStateStore<KeyValueStore<UUID, String>>(internStateStoreName)
// TODO: Må jeg ta høyde for at periodeStream ikke er ajoure og har lagt inn identitetsnummer i state?
val identitetsnummer = stateStore[record.value().periodeId] ?: return@genericProcess

when(record.value().hendelseType) {
registerGracePeriodeUtloeptHendelseType -> avsluttetHendelse(
identitetsnummer = identitetsnummer,
periodeId = record.value().periodeId,
arbeidssoekerId = record.value().arbeidssoekerId,
utfoertAv = Bruker(
type = BrukerType.SYSTEM,
id = applicationConfig.getAppImage()
),
aarsak = "Graceperiode utløpt"
)
baOmAaAvsluttePeriodeHendelsesType -> avsluttetHendelse(
identitetsnummer = identitetsnummer,
periodeId = record.value().periodeId,
arbeidssoekerId = record.value().arbeidssoekerId,
utfoertAv = Bruker(
type = BrukerType.SLUTTBRUKER,
id = identitetsnummer
),
aarsak = "Svarte NEI på spørsmål 'Vil du fortsatt være registrert som arbeidssøker?'"
)
else -> {
return@genericProcess
}
}
}.to(hendelseloggTopic, Produced.with(Serdes.Long(), HendelseSerde()))
}
}

private val bekreftelseUtgangLogger = LoggerFactory.getLogger("bekreftelseUtgangLogger")
fun ApplicationConfig.getAppImage() = runtimeEnvironment.appImageOrDefaultForLocal("paw-arbeidssoekerregisteret-bekreftelse-utgang:LOCAL")

fun avsluttetHendelse(identitetsnummer: String, periodeId: UUID, arbeidssoekerId: Long, utfoertAv: Bruker, aarsak: String) = Avsluttet(
hendelseId = UUID.randomUUID(),
id = arbeidssoekerId,
identitetsnummer = identitetsnummer,
metadata = metadata(utfoertAv, aarsak),
periodeId = periodeId,
)

fun metadata(utfoertAv: Bruker, aarsak: String) = Metadata(
tidspunkt = Instant.now(),
utfoertAv = utfoertAv,
kilde = "paw.arbeidssoekerregisteret.bekreftelse-utgang",
aarsak = aarsak,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package no.nav.paw.bekreftelseutgang.topology

import no.nav.paw.bekreftelseutgang.config.ApplicationConfig
import org.apache.kafka.streams.StreamsBuilder
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import no.nav.paw.config.kafka.streams.mapNonNull
import org.apache.kafka.streams.state.KeyValueStore
import java.util.*

fun StreamsBuilder.buildPeriodeStream(applicationConfig: ApplicationConfig){
with(applicationConfig.kafkaTopology){
stream<Long, Periode>(periodeTopic)
.mapNonNull(
"lagreEllerSlettPeriode",
internStateStoreName
) { periode ->
val stateStore: KeyValueStore<UUID, String> = getStateStore(internStateStoreName)
if(periode.avsluttet == null) {
stateStore.put(periode.id, periode.identitetsnummer)
} else {
stateStore.delete(periode.id)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,31 +1,28 @@
package no.nav.paw.bekreftelseutgang.topology

import kotlinx.coroutines.runBlocking
import no.nav.paw.bekreftelseutgang.config.ApplicationConfig
import no.nav.paw.bekreftelseutgang.context.ApplicationContext
import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient
import no.nav.paw.kafkakeygenerator.client.KafkaKeysResponse
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.state.Stores

fun buildTopology(
applicationContext: ApplicationContext
): Topology = StreamsBuilder().apply {
//buildInternStateStore(applicationContext.applicationConfig)
buildInternStateStore(applicationContext.applicationConfig)
buildPeriodeStream(applicationContext.applicationConfig)
buildBekreftelseUtgangStream(applicationContext.applicationConfig)
}.build()

/*fun StreamsBuilder.buildInternStateStore(applicationConfig: ApplicationConfig) {
fun StreamsBuilder.buildInternStateStore(applicationConfig: ApplicationConfig) {
with(applicationConfig.kafkaTopology) {
addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(internStateStoreName),
Serdes.UUID(),
InternTilstandSerde()
Serdes.String()
)
)
}
}*/

fun KafkaKeysClient.getIdAndKeyBlocking(identitetsnummer: String): KafkaKeysResponse = runBlocking {
getIdAndKey(identitetsnummer)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[kafkaTopology]
applicationIdSuffix = "v1"
internStateStoreName = "intern-tilstand"
periodeTopic = "paw.arbeidssokerperioder-v1
hendelseloggTopic = "paw.arbeidssoker-hendelseslogg-v1"
bekreftelseHendelseloggTopic = "paw.arbeidssoker-bekreftelse-hendelseslogg-v1"
punctuationInterval = "PT5S"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[kafkaTopology]
applicationIdSuffix = "${KAFKA_STREAMS_ID_SUFFIX}"
internStateStoreName = "intern-tilstand"
periodeTopic = "${KAFKA_PAW_ARBEIDSSOKERPERIODER_TOPIC}"
bekreftelseHendelseloggTopic = "${KAFKA_PAW_ARBEIDSSOKER_BEKREFTELSE_HENDELSELOGG_TOPIC}"
hendelseloggTopic = "${KAFKA_PAW_ARBEIDSSOKER_HENDELSELOGG_TOPIC}"
punctuationInterval = "${KAFKA_PUNCTUATOR_INTERVAL}"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package no.nav.paw.bekreftelseutgang

import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import io.mockk.mockk
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse
import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseSerde
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
Expand All @@ -17,21 +19,26 @@ import no.nav.paw.kafkakeygenerator.client.inMemoryKafkaKeysMock
import org.apache.avro.specific.SpecificRecord
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.common.utils.Time
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.TestInputTopic
import org.apache.kafka.streams.TestOutputTopic
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.state.internals.InMemoryKeyValueBytesStoreSupplier
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Instant
import java.util.*

class ApplicationTestContext(initialWallClockTime: Instant = Instant.now()) {
val periodeTopicSerde: Serde<Periode> = opprettSerde()
val bekreftelseHendelseLoggSerde: Serde<BekreftelseHendelse> = BekreftelseHendelseSerde()
val hendelseLoggSerde: Serde<Hendelse> = HendelseSerde()
val applicationConfig = loadNaisOrLocalConfiguration<ApplicationConfig>(APPLICATION_CONFIG_FILE_NAME)
val kafkaKeysClient = inMemoryKafkaKeysMock()

val applicationContext = ApplicationContext(
applicationConfig = applicationConfig,
prometheusMeterRegistry = mockk<PrometheusMeterRegistry>(),
Expand All @@ -42,19 +49,24 @@ class ApplicationTestContext(initialWallClockTime: Instant = Instant.now()) {
val logger: Logger = LoggerFactory.getLogger(ApplicationTestContext::class.java)

val topology = StreamsBuilder().apply {
/*addStateStore(
addStateStore(
KeyValueStoreBuilder(
InMemoryKeyValueBytesStoreSupplier(applicationConfig.kafkaTopology.internStateStoreName),
Serdes.UUID(),
InternTilstandSerde(),
Serdes.String(),
Time.SYSTEM
)
)*/
// TODO()
)
}.build()

val testDriver: TopologyTestDriver = TopologyTestDriver(topology, kafkaStreamProperties, initialWallClockTime)

val periodeTopic: TestInputTopic<Long, Periode> = testDriver.createInputTopic(
applicationConfig.kafkaTopology.periodeTopic,
Serdes.Long().serializer(),
periodeTopicSerde.serializer()
)

val bekreftelseHendelseLoggTopic: TestInputTopic<Long, BekreftelseHendelse> = testDriver.createInputTopic(
applicationConfig.kafkaTopology.bekreftelseHendelseloggTopic,
Serdes.Long().serializer(),
Expand All @@ -76,6 +88,19 @@ class ApplicationTestContext(initialWallClockTime: Instant = Instant.now()) {

const val SCHEMA_REGISTRY_SCOPE = "juni-registry"

fun <T : SpecificRecord> opprettSerde(): Serde<T> {
val schemaRegistryClient = MockSchemaRegistry.getClientForScope(SCHEMA_REGISTRY_SCOPE)
val serde: Serde<T> = SpecificAvroSerde(schemaRegistryClient)
serde.configure(
mapOf(
KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS to "true",
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG to "mock://$SCHEMA_REGISTRY_SCOPE"
),
false
)
return serde
}

val kafkaStreamProperties = Properties().apply {
this[StreamsConfig.APPLICATION_ID_CONFIG] = "test"
this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "dummy:1234"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package no.nav.paw.bekreftelse.internehendelser
import java.time.Instant
import java.util.*


const val periodeAvsluttetHendelsesType = "bekreftelse.periode_avsluttet"

data class PeriodeAvsluttet(
Expand Down

0 comments on commit c19d3ac

Please sign in to comment.