Skip to content

Commit

Permalink
Forenklet trace håndtering i suppresByWallClock
Browse files Browse the repository at this point in the history
  • Loading branch information
nilsmsa committed Nov 14, 2024
1 parent 49556b5 commit 54794b2
Showing 1 changed file with 23 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package no.nav.paw.config.kafka.streams

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.TextMapGetter
import io.opentelemetry.context.propagation.TextMapPropagator
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.streams.KeyValue
Expand All @@ -14,10 +10,13 @@ import org.apache.kafka.streams.processor.api.Record
import org.apache.kafka.streams.state.KeyValueStore
import org.apache.kafka.streams.state.TimestampedKeyValueStore
import org.apache.kafka.streams.state.ValueAndTimestamp
import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.Instant

private const val TRACE_PARENT = "traceparent"
const val TRACE_PARENT = "traceparent"

private val suppressByWallBlockLogger = LoggerFactory.getLogger("suppressByWallBlockLogger")

fun <K, V> KStream<K, V>.supressByWallClock(
name: String,
Expand All @@ -38,28 +37,21 @@ fun <K, V> KStream<K, V>.supressByWallClock(
iterator.forEach { (key, timestamp, value) ->
if (timestamp.isBefore(limit)) {
val traceparent = storeTraceId.get(key)
val span = traceparent
?.let { traceParent ->
createSpanFromTraceparent(traceParent)
} ?: Span.current()
try {
context.forward(
Record(key, value, timestamp.toEpochMilli())
.let { record ->
traceparent?.let { tp ->
record.withHeaders(
RecordHeaders(
arrayOf(RecordHeader(TRACE_PARENT, tp.toByteArray()))
)
suppressByWallBlockLogger.info("Loaded traceparent: {}", traceparent)
context.forward(
Record(key, value, timestamp.toEpochMilli())
.let { record ->
traceparent?.let { tp ->
record.withHeaders(
RecordHeaders(
arrayOf(RecordHeader(TRACE_PARENT, tp.toByteArray()))
)
} ?: record
}
)
store.delete(key)
storeTraceId.delete(key)
} finally {
span.end()
}
)
} ?: record
}
)
store.delete(key)
storeTraceId.delete(key)
}
}
}
Expand All @@ -69,13 +61,17 @@ fun <K, V> KStream<K, V>.supressByWallClock(
val store = getStateStore<TimestampedKeyValueStore<K, V>>(name)
val storeTraceId = getStateStore<KeyValueStore<K, String>>("${name}_trace_id")
val timestamp = store.get(record.key())?.timestamp() ?: record.timestamp()
val traceparent = Span.current().spanContext.let { ctx ->
"00-${ctx.traceId}-${ctx.spanId}-${ctx.traceFlags.asHex()}"
}
store.put(
record.key(),
ValueAndTimestamp.make(record.value(), timestamp)
)
suppressByWallBlockLogger.info("Storing traceparent: {}", traceparent)
storeTraceId.put(
record.key(),
record.headers().lastHeader(TRACE_PARENT)?.value()?.toString(Charsets.UTF_8)
traceparent
)
},
stateStoreNames = listOf(name, "${name}_trace_id").toTypedArray()
Expand All @@ -84,17 +80,3 @@ fun <K, V> KStream<K, V>.supressByWallClock(
operator fun <K, V> KeyValue<K, ValueAndTimestamp<V>>.component1(): K = this.key
operator fun <K, V> KeyValue<K, ValueAndTimestamp<V>>.component2(): Instant = Instant.ofEpochMilli(value.timestamp())
operator fun <K, V> KeyValue<K, ValueAndTimestamp<V>>.component3(): V = value.value()

fun createSpanFromTraceparent(traceparent: String): Span {
val propagator: TextMapPropagator = GlobalOpenTelemetry.getPropagators().textMapPropagator
val context: Context = propagator.extract(Context.current(), traceparent, object : TextMapGetter<String> {
override fun keys(carrier: String): Iterable<String> = listOf(TRACE_PARENT)
override fun get(carrier: String?, key: String): String? = carrier
})

val spanContext = Span.fromContext(context).spanContext
val tracer = GlobalOpenTelemetry.getTracer("custom-tracer")
return tracer.spanBuilder("forward aktor_v2")
.setParent(Context.current().with(Span.wrap(spanContext)))
.startSpan()
}

0 comments on commit 54794b2

Please sign in to comment.