Skip to content

Commit

Permalink
La in støtte for tracing i 'supressByWallClock'
Browse files Browse the repository at this point in the history
  • Loading branch information
nilsmsa committed Nov 14, 2024
1 parent 522a686 commit 44b41ab
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ fun initTopology(
Serdes.StringSerde(),
aktorSerde
)
).addStateStore(
Stores.keyValueStoreBuilder(
stateStoreBuilderFactory("${aktorTopologyConfig.stateStoreName}_trace_id"),
Serdes.String(),
Serdes.String()
)
)
streamsBuilder.buildAktorTopology(
meterRegistry = meterRegistry,
Expand Down
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ ktor-serialization-kotlinx-json = { group = "io.ktor", name = "ktor-serializatio
opentelemetry-api = { group = "io.opentelemetry", name = "opentelemetry-api", version.ref = "otelTargetSdkVersion" }
opentelemetry-ktor = { group = "io.opentelemetry.instrumentation", name = "opentelemetry-ktor-2.0", version.ref = "otelInstrumentationKtorVersion" }
opentelemetry-annotations = { group = "io.opentelemetry.instrumentation", name = "opentelemetry-instrumentation-annotations", version.ref = "otelInstrumentationVersion" }
opentelemetry-sdk = { group = "io.opentelemetry", name = "opentelemetry-sdk", version.ref = "otelTargetSdkVersion" }
micrometerCore = { group = "io.micrometer", name = "micrometer-core", version.ref = "micrometerVersion" }
micrometer-registryPrometheus = { group = "io.micrometer", name = "micrometer-registry-prometheus", version.ref = "micrometerVersion" }
kafka-clients = { group = "org.apache.kafka", name = "kafka-clients", version.ref = "orgApacheKafkaVersion" }
Expand Down
2 changes: 2 additions & 0 deletions lib/kafka-streams/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ dependencies {
implementation(libs.kafka.clients)
implementation(libs.kafka.streams.core)
implementation(libs.avro.kafkaStreamsSerde)
implementation(libs.opentelemetry.api)
implementation(libs.opentelemetry.sdk)

// Test
testImplementation(libs.bundles.testLibsWithUnitTesting)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
package no.nav.paw.config.kafka.streams

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.TextMapGetter
import io.opentelemetry.context.propagation.TextMapPropagator
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.processor.PunctuationType
import org.apache.kafka.streams.processor.api.Record
import org.apache.kafka.streams.state.KeyValueStore
import org.apache.kafka.streams.state.TimestampedKeyValueStore
import org.apache.kafka.streams.state.ValueAndTimestamp
import java.time.Duration
import java.time.Instant

private const val TRACE_PARENT = "traceparent"

fun <K, V> KStream<K, V>.supressByWallClock(
name: String,
duration: Duration,
Expand All @@ -19,33 +29,69 @@ fun <K, V> KStream<K, V>.supressByWallClock(
punctuation = Punctuation(
interval = checkInterval,
type = PunctuationType.WALL_CLOCK_TIME,
function = { wallclock , context ->
function = { wallclock, context ->
val store = context.getStateStore<TimestampedKeyValueStore<K, V>>(name)
val storeTraceId = context.getStateStore<KeyValueStore<K, String>>("${name}_trace_id")
store.all()
.use { iterator ->
val limit = wallclock - duration
iterator.forEach { (key, timestamp, value) ->
if (timestamp.isBefore(limit)) {
context.forward(
Record(key, value, timestamp.toEpochMilli())
)
store.delete(key)
val traceparent = storeTraceId.get(key)
val span = traceparent
?.let { traceParent ->
createSpanFromTraceparent(traceParent)
} ?: Span.current()
try {
context.forward(
Record(key, value, timestamp.toEpochMilli())
.let { record ->
traceparent?.let { tp ->
record.withHeaders(
RecordHeaders(
arrayOf(RecordHeader(TRACE_PARENT, tp.toByteArray()))
)
)
} ?: record
}
)
store.delete(key)
storeTraceId.delete(key)
} finally {
span.end()
}
}
}
}
}
),
function = { record ->
val store = getStateStore<TimestampedKeyValueStore<K, V>>(name)
val storeTraceId = getStateStore<KeyValueStore<K, String>>("${name}_trace_id")
val timestamp = store.get(record.key())?.timestamp() ?: record.timestamp()
store.put(
record.key(),
ValueAndTimestamp.make(record.value(), timestamp)
)
storeTraceId.put(record.key(), record.headers().lastHeader(TRACE_PARENT)?.value()?.toString(Charsets.UTF_8))
},
stateStoreNames = listOf(name).toTypedArray()
stateStoreNames = listOf(name, "${name}_trace_id").toTypedArray()
)

operator fun <K, V> KeyValue<K, ValueAndTimestamp<V>>.component1(): K = this.key
operator fun <K, V> KeyValue<K, ValueAndTimestamp<V>>.component2(): Instant = Instant.ofEpochMilli(value.timestamp())
operator fun <K, V> KeyValue<K, ValueAndTimestamp<V>>.component3(): V = value.value()

fun createSpanFromTraceparent(traceparent: String): Span {
val propagator: TextMapPropagator = GlobalOpenTelemetry.getPropagators().textMapPropagator
val context: Context = propagator.extract(Context.current(), traceparent, object : TextMapGetter<String> {
override fun keys(carrier: String): Iterable<String> = listOf(TRACE_PARENT)
override fun get(carrier: String?, key: String): String? = carrier
})

val spanContext = Span.fromContext(context).spanContext
val tracer = GlobalOpenTelemetry.getTracer("custom-tracer")
return tracer.spanBuilder("forward aktor_v2")
.setParent(Context.current().with(Span.wrap(spanContext)))
.startSpan()
}

0 comments on commit 44b41ab

Please sign in to comment.