diff --git a/apps/bekreftelse-tjeneste/README.md b/apps/bekreftelse-tjeneste/README.md new file mode 100644 index 00000000..9b9dcd3b --- /dev/null +++ b/apps/bekreftelse-tjeneste/README.md @@ -0,0 +1,75 @@ +```mermaid +sequenceDiagram + Registeret->>PeriodeTopic: Periode startet + PeriodeTopic-->>MeldepliktTjeneste: Periode startet + MeldepliktTjeneste-->>EndringerTopic: Melding X dager før forfall + MeldepliktTjeneste-->>EndringerTopic: Melding OK ved innsending + MeldepliktTjeneste-->>EndringerTopic: Frist utløpt + MeldepliktTjeneste-->>EndringerTopic: Graceperiode utløpt + PeriodeTopic-->>MeldepliktTjeneste: Periode avsluttet + MeldepliktTjeneste-->>EndringerTopic: Periode avsluttet +``` + +```mermaid +sequenceDiagram + Registeret->>PeriodeTopic: Periode startet + Dagpenger-->>AnsvarsTopic: Dagpenger startet + AnsvarsTopic-->>MeldepliktTjeneste: Dagpenger tar over + MeldepliktTjeneste-->>EndringerTopic: +``` + +```mermaid +graph LR + periodeTopic((PeriodeTopic)) -- Startet --> Ansvar[Ansvar] +``` +``` + periode topic: + startet: lagre initiell tilstand + avsluttet: send ok(avsluttet) + :slett tilstand + + ansvar topic: + tar ansvar: lagre info om ansvar + :sett tidspunkt for siste melding til record ts for ansvars endring + avslutter ansvar: slett info om ansvar + + melding topic: + mottatt: lagre tidspunkt for siste melding + :send OK(mottatt) + dersom ikke ønsker å fortsette: + :send VilAvsluttePerioden + + + hver x time: + for alle perioder ingen har ansvar for: + dersom tid siden siste melding (eller periode start) > Y dager: + send melding om frist nærmer seg + dersom tid siden siste melding (eller periode start) > Z dager: + send melding om frist utløpt + dersom tid siden siste melding (eller periode start) > Z+Grace dager: + send melding om graceperiode utløpt + for alle perioder andre har ansvar for: + dersom tid siden siste melding (eller periode start) > Z+G+1dag dager: + send melding om graceperiode utløpt + :slett ansvar, vi overtar +``` + +Modul: endringer til frontend +``` + meldings topic: + - OK(mottatt) -> sett oppgave fullført + - OK(avsluttet) -> slett oppgave kansellert + - OK(ansvar) -> sett oppgave fullført + - frist nærmer seg -> opprett oppgave + - frist utløpt -> opprett oppgave + - graceperiode utløpt -> ingenting +``` + +Modul: endringer til eventlogg +``` + meldings topic: + - VilAvsluttePerioden -> send avslutt hendelse til eventlogg + +``` +* OK (grunn: RapportMottatt, AnsvarFlyttet, PeriodeAvsluttet) +* \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/build.gradle.kts b/apps/bekreftelse-tjeneste/build.gradle.kts new file mode 100644 index 00000000..e39cc88a --- /dev/null +++ b/apps/bekreftelse-tjeneste/build.gradle.kts @@ -0,0 +1,35 @@ +import org.jetbrains.kotlin.gradle.tasks.KotlinCompile + +plugins { + kotlin("jvm") + id("com.google.cloud.tools.jib") +} + +dependencies { + implementation(project(":lib:hoplite-config")) + implementation(project(":lib:kafka-streams")) + implementation(project(":lib:kafka-key-generator-client")) + implementation(project(":domain:main-avro-schema")) + implementation(project(":domain:rapportering-interne-hendelser")) + implementation(project(":domain:rapporteringsansvar-schema")) + implementation(project(":domain:rapporteringsmelding-schema")) + implementation(orgApacheKafka.kafkaStreams) + implementation(jackson.datatypeJsr310) + implementation(jackson.kotlin) + implementation(apacheAvro.kafkaStreamsAvroSerde) + + testImplementation(orgApacheKafka.streamsTest) + testImplementation(testLibs.runnerJunit5) + testImplementation(testLibs.assertionsCore) +} + +//enable context receiver +tasks.withType().configureEach { + compilerOptions { + freeCompilerArgs.add("-Xcontext-receivers") + } +} + +tasks.withType().configureEach { + useJUnitPlatform() +} diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/AnsvarTopology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/AnsvarTopology.kt new file mode 100644 index 00000000..c40d7760 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/AnsvarTopology.kt @@ -0,0 +1,15 @@ +package no.nav.paw.meldeplikttjeneste + +import no.nav.paw.config.kafka.streams.genericProcess +import no.nav.paw.rapportering.ansvar.v1.AnsvarEndret +import no.nav.paw.rapportering.internehendelser.RapporteringsHendelse +import org.apache.kafka.streams.StreamsBuilder + +context(ApplicationConfiguration, ApplicationContext) +fun StreamsBuilder.processAnsvarTopic() { + stream(ansvarsTopic) + .genericProcess("ansvarEndret", statStoreName) { record -> + + } +} + diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/ApplicationConfiguration.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/ApplicationConfiguration.kt new file mode 100644 index 00000000..a0626248 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/ApplicationConfiguration.kt @@ -0,0 +1,12 @@ +package no.nav.paw.meldeplikttjeneste + +import java.time.Duration + +data class ApplicationConfiguration( + val periodeTopic: String, + val ansvarsTopic: String, + val rapporteringsTopic: String, + val rapporteringsHendelsesloggTopic: String, + val statStoreName: String, + val punctuateInterval: Duration +) diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/ApplicationContext.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/ApplicationContext.kt new file mode 100644 index 00000000..014e1598 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/ApplicationContext.kt @@ -0,0 +1,9 @@ +package no.nav.paw.meldeplikttjeneste + +import no.nav.paw.meldeplikttjeneste.tilstand.InternTilstandSerde +import no.nav.paw.rapportering.internehendelser.RapporteringsHendelseSerde + +class ApplicationContext( + val internTilstandSerde: InternTilstandSerde, + val rapporteringsHendelseSerde: RapporteringsHendelseSerde +) \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/KontrolerFrister.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/KontrolerFrister.kt new file mode 100644 index 00000000..e68d87c4 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/KontrolerFrister.kt @@ -0,0 +1,22 @@ +package no.nav.paw.meldeplikttjeneste + +import no.nav.paw.config.kafka.streams.Punctuation +import no.nav.paw.meldeplikttjeneste.tilstand.InternTilstand +import no.nav.paw.meldeplikttjeneste.tilstand.RapporteringsKonfigurasjon +import org.apache.kafka.streams.KeyValue +import org.apache.kafka.streams.processor.PunctuationType +import org.apache.kafka.streams.state.KeyValueStore +import java.time.Instant +import java.util.* + + +context(ApplicationConfiguration, RapporteringsKonfigurasjon) +fun kontrollerFrister(): Punctuation = TODO() + +operator fun KeyValue.component1(): K = key +operator fun KeyValue.component2(): V = value + +context(Instant, RapporteringsKonfigurasjon) +fun rapporteringSkalTilgjengeliggjoeres(tilstand: InternTilstand): Boolean { + TODO() +} \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/PeriodeTopology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/PeriodeTopology.kt new file mode 100644 index 00000000..17422674 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/PeriodeTopology.kt @@ -0,0 +1,67 @@ +package no.nav.paw.meldeplikttjeneste + +import no.nav.paw.arbeidssokerregisteret.api.v1.Periode +import no.nav.paw.config.kafka.streams.genericProcess +import no.nav.paw.config.kafka.streams.mapWithContext +import no.nav.paw.kafkakeygenerator.client.KafkaKeysResponse +import no.nav.paw.meldeplikttjeneste.tilstand.InternTilstand +import no.nav.paw.meldeplikttjeneste.tilstand.InternTilstandSerde +import no.nav.paw.meldeplikttjeneste.tilstand.initTilstand +import no.nav.paw.rapportering.internehendelser.PeriodeAvsluttet +import no.nav.paw.rapportering.internehendelser.RapporteringsHendelse +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.StreamsBuilder +import org.apache.kafka.streams.kstream.Produced +import org.apache.kafka.streams.state.KeyValueStore +import java.util.* + + +context(ApplicationConfiguration, ApplicationContext) +fun StreamsBuilder.processPeriodeTopic(kafkaKeyFunction: (String) -> KafkaKeysResponse) { + stream(periodeTopic) + .mapWithContext("lagreEllerSlettPeriode", statStoreName) { periode -> + val keyValueStore: KeyValueStore = getStateStore(statStoreName) + val currentState = keyValueStore[periode.id] + val (id, key) = currentState?.let { it.periode.kafkaKeysId to it.periode.recordKey } ?: + kafkaKeyFunction(periode.identitetsnummer).let { it.id to it.key} + when { + currentState == null && periode.avsluttet() -> Action.DoNothing + periode.avsluttet() -> Action.DeleteStateAndEmit(id, periode) + currentState == null -> Action.UpdateState(initTilstand(id = id, key = key, periode = periode)) + else -> Action.DoNothing + } + } + .genericProcess( + name = "executeAction", + punctuation = null, + stateStoreNames = arrayOf(statStoreName) + ) { record -> + val keyValueStore: KeyValueStore = getStateStore(statStoreName) + when (val action = record.value()) { + is Action.DeleteStateAndEmit -> { + keyValueStore.delete(action.periode.id) + forward( + record.withValue( + PeriodeAvsluttet( + UUID.randomUUID(), + action.periode.id, + action.periode.identitetsnummer, + action.arbeidsoekerId + ) as RapporteringsHendelse + ) + ) + } + + Action.DoNothing -> {} + is Action.UpdateState -> keyValueStore.put(action.state.periode.periodeId, action.state) + } + }.to(rapporteringsHendelsesloggTopic, Produced.with(Serdes.Long(), rapporteringsHendelseSerde)) +} + +fun Periode.avsluttet(): Boolean = avsluttet != null + +sealed interface Action { + data object DoNothing : Action + data class DeleteStateAndEmit(val arbeidsoekerId: Long, val periode: Periode) : Action + data class UpdateState(val state: InternTilstand) : Action +} diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/RapportingsMeldingTopology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/RapportingsMeldingTopology.kt new file mode 100644 index 00000000..b132e7d4 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/RapportingsMeldingTopology.kt @@ -0,0 +1,34 @@ +package no.nav.paw.meldeplikttjeneste + +import no.nav.paw.config.kafka.streams.genericProcess +import no.nav.paw.meldeplikttjeneste.tilstand.InternTilstand +import no.nav.paw.rapportering.internehendelser.BaOmAaAvsluttePeriode +import no.nav.paw.rapportering.internehendelser.RapporteringsHendelse +import no.nav.paw.rapportering.internehendelser.RapporteringsMeldingMottatt +import no.nav.paw.rapportering.melding.v1.Melding +import org.apache.kafka.streams.StreamsBuilder +import org.apache.kafka.streams.state.KeyValueStore +import org.slf4j.LoggerFactory +import java.util.* + +context(ApplicationConfiguration, ApplicationContext) +fun StreamsBuilder.processRapporteringsMeldingTopic() { + stream(rapporteringsTopic) + .genericProcess( + name = "meldingMottatt", + statStoreName + ) { record -> + val gjeldeneTilstand: InternTilstand? = getStateStore(statStoreName)[record.value().periodeId] + if (gjeldeneTilstand == null) { + meldingsLogger.warn("Melding mottatt for periode som ikke er aktiv/eksisterer") + } else { + TODO() + } + + } + +} + +private val meldingsLogger = LoggerFactory.getLogger("meldingsLogger") + + diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/Startup.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/Startup.kt new file mode 100644 index 00000000..bd6af218 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/Startup.kt @@ -0,0 +1,30 @@ +package no.nav.paw.meldeplikttjeneste + +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde +import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration +import no.nav.paw.config.kafka.KAFKA_STREAMS_CONFIG_WITH_SCHEME_REG +import no.nav.paw.config.kafka.KafkaConfig +import no.nav.paw.config.kafka.streams.KafkaStreamsFactory +import no.nav.paw.meldeplikttjeneste.tilstand.InternTilstandSerde +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.StreamsBuilder +import org.apache.kafka.streams.state.Stores + +const val STATE_STORE = "state-store" +const val APPLICATION_ID_SUFFIX = "beta" + +fun main() { + val kafkaConfig = loadNaisOrLocalConfiguration(KAFKA_STREAMS_CONFIG_WITH_SCHEME_REG) + val streamsFactory = KafkaStreamsFactory(APPLICATION_ID_SUFFIX, kafkaConfig) + .withDefaultKeySerde(Serdes.Long()::class) + .withDefaultValueSerde(SpecificAvroSerde::class) + val steamsBuilder = StreamsBuilder() + .addStateStore( + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(STATE_STORE), + Serdes.UUID(), + InternTilstandSerde() + ) + ) + +} \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/Topology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/Topology.kt new file mode 100644 index 00000000..8ae6a93f --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/Topology.kt @@ -0,0 +1,22 @@ +package no.nav.paw.meldeplikttjeneste + +import no.nav.paw.kafkakeygenerator.client.KafkaKeysResponse +import no.nav.paw.meldeplikttjeneste.tilstand.InternTilstand +import no.nav.paw.rapportering.ansvar.v1.AnsvarEndret +import org.apache.kafka.streams.StreamsBuilder +import org.apache.kafka.streams.Topology +import org.apache.kafka.streams.state.KeyValueStore +import java.util.* + +typealias StateStore = KeyValueStore + +context(ApplicationConfiguration, ApplicationContext) +fun StreamsBuilder.appTopology( + kafkaKeyFunction: (String) -> KafkaKeysResponse +): Topology { + processPeriodeTopic(kafkaKeyFunction) + processAnsvarTopic() + processRapporteringsMeldingTopic() + + return build() +} diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/tilstand/InternTilstand.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/tilstand/InternTilstand.kt new file mode 100644 index 00000000..0db83e99 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/tilstand/InternTilstand.kt @@ -0,0 +1,48 @@ +package no.nav.paw.meldeplikttjeneste.tilstand + +import no.nav.paw.arbeidssokerregisteret.api.v1.Periode +import no.nav.paw.rapportering.internehendelser.RapporteringsHendelse +import java.time.Duration +import java.time.Instant +import java.util.UUID +import kotlin.reflect.KClass + +@JvmRecord +data class InternTilstand( + val periode: PeriodeInfo, + val utestaaende: List +) + +@JvmRecord +data class Rapportering( + val sisteHandling: KClass, + val rapporteringsId: UUID, + val gjelderFra: Instant, + val gjelderTil: Instant +) + + +@JvmRecord +data class PeriodeInfo( + val periodeId: UUID, + val identitetsnummer: String, + val kafkaKeysId: Long, + val recordKey: Long, + val avsluttet: Boolean +) + +fun initTilstand( + id: Long, + key: Long, + periode: Periode +): InternTilstand = + InternTilstand( + periode = PeriodeInfo( + periodeId = periode.id, + identitetsnummer = periode.identitetsnummer, + kafkaKeysId = id, + recordKey = key, + avsluttet = periode.avsluttet != null + ), + utestaaende = emptyList() + ) diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/tilstand/InternTilstandSerde.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/tilstand/InternTilstandSerde.kt new file mode 100644 index 00000000..3af06599 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/tilstand/InternTilstandSerde.kt @@ -0,0 +1,35 @@ +package no.nav.paw.meldeplikttjeneste.tilstand + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import com.fasterxml.jackson.module.kotlin.registerKotlinModule +import org.apache.kafka.common.serialization.Deserializer +import org.apache.kafka.common.serialization.Serde +import org.apache.kafka.common.serialization.Serializer + +class InternTilstandSerde : Serde { + override fun serializer(): Serializer { + return InternTilstandSerializer + } + + override fun deserializer(): Deserializer { + return InternTilstandDeserializer + } +} + +object InternTilstandSerializer : Serializer { + override fun serialize(topic: String?, data: InternTilstand?): ByteArray { + return internTilstandObjectMapper.writeValueAsBytes(data) + } +} + +object InternTilstandDeserializer : Deserializer { + override fun deserialize(topic: String?, data: ByteArray?): InternTilstand { + return internTilstandObjectMapper.readValue(data, InternTilstand::class.java) + } +} + +private val internTilstandObjectMapper = ObjectMapper() + .registerKotlinModule() + .registerModules(JavaTimeModule()) + diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/tilstand/RegisterInstillinger.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/tilstand/RegisterInstillinger.kt new file mode 100644 index 00000000..90ec05f6 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/meldeplikttjeneste/tilstand/RegisterInstillinger.kt @@ -0,0 +1,46 @@ +package no.nav.paw.meldeplikttjeneste.tilstand + +import java.time.DayOfWeek +import java.time.Duration +import java.time.Instant +import java.time.LocalDate +import java.time.ZoneId +import java.time.temporal.TemporalAdjuster +import java.time.temporal.TemporalAdjusters + +data class RapporteringsKonfigurasjon( + val rapporteringTilgjengligOffset: Duration, + val varselFoerUtloepAvGracePeriode: Duration +) + +fun fristForNesteRapportering(forrige: Instant, interval: Duration): Instant { + val magicMondayAdjuster = MagicMondayAdjuster() + val zoneId = ZoneId.of("Europe/Oslo") + return forrige + .plus(interval) + .let { LocalDate.ofInstant(it, ZoneId.systemDefault()) } + .with(magicMondayAdjuster) + .plus(Duration.ofDays(1)) + .atStartOfDay(zoneId).toInstant() +} + +class MagicMondayAdjuster: TemporalAdjuster { + override fun adjustInto(temporal: java.time.temporal.Temporal): java.time.temporal.Temporal { + val internalAdjuster = TemporalAdjusters.next(DayOfWeek.MONDAY) + val internalTemporal = when (temporal) { + is LocalDate -> temporal + is Instant -> LocalDate.ofInstant(temporal, ZoneId.systemDefault()) + else -> LocalDate.from(temporal) + } + return internalTemporal + .with(internalAdjuster) + .skipForwardIfNotMagicMonday() + .plus(Duration.ofDays(1)) + .atStartOfDay(ZoneId.systemDefault()) + .toInstant() + } +} + +fun LocalDate.skipForwardIfNotMagicMonday(): LocalDate { + return this +} diff --git a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/meldeplikttjeneste/ApplicationTestContext.kt b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/meldeplikttjeneste/ApplicationTestContext.kt new file mode 100644 index 00000000..0fd5591d --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/meldeplikttjeneste/ApplicationTestContext.kt @@ -0,0 +1,123 @@ +package no.nav.paw.meldeplikttjeneste + +import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry +import io.confluent.kafka.serializers.KafkaAvroSerializerConfig +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde +import no.nav.paw.arbeidssokerregisteret.api.v1.Periode +import no.nav.paw.kafkakeygenerator.client.KafkaKeysResponse +import no.nav.paw.meldeplikttjeneste.tilstand.InternTilstandSerde +import no.nav.paw.rapportering.ansvar.v1.AnsvarEndret +import no.nav.paw.rapportering.internehendelser.RapporteringsHendelse +import no.nav.paw.rapportering.internehendelser.RapporteringsHendelseSerde +import no.nav.paw.rapportering.melding.v1.Melding +import org.apache.avro.specific.SpecificRecord +import org.apache.kafka.common.serialization.Serde +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.common.utils.Time +import org.apache.kafka.streams.StreamsBuilder +import org.apache.kafka.streams.StreamsConfig +import org.apache.kafka.streams.Topology +import org.apache.kafka.streams.TopologyTestDriver +import org.apache.kafka.streams.state.Stores +import org.apache.kafka.streams.state.internals.InMemoryKeyValueBytesStoreSupplier +import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder +import java.time.Duration +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong + +class ApplicationTestContext { + val ansvarsTopicSerde: Serde = opprettSerde() + val rapporteringMeldingSerde: Serde = opprettSerde() + val periodeTopicSerde: Serde = opprettSerde() + val hendelseLoggSerde: Serde = RapporteringsHendelseSerde() + val applicationConfiguration = ApplicationConfiguration( + periodeTopic = "periodeTopic", + ansvarsTopic = "ansvarsTopic", + rapporteringsTopic = "rapporteringsTopic", + rapporteringsHendelsesloggTopic = "rapporteringsHendelsesloggTopic", + statStoreName = "statStoreName", + punctuateInterval = Duration.ofSeconds(1) + ) + val applicationContext = ApplicationContext( + internTilstandSerde = InternTilstandSerde(), + rapporteringsHendelseSerde = RapporteringsHendelseSerde() + ) + + val kafkaKeysService = kafkaKeyInstance + + val testDriver: TopologyTestDriver = + with(applicationContext) { + with(applicationConfiguration) { + StreamsBuilder() + .addStateStore( + KeyValueStoreBuilder( + InMemoryKeyValueBytesStoreSupplier(applicationConfiguration.statStoreName), + Serdes.UUID(), + applicationContext.internTilstandSerde, + Time.SYSTEM + ) + ).appTopology(kafkaKeysService) + } + }.let { TopologyTestDriver(it, kafkaStreamProperties) } + + val periodeTopic = testDriver.createInputTopic( + applicationConfiguration.periodeTopic, + Serdes.Long().serializer(), + periodeTopicSerde.serializer() + ) + + val ansvarsTopic = testDriver.createInputTopic( + applicationConfiguration.ansvarsTopic, + Serdes.Long().serializer(), + ansvarsTopicSerde.serializer() + ) + + val rapporteringsTopic = testDriver.createInputTopic( + applicationConfiguration.rapporteringsTopic, + Serdes.Long().serializer(), + rapporteringMeldingSerde.serializer() + ) + + val hendelseLoggTopic = testDriver.createOutputTopic( + applicationConfiguration.rapporteringsHendelsesloggTopic, + Serdes.Long().deserializer(), + hendelseLoggSerde.deserializer() + ) +} + +val kafkaKeyInstance: (String) -> KafkaKeysResponse + get() { + val map = ConcurrentHashMap() + val sequence = AtomicLong(0) + return { key -> + map.computeIfAbsent(key) { + val id = sequence.getAndIncrement() + KafkaKeysResponse(sequence.getAndIncrement(), id % 2) + } + } + } + +const val SCHEMA_REGISTRY_SCOPE = "juni-registry" + +fun opprettSerde(): Serde { + val schemaRegistryClient = MockSchemaRegistry.getClientForScope(SCHEMA_REGISTRY_SCOPE) + val serde: Serde = SpecificAvroSerde(schemaRegistryClient) + serde.configure( + mapOf( + KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS to "true", + KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG to "mock://$SCHEMA_REGISTRY_SCOPE" + ), + false + ) + return serde +} + +val kafkaStreamProperties = Properties().apply { + this[StreamsConfig.APPLICATION_ID_CONFIG] = "test" + this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "dummy:1234" + this[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.Long().javaClass + this[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = SpecificAvroSerde().javaClass + this[KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS] = "true" + this[KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG] = "mock://$SCHEMA_REGISTRY_SCOPE" +} \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/meldeplikttjeneste/IngenAndreTarAnsvarTest.kt b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/meldeplikttjeneste/IngenAndreTarAnsvarTest.kt new file mode 100644 index 00000000..328de0b5 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/meldeplikttjeneste/IngenAndreTarAnsvarTest.kt @@ -0,0 +1,79 @@ +package no.nav.paw.meldeplikttjeneste + +import io.kotest.core.annotation.Ignored +import io.kotest.core.spec.style.FreeSpec +import io.kotest.matchers.shouldBe +import io.kotest.matchers.types.shouldBeInstanceOf +import no.nav.paw.arbeidssokerregisteret.api.v1.Bruker +import no.nav.paw.arbeidssokerregisteret.api.v1.BrukerType +import no.nav.paw.arbeidssokerregisteret.api.v1.Metadata +import no.nav.paw.arbeidssokerregisteret.api.v1.Periode +import no.nav.paw.rapportering.internehendelser.LeveringsfristUtloept +import no.nav.paw.rapportering.internehendelser.RapporteringTilgjengelig +import no.nav.paw.rapportering.melding.v1.Melding +import java.time.Duration +import java.time.Instant +import java.util.* + +@Ignored("Midlertidig disablet av Thomas") +class IngenAndreTarAnsvarTest: FreeSpec({ + with(ApplicationTestContext()) { + "Applikasjons test hvor ingen andre tar ansvar" - { + "Bruker avslutter via rapportering" - { + val (periode, kafkaKeyResponse) = periode(identitetsnummer = "12345678901") + periodeTopic.pipeInput(kafkaKeyResponse.key, periode) + "Nå perioden opprettes skal det ikke skje noe" { + hendelseLoggTopic.isEmpty shouldBe true + } + "Etter 13 dager skal en rapportering være tilgjengelig" { + testDriver.advanceWallClockTime(Duration.ofDays(13)) + hendelseLoggTopic.isEmpty shouldBe false + val kv = hendelseLoggTopic.readKeyValue() + kv.key shouldBe kafkaKeyResponse.key + with(kv.value.shouldBeInstanceOf()) { + periodeId shouldBe periode.id + identitetsnummer shouldBe periode.identitetsnummer + arbeidssoekerId shouldBe kafkaKeyResponse.id + gjelderFra shouldBe periode.startet.tidspunkt + } + } + "Når rapporteringen ikke blir besvart innen fristen sendes det ut en melding" { + testDriver.advanceWallClockTime(Duration.ofDays(4)) + hendelseLoggTopic.isEmpty shouldBe false + val kv = hendelseLoggTopic.readKeyValue() + kv.key shouldBe kafkaKeyResponse.key + with(kv.value.shouldBeInstanceOf()) { + periodeId shouldBe periode.id + identitetsnummer shouldBe periode.identitetsnummer + arbeidssoekerId shouldBe kafkaKeyResponse.id + } + } + } + } +}}) + +context(ApplicationTestContext) +fun periode( + id: UUID = UUID.randomUUID(), + identitetsnummer: String = "12345678901", + startet: Instant = Instant.now(), + avsluttet: Instant? = null +) = Periode( + id, + identitetsnummer, + Metadata( + startet, + Bruker(BrukerType.SLUTTBRUKER, identitetsnummer), + "junit", + "tester", + null + ), + avsluttet?.let { Metadata( + avsluttet, + Bruker(BrukerType.SLUTTBRUKER, identitetsnummer), + "junit", + "tester", + null) + } +) to kafkaKeysService(identitetsnummer) + diff --git a/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/GenericProcessor.kt b/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/GenericProcessor.kt new file mode 100644 index 00000000..8dbabb79 --- /dev/null +++ b/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/GenericProcessor.kt @@ -0,0 +1,109 @@ +package no.nav.paw.config.kafka.streams + + +import org.apache.kafka.streams.kstream.KStream +import org.apache.kafka.streams.kstream.Named +import org.apache.kafka.streams.processor.PunctuationType +import org.apache.kafka.streams.processor.api.Processor +import org.apache.kafka.streams.processor.api.ProcessorContext +import org.apache.kafka.streams.processor.api.Record +import java.time.Duration +import java.time.Instant + +fun KStream.filterWithContext( + name: String, + vararg stateStoreNames: String, + function: ProcessorContext.(V) -> Boolean +): KStream { + val processor = { + GenericProcessor { record -> + if (function(record.value())) forward(record) + } + } + return process(processor, Named.`as`(name), *stateStoreNames) +} + +fun KStream.mapNonNull( + name: String, + vararg stateStoreNames: String, + function: ProcessorContext.(V_IN) -> V_OUT? +): KStream { + val processor = { + GenericProcessor { record -> + val result = function(record.value()) + if (result != null) forward(record.withValue(result)) + } + } + return process(processor, Named.`as`(name), *stateStoreNames) +} + +fun KStream.mapWithContext( + name: String, + vararg stateStoreNames: String, + function: ProcessorContext.(V_IN) -> V_OUT +): KStream { + val processor = { + GenericProcessor { record -> + val result = function(record.value()) + forward(record.withValue(result)) + } + } + return process(processor, Named.`as`(name), *stateStoreNames) +} + +fun KStream.mapKeyAndValue( + name: String, + vararg stateStoreNames: String, + function: ProcessorContext.(K_IN, V_IN) -> Pair? +): KStream { + val processor = { + GenericProcessor { record -> + val result = function(record.key(), record.value()) + if (result != null) { + forward(record.withKey(result.first).withValue(result.second)) + } + } + } + return process(processor, Named.`as`(name), *stateStoreNames) +} + +fun KStream.genericProcess( + name: String, + vararg stateStoreNames: String, + punctuation: Punctuation? = null, + function: ProcessorContext.(Record) -> Unit +): KStream { + val processor = { + GenericProcessor(function = function, punctuation = punctuation) + } + return process(processor, Named.`as`(name), *stateStoreNames) +} + +class GenericProcessor( + private val punctuation: Punctuation? = null, + private val function: ProcessorContext.(Record) -> Unit +) : Processor { + private var context: ProcessorContext? = null + + override fun init(context: ProcessorContext?) { + super.init(context) + this.context = context + if (punctuation != null) { + context?.schedule(punctuation.interval, punctuation.type) { time -> + punctuation.function(Instant.ofEpochMilli(time), context) + } + } + } + + override fun process(record: Record?) { + if (record == null) return + val ctx = requireNotNull(context) { "Context is not initialized" } + with(ctx) { function(record) } + } +} + +data class Punctuation( + val interval: Duration, + val type: PunctuationType, + val function: (Instant, ProcessorContext) -> Unit +) diff --git a/settings.gradle.kts b/settings.gradle.kts index 8c0a6b31..e9c82b63 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -25,7 +25,8 @@ include( "apps:utgang-formidlingsgruppe", "apps:hendelselogg-backup", "apps:utgang-pdl", - "apps:kafka-key-generator" + "apps:kafka-key-generator", + "apps:bekreftelse-tjeneste" ) dependencyResolutionManagement {