diff --git a/.github/workflows/bekreftelse-tjeneste.yaml b/.github/workflows/bekreftelse-tjeneste.yaml new file mode 100644 index 00000000..b7692590 --- /dev/null +++ b/.github/workflows/bekreftelse-tjeneste.yaml @@ -0,0 +1,102 @@ +name: Bekreftelse Tjeneste + +on: + push: + branches: + - main + - dev/* + paths: + - 'apps/bekreftelse-tjeneste/**' + - 'lib/**' + - 'domain/**' + - '.github/workflows/bekreftelse-tjeneste.yaml' + - 'gradle/**' + - 'settings.gradle.kts' + - 'gradle.properties' + - 'gradlew' + - 'gradlew.bat' + +env: + MODULE: bekreftelse-tjeneste + IMAGE: europe-north1-docker.pkg.dev/${{ vars.NAIS_MANAGEMENT_PROJECT_ID }}/paw/paw-arbeidssoekerregisteret-bekreftelse-tjeneste +jobs: + build: + name: Build + permissions: + contents: read + id-token: write + packages: write + runs-on: ubuntu-latest + outputs: + image: ${{ steps.docker-build-push.outputs.image }} + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Setup Java + uses: actions/setup-java@v4 + with: + java-version: 21 + distribution: temurin + cache: gradle + - name: Set version + run: echo "VERSION=$(date +'%y.%m.%d').${{ github.run_number }}-${{ github.run_attempt }}" >> $GITHUB_ENV + - name: Login GAR + uses: nais/login@v0 + with: + project_id: ${{ vars.NAIS_MANAGEMENT_PROJECT_ID }} + identity_provider: ${{ secrets.NAIS_WORKLOAD_IDENTITY_PROVIDER }} + team: paw + - name: Build and push image with Gradle + id: docker-build-push + working-directory: ./ + env: + ORG_GRADLE_PROJECT_githubPassword: ${{ secrets.GITHUB_TOKEN }} + run: | + echo "image=${{ env.IMAGE }}:${{ env.VERSION }}" >> $GITHUB_OUTPUT + echo -Pversion=${{ env.VERSION }} -Pimage=${{ env.IMAGE }} :apps:${{ env.MODULE }}:build :apps:${{ env.MODULE }}:jib + ./gradlew -Pversion=${{ env.VERSION }} -Pimage=${{ env.IMAGE }} :apps:${{ env.MODULE }}:build :apps:${{ env.MODULE }}:jib + echo "DIGEST=$(cat ./apps/${{ env.MODULE }}/build/jib-image.digest)" >> $GITHUB_ENV + - name: Attest and sign image + uses: nais/attest-sign@v1.3.4 + with: + image_ref: ${{ env.IMAGE }}@${{ env.DIGEST }} + + deploy-dev: + if: github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/heads/dev') + name: Deploy to dev-gcp + needs: + - build + permissions: + contents: read + id-token: write + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Deploy to GCP + uses: nais/deploy/actions/deploy@v2 + env: + CLUSTER: dev-gcp + RESOURCE: ./apps/${{ env.MODULE }}/nais/nais-dev.yaml + VAR: image=${{ needs.build.outputs.image }} + +# deploy-prod: +# if: github.ref == 'refs/heads/main' +# name: Deploy to prod-gcp +# needs: +# - build +# - deploy-dev +# permissions: +# contents: read +# id-token: write +# runs-on: ubuntu-latest +# steps: +# - name: Checkout +# uses: actions/checkout@v4 +# - name: Deploy to GCP +# uses: nais/deploy/actions/deploy@v2 +# env: +# TEAM: paw +# CLUSTER: prod-gcp +# RESOURCE: ./apps/${{ env.MODULE }}/nais/nais-prod.yaml +# VAR: image=${{ needs.build.outputs.image }} diff --git a/apps/bekreftelse-tjeneste/build.gradle.kts b/apps/bekreftelse-tjeneste/build.gradle.kts index 4d8c1887..14d1ed6a 100644 --- a/apps/bekreftelse-tjeneste/build.gradle.kts +++ b/apps/bekreftelse-tjeneste/build.gradle.kts @@ -1,36 +1,78 @@ -import org.jetbrains.kotlin.gradle.tasks.KotlinCompile plugins { kotlin("jvm") id("com.google.cloud.tools.jib") + application } +val jvmMajorVersion: String by project +val baseImage: String by project +val image: String? by project + + dependencies { + // Project implementation(project(":lib:hoplite-config")) + implementation(project(":lib:error-handling")) implementation(project(":lib:kafka-streams")) implementation(project(":lib:kafka-key-generator-client")) implementation(project(":domain:main-avro-schema")) implementation(project(":domain:bekreftelse-interne-hendelser")) implementation(project(":domain:bekreftelsesansvar-avro-schema")) implementation(project(":domain:bekreftelsesmelding-avro-schema")) - implementation(libs.kafkaStreams) + + // Server + implementation(libs.bundles.ktorServerWithNettyAndMicrometer) + + // Serialization + implementation(libs.ktorSerializationJackson) + implementation(libs.ktorSerializationJson) implementation(libs.jacksonDatatypeJsr310) - implementation(libs.jacksonKotlin) - implementation(libs.kafkaStreamsAvroSerde) + + // Tooling implementation(libs.arrowCore) - implementation(libs.bundles.ktorServerWithNettyAndMicrometer) + + // Logging + implementation(libs.logbackClassic) + implementation(libs.logstashLogbackEncoder) + implementation(libs.log) + implementation(libs.auditLog) + + // Instrumentation + implementation(libs.micrometerRegistryPrometheus) + + // Kafka + implementation(libs.kafkaStreams) + implementation(libs.kafkaStreamsAvroSerde) + + // Testing + testImplementation(libs.ktorServerTestsJvm) testImplementation(libs.streamsTest) - testImplementation(libs.runnerJunit5) - testImplementation(libs.assertionsCore) + testImplementation(libs.bundles.testLibsWithUnitTesting) + testImplementation(project(":test:test-data-lib")) } -//enable context receiver -tasks.withType().configureEach { - compilerOptions { - freeCompilerArgs.add("-Xcontext-receivers") +java { + toolchain { + languageVersion.set(JavaLanguageVersion.of(jvmMajorVersion)) } } +application { + mainClass.set("no.nav.paw.bekreftelsetjeneste.ApplicationKt") +} + tasks.withType().configureEach { useJUnitPlatform() } + +jib { + from.image = "$baseImage:$jvmMajorVersion" + to.image = "${image ?: project.name}:${project.version}" + container { + environment = mapOf( + "IMAGE_WITH_VERSION" to "${image ?: project.name}:${project.version}" + ) + jvmFlags = listOf("-XX:ActiveProcessorCount=4", "-XX:+UseZGC", "-XX:+ZGenerational") + } +} \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/nais/nais-dev.yaml b/apps/bekreftelse-tjeneste/nais/nais-dev.yaml new file mode 100644 index 00000000..467439b2 --- /dev/null +++ b/apps/bekreftelse-tjeneste/nais/nais-dev.yaml @@ -0,0 +1,58 @@ +apiVersion: nais.io/v1alpha1 +kind: Application +metadata: + name: paw-arbeidssoekerregisteret-bekreftelse-tjeneste + namespace: paw + labels: + team: paw +spec: + image: {{ image }} + port: 8080 + env: + - name: KAFKA_STREAMS_ID_SUFFIX + value: "v1" + - name: KAFKA_PAW_ARBEIDSSOKER_BEKREFTELSE_TOPIC + value: "paw.arbeidssoker-bekreftelse-beta-v1" + - name: KAFKA_PAW_ARBEIDSSOKER_BEKREFTELSE_HENDELSELOGG_TOPIC + value: "paw.arbeidssoker-bekreftelse-hendelseslogg-beta-v1" + - name: KAFKA_PAW_ARBEIDSSOKERPERIODER_TOPIC + value: "paw.arbeidssokerperioder-v1" + - name: KAFKA_PUNCTUATOR_INTERVAL + value: "PT1M" + - name: BEKREFTELSE_INTERVAL + value: "PT28M" + - name: BEKREFTELSE_GRACEPERIODE + value: "PT14M" + - name: BEKREFTELSE_TILGJENGELIG_OFFSET + value: "PT6M" + + replicas: + min: 1 + max: 1 + resources: + limits: + memory: 1024Mi + requests: + cpu: 200m + memory: 256Mi + liveness: + path: /internal/isAlive + initialDelay: 10 + readiness: + path: /internal/isReady + initialDelay: 10 + prometheus: + enabled: true + path: /internal/metrics + observability: + autoInstrumentation: + enabled: true + runtime: java + kafka: + pool: nav-dev + streams: true + accessPolicy: + outbound: + rules: + - application: paw-kafka-key-generator + namespace: paw diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/AnsvarTopology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/AnsvarTopology.kt deleted file mode 100644 index aaf63e3c..00000000 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/AnsvarTopology.kt +++ /dev/null @@ -1,15 +0,0 @@ -package no.nav.paw.bekreftelsetjeneste - -import no.nav.paw.config.kafka.streams.genericProcess -import no.nav.paw.bekreftelse.ansvar.v1.AnsvarEndret -import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse -import org.apache.kafka.streams.StreamsBuilder - -context(ApplicationConfiguration, ApplicationContext) -fun StreamsBuilder.processAnsvarTopic() { - stream(ansvarsTopic) - .genericProcess("ansvarEndret", stateStoreName) { record -> - - } -} - diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Application.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Application.kt new file mode 100644 index 00000000..ad0b1401 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Application.kt @@ -0,0 +1,55 @@ +package no.nav.paw.bekreftelsetjeneste + +import io.ktor.server.application.Application +import io.ktor.server.engine.addShutdownHook +import io.ktor.server.engine.embeddedServer +import io.ktor.server.netty.Netty +import io.ktor.server.routing.routing +import no.nav.paw.bekreftelsetjeneste.config.APPLICATION_CONFIG_FILE_NAME +import no.nav.paw.bekreftelsetjeneste.config.ApplicationConfig +import no.nav.paw.bekreftelsetjeneste.config.SERVER_CONFIG_FILE_NAME +import no.nav.paw.bekreftelsetjeneste.config.ServerConfig +import no.nav.paw.bekreftelsetjeneste.context.ApplicationContext +import no.nav.paw.bekreftelsetjeneste.plugins.buildKafkaStreams +import no.nav.paw.bekreftelsetjeneste.plugins.configureKafka +import no.nav.paw.bekreftelsetjeneste.plugins.configureMetrics +import no.nav.paw.bekreftelsetjeneste.routes.metricsRoutes +import no.nav.paw.bekreftelsetjeneste.topology.buildTopology +import no.nav.paw.config.env.appNameOrDefaultForLocal +import no.nav.paw.config.env.currentRuntimeEnvironment +import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration +import no.nav.paw.health.route.healthRoutes +import org.slf4j.LoggerFactory + +fun main() { + val logger = LoggerFactory.getLogger("no.nav.paw.logger.application") + + val serverConfig = loadNaisOrLocalConfiguration(SERVER_CONFIG_FILE_NAME) + val applicationConfig = loadNaisOrLocalConfiguration(APPLICATION_CONFIG_FILE_NAME) + + logger.info("Starter: ${currentRuntimeEnvironment.appNameOrDefaultForLocal()}") + + with(serverConfig) { + embeddedServer(Netty, port = port) { + module(applicationConfig) + }.apply { + addShutdownHook { stop(gracePeriodMillis, timeoutMillis) } + start(wait = true) + } + } +} + +fun Application.module(applicationConfig: ApplicationConfig) { + val applicationContext = ApplicationContext.create(applicationConfig) + + val kafkaTopology = buildTopology(applicationContext) + val kafkaStreams = buildKafkaStreams(applicationContext, kafkaTopology) + + configureMetrics(applicationContext) + configureKafka(applicationContext, kafkaStreams) + + routing { + healthRoutes(applicationContext.healthIndicatorRepository) + metricsRoutes(applicationContext) + } +} \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationConfiguration.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationConfiguration.kt deleted file mode 100644 index 6d166753..00000000 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationConfiguration.kt +++ /dev/null @@ -1,14 +0,0 @@ -package no.nav.paw.bekreftelsetjeneste - -import java.time.Duration - -data class ApplicationConfiguration( - val periodeTopic: String, - val ansvarsTopic: String, - val bekreftelseTopic: String, - val bekreftelseHendelseloggTopic: String, - val stateStoreName: String, - val punctuateInterval: Duration -) { - val pawNamespace = "paw" -} diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationContext.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationContext.kt deleted file mode 100644 index 40c13bac..00000000 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationContext.kt +++ /dev/null @@ -1,17 +0,0 @@ -package no.nav.paw.bekreftelsetjeneste - -import kotlinx.coroutines.runBlocking -import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient -import no.nav.paw.kafkakeygenerator.client.KafkaKeysResponse -import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstandSerde -import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde - -class ApplicationContext( - val internTilstandSerde: InternTilstandSerde, - val bekreftelseHendelseSerde: BekreftelseHendelseSerde, - val kafkaKeysClient: KafkaKeysClient -) { - val kafkaKeyFunction: (String) -> KafkaKeysResponse = { - runBlocking { kafkaKeysClient.getIdAndKey(it) } - } -} \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseMeldingTopology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseMeldingTopology.kt deleted file mode 100644 index 4baa9332..00000000 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseMeldingTopology.kt +++ /dev/null @@ -1,84 +0,0 @@ -package no.nav.paw.bekreftelsetjeneste - -import arrow.core.partially1 -import no.nav.paw.bekreftelse.internehendelser.BaOmAaAvsluttePeriode -import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse -import no.nav.paw.bekreftelse.internehendelser.BekreftelseMeldingMottatt -import no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse -import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand -import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand -import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand.KlarForUtfylling -import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand.VenterSvar -import no.nav.paw.config.kafka.streams.Punctuation -import no.nav.paw.config.kafka.streams.genericProcess -import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.StreamsBuilder -import org.apache.kafka.streams.kstream.Produced -import org.apache.kafka.streams.processor.PunctuationType -import org.apache.kafka.streams.processor.api.Record -import org.slf4j.LoggerFactory -import java.time.Instant -import java.util.* - -context(ApplicationConfiguration, ApplicationContext) -fun StreamsBuilder.processBekreftelseMeldingTopic() { - stream(bekreftelseTopic) - .genericProcess( - name = "meldingMottatt", - stateStoreName, - punctuation = Punctuation(punctuateInterval, PunctuationType.WALL_CLOCK_TIME, ::bekreftelsePunctuator.partially1(stateStoreName)), - ) { record -> - val stateStore = getStateStore(stateStoreName) - val gjeldeneTilstand: InternTilstand? = stateStore[record.value().periodeId] - if (gjeldeneTilstand == null) { - meldingsLogger.warn("Melding mottatt for periode som ikke er aktiv/eksisterer") - return@genericProcess - } - if (record.value().namespace == pawNamespace) { - val bekreftelse = gjeldeneTilstand.bekreftelser.find { bekreftelse -> bekreftelse.bekreftelseId == record.value().id } - when { - bekreftelse == null -> { - meldingsLogger.warn("Melding {} har ingen matchene bekreftelse", record.value().id) - } - bekreftelse.tilstand is VenterSvar || bekreftelse.tilstand is KlarForUtfylling -> { - val (hendelser, oppdatertBekreftelse) = behandleGyldigSvar(gjeldeneTilstand.periode.arbeidsoekerId, record, bekreftelse) - val oppdatertBekreftelser = gjeldeneTilstand.bekreftelser - .filterNot { t -> t.bekreftelseId == oppdatertBekreftelse.bekreftelseId } + oppdatertBekreftelse - val oppdatertTilstand = gjeldeneTilstand.copy(bekreftelser = oppdatertBekreftelser) - stateStore.put(oppdatertTilstand.periode.periodeId, oppdatertTilstand) - hendelser - .map (record::withValue) - .forEach (::forward) - } - else -> { - meldingsLogger.warn("Melding {} har ikke forventet tilstand, tilstand={}", record.value().id, bekreftelse.tilstand) - } - } - } - }.to(bekreftelseHendelseloggTopic, Produced.with(Serdes.Long(), bekreftelseHendelseSerde)) -} - -fun behandleGyldigSvar(arbeidssoekerId: Long, record: Record, bekreftelse: Bekreftelse): Pair, Bekreftelse> { - val oppdatertBekreftelse = bekreftelse.copy(tilstand = Tilstand.Levert) - val baOmAaAvslutte = if (!record.value().svar.vilFortsetteSomArbeidssoeker) { - BaOmAaAvsluttePeriode( - hendelseId = UUID.randomUUID(), - periodeId = record.value().periodeId, - arbeidssoekerId = arbeidssoekerId, - hendelseTidspunkt = Instant.now() - ) - } else null - val meldingMottatt = BekreftelseMeldingMottatt( - hendelseId = UUID.randomUUID(), - periodeId = record.value().periodeId, - arbeidssoekerId = arbeidssoekerId, - bekreftelseId = bekreftelse.bekreftelseId, - hendelseTidspunkt = Instant.now() - ) - return listOfNotNull(meldingMottatt, baOmAaAvslutte) to oppdatertBekreftelse -} - - -private val meldingsLogger = LoggerFactory.getLogger("meldingsLogger") - - diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/KontrolerFrister.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/KontrolerFrister.kt deleted file mode 100644 index b4bf72d1..00000000 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/KontrolerFrister.kt +++ /dev/null @@ -1,30 +0,0 @@ -package no.nav.paw.bekreftelsetjeneste - -import arrow.core.NonEmptyList -import no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse -import no.nav.paw.bekreftelsetjeneste.tilstand.BekreftelseConfig -import java.time.Instant - -fun Bekreftelse.erKlarForUtfylling(now: Instant): Boolean = - now.isAfter(gjelderTil.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset)) - -fun Bekreftelse.harFristUtloept(now: Instant): Boolean = - now.isAfter(tilgjengeliggjort?.plus(BekreftelseConfig.bekreftelseTilgjengeligOffset) ?: gjelderTil) - -fun Bekreftelse.erSisteVarselOmGjenstaaendeGraceTid(now: Instant): Boolean = - sisteVarselOmGjenstaaendeGraceTid == null && now.isAfter(fristUtloept?.plus(BekreftelseConfig.varselFoerGracePeriodeUtloept) ?: gjelderTil.plus( - BekreftelseConfig.varselFoerGracePeriodeUtloept - )) - -fun Bekreftelse.harGracePeriodeUtloept(now: Instant): Boolean = - now.isAfter(fristUtloept?.plus(BekreftelseConfig.gracePeriode) ?: gjelderTil.plus(BekreftelseConfig.gracePeriode)) - -fun NonEmptyList.shouldCreateNewBekreftelse(now: Instant): Boolean = - maxBy { it.gjelderTil } - .let { - now.isAfter(it.gjelderTil.plus(BekreftelseConfig.bekreftelseInterval.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset))) - } - - - - diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/PeriodeTopology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/PeriodeTopology.kt deleted file mode 100644 index 7452c9a9..00000000 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/PeriodeTopology.kt +++ /dev/null @@ -1,65 +0,0 @@ -package no.nav.paw.bekreftelsetjeneste - -import no.nav.paw.arbeidssokerregisteret.api.v1.Periode -import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse -import no.nav.paw.bekreftelse.internehendelser.PeriodeAvsluttet -import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand -import no.nav.paw.bekreftelsetjeneste.tilstand.initTilstand -import no.nav.paw.config.kafka.streams.genericProcess -import no.nav.paw.config.kafka.streams.mapWithContext -import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.StreamsBuilder -import org.apache.kafka.streams.kstream.Produced -import org.apache.kafka.streams.state.KeyValueStore -import java.time.Instant -import java.util.* - -context(ApplicationConfiguration, ApplicationContext) -fun StreamsBuilder.processPeriodeTopic() { - stream(periodeTopic) - .mapWithContext("lagreEllerSlettPeriode", stateStoreName) { periode -> - val keyValueStore: KeyValueStore = getStateStore(stateStoreName) - val currentState = keyValueStore[periode.id] - val (arbeidsoekerId, kafkaKey) = currentState?.let { it.periode.arbeidsoekerId to it.periode.recordKey } ?: - kafkaKeyFunction(periode.identitetsnummer).let { it.id to it.key} - when { - currentState == null && periode.avsluttet() -> Action.DoNothing - periode.avsluttet() -> Action.DeleteStateAndEmit(arbeidsoekerId, periode) - currentState == null -> Action.UpdateState(initTilstand(id = arbeidsoekerId, key = kafkaKey, periode = periode)) - else -> Action.DoNothing - } - } - .genericProcess( - name = "executeAction", - punctuation = null, - stateStoreNames = arrayOf(stateStoreName) - ) { record -> - val keyValueStore: KeyValueStore = getStateStore(stateStoreName) - when (val action = record.value()) { - is Action.DeleteStateAndEmit -> { - forward( - record.withValue( - PeriodeAvsluttet( - hendelseId = UUID.randomUUID(), - periodeId = action.periode.id, - arbeidssoekerId = action.arbeidsoekerId, - hendelseTidspunkt = Instant.now() - ) as BekreftelseHendelse - ) - ) - keyValueStore.delete(action.periode.id) - } - - Action.DoNothing -> {} - is Action.UpdateState -> keyValueStore.put(action.state.periode.periodeId, action.state) - } - }.to(bekreftelseHendelseloggTopic, Produced.with(Serdes.Long(), bekreftelseHendelseSerde)) -} - -fun Periode.avsluttet(): Boolean = avsluttet != null - -sealed interface Action { - data object DoNothing : Action - data class DeleteStateAndEmit(val arbeidsoekerId: Long, val periode: Periode) : Action - data class UpdateState(val state: InternTilstand) : Action -} diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Startup.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Startup.kt deleted file mode 100644 index c7b26513..00000000 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Startup.kt +++ /dev/null @@ -1,52 +0,0 @@ -package no.nav.paw.bekreftelsetjeneste - -import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde -import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde -import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration -import no.nav.paw.config.kafka.KAFKA_STREAMS_CONFIG_WITH_SCHEME_REG -import no.nav.paw.config.kafka.KafkaConfig -import no.nav.paw.config.kafka.streams.KafkaStreamsFactory -import no.nav.paw.kafkakeygenerator.client.createKafkaKeyGeneratorClient -import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstandSerde -import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.KafkaStreams -import org.apache.kafka.streams.StreamsBuilder -import org.apache.kafka.streams.state.Stores -import java.time.Duration - -const val APPLICATION_ID_SUFFIX = "beta" - -fun main() { - val applicationConfiguration = ApplicationConfiguration( - periodeTopic = "paw.arbeidssokerperioder-v1", - ansvarsTopic = "paw.arbeidssoker-bekreftelse-ansvar-beta-v1", - bekreftelseTopic = "paw.arbeidssoker-bekreftelse-beta-v1", - bekreftelseHendelseloggTopic = "paw.arbeidssoker-bekreftelse-hendelseslogg-beta-v1", - stateStoreName = "bekreftelse", - punctuateInterval = Duration.ofMinutes(5) - ) - val applicationContext: ApplicationContext = ApplicationContext( - InternTilstandSerde(), - BekreftelseHendelseSerde(), - createKafkaKeyGeneratorClient() - ) - val streamsBuilder = StreamsBuilder() - .addStateStore( - Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore(applicationConfiguration.stateStoreName), - Serdes.UUID(), - InternTilstandSerde() - ) - ) - val topology = with(applicationContext) { - with(applicationConfiguration) { - streamsBuilder.appTopology() - } - } - val kafkaConfig = loadNaisOrLocalConfiguration(KAFKA_STREAMS_CONFIG_WITH_SCHEME_REG) - val streamsFactory = KafkaStreamsFactory(APPLICATION_ID_SUFFIX, kafkaConfig) - .withDefaultKeySerde(Serdes.Long()::class) - .withDefaultValueSerde(SpecificAvroSerde::class) - val streams = KafkaStreams(topology, streamsFactory.properties) - // TODO: start streams -} \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Topology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Topology.kt deleted file mode 100644 index 4bdbeb06..00000000 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/Topology.kt +++ /dev/null @@ -1,18 +0,0 @@ -package no.nav.paw.bekreftelsetjeneste - -import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand -import org.apache.kafka.streams.StreamsBuilder -import org.apache.kafka.streams.Topology -import org.apache.kafka.streams.state.KeyValueStore -import java.util.* - -typealias StateStore = KeyValueStore - -context(ApplicationConfiguration, ApplicationContext) -fun StreamsBuilder.appTopology(): Topology { - processPeriodeTopic() - processAnsvarTopic() - processBekreftelseMeldingTopic() - - return build() -} diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/config/ApplicationConfig.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/config/ApplicationConfig.kt new file mode 100644 index 00000000..ea527ca3 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/config/ApplicationConfig.kt @@ -0,0 +1,39 @@ +package no.nav.paw.bekreftelsetjeneste.config + +import no.nav.paw.config.env.RuntimeEnvironment +import no.nav.paw.config.env.currentRuntimeEnvironment +import no.nav.paw.config.kafka.KafkaConfig +import no.nav.paw.kafkakeygenerator.auth.AzureM2MConfig +import no.nav.paw.kafkakeygenerator.client.KafkaKeyConfig +import java.net.InetAddress +import java.time.Duration + +const val APPLICATION_CONFIG_FILE_NAME = "application_config.toml" + +data class ApplicationConfig( + val bekreftelseIntervals: BekreftelseIntervals, + val kafkaTopology: KafkaTopologyConfig, + val kafkaStreams: KafkaConfig, + val azureM2M: AzureM2MConfig, + val kafkaKeysClient: KafkaKeyConfig, + // Env + val runtimeEnvironment: RuntimeEnvironment = currentRuntimeEnvironment, + val hostname: String = InetAddress.getLocalHost().hostName +) + +data class BekreftelseIntervals( + val interval: Duration, + val graceperiode: Duration, + val tilgjengeligOffset: Duration, + val varselFoerGraceperiodeUtloept: Duration = graceperiode.dividedBy(2) +) + +data class KafkaTopologyConfig( + val applicationIdSuffix: String, + val internStateStoreName: String, + val periodeTopic: String, + val bekreftelseTopic: String, + val bekreftelseHendelseloggTopic: String, + val punctuationInterval: Duration, + val shutdownTimeout: Duration = Duration.ofMinutes(5), +) diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/config/ServerConfig.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/config/ServerConfig.kt new file mode 100644 index 00000000..8c8f1226 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/config/ServerConfig.kt @@ -0,0 +1,12 @@ +package no.nav.paw.bekreftelsetjeneste.config + +const val SERVER_CONFIG_FILE_NAME = "server_config.toml" + +data class ServerConfig( + val port: Int, + val callGroupSize: Int, + val workerGroupSize: Int, + val connectionGroupSize: Int, + val gracePeriodMillis: Long, + val timeoutMillis: Long +) diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/context/ApplicationContext.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/context/ApplicationContext.kt new file mode 100644 index 00000000..7e8203a1 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/context/ApplicationContext.kt @@ -0,0 +1,39 @@ +package no.nav.paw.bekreftelsetjeneste.context + +import io.micrometer.prometheusmetrics.PrometheusConfig +import io.micrometer.prometheusmetrics.PrometheusMeterRegistry +import no.nav.paw.bekreftelsetjeneste.config.ApplicationConfig +import no.nav.paw.health.repository.HealthIndicatorRepository +import no.nav.paw.kafkakeygenerator.auth.azureAdM2MTokenClient +import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient +import no.nav.paw.kafkakeygenerator.client.kafkaKeysClient + +class ApplicationContext( + val applicationConfig: ApplicationConfig, + val prometheusMeterRegistry: PrometheusMeterRegistry, + val healthIndicatorRepository: HealthIndicatorRepository, + val kafkaKeysClient: KafkaKeysClient +) { + companion object { + fun create(applicationConfig: ApplicationConfig): ApplicationContext { + val prometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) + + val healthIndicatorRepository = HealthIndicatorRepository() + + val azureM2MTokenClient = azureAdM2MTokenClient( + applicationConfig.runtimeEnvironment, applicationConfig.azureM2M + ) + + val kafkaKeysClient = kafkaKeysClient(applicationConfig.kafkaKeysClient) { + azureM2MTokenClient.createMachineToMachineToken(applicationConfig.kafkaKeysClient.scope) + } + + return ApplicationContext( + applicationConfig, + prometheusMeterRegistry, + healthIndicatorRepository, + kafkaKeysClient + ) + } + } +} \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/plugins/Kafka.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/plugins/Kafka.kt new file mode 100644 index 00000000..fef5fbd4 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/plugins/Kafka.kt @@ -0,0 +1,17 @@ +package no.nav.paw.bekreftelsetjeneste.plugins + + +import io.ktor.server.application.Application +import io.ktor.server.application.install +import no.nav.paw.bekreftelsetjeneste.context.ApplicationContext +import org.apache.kafka.streams.KafkaStreams + +fun Application.configureKafka( + applicationContext: ApplicationContext, + kafkaStreams: KafkaStreams +) { + install(KafkaStreamsPlugin) { + shutDownTimeout = applicationContext.applicationConfig.kafkaTopology.shutdownTimeout + kafkaStreamsList = listOf(kafkaStreams) + } +} diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/plugins/KafkaStreams.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/plugins/KafkaStreams.kt new file mode 100644 index 00000000..b3d4f8ba --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/plugins/KafkaStreams.kt @@ -0,0 +1,70 @@ +package no.nav.paw.bekreftelsetjeneste.plugins + +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde +import io.ktor.server.application.ApplicationPlugin +import io.ktor.server.application.ApplicationStarted +import io.ktor.server.application.ApplicationStopping +import io.ktor.server.application.createApplicationPlugin +import io.ktor.server.application.hooks.MonitoringEvent +import io.ktor.server.application.log +import io.ktor.util.KtorDsl +import no.nav.paw.bekreftelsetjeneste.context.ApplicationContext +import no.nav.paw.config.kafka.streams.KafkaStreamsFactory +import no.nav.paw.error.handler.withApplicationTerminatingExceptionHandler +import no.nav.paw.health.listener.withHealthIndicatorStateListener +import no.nav.paw.health.model.LivenessHealthIndicator +import no.nav.paw.health.model.ReadinessHealthIndicator +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.KafkaStreams +import org.apache.kafka.streams.StreamsConfig +import org.apache.kafka.streams.Topology +import java.time.Duration + +@KtorDsl +class KafkaStreamsPluginConfig { + var shutDownTimeout: Duration? = null + var kafkaStreamsList: List? = null +} + +val KafkaStreamsPlugin: ApplicationPlugin = + createApplicationPlugin("KafkaStreams", ::KafkaStreamsPluginConfig) { + val shutDownTimeout = requireNotNull(pluginConfig.shutDownTimeout) { "ShutDownTimeout er null" } + val kafkaStreamsList = requireNotNull(pluginConfig.kafkaStreamsList) { "KafkaStreams er null" } + + on(MonitoringEvent(ApplicationStarted)) { application -> + application.log.info("Starter Kafka Streams") + kafkaStreamsList.forEach { stream -> stream.start() } + } + + on(MonitoringEvent(ApplicationStopping)) { application -> + application.log.info("Stopper Kafka Streams") + kafkaStreamsList.forEach { stream -> stream.close(shutDownTimeout) } + } + } + +fun buildKafkaStreams( + applicationContext: ApplicationContext, + topology: Topology +): KafkaStreams { + val applicationConfig = applicationContext.applicationConfig + val healthIndicatorRepository = applicationContext.healthIndicatorRepository + + val livenessIndicator = healthIndicatorRepository.addLivenessIndicator(LivenessHealthIndicator()) + val readinessIndicator = healthIndicatorRepository.addReadinessIndicator(ReadinessHealthIndicator()) + + val streamsFactory = KafkaStreamsFactory( + applicationConfig.kafkaTopology.applicationIdSuffix, + applicationConfig.kafkaStreams + ) + .withDefaultKeySerde(Serdes.Long()::class) + .withDefaultValueSerde(SpecificAvroSerde::class) + .apply { properties["application.server"] = applicationConfig.hostname } + + val kafkaStreams = KafkaStreams( + topology, + StreamsConfig(streamsFactory.properties) + ) + kafkaStreams.withHealthIndicatorStateListener(livenessIndicator, readinessIndicator) + kafkaStreams.withApplicationTerminatingExceptionHandler() + return kafkaStreams +} diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/plugins/Metrics.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/plugins/Metrics.kt new file mode 100644 index 00000000..65c79280 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/plugins/Metrics.kt @@ -0,0 +1,32 @@ +package no.nav.paw.bekreftelsetjeneste.plugins + +import io.ktor.server.application.Application +import io.ktor.server.application.install +import io.ktor.server.metrics.micrometer.MicrometerMetrics +import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics +import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics +import io.micrometer.core.instrument.binder.system.ProcessorMetrics +import io.micrometer.core.instrument.distribution.DistributionStatisticConfig +import no.nav.paw.bekreftelsetjeneste.context.ApplicationContext +import java.time.Duration + +fun Application.configureMetrics(applicationContext: ApplicationContext) { + install(MicrometerMetrics) { + registry = applicationContext.prometheusMeterRegistry + meterBinders = listOf( + JvmMemoryMetrics(), + JvmGcMetrics(), + ProcessorMetrics() + ) + distributionStatisticConfig = + DistributionStatisticConfig.builder() + .percentilesHistogram(true) + .maximumExpectedValue(Duration.ofSeconds(1).toNanos().toDouble()) + .minimumExpectedValue(Duration.ofMillis(20).toNanos().toDouble()) + .serviceLevelObjectives( + Duration.ofMillis(150).toNanos().toDouble(), + Duration.ofMillis(500).toNanos().toDouble() + ) + .build() + } +} diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/routes/MetricsRoutes.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/routes/MetricsRoutes.kt new file mode 100644 index 00000000..3ca7aa39 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/routes/MetricsRoutes.kt @@ -0,0 +1,13 @@ +package no.nav.paw.bekreftelsetjeneste.routes + +import io.ktor.server.application.call +import io.ktor.server.response.respond +import io.ktor.server.routing.Routing +import io.ktor.server.routing.get +import no.nav.paw.bekreftelsetjeneste.context.ApplicationContext + +fun Routing.metricsRoutes(applicationContext: ApplicationContext) { + get("/internal/metrics") { + call.respond(applicationContext.prometheusMeterRegistry.scrape()) + } +} \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstand.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstand.kt index e9daeac2..8ae1cd57 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstand.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/InternTilstand.kt @@ -3,6 +3,7 @@ package no.nav.paw.bekreftelsetjeneste.tilstand import com.fasterxml.jackson.annotation.JsonSubTypes import com.fasterxml.jackson.annotation.JsonTypeInfo import no.nav.paw.arbeidssokerregisteret.api.v1.Periode +import java.time.Duration import java.time.Instant import java.util.* @@ -74,7 +75,8 @@ fun initTilstand( ) fun initBekreftelsePeriode( - periode: PeriodeInfo + periode: PeriodeInfo, + interval: Duration, ): Bekreftelse = Bekreftelse( tilstand = Tilstand.IkkeKlarForUtfylling, @@ -83,11 +85,12 @@ fun initBekreftelsePeriode( sisteVarselOmGjenstaaendeGraceTid = null, bekreftelseId = UUID.randomUUID(), gjelderFra = periode.startet, - gjelderTil = fristForNesteBekreftelse(periode.startet, BekreftelseConfig.bekreftelseInterval) + gjelderTil = fristForNesteBekreftelse(periode.startet, interval) ) fun List.initNewBekreftelse( tilgjengeliggjort: Instant, + interval: Duration ): Bekreftelse = maxBy { it.gjelderTil }.copy( tilstand = Tilstand.KlarForUtfylling, @@ -95,5 +98,5 @@ fun List.initNewBekreftelse( sisteVarselOmGjenstaaendeGraceTid = null, bekreftelseId = UUID.randomUUID(), gjelderFra = maxBy { it.gjelderTil }.gjelderTil, - gjelderTil = fristForNesteBekreftelse(maxBy { it.gjelderTil }.gjelderTil, BekreftelseConfig.bekreftelseInterval) + gjelderTil = fristForNesteBekreftelse(maxBy { it.gjelderTil }.gjelderTil, interval) ) \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/KontrolerFrister.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/KontrolerFrister.kt new file mode 100644 index 00000000..d7e84076 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/KontrolerFrister.kt @@ -0,0 +1,29 @@ +package no.nav.paw.bekreftelsetjeneste.tilstand + +import arrow.core.NonEmptyList +import java.time.Duration +import java.time.Instant + +fun Bekreftelse.erKlarForUtfylling(now: Instant, tilgjengeligOffset: Duration): Boolean = + now.isAfter(gjelderTil.minus(tilgjengeligOffset)) + +fun Bekreftelse.harFristUtloept(now: Instant, tilgjengeligOffset: Duration): Boolean = + now.isAfter(tilgjengeliggjort?.plus(tilgjengeligOffset) ?: gjelderTil) + +fun Bekreftelse.erSisteVarselOmGjenstaaendeGraceTid(now: Instant, varselFoerGraceperiodeUtloept: Duration): Boolean = + sisteVarselOmGjenstaaendeGraceTid == null && now.isAfter(fristUtloept?.plus(varselFoerGraceperiodeUtloept) ?: gjelderTil.plus( + varselFoerGraceperiodeUtloept + )) + +fun Bekreftelse.harGraceperiodeUtloept(now: Instant, graceperiode: Duration): Boolean = + now.isAfter(fristUtloept?.plus(graceperiode) ?: gjelderTil.plus(graceperiode)) + +fun NonEmptyList.shouldCreateNewBekreftelse(now: Instant, interval: Duration, tilgjengeligOffset: Duration): Boolean = + maxBy { it.gjelderTil } + .let { + now.isAfter(it.gjelderTil.plus(interval.minus(tilgjengeligOffset))) + } + + + + diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/RegisterInstillinger.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/RegisterInstillinger.kt index 6b29bd04..46c1f255 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/RegisterInstillinger.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/tilstand/RegisterInstillinger.kt @@ -8,22 +8,12 @@ import java.time.ZoneId import java.time.temporal.TemporalAdjuster import java.time.temporal.TemporalAdjusters -// Felles verdier for bekreftelse. -// TODO: Endre intervaller til dev-verdier -data object BekreftelseConfig { - val bekreftelseInterval:Duration = Duration.ofDays(14) - val gracePeriode: Duration = Duration.ofDays(7) - val bekreftelseTilgjengeligOffset: Duration = Duration.ofDays(3) - val varselFoerGracePeriodeUtloept: Duration = gracePeriode.dividedBy(2) -} - fun fristForNesteBekreftelse(forrige: Instant, interval: Duration): Instant { return forrige.plus(interval) } -fun Bekreftelse.gjenstaendeGracePeriode(timestamp: Instant): Duration { - val gracePeriode = BekreftelseConfig.gracePeriode - val utvidetGjelderTil = fristUtloept?.plus(gracePeriode) ?: gjelderTil.plus(gracePeriode) +fun Bekreftelse.gjenstaendeGraceperiode(timestamp: Instant, graceperiode: Duration): Duration { + val utvidetGjelderTil = fristUtloept?.plus(graceperiode) ?: gjelderTil.plus(graceperiode) return if (timestamp.isAfter(utvidetGjelderTil)) { Duration.ZERO diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuator.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelsePunctuator.kt similarity index 75% rename from apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuator.kt rename to apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelsePunctuator.kt index 2acf5f0f..4a58c5a3 100644 --- a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuator.kt +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelsePunctuator.kt @@ -1,4 +1,4 @@ -package no.nav.paw.bekreftelsetjeneste +package no.nav.paw.bekreftelsetjeneste.topology import arrow.core.toNonEmptyListOrNull import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse @@ -6,20 +6,28 @@ import no.nav.paw.bekreftelse.internehendelser.BekreftelseTilgjengelig import no.nav.paw.bekreftelse.internehendelser.LeveringsfristUtloept import no.nav.paw.bekreftelse.internehendelser.RegisterGracePeriodeGjenstaaendeTid import no.nav.paw.bekreftelse.internehendelser.RegisterGracePeriodeUtloept +import no.nav.paw.bekreftelsetjeneste.config.BekreftelseIntervals +import no.nav.paw.bekreftelsetjeneste.tilstand.erKlarForUtfylling +import no.nav.paw.bekreftelsetjeneste.tilstand.erSisteVarselOmGjenstaaendeGraceTid +import no.nav.paw.bekreftelsetjeneste.tilstand.harFristUtloept +import no.nav.paw.bekreftelsetjeneste.tilstand.harGraceperiodeUtloept +import no.nav.paw.bekreftelsetjeneste.tilstand.shouldCreateNewBekreftelse import no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand -import no.nav.paw.bekreftelsetjeneste.tilstand.gjenstaendeGracePeriode +import no.nav.paw.bekreftelsetjeneste.tilstand.gjenstaendeGraceperiode import no.nav.paw.bekreftelsetjeneste.tilstand.initBekreftelsePeriode import no.nav.paw.bekreftelsetjeneste.tilstand.initNewBekreftelse import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.processor.api.ProcessorContext import org.apache.kafka.streams.processor.api.Record +import java.time.Duration import java.time.Instant import java.util.* fun bekreftelsePunctuator( stateStoreName: String, + bekreftelseIntervals: BekreftelseIntervals, timestamp: Instant, ctx: ProcessorContext ) { @@ -27,7 +35,7 @@ fun bekreftelsePunctuator( stateStore.all().use { states -> states.forEach { (key, value) -> - val (updatedState, bekreftelseHendelser) = processBekreftelser(value, timestamp) + val (updatedState, bekreftelseHendelser) = processBekreftelser(bekreftelseIntervals, value, timestamp) bekreftelseHendelser.forEach { ctx.forward(Record(value.periode.recordKey, it, Instant.now().toEpochMilli())) @@ -38,32 +46,34 @@ fun bekreftelsePunctuator( } private fun processBekreftelser( + bekreftelseIntervals: BekreftelseIntervals, currentState: InternTilstand, timestamp: Instant, ): Pair> { val existingBekreftelse = currentState.bekreftelser.firstOrNull() val (tilstand, hendelse) = if (existingBekreftelse == null) { - currentState.createInitialBekreftelse() to null + currentState.createInitialBekreftelse(bekreftelseIntervals.interval) to null } else { - currentState.checkAndCreateNewBekreftelse(timestamp) + currentState.checkAndCreateNewBekreftelse(timestamp, bekreftelseIntervals) } - val (updatedTilstand, additionalHendelse) = tilstand.handleUpdateBekreftelser(timestamp) + val (updatedTilstand, additionalHendelse) = tilstand.handleUpdateBekreftelser(timestamp, bekreftelseIntervals) return updatedTilstand to listOfNotNull(hendelse, additionalHendelse) } -private fun InternTilstand.createInitialBekreftelse(): InternTilstand = - copy(bekreftelser = listOf(initBekreftelsePeriode(periode))) +private fun InternTilstand.createInitialBekreftelse(interval: Duration): InternTilstand = + copy(bekreftelser = listOf(initBekreftelsePeriode(periode, interval))) private fun InternTilstand.checkAndCreateNewBekreftelse( - timestamp: Instant + timestamp: Instant, + bekreftelseIntervals: BekreftelseIntervals, ): Pair { val nonEmptyBekreftelser = bekreftelser.toNonEmptyListOrNull() ?: return this to null - return if (nonEmptyBekreftelser.shouldCreateNewBekreftelse(timestamp)) { - val newBekreftelse = bekreftelser.initNewBekreftelse(tilgjengeliggjort = timestamp) + return if (nonEmptyBekreftelser.shouldCreateNewBekreftelse(timestamp, bekreftelseIntervals.interval, bekreftelseIntervals.tilgjengeligOffset)) { + val newBekreftelse = bekreftelser.initNewBekreftelse(tilgjengeliggjort = timestamp, interval = bekreftelseIntervals.interval) copy(bekreftelser = nonEmptyBekreftelser + newBekreftelse) to createNewBekreftelseTilgjengelig(newBekreftelse) } else { this to null @@ -72,16 +82,17 @@ private fun InternTilstand.checkAndCreateNewBekreftelse( private fun InternTilstand.handleUpdateBekreftelser( timestamp: Instant, + bekreftelseIntervals: BekreftelseIntervals, ): Pair { val updatedBekreftelser = bekreftelser.map { bekreftelse -> generateSequence(bekreftelse to null as BekreftelseHendelse?) { (currentBekreftelse, _) -> - getProcessedBekreftelseTilstandAndHendelse(currentBekreftelse, timestamp).takeIf { it.second != null } + getProcessedBekreftelseTilstandAndHendelse(currentBekreftelse, timestamp, bekreftelseIntervals).takeIf { it.second != null } }.last().first } val hendelse: BekreftelseHendelse? = bekreftelser.flatMap { bekreftelse -> generateSequence(bekreftelse to null as BekreftelseHendelse?) { (currentBekreftelse, _) -> - getProcessedBekreftelseTilstandAndHendelse(currentBekreftelse, timestamp).takeIf { it.second != null } + getProcessedBekreftelseTilstandAndHendelse(currentBekreftelse, timestamp, bekreftelseIntervals).takeIf { it.second != null } }.mapNotNull { it.second } }.lastOrNull() @@ -90,10 +101,11 @@ private fun InternTilstand.handleUpdateBekreftelser( private fun InternTilstand.getProcessedBekreftelseTilstandAndHendelse( bekreftelse: Bekreftelse, - timestamp: Instant + timestamp: Instant, + bekreftelseIntervals: BekreftelseIntervals, ): Pair { return when { - bekreftelse.tilstand is Tilstand.IkkeKlarForUtfylling && bekreftelse.erKlarForUtfylling(timestamp) -> { + bekreftelse.tilstand is Tilstand.IkkeKlarForUtfylling && bekreftelse.erKlarForUtfylling(timestamp, bekreftelseIntervals.tilgjengeligOffset) -> { val updatedBekreftelse = bekreftelse.copy(tilstand = Tilstand.KlarForUtfylling, tilgjengeliggjort = timestamp) val hendelse = BekreftelseTilgjengelig( hendelseId = UUID.randomUUID(), @@ -106,7 +118,7 @@ private fun InternTilstand.getProcessedBekreftelseTilstandAndHendelse( updatedBekreftelse to hendelse } - bekreftelse.tilstand is Tilstand.KlarForUtfylling && bekreftelse.harFristUtloept(timestamp) -> { + bekreftelse.tilstand is Tilstand.KlarForUtfylling && bekreftelse.harFristUtloept(timestamp, bekreftelseIntervals.tilgjengeligOffset) -> { val updatedBekreftelse = bekreftelse.copy(tilstand = Tilstand.VenterSvar, fristUtloept = timestamp) val hendelse = LeveringsfristUtloept( hendelseId = UUID.randomUUID(), @@ -118,19 +130,19 @@ private fun InternTilstand.getProcessedBekreftelseTilstandAndHendelse( updatedBekreftelse to hendelse } - bekreftelse.tilstand == Tilstand.VenterSvar && bekreftelse.erSisteVarselOmGjenstaaendeGraceTid(timestamp) -> { + bekreftelse.tilstand == Tilstand.VenterSvar && bekreftelse.erSisteVarselOmGjenstaaendeGraceTid(timestamp, bekreftelseIntervals.varselFoerGraceperiodeUtloept) -> { val updatedBekreftelse = bekreftelse.copy(sisteVarselOmGjenstaaendeGraceTid = timestamp) val hendelse = RegisterGracePeriodeGjenstaaendeTid( hendelseId = UUID.randomUUID(), periodeId = periode.periodeId, arbeidssoekerId = periode.arbeidsoekerId, bekreftelseId = bekreftelse.bekreftelseId, - gjenstaandeTid = bekreftelse.gjenstaendeGracePeriode(timestamp), + gjenstaandeTid = bekreftelse.gjenstaendeGraceperiode(timestamp, bekreftelseIntervals.graceperiode), hendelseTidspunkt = Instant.now()) updatedBekreftelse to hendelse } - bekreftelse.tilstand == Tilstand.VenterSvar && bekreftelse.harGracePeriodeUtloept(timestamp) -> { + bekreftelse.tilstand == Tilstand.VenterSvar && bekreftelse.harGraceperiodeUtloept(timestamp, bekreftelseIntervals.graceperiode) -> { val updatedBekreftelse = bekreftelse.copy(tilstand = Tilstand.GracePeriodeUtloept) val hendelse = RegisterGracePeriodeUtloept( hendelseId = UUID.randomUUID(), diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelseStream.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelseStream.kt new file mode 100644 index 00000000..29583d21 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/BekreftelseStream.kt @@ -0,0 +1,107 @@ +package no.nav.paw.bekreftelsetjeneste.topology + +import arrow.core.partially1 +import no.nav.paw.bekreftelse.internehendelser.BaOmAaAvsluttePeriode +import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse +import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde +import no.nav.paw.bekreftelse.internehendelser.BekreftelseMeldingMottatt +import no.nav.paw.bekreftelsetjeneste.config.ApplicationConfig +import no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse +import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand +import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand +import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand.KlarForUtfylling +import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand.VenterSvar +import no.nav.paw.config.kafka.streams.Punctuation +import no.nav.paw.config.kafka.streams.genericProcess +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.StreamsBuilder +import org.apache.kafka.streams.kstream.Produced +import org.apache.kafka.streams.processor.PunctuationType +import org.apache.kafka.streams.processor.api.Record +import org.slf4j.LoggerFactory +import java.time.Instant +import java.util.* + +fun StreamsBuilder.buildBekreftelseStream(applicationConfig: ApplicationConfig) { + with(applicationConfig.kafkaTopology) { + stream(bekreftelseTopic) + .genericProcess( + name = "meldingMottatt", + internStateStoreName, + punctuation = Punctuation( + punctuationInterval, + PunctuationType.WALL_CLOCK_TIME, + ::bekreftelsePunctuator.partially1(internStateStoreName).partially1(applicationConfig.bekreftelseIntervals) + ), + ) { record -> + val stateStore = getStateStore(internStateStoreName) + val gjeldeneTilstand: InternTilstand? = stateStore[record.value().periodeId] + if (gjeldeneTilstand == null) { + // TODO: håndtere potensiell tom tilstand når vi starter med ansvarsTopic + meldingsLogger.warn("Melding mottatt for periode som ikke er aktiv/eksisterer") + return@genericProcess + } + if (record.value().namespace == "paw") { + val bekreftelse = + gjeldeneTilstand.bekreftelser.find { bekreftelse -> bekreftelse.bekreftelseId == record.value().id } + when { + bekreftelse == null -> { + meldingsLogger.warn("Melding {} har ingen matchene bekreftelse", record.value().id) + } + + bekreftelse.tilstand is VenterSvar || bekreftelse.tilstand is KlarForUtfylling -> { + val (hendelser, oppdatertBekreftelse) = behandleGyldigSvar( + gjeldeneTilstand.periode.arbeidsoekerId, + record, + bekreftelse + ) + val oppdatertBekreftelser = gjeldeneTilstand.bekreftelser + .filterNot { t -> t.bekreftelseId == oppdatertBekreftelse.bekreftelseId } + oppdatertBekreftelse + val oppdatertTilstand = gjeldeneTilstand.copy(bekreftelser = oppdatertBekreftelser) + stateStore.put(oppdatertTilstand.periode.periodeId, oppdatertTilstand) + hendelser + .map(record::withValue) + .forEach(::forward) + } + + else -> { + meldingsLogger.warn( + "Melding {} har ikke forventet tilstand, tilstand={}", + record.value().id, + bekreftelse.tilstand + ) + } + } + } + }.to(bekreftelseHendelseloggTopic, Produced.with(Serdes.Long(), BekreftelseHendelseSerde())) + } +} + +fun behandleGyldigSvar( + arbeidssoekerId: Long, + record: Record, + bekreftelse: Bekreftelse +): Pair, Bekreftelse> { + val oppdatertBekreftelse = bekreftelse.copy(tilstand = Tilstand.Levert) + val baOmAaAvslutte = if (!record.value().svar.vilFortsetteSomArbeidssoeker) { + BaOmAaAvsluttePeriode( + hendelseId = UUID.randomUUID(), + periodeId = record.value().periodeId, + arbeidssoekerId = arbeidssoekerId, + hendelseTidspunkt = Instant.now() + ) + } else null + val meldingMottatt = BekreftelseMeldingMottatt( + hendelseId = UUID.randomUUID(), + periodeId = record.value().periodeId, + arbeidssoekerId = arbeidssoekerId, + bekreftelseId = bekreftelse.bekreftelseId, + hendelseTidspunkt = Instant.now() + ) + return listOfNotNull(meldingMottatt, baOmAaAvslutte) to oppdatertBekreftelse +} + + +private val meldingsLogger = LoggerFactory.getLogger("meldingsLogger") + + diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/PeriodeStream.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/PeriodeStream.kt new file mode 100644 index 00000000..dc072b90 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/PeriodeStream.kt @@ -0,0 +1,84 @@ +package no.nav.paw.bekreftelsetjeneste.topology + +import no.nav.paw.arbeidssokerregisteret.api.v1.Periode +import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse +import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde +import no.nav.paw.bekreftelse.internehendelser.PeriodeAvsluttet +import no.nav.paw.bekreftelsetjeneste.config.ApplicationConfig +import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand +import no.nav.paw.bekreftelsetjeneste.tilstand.initTilstand +import no.nav.paw.config.kafka.streams.genericProcess +import no.nav.paw.config.kafka.streams.mapWithContext +import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.StreamsBuilder +import org.apache.kafka.streams.kstream.Produced +import org.apache.kafka.streams.state.KeyValueStore +import java.time.Instant +import java.util.* + +fun StreamsBuilder.buildPeriodeStream( + applicationConfig: ApplicationConfig, + kafaKeysClient: KafkaKeysClient +) { + with(applicationConfig.kafkaTopology) { + stream(periodeTopic) + .mapWithContext( + "lagreEllerSlettPeriode", + internStateStoreName + ) { periode -> + val keyValueStore: KeyValueStore = + getStateStore(internStateStoreName) + val currentState = keyValueStore[periode.id] + val (arbeidsoekerId, kafkaKey) = currentState?.let { it.periode.arbeidsoekerId to it.periode.recordKey } + ?: kafaKeysClient.getIdAndKeyBlocking(periode.identitetsnummer).let { it.id to it.key } + when { + currentState == null && periode.avsluttet() -> Action.DoNothing + periode.avsluttet() -> Action.DeleteStateAndEmit(arbeidsoekerId, periode) + currentState == null -> Action.UpdateState( + initTilstand( + id = arbeidsoekerId, + key = kafkaKey, + periode = periode + ) + ) + + else -> Action.DoNothing + } + } + .genericProcess( + name = "executeAction", + punctuation = null, + stateStoreNames = arrayOf(internStateStoreName) + ) { record -> + val keyValueStore: KeyValueStore = + getStateStore(internStateStoreName) + when (val action = record.value()) { + is Action.DeleteStateAndEmit -> { + forward( + record.withValue( + PeriodeAvsluttet( + hendelseId = UUID.randomUUID(), + periodeId = action.periode.id, + arbeidssoekerId = action.arbeidsoekerId, + hendelseTidspunkt = Instant.now() + ) as BekreftelseHendelse + ) + ) + keyValueStore.delete(action.periode.id) + } + + Action.DoNothing -> {} + is Action.UpdateState -> keyValueStore.put(action.state.periode.periodeId, action.state) + } + }.to(bekreftelseHendelseloggTopic, Produced.with(Serdes.Long(), BekreftelseHendelseSerde())) + } +} + +fun Periode.avsluttet(): Boolean = avsluttet != null + +sealed interface Action { + data object DoNothing : Action + data class DeleteStateAndEmit(val arbeidsoekerId: Long, val periode: Periode) : Action + data class UpdateState(val state: InternTilstand) : Action +} diff --git a/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/Topology.kt b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/Topology.kt new file mode 100644 index 00000000..cfc578aa --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/Topology.kt @@ -0,0 +1,41 @@ +package no.nav.paw.bekreftelsetjeneste.topology + +import kotlinx.coroutines.runBlocking +import no.nav.paw.bekreftelsetjeneste.config.ApplicationConfig +import no.nav.paw.bekreftelsetjeneste.context.ApplicationContext +import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand +import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstandSerde +import no.nav.paw.kafkakeygenerator.client.KafkaKeysClient +import no.nav.paw.kafkakeygenerator.client.KafkaKeysResponse +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.StreamsBuilder +import org.apache.kafka.streams.Topology +import org.apache.kafka.streams.state.KeyValueStore +import org.apache.kafka.streams.state.Stores +import java.util.* + +typealias StateStore = KeyValueStore + +fun buildTopology( + applicationContext: ApplicationContext +): Topology = StreamsBuilder().apply { + buildInternStateStore(applicationContext.applicationConfig) + buildPeriodeStream(applicationContext.applicationConfig, applicationContext.kafkaKeysClient) + buildBekreftelseStream(applicationContext.applicationConfig) +}.build() + +fun StreamsBuilder.buildInternStateStore(applicationConfig: ApplicationConfig) { + with(applicationConfig.kafkaTopology) { + addStateStore( + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(internStateStoreName), + Serdes.UUID(), + InternTilstandSerde() + ) + ) + } +} + +fun KafkaKeysClient.getIdAndKeyBlocking(identitetsnummer: String): KafkaKeysResponse = runBlocking { + getIdAndKey(identitetsnummer) +} diff --git a/apps/bekreftelse-tjeneste/src/main/resources/local/application_config.toml b/apps/bekreftelse-tjeneste/src/main/resources/local/application_config.toml new file mode 100644 index 00000000..5d729881 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/resources/local/application_config.toml @@ -0,0 +1,27 @@ +[bekreftelseIntervals] +interval = "P14D" +graceperiode = "P7D" +tilgjengeligOffset = "P3D" + +[kafkaTopology] +applicationIdSuffix = "v1" +internStateStoreName = "intern-tilstand" +periodeTopic = "paw.arbeidssokerperioder-v1" +bekreftelseTopic = "paw.arbeidssoker-bekreftelse-v1" +bekreftelseHendelseloggTopic = "paw.arbeidssoker-bekreftelse-hendelseslogg-v1" +punctuationInterval = "PT5S" + +[azureM2M] +tokenEndpointUrl = "http://localhost:8081/azure/token" +clientId = "paw-arbeidssoekerregisteret-bekreftelse-tjeneste" + +[kafkaKeysClient] +url = "http://localhost:8081/api/v2/hentEllerOpprett" +scope = "api://local.paw.paw-kafka-key-generator/.default" + +[kafkaStreams] +brokers = "localhost:9092" +applicationIdPrefix = "paw-arbeidssoekerregisteret-bekreftelse-tjeneste" + +[kafkaStreams.schemaRegistry] +url = "http://localhost:8082" diff --git a/apps/bekreftelse-tjeneste/src/main/resources/local/server_config.toml b/apps/bekreftelse-tjeneste/src/main/resources/local/server_config.toml new file mode 100644 index 00000000..5f9044d0 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/resources/local/server_config.toml @@ -0,0 +1,6 @@ +port = 8080 +callGroupSize = 16 +workerGroupSize = 8 +connectionGroupSize = 8 +gracePeriodMillis = 300 +timeoutMillis = 300 \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/resources/logback.xml b/apps/bekreftelse-tjeneste/src/main/resources/logback.xml new file mode 100644 index 00000000..bf60ec99 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/resources/logback.xml @@ -0,0 +1,33 @@ + + + + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %5p %c{1}:%L - %m%n + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/main/resources/nais/application_config.toml b/apps/bekreftelse-tjeneste/src/main/resources/nais/application_config.toml new file mode 100644 index 00000000..d95897c2 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/resources/nais/application_config.toml @@ -0,0 +1,34 @@ +[bekreftelseIntervals] +interval = "${BEKREFTELSE_INTERVAL}" +graceperiode = "${BEKREFTELSE_GRACEPERIODE}" +tilgjengeligOffset = "${BEKREFTELSE_GRACEPERIODE}" + +[kafkaTopology] +applicationIdSuffix = "${KAFKA_STREAMS_ID_SUFFIX}" +internStateStoreName = "intern-tilstand" +periodeTopic = "${KAFKA_PAW_ARBEIDSSOKERPERIODER_TOPIC}" +bekreftelseTopic = "${KAFKA_PAW_ARBEIDSSOKER_BEKREFTELSE_TOPIC}" +bekreftelseHendelseloggTopic = "${KAFKA_PAW_ARBEIDSSOKER_BEKREFTELSE_HENDELSELOGG_TOPIC}" +punctuationInterval = "${KAFKA_PUNCTUATOR_INTERVAL}" + +[azureM2M] +tokenEndpointUrl = "${AZURE_OPENID_CONFIG_TOKEN_ENDPOINT}" +clientId = "${AZURE_APP_CLIENT_ID}" + +[kafkaKeysClient] +url = "http://paw-kafka-key-generator/api/v2/hentEllerOpprett" +scope = "api://${NAIS_CLUSTER_NAME}.paw.paw-kafka-key-generator/.default" + +[kafkaStreams] +brokers = "${KAFKA_BROKERS}" +applicationIdPrefix = "${KAFKA_STREAMS_APPLICATION_ID}" + +[kafkaStreams.authentication] +keystorePath = "${KAFKA_KEYSTORE_PATH}" +truststorePath = "${KAFKA_TRUSTSTORE_PATH}" +credstorePassword = "${KAFKA_CREDSTORE_PASSWORD}" + +[kafkaStreams.schemaRegistry] +url = "${KAFKA_SCHEMA_REGISTRY}" +username = "${KAFKA_SCHEMA_REGISTRY_USER}" +password = "${KAFKA_SCHEMA_REGISTRY_PASSWORD}" diff --git a/apps/bekreftelse-tjeneste/src/main/resources/nais/server_config.toml b/apps/bekreftelse-tjeneste/src/main/resources/nais/server_config.toml new file mode 100644 index 00000000..5f9044d0 --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/main/resources/nais/server_config.toml @@ -0,0 +1,6 @@ +port = 8080 +callGroupSize = 16 +workerGroupSize = 8 +connectionGroupSize = 8 +gracePeriodMillis = 300 +timeoutMillis = 300 \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationTestContext.kt b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationTestContext.kt index d0ba7470..8987a8e6 100644 --- a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationTestContext.kt +++ b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/ApplicationTestContext.kt @@ -3,13 +3,21 @@ package no.nav.paw.bekreftelsetjeneste import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry import io.confluent.kafka.serializers.KafkaAvroSerializerConfig import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde +import io.micrometer.prometheusmetrics.PrometheusMeterRegistry +import io.mockk.mockk import no.nav.paw.arbeidssokerregisteret.api.v1.Periode import no.nav.paw.bekreftelse.ansvar.v1.AnsvarEndret import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde import no.nav.paw.bekreftelse.melding.v1.Bekreftelse +import no.nav.paw.bekreftelsetjeneste.config.APPLICATION_CONFIG_FILE_NAME +import no.nav.paw.bekreftelsetjeneste.config.ApplicationConfig +import no.nav.paw.bekreftelsetjeneste.context.ApplicationContext import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstandSerde -import no.nav.paw.kafkakeygenerator.client.KafkaKeysResponse +import no.nav.paw.bekreftelsetjeneste.topology.buildBekreftelseStream +import no.nav.paw.bekreftelsetjeneste.topology.buildPeriodeStream +import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration +import no.nav.paw.health.repository.HealthIndicatorRepository import no.nav.paw.kafkakeygenerator.client.inMemoryKafkaKeysMock import org.apache.avro.specific.SpecificRecord import org.apache.kafka.common.serialization.Serde @@ -17,12 +25,13 @@ import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.common.utils.Time import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.StreamsConfig +import org.apache.kafka.streams.TestInputTopic +import org.apache.kafka.streams.TestOutputTopic import org.apache.kafka.streams.TopologyTestDriver import org.apache.kafka.streams.state.internals.InMemoryKeyValueBytesStoreSupplier import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder import org.slf4j.Logger import org.slf4j.LoggerFactory -import java.time.Duration import java.time.Instant import java.util.* @@ -31,62 +40,49 @@ class ApplicationTestContext(initialWallClockTime: Instant = Instant.now()) { val bekreftelseSerde: Serde = opprettSerde() val periodeTopicSerde: Serde = opprettSerde() val hendelseLoggSerde: Serde = BekreftelseHendelseSerde() - val applicationConfiguration = ApplicationConfiguration( - periodeTopic = "periodeTopic", - ansvarsTopic = "ansvarsTopic", - bekreftelseTopic = "bekreftelseTopic", - bekreftelseHendelseloggTopic = "bekreftelseHendelsesloggTopic", - stateStoreName = "stateStoreName", - punctuateInterval = Duration.ofSeconds(1) - ) + val applicationConfig = loadNaisOrLocalConfiguration(APPLICATION_CONFIG_FILE_NAME) + val kafkaKeysClient = inMemoryKafkaKeysMock() val applicationContext = ApplicationContext( - internTilstandSerde = InternTilstandSerde(), - bekreftelseHendelseSerde = BekreftelseHendelseSerde(), - kafkaKeysClient = inMemoryKafkaKeysMock() + applicationConfig = applicationConfig, + prometheusMeterRegistry = mockk(), + healthIndicatorRepository = HealthIndicatorRepository(), + kafkaKeysClient = kafkaKeysClient ) val logger: Logger = LoggerFactory.getLogger(ApplicationTestContext::class.java) - val testDriver: TopologyTestDriver = - with(applicationContext) { - with(applicationConfiguration) { - StreamsBuilder() - .addStateStore( - KeyValueStoreBuilder( - InMemoryKeyValueBytesStoreSupplier(applicationConfiguration.stateStoreName), - Serdes.UUID(), - applicationContext.internTilstandSerde, - Time.SYSTEM - ) - ).appTopology() - } - }.let { TopologyTestDriver(it, kafkaStreamProperties, initialWallClockTime) } + val topology = StreamsBuilder().apply { + addStateStore( + KeyValueStoreBuilder( + InMemoryKeyValueBytesStoreSupplier(applicationConfig.kafkaTopology.internStateStoreName), + Serdes.UUID(), + InternTilstandSerde(), + Time.SYSTEM + ) + ) + buildPeriodeStream(applicationConfig, kafkaKeysClient) + buildBekreftelseStream(applicationConfig) + }.build() - val periodeTopic = testDriver.createInputTopic( - applicationConfiguration.periodeTopic, - Serdes.Long().serializer(), - periodeTopicSerde.serializer() - ) + val testDriver: TopologyTestDriver = TopologyTestDriver(topology, kafkaStreamProperties, initialWallClockTime) - val ansvarsTopic = testDriver.createInputTopic( - applicationConfiguration.ansvarsTopic, + val periodeTopic: TestInputTopic = testDriver.createInputTopic( + applicationConfig.kafkaTopology.periodeTopic, Serdes.Long().serializer(), - ansvarsTopicSerde.serializer() + periodeTopicSerde.serializer() ) - val bekreftelseTopic = testDriver.createInputTopic( - applicationConfiguration.bekreftelseTopic, + val bekreftelseTopic: TestInputTopic = testDriver.createInputTopic( + applicationConfig.kafkaTopology.bekreftelseTopic, Serdes.Long().serializer(), bekreftelseSerde.serializer() ) - val hendelseLoggTopicOut = testDriver.createOutputTopic( - applicationConfiguration.bekreftelseHendelseloggTopic, + val hendelseLoggTopicOut: TestOutputTopic = testDriver.createOutputTopic( + applicationConfig.kafkaTopology.bekreftelseHendelseloggTopic, Serdes.Long().deserializer(), hendelseLoggSerde.deserializer() ) - - fun kafkaKeyFunction(id: String): KafkaKeysResponse = applicationContext.kafkaKeyFunction(id) } const val SCHEMA_REGISTRY_SCOPE = "juni-registry" diff --git a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseMeldingTopologyTest.kt b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseMeldingTopologyTest.kt new file mode 100644 index 00000000..03a5216b --- /dev/null +++ b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelseMeldingTopologyTest.kt @@ -0,0 +1,153 @@ +package no.nav.paw.bekreftelsetjeneste + +import io.kotest.core.spec.style.FreeSpec +import io.kotest.matchers.shouldBe +import no.nav.paw.arbeidssoekerregisteret.testdata.kafkaKeyContext +import no.nav.paw.arbeidssoekerregisteret.testdata.mainavro.metadata +import no.nav.paw.arbeidssoekerregisteret.testdata.mainavro.periode +import no.nav.paw.bekreftelse.internehendelser.BekreftelseMeldingMottatt +import no.nav.paw.bekreftelse.internehendelser.BekreftelseTilgjengelig +import no.nav.paw.bekreftelse.melding.v1.Bekreftelse +import no.nav.paw.bekreftelse.melding.v1.vo.Bruker +import no.nav.paw.bekreftelse.melding.v1.vo.BrukerType +import no.nav.paw.bekreftelse.melding.v1.vo.Metadata +import no.nav.paw.bekreftelse.melding.v1.vo.Svar +import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand +import no.nav.paw.bekreftelsetjeneste.tilstand.PeriodeInfo +import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand +import java.time.Instant +import java.util.* + + +class BekreftelseMeldingTopologyTest : FreeSpec({ + + val identitetsnummer = "12345678901" + val startTime = Instant.ofEpochMilli(1704185347000) + + "For melding mottatt uten en tilhørende tilstand skal tilstand være uendret og hendelselogg skal være tom" { + with(ApplicationTestContext(initialWallClockTime = startTime)) { + val bekreftelseMelding = bekreftelseMelding( + periodeId = UUID.randomUUID(), + namespace = "tullball", + gjelderFra = Instant.now(), + gjelderTil = Instant.now(), + harJobbetIDennePerioden = true, + vilFortsetteSomArbeidssoeker = true + ) + + bekreftelseTopic.pipeInput(1234L, bekreftelseMelding) + val stateStore = + testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + stateStore.all().asSequence().count() shouldBe 0 + + hendelseLoggTopicOut.isEmpty shouldBe true + } + } + + "Mottatt melding med tilhørende tilstand av typen VenterSvar skal oppdatere tilstand til Levert og sende BekreftelseMeldingMottatt hendelse" { + with(ApplicationTestContext(initialWallClockTime = startTime)) { + with(kafkaKeyContext()) { + val (interval, _, tilgjengeligOffset, _) = applicationConfig.bekreftelseIntervals + val (id, key, periode) = periode(identitetsnummer = identitetsnummer, startetMetadata = metadata(tidspunkt = startTime)) + + periodeTopic.pipeInput(key, periode) + testDriver.advanceWallClockTime( + interval.minus(tilgjengeligOffset).plusSeconds(5) + ) + + val bekreftelseId = (hendelseLoggTopicOut.readValue() as BekreftelseTilgjengelig).bekreftelseId + val bekreftelseMelding = bekreftelseMelding( + id = bekreftelseId, + periodeId = periode.id, + namespace = "paw", + gjelderFra = startTime, + gjelderTil = startTime.plus(interval), + harJobbetIDennePerioden = true, + vilFortsetteSomArbeidssoeker = true + ) + bekreftelseTopic.pipeInput(key, bekreftelseMelding) + + val stateStore = + testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + val internTilstand = stateStore[periode.id] + + internTilstand shouldBe InternTilstand( + periode = PeriodeInfo( + periodeId = periode.id, + identitetsnummer = periode.identitetsnummer, + arbeidsoekerId = id, + recordKey = key, + startet = periode.startet.tidspunkt, + avsluttet = periode.avsluttet?.tidspunkt + ), + bekreftelser = listOf( + no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse( + tilstand = Tilstand.Levert, + tilgjengeliggjort = startTime.plus( + interval.minus(tilgjengeligOffset) + .plusSeconds(5) + ), + fristUtloept = null, + sisteVarselOmGjenstaaendeGraceTid = null, + bekreftelseId = bekreftelseMelding.id, + gjelderFra = bekreftelseMelding.svar.gjelderFra, + gjelderTil = bekreftelseMelding.svar.gjelderTil + ) + ) + ) + + hendelseLoggTopicOut.isEmpty shouldBe false + val hendelse = hendelseLoggTopicOut.readKeyValue() + hendelse.key shouldBe key + hendelse.value shouldBe BekreftelseMeldingMottatt( + hendelseId = hendelse.value.hendelseId, + periodeId = periode.id, + arbeidssoekerId = id, + bekreftelseId = bekreftelseMelding.id, + hendelseTidspunkt = hendelse.value.hendelseTidspunkt + ) + } + } + } +}) + +// TODO: flytt denne til test-data-lib +fun bekreftelseMelding( + id: UUID = UUID.randomUUID(), + periodeId: UUID = UUID.randomUUID(), + namespace: String = "paw", + gjelderFra: Instant = Instant.now(), + gjelderTil: Instant = Instant.now(), + harJobbetIDennePerioden: Boolean = true, + vilFortsetteSomArbeidssoeker: Boolean = true +) = + Bekreftelse + .newBuilder() + .setPeriodeId(periodeId) + .setNamespace(namespace) + .setId(id) + .setSvar( + Svar + .newBuilder() + .setSendtInn( + Metadata + .newBuilder() + .setTidspunkt(Instant.now()) + .setUtfoertAv( + Bruker + .newBuilder() + .setId("test") + .setType(BrukerType.SLUTTBRUKER) + .build() + ).setKilde("test") + .setAarsak("test") + .build() + ) + .setGjelderFra(gjelderFra) + .setGjelderTil(gjelderTil) + .setHarJobbetIDennePerioden(harJobbetIDennePerioden) + .setVilFortsetteSomArbeidssoeker(vilFortsetteSomArbeidssoeker) + .build() + + ) + .build() \ No newline at end of file diff --git a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuatorTest.kt b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuatorTest.kt index bcf5e00c..fc1e6816 100644 --- a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuatorTest.kt +++ b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/BekreftelsePunctuatorTest.kt @@ -3,6 +3,9 @@ package no.nav.paw.bekreftelsetjeneste import io.kotest.core.spec.style.FreeSpec import io.kotest.matchers.shouldBe import io.kotest.matchers.types.shouldBeInstanceOf +import no.nav.paw.arbeidssoekerregisteret.testdata.kafkaKeyContext +import no.nav.paw.arbeidssoekerregisteret.testdata.mainavro.metadata +import no.nav.paw.arbeidssoekerregisteret.testdata.mainavro.periode import no.nav.paw.bekreftelse.internehendelser.BaOmAaAvsluttePeriode import no.nav.paw.bekreftelse.internehendelser.BekreftelseMeldingMottatt import no.nav.paw.bekreftelse.internehendelser.BekreftelseTilgjengelig @@ -13,11 +16,11 @@ import no.nav.paw.bekreftelse.melding.v1.vo.BrukerType import no.nav.paw.bekreftelse.melding.v1.vo.Metadata import no.nav.paw.bekreftelse.melding.v1.vo.Svar import no.nav.paw.bekreftelsetjeneste.tilstand.Bekreftelse -import no.nav.paw.bekreftelsetjeneste.tilstand.BekreftelseConfig import no.nav.paw.bekreftelsetjeneste.tilstand.InternTilstand import no.nav.paw.bekreftelsetjeneste.tilstand.PeriodeInfo import no.nav.paw.bekreftelsetjeneste.tilstand.Tilstand import no.nav.paw.bekreftelsetjeneste.tilstand.fristForNesteBekreftelse +import no.nav.paw.bekreftelsetjeneste.topology.StateStore import java.time.Duration import java.time.Instant @@ -27,415 +30,488 @@ class BekreftelsePunctuatorTest : FreeSpec({ val identitetsnummer = "12345678901" "BekreftelsePunctuator sender riktig hendelser i rekkefølge" - { - with(ApplicationTestContext(initialWallClockTime = startTime)){ - val (periode, kafkaKeyResponse) = periode(identitetsnummer = identitetsnummer, startet = startTime) - periodeTopic.pipeInput(kafkaKeyResponse.key, periode) - "Når perioden opprettes skal det opprettes en intern tilstand med en bekreftelse" { + with(ApplicationTestContext(initialWallClockTime = startTime)) { + with(kafkaKeyContext()){ + val (interval, graceperiode, tilgjengeligOffset, varselFoerGraceperiodeUtloept) = applicationConfig.bekreftelseIntervals + val (id, key, periode) = periode(identitetsnummer = identitetsnummer, startetMetadata = metadata(tidspunkt = startTime)) + periodeTopic.pipeInput(key, periode) testDriver.advanceWallClockTime(Duration.ofSeconds(5)) - hendelseLoggTopicOut.isEmpty shouldBe true - val stateStore: StateStore = testDriver.getKeyValueStore(applicationConfiguration.stateStoreName) - val currentState = stateStore.get(periode.id) - currentState shouldBe InternTilstand( - periode = PeriodeInfo( - periodeId = periode.id, - identitetsnummer = periode.identitetsnummer, - arbeidsoekerId = kafkaKeyResponse.id, - recordKey = kafkaKeyResponse.key, - startet = periode.startet.tidspunkt, - avsluttet = periode.avsluttet?.tidspunkt - ), - bekreftelser = listOf( - Bekreftelse( - tilstand = Tilstand.IkkeKlarForUtfylling, - tilgjengeliggjort = null, - fristUtloept = null, - sisteVarselOmGjenstaaendeGraceTid = null, - bekreftelseId = currentState.bekreftelser.first().bekreftelseId, - gjelderFra = periode.startet.tidspunkt, - gjelderTil = fristForNesteBekreftelse(periode.startet.tidspunkt, BekreftelseConfig.bekreftelseInterval) + + "Når perioden opprettes skal det opprettes en intern tilstand med en bekreftelse" { + hendelseLoggTopicOut.isEmpty shouldBe true + val stateStore: StateStore = + testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + val currentState = stateStore.get(periode.id) + currentState shouldBe InternTilstand( + periode = PeriodeInfo( + periodeId = periode.id, + identitetsnummer = periode.identitetsnummer, + arbeidsoekerId = id, + recordKey = key, + startet = periode.startet.tidspunkt, + avsluttet = periode.avsluttet?.tidspunkt + ), bekreftelser = listOf( + Bekreftelse( + tilstand = Tilstand.IkkeKlarForUtfylling, + tilgjengeliggjort = null, + fristUtloept = null, + sisteVarselOmGjenstaaendeGraceTid = null, + bekreftelseId = currentState.bekreftelser.first().bekreftelseId, + gjelderFra = periode.startet.tidspunkt, + gjelderTil = fristForNesteBekreftelse( + periode.startet.tidspunkt, interval + ) + ) ) ) - ) + } - } - "Etter 11 dager skal det ha blitt sendt en BekreftelseTilgjengelig hendelse" { - testDriver.advanceWallClockTime(BekreftelseConfig.bekreftelseInterval.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset)) - val stateStore:StateStore = testDriver.getKeyValueStore(applicationConfiguration.stateStoreName) - stateStore.all().use { - it.forEach { - logger.info("key: ${it.key}, value: ${it.value}") + "Etter 11 dager skal det ha blitt sendt en BekreftelseTilgjengelig hendelse" { + testDriver.advanceWallClockTime(interval.minus(tilgjengeligOffset)) + val stateStore: StateStore = + testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + stateStore.all().use { + it.forEach { + logger.info("key: ${it.key}, value: ${it.value}") + } } + hendelseLoggTopicOut.isEmpty shouldBe false + val hendelser = hendelseLoggTopicOut.readKeyValuesToList() + logger.info("hendelser: $hendelser") + hendelser.size shouldBe 1 + val kv = hendelser.last() + kv.key shouldBe key + kv.value.shouldBeInstanceOf() } - hendelseLoggTopicOut.isEmpty shouldBe false - val hendelser = hendelseLoggTopicOut.readKeyValuesToList() - logger.info("hendelser: $hendelser") - hendelser.size shouldBe 1 - val kv = hendelser.last() - kv.key shouldBe kafkaKeyResponse.key - kv.value.shouldBeInstanceOf() - } - "Etter 14 dager skal det ha blitt sendt en LeveringsFristUtloept hendelse" { - testDriver.advanceWallClockTime(BekreftelseConfig.bekreftelseTilgjengeligOffset.plusSeconds(5)) - val stateStore:StateStore = testDriver.getKeyValueStore(applicationConfiguration.stateStoreName) - stateStore.all().use { - it.forEach { - logger.info("key: ${it.key}, value: ${it.value}") + + "Etter 14 dager skal det ha blitt sendt en LeveringsFristUtloept hendelse" { + testDriver.advanceWallClockTime(tilgjengeligOffset.plusSeconds(5)) + val stateStore: StateStore = + testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + stateStore.all().use { + it.forEach { + logger.info("key: ${it.key}, value: ${it.value}") + } } + hendelseLoggTopicOut.isEmpty shouldBe false + val hendelser = hendelseLoggTopicOut.readKeyValuesToList() + logger.info("hendelser: $hendelser") + hendelser.size shouldBe 1 + val hendelseLast = hendelser.last() + hendelseLast.key shouldBe key + hendelseLast.value.shouldBeInstanceOf() } - hendelseLoggTopicOut.isEmpty shouldBe false - val hendelser = hendelseLoggTopicOut.readKeyValuesToList() - logger.info("hendelser: $hendelser") - hendelser.size shouldBe 1 - val hendelseLast = hendelser.last() - hendelseLast.key shouldBe kafkaKeyResponse.key - hendelseLast.value.shouldBeInstanceOf() - } - "Etter 17,5 dager uten svar skal det ha blitt sendt en RegisterGracePeriodeGjenstaaendeTid hendelse" { - testDriver.advanceWallClockTime(BekreftelseConfig.varselFoerGracePeriodeUtloept.plusSeconds(5)) - val stateStore:StateStore = testDriver.getKeyValueStore(applicationConfiguration.stateStoreName) - stateStore.all().use { - it.forEach { - logger.info("key: ${it.key}, value: ${it.value}") + "Etter 17,5 dager uten svar skal det ha blitt sendt en RegisterGracePeriodeGjenstaaendeTid hendelse" { + testDriver.advanceWallClockTime(varselFoerGraceperiodeUtloept.plusSeconds(5)) + val stateStore: StateStore = + testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + stateStore.all().use { + it.forEach { + logger.info("key: ${it.key}, value: ${it.value}") + } } + hendelseLoggTopicOut.isEmpty shouldBe false + val hendelser = hendelseLoggTopicOut.readKeyValuesToList() + logger.info("hendelser: $hendelser") + hendelser.size shouldBe 1 + val kv = hendelser.last() + kv.key shouldBe key + kv.value.shouldBeInstanceOf() } - hendelseLoggTopicOut.isEmpty shouldBe false - val hendelser = hendelseLoggTopicOut.readKeyValuesToList() - logger.info("hendelser: $hendelser") - hendelser.size shouldBe 1 - val kv = hendelser.last() - kv.key shouldBe kafkaKeyResponse.key - kv.value.shouldBeInstanceOf() - } - "Etter 21 dager uten svar skal det ha blitt sendt en RegisterGracePeriodeUtloept hendelse" { - testDriver.advanceWallClockTime(BekreftelseConfig.varselFoerGracePeriodeUtloept.plusSeconds(5)) - val stateStore:StateStore = testDriver.getKeyValueStore(applicationConfiguration.stateStoreName) - stateStore.all().use { - it.forEach { - logger.info("key: ${it.key}, value: ${it.value}") + "Etter 21 dager uten svar skal det ha blitt sendt en RegisterGracePeriodeUtloept hendelse" { + testDriver.advanceWallClockTime(varselFoerGraceperiodeUtloept.plusSeconds(5)) + val stateStore: StateStore = + testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + stateStore.all().use { + it.forEach { + logger.info("key: ${it.key}, value: ${it.value}") + } } + hendelseLoggTopicOut.isEmpty shouldBe false + val hendelser = hendelseLoggTopicOut.readKeyValuesToList() + logger.info("hendelser: $hendelser") + hendelser.size shouldBe 1 + val kv = hendelser.last() + kv.key shouldBe key + kv.value.shouldBeInstanceOf() } - hendelseLoggTopicOut.isEmpty shouldBe false - val hendelser = hendelseLoggTopicOut.readKeyValuesToList() - logger.info("hendelser: $hendelser") - hendelser.size shouldBe 1 - val kv = hendelser.last() - kv.key shouldBe kafkaKeyResponse.key - kv.value.shouldBeInstanceOf() - } - "Etter 25 dager skal det ha blitt sendt en BekreftelseTilgjengelig hendelse" { - testDriver.advanceWallClockTime( - Duration.between( - startTime.plus(BekreftelseConfig.bekreftelseInterval).plus(BekreftelseConfig.gracePeriode), - startTime.plus(BekreftelseConfig.bekreftelseInterval).plus(BekreftelseConfig.bekreftelseInterval.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset)) + "Etter 25 dager skal det ha blitt sendt en BekreftelseTilgjengelig hendelse" { + testDriver.advanceWallClockTime( + Duration.between( + startTime.plus(interval).plus(graceperiode), + startTime.plus(interval) + .plus(interval.minus(tilgjengeligOffset)) + ) ) - ) - val stateStore:StateStore = testDriver.getKeyValueStore(applicationConfiguration.stateStoreName) - stateStore.all().use { - it.forEach { - logger.info("key: ${it.key}, value: ${it.value}") + val stateStore: StateStore = + testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + stateStore.all().use { + it.forEach { + logger.info("key: ${it.key}, value: ${it.value}") + } } + hendelseLoggTopicOut.isEmpty shouldBe false + val hendelser = hendelseLoggTopicOut.readKeyValuesToList() + logger.info("hendelser: $hendelser") + hendelser.size shouldBe 1 + val kv = hendelser.last() + kv.key shouldBe key + kv.value.shouldBeInstanceOf() } - hendelseLoggTopicOut.isEmpty shouldBe false - val hendelser = hendelseLoggTopicOut.readKeyValuesToList() - logger.info("hendelser: $hendelser") - hendelser.size shouldBe 1 - val kv = hendelser.last() - kv.key shouldBe kafkaKeyResponse.key - kv.value.shouldBeInstanceOf() } } } - "BekreftelsePunctuator håndterer BekreftelseMeldingMotatt hendelse" { - with(ApplicationTestContext(initialWallClockTime = startTime)){ - val (periode, kafkaKeyResponse) = periode(identitetsnummer = identitetsnummer, startet = startTime) + "BekreftelsePunctuator håndterer BekreftelseMeldingMottatt hendelse" { + with(ApplicationTestContext(initialWallClockTime = startTime)) { + with(kafkaKeyContext()){ + val (interval, graceperiode, tilgjengeligOffset, _) = applicationConfig.bekreftelseIntervals + val (_, key, periode) = periode(identitetsnummer = identitetsnummer, startetMetadata = metadata(tidspunkt = startTime)) - periodeTopic.pipeInput(kafkaKeyResponse.key, periode) - testDriver.advanceWallClockTime(BekreftelseConfig.bekreftelseInterval.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).plusSeconds(5)) - val stateStore:StateStore = testDriver.getKeyValueStore(applicationConfiguration.stateStoreName) - val currentState = stateStore.get(periode.id) - bekreftelseTopic.pipeInput(kafkaKeyResponse.key, - no.nav.paw.bekreftelse.melding.v1.Bekreftelse( - periode.id, - "paw", - currentState.bekreftelser.first().bekreftelseId, - Svar( - Metadata( - Instant.now(), - no.nav.paw.bekreftelse.melding.v1.vo.Bruker( - BrukerType.SLUTTBRUKER, - "12345678901" + periodeTopic.pipeInput(key, periode) + testDriver.advanceWallClockTime( + interval.minus(tilgjengeligOffset) + .plusSeconds(5) + ) + val stateStore: StateStore = + testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + val currentState = stateStore.get(periode.id) + bekreftelseTopic.pipeInput( + key, no.nav.paw.bekreftelse.melding.v1.Bekreftelse( + periode.id, "paw", currentState.bekreftelser.first().bekreftelseId, Svar( + Metadata( + Instant.now(), no.nav.paw.bekreftelse.melding.v1.vo.Bruker( + BrukerType.SLUTTBRUKER, "12345678901" + ), "test", "test" ), - "test", - "test" - ), - periode.startet.tidspunkt, - fristForNesteBekreftelse(periode.startet.tidspunkt, BekreftelseConfig.bekreftelseInterval), - true, - true + periode.startet.tidspunkt, + fristForNesteBekreftelse(periode.startet.tidspunkt, interval), + true, + true + ) ) ) - ) - testDriver.advanceWallClockTime(Duration.ofSeconds(5)) - testDriver.advanceWallClockTime(BekreftelseConfig.bekreftelseTilgjengeligOffset.plus(BekreftelseConfig.gracePeriode)) - val hendelseLoggOutput = hendelseLoggTopicOut.readKeyValuesToList() - logger.info("hendelseOutput: $hendelseLoggOutput") - stateStore.all().use { - it.forEach { - logger.info("key: ${it.key}, value: ${it.value}") + testDriver.advanceWallClockTime(Duration.ofSeconds(5)) + testDriver.advanceWallClockTime(tilgjengeligOffset.plus(graceperiode)) + val hendelseLoggOutput = hendelseLoggTopicOut.readKeyValuesToList() + logger.info("hendelseOutput: $hendelseLoggOutput") + stateStore.all().use { + it.forEach { + logger.info("key: ${it.key}, value: ${it.value}") + } } + hendelseLoggOutput.size shouldBe 2 + hendelseLoggOutput.filter { it.value is BekreftelseTilgjengelig }.size shouldBe 1 + hendelseLoggOutput.filter { it.value is BekreftelseMeldingMottatt }.size shouldBe 1 } - hendelseLoggOutput.size shouldBe 2 - hendelseLoggOutput.filter { it.value is BekreftelseTilgjengelig }.size shouldBe 1 - hendelseLoggOutput.filter { it.value is BekreftelseMeldingMottatt }.size shouldBe 1 + } } "BekreftelsePunctuator håndterer BaOmAaAvslutePeriode hendelse" { with(ApplicationTestContext(initialWallClockTime = startTime)) { - val (periode, kafkaKeyResponse) = periode(identitetsnummer = identitetsnummer, startet = startTime) + with(kafkaKeyContext()) { + val (interval, graceperiode, tilgjengeligOffset, _) = applicationConfig.bekreftelseIntervals + val (_, key, periode) = periode( + identitetsnummer = identitetsnummer, + startetMetadata = metadata(tidspunkt = startTime) + ) - periodeTopic.pipeInput(kafkaKeyResponse.key, periode) - testDriver.advanceWallClockTime(BekreftelseConfig.bekreftelseInterval.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).plusSeconds(5)) - val stateStore: StateStore = testDriver.getKeyValueStore(applicationConfiguration.stateStoreName) - val currentState = stateStore.get(periode.id) - bekreftelseTopic.pipeInput( - kafkaKeyResponse.key, - no.nav.paw.bekreftelse.melding.v1.Bekreftelse( - periode.id, - "paw", - currentState.bekreftelser.first().bekreftelseId, - Svar( - Metadata( - Instant.now(), - no.nav.paw.bekreftelse.melding.v1.vo.Bruker( - BrukerType.SLUTTBRUKER, - "12345678901" + periodeTopic.pipeInput(key, periode) + testDriver.advanceWallClockTime( + interval.minus(tilgjengeligOffset) + .plusSeconds(5) + ) + val stateStore: StateStore = + testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + val currentState = stateStore.get(periode.id) + bekreftelseTopic.pipeInput( + key, no.nav.paw.bekreftelse.melding.v1.Bekreftelse( + periode.id, "paw", currentState.bekreftelser.first().bekreftelseId, Svar( + Metadata( + Instant.now(), no.nav.paw.bekreftelse.melding.v1.vo.Bruker( + BrukerType.SLUTTBRUKER, "12345678901" + ), "test", "test" ), - "test", - "test" - ), - periode.startet.tidspunkt, - fristForNesteBekreftelse(periode.startet.tidspunkt, BekreftelseConfig.bekreftelseInterval), - true, - false + periode.startet.tidspunkt, + fristForNesteBekreftelse(periode.startet.tidspunkt, interval), + true, + false + ) ) ) - ) - testDriver.advanceWallClockTime(Duration.ofSeconds(5)) - testDriver.advanceWallClockTime(BekreftelseConfig.bekreftelseTilgjengeligOffset.plus(BekreftelseConfig.gracePeriode)) - val hendelseLoggOutput = hendelseLoggTopicOut.readKeyValuesToList() - logger.info("hendelseOutput: $hendelseLoggOutput") - stateStore.all().use { - it.forEach { - logger.info("key: ${it.key}, value: ${it.value}") + testDriver.advanceWallClockTime(Duration.ofSeconds(5)) + testDriver.advanceWallClockTime(tilgjengeligOffset.plus(graceperiode)) + val hendelseLoggOutput = hendelseLoggTopicOut.readKeyValuesToList() + logger.info("hendelseOutput: $hendelseLoggOutput") + stateStore.all().use { + it.forEach { + logger.info("key: ${it.key}, value: ${it.value}") + } } - } - hendelseLoggOutput.size shouldBe 3 + hendelseLoggOutput.size shouldBe 3 - hendelseLoggOutput.filter { it.value is BekreftelseTilgjengelig }.size shouldBe 1 - hendelseLoggOutput.filter { it.value is BekreftelseMeldingMottatt }.size shouldBe 1 - hendelseLoggOutput.filter { it.value is BaOmAaAvsluttePeriode }.size shouldBe 1 + hendelseLoggOutput.filter { it.value is BekreftelseTilgjengelig }.size shouldBe 1 + hendelseLoggOutput.filter { it.value is BekreftelseMeldingMottatt }.size shouldBe 1 + hendelseLoggOutput.filter { it.value is BaOmAaAvsluttePeriode }.size shouldBe 1 + } } } "BekreftelsePunctuator setter riktig tilstand og sender riktig hendelse for: " - { "IkkeKlarForUtfylling" { with(ApplicationTestContext(initialWallClockTime = startTime)) { - val (periode, kafkaKeyResponse) = periode(identitetsnummer = identitetsnummer, startet = startTime) - periodeTopic.pipeInput(kafkaKeyResponse.key, periode) - testDriver.advanceWallClockTime(Duration.ofSeconds(5)) - val stateStore: StateStore = testDriver.getKeyValueStore(applicationConfiguration.stateStoreName) - val currentState = stateStore.get(periode.id) - currentState shouldBe InternTilstand( - periode = PeriodeInfo( - periodeId = periode.id, - identitetsnummer = periode.identitetsnummer, - arbeidsoekerId = kafkaKeyResponse.id, - recordKey = kafkaKeyResponse.key, - startet = periode.startet.tidspunkt, - avsluttet = periode.avsluttet?.tidspunkt - ), - bekreftelser = listOf( - Bekreftelse( - tilstand = Tilstand.IkkeKlarForUtfylling, - tilgjengeliggjort = null, - fristUtloept = null, - sisteVarselOmGjenstaaendeGraceTid = null, - bekreftelseId = currentState.bekreftelser.first().bekreftelseId, - gjelderFra = periode.startet.tidspunkt, - gjelderTil = fristForNesteBekreftelse(periode.startet.tidspunkt, BekreftelseConfig.bekreftelseInterval) + with(kafkaKeyContext()) { + val (interval, _, _, _) = applicationConfig.bekreftelseIntervals + val (id, key, periode) = periode( + identitetsnummer = identitetsnummer, + startetMetadata = metadata(tidspunkt = startTime) + ) + periodeTopic.pipeInput(key, periode) + testDriver.advanceWallClockTime(Duration.ofSeconds(5)) + val stateStore: StateStore = + testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + val currentState = stateStore.get(periode.id) + currentState shouldBe InternTilstand( + periode = PeriodeInfo( + periodeId = periode.id, + identitetsnummer = periode.identitetsnummer, + arbeidsoekerId = id, + recordKey = key, + startet = periode.startet.tidspunkt, + avsluttet = periode.avsluttet?.tidspunkt + ), bekreftelser = listOf( + Bekreftelse( + tilstand = Tilstand.IkkeKlarForUtfylling, + tilgjengeliggjort = null, + fristUtloept = null, + sisteVarselOmGjenstaaendeGraceTid = null, + bekreftelseId = currentState.bekreftelser.first().bekreftelseId, + gjelderFra = periode.startet.tidspunkt, + gjelderTil = fristForNesteBekreftelse( + periode.startet.tidspunkt, interval + ) + ) ) ) - ) - hendelseLoggTopicOut.isEmpty shouldBe true + hendelseLoggTopicOut.isEmpty shouldBe true + } } } "KlarForUtfylling og BekreftelseTilgjengelig" { with(ApplicationTestContext(initialWallClockTime = startTime)) { - val (periode, kafkaKeyResponse) = periode(identitetsnummer = identitetsnummer, startet = startTime) - periodeTopic.pipeInput(kafkaKeyResponse.key, periode) - testDriver.advanceWallClockTime(BekreftelseConfig.bekreftelseInterval.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).plusSeconds(5)) - val stateStore: StateStore = testDriver.getKeyValueStore(applicationConfiguration.stateStoreName) - val currentState = stateStore.get(periode.id) - currentState shouldBe InternTilstand( - periode = PeriodeInfo( - periodeId = periode.id, - identitetsnummer = periode.identitetsnummer, - arbeidsoekerId = kafkaKeyResponse.id, - recordKey = kafkaKeyResponse.key, - startet = periode.startet.tidspunkt, - avsluttet = periode.avsluttet?.tidspunkt - ), - bekreftelser = listOf( - Bekreftelse( - tilstand = Tilstand.KlarForUtfylling, - tilgjengeliggjort = startTime.plus(BekreftelseConfig.bekreftelseInterval).minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).plusSeconds(5), - fristUtloept = null, - sisteVarselOmGjenstaaendeGraceTid = null, - bekreftelseId = currentState.bekreftelser.first().bekreftelseId, - gjelderFra = periode.startet.tidspunkt, - gjelderTil = fristForNesteBekreftelse(periode.startet.tidspunkt, BekreftelseConfig.bekreftelseInterval) + with(kafkaKeyContext()) { + val (interval, _, tilgjengeligOffset, _) = applicationConfig.bekreftelseIntervals + val (id, key, periode) = periode( + identitetsnummer = identitetsnummer, + startetMetadata = metadata(tidspunkt = startTime) + ) + + periodeTopic.pipeInput(key, periode) + testDriver.advanceWallClockTime( + interval.minus(tilgjengeligOffset) + .plusSeconds(5) + ) + val stateStore: StateStore = + testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + val currentState = stateStore.get(periode.id) + currentState shouldBe InternTilstand( + periode = PeriodeInfo( + periodeId = periode.id, + identitetsnummer = periode.identitetsnummer, + arbeidsoekerId = id, + recordKey = key, + startet = periode.startet.tidspunkt, + avsluttet = periode.avsluttet?.tidspunkt + ), bekreftelser = listOf( + Bekreftelse( + tilstand = Tilstand.KlarForUtfylling, + tilgjengeliggjort = startTime.plus(interval) + .minus(tilgjengeligOffset).plusSeconds(5), + fristUtloept = null, + sisteVarselOmGjenstaaendeGraceTid = null, + bekreftelseId = currentState.bekreftelser.first().bekreftelseId, + gjelderFra = periode.startet.tidspunkt, + gjelderTil = fristForNesteBekreftelse( + periode.startet.tidspunkt, interval + ) + ) ) ) - ) - hendelseLoggTopicOut.isEmpty shouldBe false - val hendelser = hendelseLoggTopicOut.readKeyValuesToList() - hendelser.size shouldBe 1 - val kv = hendelser.last() - kv.key shouldBe kafkaKeyResponse.key - kv.value.shouldBeInstanceOf() + hendelseLoggTopicOut.isEmpty shouldBe false + val hendelser = hendelseLoggTopicOut.readKeyValuesToList() + hendelser.size shouldBe 1 + val kv = hendelser.last() + kv.key shouldBe key + kv.value.shouldBeInstanceOf() + } } } "VenterSvar og LeveringsfristUtloept" { with(ApplicationTestContext(initialWallClockTime = startTime)) { - val (periode, kafkaKeyResponse) = periode(identitetsnummer = identitetsnummer, startet = startTime) - periodeTopic.pipeInput(kafkaKeyResponse.key, periode) - // Spoler frem til BekreftelseTilgjengelig - testDriver.advanceWallClockTime(BekreftelseConfig.bekreftelseInterval.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).plusSeconds(5)) - // Spoler frem til LeveringsfristUtloept - testDriver.advanceWallClockTime(BekreftelseConfig.bekreftelseTilgjengeligOffset.plusSeconds(5)) - val stateStore: StateStore = testDriver.getKeyValueStore(applicationConfiguration.stateStoreName) - stateStore.all().use { - it.forEach { - logger.info("key: ${it.key}, value: ${it.value}") + with(kafkaKeyContext()) { + val (interval, _, tilgjengeligOffset, _) = applicationConfig.bekreftelseIntervals + val (id, key, periode) = periode( + identitetsnummer = identitetsnummer, + startetMetadata = metadata(tidspunkt = startTime) + ) + periodeTopic.pipeInput(key, periode) + // Spoler frem til BekreftelseTilgjengelig + testDriver.advanceWallClockTime( + interval.minus(tilgjengeligOffset) + .plusSeconds(5) + ) + // Spoler frem til LeveringsfristUtloept + testDriver.advanceWallClockTime(tilgjengeligOffset.plusSeconds(5)) + val stateStore: StateStore = + testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + stateStore.all().use { + it.forEach { + logger.info("key: ${it.key}, value: ${it.value}") + } } - } - val currentState = stateStore.get(periode.id) - currentState shouldBe InternTilstand( - periode = PeriodeInfo( - periodeId = periode.id, - identitetsnummer = periode.identitetsnummer, - arbeidsoekerId = kafkaKeyResponse.id, - recordKey = kafkaKeyResponse.key, - startet = periode.startet.tidspunkt, - avsluttet = periode.avsluttet?.tidspunkt - ), - bekreftelser = listOf( - Bekreftelse( - tilstand = Tilstand.VenterSvar, - tilgjengeliggjort = startTime.plus(BekreftelseConfig.bekreftelseInterval).minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).plusSeconds(5), - fristUtloept = startTime.plus(BekreftelseConfig.bekreftelseInterval).plusSeconds(10), - sisteVarselOmGjenstaaendeGraceTid = null, - bekreftelseId = currentState.bekreftelser.first().bekreftelseId, - gjelderFra = periode.startet.tidspunkt, - gjelderTil = fristForNesteBekreftelse(periode.startet.tidspunkt, BekreftelseConfig.bekreftelseInterval) + val currentState = stateStore.get(periode.id) + currentState shouldBe InternTilstand( + periode = PeriodeInfo( + periodeId = periode.id, + identitetsnummer = periode.identitetsnummer, + arbeidsoekerId = id, + recordKey = key, + startet = periode.startet.tidspunkt, + avsluttet = periode.avsluttet?.tidspunkt + ), bekreftelser = listOf( + Bekreftelse( + tilstand = Tilstand.VenterSvar, + tilgjengeliggjort = startTime.plus(interval) + .minus(tilgjengeligOffset).plusSeconds(5), + fristUtloept = startTime.plus(interval).plusSeconds(10), + sisteVarselOmGjenstaaendeGraceTid = null, + bekreftelseId = currentState.bekreftelser.first().bekreftelseId, + gjelderFra = periode.startet.tidspunkt, + gjelderTil = fristForNesteBekreftelse( + periode.startet.tidspunkt, interval + ) + ) ) ) - ) - hendelseLoggTopicOut.isEmpty shouldBe false - val hendelser = hendelseLoggTopicOut.readKeyValuesToList() - logger.info("hendelser: $hendelser") - hendelser.size shouldBe 2 - val kv = hendelser.last() - kv.key shouldBe kafkaKeyResponse.key - kv.value.shouldBeInstanceOf() + hendelseLoggTopicOut.isEmpty shouldBe false + val hendelser = hendelseLoggTopicOut.readKeyValuesToList() + logger.info("hendelser: $hendelser") + hendelser.size shouldBe 2 + val kv = hendelser.last() + kv.key shouldBe key + kv.value.shouldBeInstanceOf() + } } } "VenterSvar og RegisterGracePeriodeGjenstaaende" { with(ApplicationTestContext(initialWallClockTime = startTime)) { - val (periode, kafkaKeyResponse) = periode(identitetsnummer = identitetsnummer, startet = startTime) - periodeTopic.pipeInput(kafkaKeyResponse.key, periode) - // Spoler frem til BekreftelseTilgjengelig - testDriver.advanceWallClockTime(BekreftelseConfig.bekreftelseInterval.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).plusSeconds(5)) - // Spoler frem til LeveringsfristUtloept - testDriver.advanceWallClockTime(BekreftelseConfig.bekreftelseTilgjengeligOffset.plusSeconds(5)) - // Spoler frem til RegisterGracePeriodeGjenstaaende - testDriver.advanceWallClockTime(BekreftelseConfig.varselFoerGracePeriodeUtloept.plusSeconds(5)) - val stateStore: StateStore = testDriver.getKeyValueStore(applicationConfiguration.stateStoreName) - val currentState = stateStore.get(periode.id) - currentState shouldBe InternTilstand( - periode = PeriodeInfo( - periodeId = periode.id, - identitetsnummer = periode.identitetsnummer, - arbeidsoekerId = kafkaKeyResponse.id, - recordKey = kafkaKeyResponse.key, - startet = periode.startet.tidspunkt, - avsluttet = periode.avsluttet?.tidspunkt - ), - bekreftelser = listOf( - Bekreftelse( - tilstand = Tilstand.VenterSvar, - tilgjengeliggjort = startTime.plus(BekreftelseConfig.bekreftelseInterval).minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).plusSeconds(5), - fristUtloept = startTime.plus(BekreftelseConfig.bekreftelseInterval).plusSeconds(10), - sisteVarselOmGjenstaaendeGraceTid = startTime.plus(BekreftelseConfig.bekreftelseInterval).plus(BekreftelseConfig.varselFoerGracePeriodeUtloept).plusSeconds(15), - bekreftelseId = currentState.bekreftelser.first().bekreftelseId, - gjelderFra = periode.startet.tidspunkt, - gjelderTil = fristForNesteBekreftelse(periode.startet.tidspunkt, BekreftelseConfig.bekreftelseInterval) + with(kafkaKeyContext()) { + val (interval, _, tilgjengeligOffset, varselFoerGraceperiodeUtloept) = applicationConfig.bekreftelseIntervals + val (id, key, periode) = periode( + identitetsnummer = identitetsnummer, + startetMetadata = metadata(tidspunkt = startTime) + ) + periodeTopic.pipeInput(key, periode) + // Spoler frem til BekreftelseTilgjengelig + testDriver.advanceWallClockTime( + interval.minus(tilgjengeligOffset) + .plusSeconds(5) + ) + // Spoler frem til LeveringsfristUtloept + testDriver.advanceWallClockTime(tilgjengeligOffset.plusSeconds(5)) + // Spoler frem til RegisterGracePeriodeGjenstaaende + testDriver.advanceWallClockTime(varselFoerGraceperiodeUtloept.plusSeconds(5)) + val stateStore: StateStore = + testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + val currentState = stateStore.get(periode.id) + currentState shouldBe InternTilstand( + periode = PeriodeInfo( + periodeId = periode.id, + identitetsnummer = periode.identitetsnummer, + arbeidsoekerId = id, + recordKey = key, + startet = periode.startet.tidspunkt, + avsluttet = periode.avsluttet?.tidspunkt + ), bekreftelser = listOf( + Bekreftelse( + tilstand = Tilstand.VenterSvar, + tilgjengeliggjort = startTime.plus(interval) + .minus(tilgjengeligOffset).plusSeconds(5), + fristUtloept = startTime.plus(interval).plusSeconds(10), + sisteVarselOmGjenstaaendeGraceTid = startTime.plus(interval) + .plus(varselFoerGraceperiodeUtloept).plusSeconds(15), + bekreftelseId = currentState.bekreftelser.first().bekreftelseId, + gjelderFra = periode.startet.tidspunkt, + gjelderTil = fristForNesteBekreftelse( + periode.startet.tidspunkt, interval + ) + ) ) ) - ) - hendelseLoggTopicOut.isEmpty shouldBe false - val hendelser = hendelseLoggTopicOut.readKeyValuesToList() - logger.info("hendelser: $hendelser") - hendelser.size shouldBe 3 - val kv = hendelser.last() - kv.key shouldBe kafkaKeyResponse.key - kv.value.shouldBeInstanceOf() + hendelseLoggTopicOut.isEmpty shouldBe false + val hendelser = hendelseLoggTopicOut.readKeyValuesToList() + logger.info("hendelser: $hendelser") + hendelser.size shouldBe 3 + val kv = hendelser.last() + kv.key shouldBe key + kv.value.shouldBeInstanceOf() + } } } "GracePeriodeUtloept og RegisterGracePeriodeUtloept" { with(ApplicationTestContext(initialWallClockTime = startTime)) { - val (periode, kafkaKeyResponse) = periode(identitetsnummer = identitetsnummer, startet = startTime) - periodeTopic.pipeInput(kafkaKeyResponse.key, periode) - // Spoler frem til BekreftelseTilgjengelig - testDriver.advanceWallClockTime(BekreftelseConfig.bekreftelseInterval.minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).plusSeconds(5)) - // Spoler frem til LeveringsfristUtloept - testDriver.advanceWallClockTime(BekreftelseConfig.bekreftelseTilgjengeligOffset.plusSeconds(5)) - // Spoler frem til RegisterGracePeriodeGjenstaaendeTid - testDriver.advanceWallClockTime(BekreftelseConfig.varselFoerGracePeriodeUtloept.plusSeconds(5)) - // Spoler frem til RegisterGracePeriodeUtloept - testDriver.advanceWallClockTime(BekreftelseConfig.varselFoerGracePeriodeUtloept.plusSeconds(5)) - val stateStore: StateStore = testDriver.getKeyValueStore(applicationConfiguration.stateStoreName) - val currentState = stateStore.get(periode.id) - currentState shouldBe InternTilstand( - periode = PeriodeInfo( - periodeId = periode.id, - identitetsnummer = periode.identitetsnummer, - arbeidsoekerId = kafkaKeyResponse.id, - recordKey = kafkaKeyResponse.key, - startet = periode.startet.tidspunkt, - avsluttet = periode.avsluttet?.tidspunkt - ), - bekreftelser = listOf( - Bekreftelse( - tilstand = Tilstand.GracePeriodeUtloept, - tilgjengeliggjort = startTime.plus(BekreftelseConfig.bekreftelseInterval).minus(BekreftelseConfig.bekreftelseTilgjengeligOffset).plusSeconds(5), - fristUtloept = startTime.plus(BekreftelseConfig.bekreftelseInterval).plusSeconds(10), - sisteVarselOmGjenstaaendeGraceTid = startTime.plus(BekreftelseConfig.bekreftelseInterval).plus(BekreftelseConfig.varselFoerGracePeriodeUtloept).plusSeconds(15), - bekreftelseId = currentState.bekreftelser.first().bekreftelseId, - gjelderFra = periode.startet.tidspunkt, - gjelderTil = fristForNesteBekreftelse(periode.startet.tidspunkt, BekreftelseConfig.bekreftelseInterval) + with(kafkaKeyContext()) { + val (interval, _, tilgjengeligOffset, varselFoerGraceperiodeUtloept) = applicationConfig.bekreftelseIntervals + val (id, key, periode) = periode( + identitetsnummer = identitetsnummer, + startetMetadata = metadata(tidspunkt = startTime) + ) + periodeTopic.pipeInput(key, periode) + // Spoler frem til BekreftelseTilgjengelig + testDriver.advanceWallClockTime( + interval.minus(tilgjengeligOffset) + .plusSeconds(5) + ) + // Spoler frem til LeveringsfristUtloept + testDriver.advanceWallClockTime(tilgjengeligOffset.plusSeconds(5)) + // Spoler frem til RegisterGracePeriodeGjenstaaendeTid + testDriver.advanceWallClockTime(varselFoerGraceperiodeUtloept.plusSeconds(5)) + // Spoler frem til RegisterGracePeriodeUtloept + testDriver.advanceWallClockTime(varselFoerGraceperiodeUtloept.plusSeconds(5)) + val stateStore: StateStore = + testDriver.getKeyValueStore(applicationConfig.kafkaTopology.internStateStoreName) + val currentState = stateStore.get(periode.id) + currentState shouldBe InternTilstand( + periode = PeriodeInfo( + periodeId = periode.id, + identitetsnummer = periode.identitetsnummer, + arbeidsoekerId = id, + recordKey = key, + startet = periode.startet.tidspunkt, + avsluttet = periode.avsluttet?.tidspunkt + ), bekreftelser = listOf( + Bekreftelse( + tilstand = Tilstand.GracePeriodeUtloept, + tilgjengeliggjort = startTime.plus(interval) + .minus(tilgjengeligOffset).plusSeconds(5), + fristUtloept = startTime.plus(interval).plusSeconds(10), + sisteVarselOmGjenstaaendeGraceTid = startTime.plus(interval) + .plus(varselFoerGraceperiodeUtloept).plusSeconds(15), + bekreftelseId = currentState.bekreftelser.first().bekreftelseId, + gjelderFra = periode.startet.tidspunkt, + gjelderTil = fristForNesteBekreftelse( + periode.startet.tidspunkt, interval + ) + ) ) ) - ) - hendelseLoggTopicOut.isEmpty shouldBe false - val hendelser = hendelseLoggTopicOut.readKeyValuesToList() - logger.info("hendelser: $hendelser") - hendelser.size shouldBe 4 - val kv = hendelser.last() - kv.key shouldBe kafkaKeyResponse.key - kv.value.shouldBeInstanceOf() + hendelseLoggTopicOut.isEmpty shouldBe false + val hendelser = hendelseLoggTopicOut.readKeyValuesToList() + logger.info("hendelser: $hendelser") + hendelser.size shouldBe 4 + val kv = hendelser.last() + kv.key shouldBe key + kv.value.shouldBeInstanceOf() + } } } } }) - - diff --git a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/IngenAndreTarAnsvarTest.kt b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/IngenAndreTarAnsvarTest.kt index cf3798c7..7f7fc667 100644 --- a/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/IngenAndreTarAnsvarTest.kt +++ b/apps/bekreftelse-tjeneste/src/test/kotlin/no/nav/paw/bekreftelsetjeneste/IngenAndreTarAnsvarTest.kt @@ -4,73 +4,46 @@ import io.kotest.core.annotation.Ignored import io.kotest.core.spec.style.FreeSpec import io.kotest.matchers.shouldBe import io.kotest.matchers.types.shouldBeInstanceOf -import no.nav.paw.arbeidssokerregisteret.api.v1.Bruker -import no.nav.paw.arbeidssokerregisteret.api.v1.BrukerType -import no.nav.paw.arbeidssokerregisteret.api.v1.Metadata -import no.nav.paw.arbeidssokerregisteret.api.v1.Periode -import no.nav.paw.bekreftelse.internehendelser.LeveringsfristUtloept +import no.nav.paw.arbeidssoekerregisteret.testdata.kafkaKeyContext +import no.nav.paw.arbeidssoekerregisteret.testdata.mainavro.periode import no.nav.paw.bekreftelse.internehendelser.BekreftelseTilgjengelig +import no.nav.paw.bekreftelse.internehendelser.LeveringsfristUtloept import java.time.Duration -import java.time.Instant -import java.util.* @Ignored("Midlertidig disablet av Thomas") -class IngenAndreTarAnsvarTest: FreeSpec({ +class IngenAndreTarAnsvarTest : FreeSpec({ with(ApplicationTestContext()) { - "Applikasjons test hvor ingen andre tar ansvar" - { - "Bruker avslutter via rapportering" - { - val (periode, kafkaKeyResponse) = periode(identitetsnummer = "12345678901") - periodeTopic.pipeInput(kafkaKeyResponse.key, periode) - "Nå perioden opprettes skal det ikke skje noe" { - hendelseLoggTopicOut.isEmpty shouldBe true - } - "Etter 13 dager skal en rapportering være tilgjengelig" { - testDriver.advanceWallClockTime(Duration.ofDays(13)) - hendelseLoggTopicOut.isEmpty shouldBe false - val kv = hendelseLoggTopicOut.readKeyValue() - kv.key shouldBe kafkaKeyResponse.key - with(kv.value.shouldBeInstanceOf()) { - periodeId shouldBe periode.id - arbeidssoekerId shouldBe kafkaKeyResponse.id - gjelderFra shouldBe periode.startet.tidspunkt + with(kafkaKeyContext()) { + "Applikasjons test hvor ingen andre tar ansvar" - { + "Bruker avslutter via rapportering" - { + val (id, key, periode) = periode(identitetsnummer = "12345678901") + periodeTopic.pipeInput(key, periode) + "Nå perioden opprettes skal det ikke skje noe" { + hendelseLoggTopicOut.isEmpty shouldBe true } - } - "Når rapporteringen ikke blir besvart innen fristen sendes det ut en melding" { - testDriver.advanceWallClockTime(Duration.ofDays(4)) - hendelseLoggTopicOut.isEmpty shouldBe false - val kv = hendelseLoggTopicOut.readKeyValue() - kv.key shouldBe kafkaKeyResponse.key - with(kv.value.shouldBeInstanceOf()) { - periodeId shouldBe periode.id - arbeidssoekerId shouldBe kafkaKeyResponse.id + "Etter 13 dager skal en rapportering være tilgjengelig" { + testDriver.advanceWallClockTime(Duration.ofDays(13)) + hendelseLoggTopicOut.isEmpty shouldBe false + val kv = hendelseLoggTopicOut.readKeyValue() + kv.key shouldBe key + with(kv.value.shouldBeInstanceOf()) { + periodeId shouldBe periode.id + arbeidssoekerId shouldBe id + gjelderFra shouldBe periode.startet.tidspunkt + } + } + "Når rapporteringen ikke blir besvart innen fristen sendes det ut en melding" { + testDriver.advanceWallClockTime(Duration.ofDays(4)) + hendelseLoggTopicOut.isEmpty shouldBe false + val kv = hendelseLoggTopicOut.readKeyValue() + kv.key shouldBe key + with(kv.value.shouldBeInstanceOf()) { + periodeId shouldBe periode.id + arbeidssoekerId shouldBe id + } } } } + } } -}}) - -context(ApplicationTestContext) -fun periode( - id: UUID = UUID.randomUUID(), - identitetsnummer: String = "12345678901", - startet: Instant = Instant.now(), - avsluttet: Instant? = null -) = Periode( - id, - identitetsnummer, - Metadata( - startet, - Bruker(BrukerType.SLUTTBRUKER, identitetsnummer), - "junit", - "tester", - null - ), - avsluttet?.let { Metadata( - avsluttet, - Bruker(BrukerType.SLUTTBRUKER, identitetsnummer), - "junit", - "tester", - null) - } -) to (kafkaKeyFunction(identitetsnummer)) - +}) diff --git a/apps/kafka-key-generator/nais/nais-dev.yaml b/apps/kafka-key-generator/nais/nais-dev.yaml index ca013ff7..c7bd9f45 100644 --- a/apps/kafka-key-generator/nais/nais-dev.yaml +++ b/apps/kafka-key-generator/nais/nais-dev.yaml @@ -53,4 +53,5 @@ spec: - application: paw-microfrontend-toggler - application: paw-arbeidssoekerregisteret-hendelselogg-backup - application: paw-arbeidssoekerregisteret-api-bekreftelse + - application: paw-arbeidssoekerregisteret-bekreftelse-tjeneste - application: paw-arbeidssoekerregisteret-bekreftelse-min-side-varsler