Skip to content

Commit

Permalink
Flyttet lagring av state of videresending av hendelser ut av 'process…
Browse files Browse the repository at this point in the history
…PawNameSpace'
  • Loading branch information
nilsmsa committed Oct 9, 2024
1 parent b8b9c17 commit d177360
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,10 @@ fun initTilstand(
),
bekreftelser = emptyList()
)

fun InternTilstand.oppdaterBekreftelse(ny: Bekreftelse): InternTilstand {
val nyBekreftelser = bekreftelser.map {
if (it.bekreftelseId == ny.bekreftelseId) ny else it
}
return copy(bekreftelser = nyBekreftelser)
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ fun StreamsBuilder.buildBekreftelseStream(applicationConfig: ApplicationConfig)
}

if (record.value().namespace == "paw") {
processPawNamespace(record, stateStore, gjeldendeTilstand, ::forward)
val (nyTilstand, hendelser) = processPawNamespace(record.value(), gjeldendeTilstand)
if (nyTilstand != gjeldendeTilstand) {
stateStore.put(gjeldendeTilstand.periode.periodeId, nyTilstand)
}
forwardHendelser(record, hendelser, this::forward)
}
}
.to(bekreftelseHendelseloggTopic, Produced.with(Serdes.Long(), BekreftelseHendelseSerde()))
Expand All @@ -75,36 +79,32 @@ fun retrieveState(
kind = SpanKind.INTERNAL
)
fun processPawNamespace(
record: Record<Long, no.nav.paw.bekreftelse.melding.v1.Bekreftelse>,
stateStore: StateStore,
gjeldeneTilstand: InternTilstand,
forward: (Record<Long, BekreftelseHendelse>) -> Unit
) {
val bekreftelse = gjeldeneTilstand.findBekreftelse(record.value().id)
hendelse: no.nav.paw.bekreftelse.melding.v1.Bekreftelse,
gjeldeneTilstand: InternTilstand
): Pair<InternTilstand, List<BekreftelseHendelse>> {
val bekreftelse = gjeldeneTilstand.findBekreftelse(hendelse.id)

if (bekreftelse == null) {
Span.current().addEvent("Bekreftelse not found for message")
meldingsLogger.warn("Melding {} har ingen matchene bekreftelse", record.value().id)
return
meldingsLogger.warn("Melding {} har ingen matchene bekreftelse", hendelse.id)
return gjeldeneTilstand to emptyList()
}

when (val sisteTilstand = bekreftelse.sisteTilstand()) {
return when (val sisteTilstand = bekreftelse.sisteTilstand()) {
is VenterSvar, is KlarForUtfylling -> {
val (hendelser, oppdatertBekreftelse) = behandleGyldigSvar(gjeldeneTilstand, record, bekreftelse)
oppdaterStateStore(stateStore, gjeldeneTilstand, oppdatertBekreftelse)

val (hendelser, oppdatertBekreftelse) = behandleGyldigSvar(gjeldeneTilstand, hendelse, bekreftelse)
Span.current().setAttribute("bekreftelse.tilstand", sisteTilstand.toString())

forwardHendelser(record, hendelser, forward)
gjeldeneTilstand.oppdaterBekreftelse(oppdatertBekreftelse) to hendelser
}

else -> {
Span.current().setAttribute("unexpected_tilstand", sisteTilstand.toString())
meldingsLogger.warn(
"Melding {} har ikke forventet tilstand, tilstand={}",
record.value().id,
hendelse.id,
sisteTilstand
)
gjeldeneTilstand to emptyList()
}
}
}
Expand All @@ -117,26 +117,26 @@ fun InternTilstand.findBekreftelse(id: UUID): Bekreftelse? = bekreftelser.find {
)
fun behandleGyldigSvar(
gjeldeneTilstand: InternTilstand,
record: Record<Long, no.nav.paw.bekreftelse.melding.v1.Bekreftelse>,
record: no.nav.paw.bekreftelse.melding.v1.Bekreftelse,
bekreftelse: Bekreftelse
): Pair<List<BekreftelseHendelse>, Bekreftelse> {
val arbeidssoekerId = gjeldeneTilstand.periode.arbeidsoekerId
val sisteTilstand = bekreftelse.sisteTilstand()
Span.current().setAttribute("arbeidsoekerId", arbeidssoekerId.toString())
Span.current().setAttribute("bekreftelseId", bekreftelse.bekreftelseId.toString())
Span.current().setAttribute("periodeId", record.value().periodeId.toString())
Span.current().setAttribute("periodeId", record.periodeId.toString())
Span.current().setAttribute("bekreftelse.oldTilstand", sisteTilstand.toString())

val oppdatertBekreftelse = bekreftelse + Levert(Instant.now())
Span.current().setAttribute("bekreftelse.newTilstand", oppdatertBekreftelse.sisteTilstand().toString())

val vilFortsette = record.value().svar.vilFortsetteSomArbeidssoeker
val vilFortsette = record.svar.vilFortsetteSomArbeidssoeker
Span.current().setAttribute("vilFortsetteSomArbeidssoeker", vilFortsette.toString())

val baOmAaAvslutte = if (!vilFortsette) {
val baOmAaAvslutteHendelse = BaOmAaAvsluttePeriode(
hendelseId = UUID.randomUUID(),
periodeId = record.value().periodeId,
periodeId = record.periodeId,
arbeidssoekerId = arbeidssoekerId,
hendelseTidspunkt = Instant.now()
)
Expand All @@ -146,7 +146,7 @@ fun behandleGyldigSvar(

val meldingMottatt = BekreftelseMeldingMottatt(
hendelseId = UUID.randomUUID(),
periodeId = record.value().periodeId,
periodeId = record.periodeId,
arbeidssoekerId = arbeidssoekerId,
bekreftelseId = bekreftelse.bekreftelseId,
hendelseTidspunkt = Instant.now()
Expand Down

0 comments on commit d177360

Please sign in to comment.