diff --git a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/InitKafkaStreams.kt b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/InitKafkaStreams.kt index f04e1f71..6fea2e49 100644 --- a/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/InitKafkaStreams.kt +++ b/apps/kafka-key-maintenance/src/main/kotlin/no/nav/paw/kafkakeymaintenance/InitKafkaStreams.kt @@ -77,6 +77,12 @@ fun initTopology( Serdes.StringSerde(), aktorSerde ) + ).addStateStore( + Stores.keyValueStoreBuilder( + stateStoreBuilderFactory("${aktorTopologyConfig.stateStoreName}_trace_id"), + Serdes.String(), + Serdes.String() + ) ) streamsBuilder.buildAktorTopology( meterRegistry = meterRegistry, diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 19b710da..e3e1124c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -65,6 +65,7 @@ ktor-serialization-kotlinx-json = { group = "io.ktor", name = "ktor-serializatio opentelemetry-api = { group = "io.opentelemetry", name = "opentelemetry-api", version.ref = "otelTargetSdkVersion" } opentelemetry-ktor = { group = "io.opentelemetry.instrumentation", name = "opentelemetry-ktor-2.0", version.ref = "otelInstrumentationKtorVersion" } opentelemetry-annotations = { group = "io.opentelemetry.instrumentation", name = "opentelemetry-instrumentation-annotations", version.ref = "otelInstrumentationVersion" } +opentelemetry-sdk = { group = "io.opentelemetry", name = "opentelemetry-sdk", version.ref = "otelTargetSdkVersion" } micrometerCore = { group = "io.micrometer", name = "micrometer-core", version.ref = "micrometerVersion" } micrometer-registryPrometheus = { group = "io.micrometer", name = "micrometer-registry-prometheus", version.ref = "micrometerVersion" } kafka-clients = { group = "org.apache.kafka", name = "kafka-clients", version.ref = "orgApacheKafkaVersion" } diff --git a/lib/kafka-streams/build.gradle.kts b/lib/kafka-streams/build.gradle.kts index d40c317a..2d753646 100644 --- a/lib/kafka-streams/build.gradle.kts +++ b/lib/kafka-streams/build.gradle.kts @@ -7,6 +7,8 @@ dependencies { implementation(libs.kafka.clients) implementation(libs.kafka.streams.core) implementation(libs.avro.kafkaStreamsSerde) + implementation(libs.opentelemetry.api) + implementation(libs.opentelemetry.sdk) // Test testImplementation(libs.bundles.testLibsWithUnitTesting) diff --git a/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/SuppressByWallClock.kt b/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/SuppressByWallClock.kt index 77eb5da2..a15ac1c8 100644 --- a/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/SuppressByWallClock.kt +++ b/lib/kafka-streams/src/main/kotlin/no/nav/paw/config/kafka/streams/SuppressByWallClock.kt @@ -1,14 +1,24 @@ package no.nav.paw.config.kafka.streams +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.trace.Span +import io.opentelemetry.context.Context +import io.opentelemetry.context.propagation.TextMapGetter +import io.opentelemetry.context.propagation.TextMapPropagator +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.kstream.KStream import org.apache.kafka.streams.processor.PunctuationType import org.apache.kafka.streams.processor.api.Record +import org.apache.kafka.streams.state.KeyValueStore import org.apache.kafka.streams.state.TimestampedKeyValueStore import org.apache.kafka.streams.state.ValueAndTimestamp import java.time.Duration import java.time.Instant +private const val TRACE_PARENT = "traceparent" + fun KStream.supressByWallClock( name: String, duration: Duration, @@ -19,17 +29,37 @@ fun KStream.supressByWallClock( punctuation = Punctuation( interval = checkInterval, type = PunctuationType.WALL_CLOCK_TIME, - function = { wallclock , context -> + function = { wallclock, context -> val store = context.getStateStore>(name) + val storeTraceId = context.getStateStore>("${name}_trace_id") store.all() .use { iterator -> val limit = wallclock - duration iterator.forEach { (key, timestamp, value) -> if (timestamp.isBefore(limit)) { - context.forward( - Record(key, value, timestamp.toEpochMilli()) - ) - store.delete(key) + val traceparent = storeTraceId.get(key) + val span = traceparent + ?.let { traceParent -> + createSpanFromTraceparent(traceParent) + } ?: Span.current() + try { + context.forward( + Record(key, value, timestamp.toEpochMilli()) + .let { record -> + traceparent?.let { tp -> + record.withHeaders( + RecordHeaders( + arrayOf(RecordHeader(TRACE_PARENT, tp.toByteArray())) + ) + ) + } ?: record + } + ) + store.delete(key) + storeTraceId.delete(key) + } finally { + span.end() + } } } } @@ -37,15 +67,31 @@ fun KStream.supressByWallClock( ), function = { record -> val store = getStateStore>(name) + val storeTraceId = getStateStore>("${name}_trace_id") val timestamp = store.get(record.key())?.timestamp() ?: record.timestamp() store.put( record.key(), ValueAndTimestamp.make(record.value(), timestamp) ) + storeTraceId.put(record.key(), record.headers().lastHeader(TRACE_PARENT)?.value()?.toString(Charsets.UTF_8)) }, - stateStoreNames = listOf(name).toTypedArray() + stateStoreNames = listOf(name, "${name}_trace_id").toTypedArray() ) operator fun KeyValue>.component1(): K = this.key operator fun KeyValue>.component2(): Instant = Instant.ofEpochMilli(value.timestamp()) operator fun KeyValue>.component3(): V = value.value() + +fun createSpanFromTraceparent(traceparent: String): Span { + val propagator: TextMapPropagator = GlobalOpenTelemetry.getPropagators().textMapPropagator + val context: Context = propagator.extract(Context.current(), traceparent, object : TextMapGetter { + override fun keys(carrier: String): Iterable = listOf(TRACE_PARENT) + override fun get(carrier: String?, key: String): String? = carrier + }) + + val spanContext = Span.fromContext(context).spanContext + val tracer = GlobalOpenTelemetry.getTracer("custom-tracer") + return tracer.spanBuilder("forward aktor_v2") + .setParent(Context.current().with(Span.wrap(spanContext))) + .startSpan() +}