Skip to content

Commit

Permalink
Ryddet litt i bekreftelse varsel topology
Browse files Browse the repository at this point in the history
  • Loading branch information
nilsmsa committed Oct 3, 2024
1 parent c19d3ac commit 8bcc38e
Showing 1 changed file with 8 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import java.util.*

typealias InternalStateStore = KeyValueStore<UUID, InternTilstand>

fun ProcessorContext<*, *>.getStateStore(stateStoreName: StateStoreName ): InternalStateStore = getStateStore(stateStoreName.value)
fun ProcessorContext<*, *>.getStateStore(stateStoreName: StateStoreName): InternalStateStore =
getStateStore(stateStoreName.value)

private val logger = LoggerFactory.getLogger("bekreftelse.varsler.topology")

Expand All @@ -46,7 +47,7 @@ fun StreamsBuilder.applicationTopology(
stream(
kafkaTopics.bekreftelseHendelseTopic, Consumed.with(Serdes.Long(), BekreftelseHendelseSerde())
).mapWithContext("bekreftelse-hendelse-mottatt", stateStoreName.value) { hendelse ->
val store: KeyValueStore<UUID, InternTilstand> = getStateStore(stateStoreName.value)
val store = getStateStore(stateStoreName)
val tilstand = store[hendelse.periodeId]
if (tilstand == null) {
logger.warn(
Expand All @@ -64,9 +65,11 @@ fun StreamsBuilder.applicationTopology(
}
meldinger
}
}.flatMapValues { _, meldinger -> meldinger }.mapKeyAndValue("map_til_utgaaende") { _, melding ->
melding.varselId.toString() to melding.value
}.to(kafkaTopics.tmsOppgaveTopic, Produced.with(Serdes.String(), Serdes.String()))
}
.flatMapValues { _, meldinger -> meldinger }
.mapKeyAndValue("map_til_utgaaende") { _, melding ->
melding.varselId.toString() to melding.value
}.to(kafkaTopics.tmsOppgaveTopic, Produced.with(Serdes.String(), Serdes.String()))

return build()
}

0 comments on commit 8bcc38e

Please sign in to comment.