diff --git a/apps/bekreftelse-utgang/nais/nais-dev.yaml b/apps/bekreftelse-utgang/nais/nais-dev.yaml index 21fec5f5..caacc3cf 100644 --- a/apps/bekreftelse-utgang/nais/nais-dev.yaml +++ b/apps/bekreftelse-utgang/nais/nais-dev.yaml @@ -11,12 +11,14 @@ spec: env: - name: KAFKA_STREAMS_ID_SUFFIX value: "v1" + - name: KAFKA_PAW_ARBEIDSSOKERPERIODER_TOPIC + value: "paw.arbeidssokerperioder-v1" - name: KAFKA_PAW_ARBEIDSSOKER_BEKREFTELSE_HENDELSELOGG_TOPIC value: "paw.arbeidssoker-bekreftelse-hendelseslogg-beta-v1" - name: KAFKA_PAW_ARBEIDSSOKER_HENDELSELOGG_TOPIC value: "paw.arbeidssoker-hendelseslogg-v1" - name: KAFKA_PUNCTUATOR_INTERVAL - value: "PT1M" + value: "PT5M" azure: application: enabled: true diff --git a/apps/bekreftelse-utgang/src/main/kotlin/no/nav/paw/bekreftelseutgang/config/ApplicationConfig.kt b/apps/bekreftelse-utgang/src/main/kotlin/no/nav/paw/bekreftelseutgang/config/ApplicationConfig.kt index 896a44f3..7e2924f8 100644 --- a/apps/bekreftelse-utgang/src/main/kotlin/no/nav/paw/bekreftelseutgang/config/ApplicationConfig.kt +++ b/apps/bekreftelse-utgang/src/main/kotlin/no/nav/paw/bekreftelseutgang/config/ApplicationConfig.kt @@ -23,6 +23,7 @@ data class ApplicationConfig( data class KafkaTopologyConfig( val applicationIdSuffix: String, val internStateStoreName: String, + val periodeTopic: String, val hendelseloggTopic: String, val bekreftelseHendelseloggTopic: String, val punctuationInterval: Duration, diff --git a/apps/bekreftelse-utgang/src/main/kotlin/no/nav/paw/bekreftelseutgang/topology/BekreftelseUtgangStream.kt b/apps/bekreftelse-utgang/src/main/kotlin/no/nav/paw/bekreftelseutgang/topology/BekreftelseUtgangStream.kt index f2062052..be9bbf8e 100644 --- a/apps/bekreftelse-utgang/src/main/kotlin/no/nav/paw/bekreftelseutgang/topology/BekreftelseUtgangStream.kt +++ b/apps/bekreftelse-utgang/src/main/kotlin/no/nav/paw/bekreftelseutgang/topology/BekreftelseUtgangStream.kt @@ -1,32 +1,78 @@ package no.nav.paw.bekreftelseutgang.topology +import no.nav.paw.arbeidssokerregisteret.intern.v1.Avsluttet import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseSerde +import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Bruker +import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.BrukerType +import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.Metadata +import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse +import no.nav.paw.bekreftelse.internehendelser.baOmAaAvsluttePeriodeHendelsesType +import no.nav.paw.bekreftelse.internehendelser.registerGracePeriodeUtloeptHendelseType import no.nav.paw.bekreftelseutgang.config.ApplicationConfig -import no.nav.paw.config.kafka.streams.Punctuation +import no.nav.paw.config.env.appImageOrDefaultForLocal import no.nav.paw.config.kafka.streams.genericProcess import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.kstream.Produced -import org.apache.kafka.streams.processor.PunctuationType -import org.slf4j.LoggerFactory +import org.apache.kafka.streams.state.KeyValueStore +import java.time.Instant +import java.util.* fun StreamsBuilder.buildBekreftelseUtgangStream(applicationConfig: ApplicationConfig) { with(applicationConfig.kafkaTopology) { - stream(bekreftelseHendelseloggTopic) - .genericProcess( + stream(bekreftelseHendelseloggTopic) + .genericProcess( name = "bekreftelseUtgangStream", internStateStoreName, - punctuation = Punctuation( - punctuationInterval, - PunctuationType.WALL_CLOCK_TIME, - { _, _ -> true }, // TODO() - ), + punctuation = null, ) { record -> - // TODO() - } - .to(hendelseloggTopic, Produced.with(Serdes.Long(), HendelseSerde())) + val stateStore = getStateStore>(internStateStoreName) + // TODO: Må jeg ta høyde for at periodeStream ikke er ajoure og har lagt inn identitetsnummer i state? + val identitetsnummer = stateStore[record.value().periodeId] ?: return@genericProcess + + when(record.value().hendelseType) { + registerGracePeriodeUtloeptHendelseType -> avsluttetHendelse( + identitetsnummer = identitetsnummer, + periodeId = record.value().periodeId, + arbeidssoekerId = record.value().arbeidssoekerId, + utfoertAv = Bruker( + type = BrukerType.SYSTEM, + id = applicationConfig.getAppImage() + ), + aarsak = "Graceperiode utløpt" + ) + baOmAaAvsluttePeriodeHendelsesType -> avsluttetHendelse( + identitetsnummer = identitetsnummer, + periodeId = record.value().periodeId, + arbeidssoekerId = record.value().arbeidssoekerId, + utfoertAv = Bruker( + type = BrukerType.SLUTTBRUKER, + id = identitetsnummer + ), + aarsak = "Svarte NEI på spørsmål 'Vil du fortsatt være registrert som arbeidssøker?'" + ) + else -> { + return@genericProcess + } + } + }.to(hendelseloggTopic, Produced.with(Serdes.Long(), HendelseSerde())) } } -private val bekreftelseUtgangLogger = LoggerFactory.getLogger("bekreftelseUtgangLogger") \ No newline at end of file +fun ApplicationConfig.getAppImage() = runtimeEnvironment.appImageOrDefaultForLocal("paw-arbeidssoekerregisteret-bekreftelse-utgang:LOCAL") + +fun avsluttetHendelse(identitetsnummer: String, periodeId: UUID, arbeidssoekerId: Long, utfoertAv: Bruker, aarsak: String) = Avsluttet( + hendelseId = UUID.randomUUID(), + id = arbeidssoekerId, + identitetsnummer = identitetsnummer, + metadata = metadata(utfoertAv, aarsak), + periodeId = periodeId, +) + +fun metadata(utfoertAv: Bruker, aarsak: String) = Metadata( + tidspunkt = Instant.now(), + utfoertAv = utfoertAv, + kilde = "paw.arbeidssoekerregisteret.bekreftelse-utgang", + aarsak = aarsak, +) diff --git a/apps/bekreftelse-utgang/src/main/kotlin/no/nav/paw/bekreftelseutgang/topology/PeriodeStream.kt b/apps/bekreftelse-utgang/src/main/kotlin/no/nav/paw/bekreftelseutgang/topology/PeriodeStream.kt new file mode 100644 index 00000000..0ee80328 --- /dev/null +++ b/apps/bekreftelse-utgang/src/main/kotlin/no/nav/paw/bekreftelseutgang/topology/PeriodeStream.kt @@ -0,0 +1,25 @@ +package no.nav.paw.bekreftelseutgang.topology + +import no.nav.paw.bekreftelseutgang.config.ApplicationConfig +import org.apache.kafka.streams.StreamsBuilder +import no.nav.paw.arbeidssokerregisteret.api.v1.Periode +import no.nav.paw.config.kafka.streams.mapNonNull +import org.apache.kafka.streams.state.KeyValueStore +import java.util.* + +fun StreamsBuilder.buildPeriodeStream(applicationConfig: ApplicationConfig){ + with(applicationConfig.kafkaTopology){ + stream(periodeTopic) + .mapNonNull( + "lagreEllerSlettPeriode", + internStateStoreName + ) { periode -> + val stateStore: KeyValueStore = getStateStore(internStateStoreName) + if(periode.avsluttet == null) { + stateStore.put(periode.id, periode.identitetsnummer) + } else { + stateStore.delete(periode.id) + } + } + } +} \ No newline at end of file diff --git a/apps/bekreftelse-utgang/src/main/kotlin/no/nav/paw/bekreftelseutgang/topology/Topology.kt b/apps/bekreftelse-utgang/src/main/kotlin/no/nav/paw/bekreftelseutgang/topology/Topology.kt index 7e16979b..86ca831b 100644 --- a/apps/bekreftelse-utgang/src/main/kotlin/no/nav/paw/bekreftelseutgang/topology/Topology.kt +++ b/apps/bekreftelse-utgang/src/main/kotlin/no/nav/paw/bekreftelseutgang/topology/Topology.kt @@ -1,31 +1,28 @@ package no.nav.paw.bekreftelseutgang.topology -import kotlinx.coroutines.runBlocking +import no.nav.paw.bekreftelseutgang.config.ApplicationConfig import no.nav.paw.bekreftelseutgang.context.ApplicationContext -import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient -import no.nav.paw.kafkakeygenerator.client.KafkaKeysResponse +import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.Topology +import org.apache.kafka.streams.state.Stores fun buildTopology( applicationContext: ApplicationContext ): Topology = StreamsBuilder().apply { - //buildInternStateStore(applicationContext.applicationConfig) + buildInternStateStore(applicationContext.applicationConfig) + buildPeriodeStream(applicationContext.applicationConfig) buildBekreftelseUtgangStream(applicationContext.applicationConfig) }.build() -/*fun StreamsBuilder.buildInternStateStore(applicationConfig: ApplicationConfig) { +fun StreamsBuilder.buildInternStateStore(applicationConfig: ApplicationConfig) { with(applicationConfig.kafkaTopology) { addStateStore( Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore(internStateStoreName), Serdes.UUID(), - InternTilstandSerde() + Serdes.String() ) ) } -}*/ - -fun KafkaKeysClient.getIdAndKeyBlocking(identitetsnummer: String): KafkaKeysResponse = runBlocking { - getIdAndKey(identitetsnummer) } diff --git a/apps/bekreftelse-utgang/src/main/resources/local/application_config.toml b/apps/bekreftelse-utgang/src/main/resources/local/application_config.toml index 0586019a..d7e28f59 100644 --- a/apps/bekreftelse-utgang/src/main/resources/local/application_config.toml +++ b/apps/bekreftelse-utgang/src/main/resources/local/application_config.toml @@ -1,6 +1,7 @@ [kafkaTopology] applicationIdSuffix = "v1" internStateStoreName = "intern-tilstand" +periodeTopic = "paw.arbeidssokerperioder-v1 hendelseloggTopic = "paw.arbeidssoker-hendelseslogg-v1" bekreftelseHendelseloggTopic = "paw.arbeidssoker-bekreftelse-hendelseslogg-v1" punctuationInterval = "PT5S" diff --git a/apps/bekreftelse-utgang/src/main/resources/nais/application_config.toml b/apps/bekreftelse-utgang/src/main/resources/nais/application_config.toml index 9d36b884..fe63f964 100644 --- a/apps/bekreftelse-utgang/src/main/resources/nais/application_config.toml +++ b/apps/bekreftelse-utgang/src/main/resources/nais/application_config.toml @@ -1,6 +1,7 @@ [kafkaTopology] applicationIdSuffix = "${KAFKA_STREAMS_ID_SUFFIX}" internStateStoreName = "intern-tilstand" +periodeTopic = "${KAFKA_PAW_ARBEIDSSOKERPERIODER_TOPIC}" bekreftelseHendelseloggTopic = "${KAFKA_PAW_ARBEIDSSOKER_BEKREFTELSE_HENDELSELOGG_TOPIC}" hendelseloggTopic = "${KAFKA_PAW_ARBEIDSSOKER_HENDELSELOGG_TOPIC}" punctuationInterval = "${KAFKA_PUNCTUATOR_INTERVAL}" diff --git a/apps/bekreftelse-utgang/src/test/kotlin/no/nav/paw/bekreftelseutgang/ApplicationTestContext.kt b/apps/bekreftelse-utgang/src/test/kotlin/no/nav/paw/bekreftelseutgang/ApplicationTestContext.kt index c3b065e3..dc4d819d 100644 --- a/apps/bekreftelse-utgang/src/test/kotlin/no/nav/paw/bekreftelseutgang/ApplicationTestContext.kt +++ b/apps/bekreftelse-utgang/src/test/kotlin/no/nav/paw/bekreftelseutgang/ApplicationTestContext.kt @@ -1,9 +1,11 @@ package no.nav.paw.bekreftelseutgang +import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry import io.confluent.kafka.serializers.KafkaAvroSerializerConfig import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde import io.micrometer.prometheusmetrics.PrometheusMeterRegistry import io.mockk.mockk +import no.nav.paw.arbeidssokerregisteret.api.v1.Periode import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseSerde import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse @@ -17,21 +19,26 @@ import no.nav.paw.kafkakeygenerator.client.inMemoryKafkaKeysMock import org.apache.avro.specific.SpecificRecord import org.apache.kafka.common.serialization.Serde import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.common.utils.Time import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.streams.TestInputTopic import org.apache.kafka.streams.TestOutputTopic import org.apache.kafka.streams.TopologyTestDriver +import org.apache.kafka.streams.state.internals.InMemoryKeyValueBytesStoreSupplier +import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder import org.slf4j.Logger import org.slf4j.LoggerFactory import java.time.Instant import java.util.* class ApplicationTestContext(initialWallClockTime: Instant = Instant.now()) { + val periodeTopicSerde: Serde = opprettSerde() val bekreftelseHendelseLoggSerde: Serde = BekreftelseHendelseSerde() val hendelseLoggSerde: Serde = HendelseSerde() val applicationConfig = loadNaisOrLocalConfiguration(APPLICATION_CONFIG_FILE_NAME) val kafkaKeysClient = inMemoryKafkaKeysMock() + val applicationContext = ApplicationContext( applicationConfig = applicationConfig, prometheusMeterRegistry = mockk(), @@ -42,19 +49,24 @@ class ApplicationTestContext(initialWallClockTime: Instant = Instant.now()) { val logger: Logger = LoggerFactory.getLogger(ApplicationTestContext::class.java) val topology = StreamsBuilder().apply { - /*addStateStore( + addStateStore( KeyValueStoreBuilder( InMemoryKeyValueBytesStoreSupplier(applicationConfig.kafkaTopology.internStateStoreName), Serdes.UUID(), - InternTilstandSerde(), + Serdes.String(), Time.SYSTEM ) - )*/ - // TODO() + ) }.build() val testDriver: TopologyTestDriver = TopologyTestDriver(topology, kafkaStreamProperties, initialWallClockTime) + val periodeTopic: TestInputTopic = testDriver.createInputTopic( + applicationConfig.kafkaTopology.periodeTopic, + Serdes.Long().serializer(), + periodeTopicSerde.serializer() + ) + val bekreftelseHendelseLoggTopic: TestInputTopic = testDriver.createInputTopic( applicationConfig.kafkaTopology.bekreftelseHendelseloggTopic, Serdes.Long().serializer(), @@ -76,6 +88,19 @@ class ApplicationTestContext(initialWallClockTime: Instant = Instant.now()) { const val SCHEMA_REGISTRY_SCOPE = "juni-registry" +fun opprettSerde(): Serde { + val schemaRegistryClient = MockSchemaRegistry.getClientForScope(SCHEMA_REGISTRY_SCOPE) + val serde: Serde = SpecificAvroSerde(schemaRegistryClient) + serde.configure( + mapOf( + KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS to "true", + KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG to "mock://$SCHEMA_REGISTRY_SCOPE" + ), + false + ) + return serde +} + val kafkaStreamProperties = Properties().apply { this[StreamsConfig.APPLICATION_ID_CONFIG] = "test" this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "dummy:1234" diff --git a/domain/bekreftelse-interne-hendelser/src/main/kotlin/no/nav/paw/bekreftelse/internehendelser/PeriodeAvsluttet.kt b/domain/bekreftelse-interne-hendelser/src/main/kotlin/no/nav/paw/bekreftelse/internehendelser/PeriodeAvsluttet.kt index 2aedab47..8e59215e 100644 --- a/domain/bekreftelse-interne-hendelser/src/main/kotlin/no/nav/paw/bekreftelse/internehendelser/PeriodeAvsluttet.kt +++ b/domain/bekreftelse-interne-hendelser/src/main/kotlin/no/nav/paw/bekreftelse/internehendelser/PeriodeAvsluttet.kt @@ -3,7 +3,6 @@ package no.nav.paw.bekreftelse.internehendelser import java.time.Instant import java.util.* - const val periodeAvsluttetHendelsesType = "bekreftelse.periode_avsluttet" data class PeriodeAvsluttet(