Skip to content

Commit

Permalink
Merge pull request #796 from iRevive/benchmarks/trace
Browse files Browse the repository at this point in the history
sdk-trace: use `MapRef` in `SpanStorage`
  • Loading branch information
iRevive authored Oct 8, 2024
2 parents 0ac353d + 189580c commit 2171738
Show file tree
Hide file tree
Showing 14 changed files with 137 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,84 +17,124 @@
package org.typelevel.otel4s.benchmarks

import cats.effect.IO
import cats.effect.Resource
import cats.effect.unsafe.implicits.global
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.`export`.SimpleSpanProcessor
import org.openjdk.jmh.annotations._
import org.typelevel.otel4s.oteljava.OtelJava
import org.typelevel.otel4s.trace.Tracer

import java.util.concurrent.TimeUnit

// benchmarks/Jmh/run org.typelevel.otel4s.benchmarks.TraceBenchmark -prof gc
@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(2)
@Measurement(iterations = 40, time = 1)
@Warmup(iterations = 5, time = 1)
class TraceBenchmark {

import TraceBenchmark._

@Param(Array("noop", "oteljava", "sdk"))
var backend: String = _
var tracer: Tracer[IO] = _
var finalizer: IO[Unit] = _

@Benchmark
def pure(): Unit =
IO.unit.unsafeRunSync()

@Benchmark
def noop(ctx: NoopTracer): Unit = {
import ctx._
def span(): Unit =
tracer.span("span").use_.unsafeRunSync()
}

@Benchmark
def inMemoryDisabled(ctx: InMemoryTracer): Unit = {
import ctx._
tracer
.noopScope(
tracer.span("span").use_
)
.unsafeRunSync()
}
def noopScope(): Unit =
tracer.noopScope(tracer.span("span").use_).unsafeRunSync()

@Benchmark
def inMemoryEnabled(ctx: InMemoryTracer): Unit = {
import ctx._
tracer
.rootScope(
tracer.span("span").use_
)
.unsafeRunSync()
}
def rootScope(): Unit =
tracer.rootScope(tracer.span("span").use_).unsafeRunSync()

@Setup(Level.Trial)
def setup(): Unit =
backend match {
case "noop" =>
tracer = noopTracer
finalizer = IO.unit

case "oteljava" =>
val (t, release) = otelJavaTracer.allocated.unsafeRunSync()

tracer = t
finalizer = release

case "sdk" =>
val (t, release) = sdkTracer.allocated.unsafeRunSync()

tracer = t
finalizer = release

case other =>
sys.error(s"unknown backend [$other]")
}

@TearDown(Level.Trial)
def cleanup(): Unit =
finalizer.unsafeRunSync()
}

object TraceBenchmark {
@State(Scope.Benchmark)
class NoopTracer {
implicit val tracer: Tracer[IO] = Tracer.noop
}

@State(Scope.Benchmark)
class InMemoryTracer {
private def makeTracer: IO[Tracer[IO]] = {
val exporter = InMemorySpanExporter.create()
private def otelJavaTracer: Resource[IO, Tracer[IO]] = {
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.`export`.BatchSpanProcessor
import org.typelevel.otel4s.oteljava.OtelJava

val builder = SdkTracerProvider
.builder()
.addSpanProcessor(SimpleSpanProcessor.create(exporter))
def exporter = InMemorySpanExporter.create()

val tracerProvider: SdkTracerProvider =
builder.build()
def builder = SdkTracerProvider
.builder()
.addSpanProcessor(BatchSpanProcessor.builder(exporter).build())

val otel = OpenTelemetrySdk
.builder()
.setTracerProvider(tracerProvider)
.build()
def tracerProvider: SdkTracerProvider =
builder.build()

OtelJava.forAsync[IO](otel).flatMap {
_.tracerProvider.tracer("trace-benchmark").get
def otel = OpenTelemetrySdk
.builder()
.setTracerProvider(tracerProvider)
.build()

OtelJava
.resource[IO](IO(otel))
.evalMap(_.tracerProvider.tracer("trace-benchmark").get)
}

private def sdkTracer: Resource[IO, Tracer[IO]] = {
import cats.effect.std.Random
import org.typelevel.otel4s.context.LocalProvider
import org.typelevel.otel4s.sdk.context.Context
import org.typelevel.otel4s.sdk.testkit.trace.InMemorySpanExporter
import org.typelevel.otel4s.sdk.trace.SdkTracerProvider
import org.typelevel.otel4s.sdk.trace.processor.BatchSpanProcessor

Resource.eval(InMemorySpanExporter.create[IO](None)).flatMap { exporter =>
BatchSpanProcessor.builder(exporter).build.evalMap { processor =>
Random.scalaUtilRandom[IO].flatMap { implicit random =>
LocalProvider[IO, Context].local.flatMap { implicit local =>
for {
tracerProvider <- SdkTracerProvider.builder[IO].addSpanProcessor(processor).build
tracer <- tracerProvider.get("trace-benchmark")
} yield tracer
}
}
}
}

implicit val tracer: Tracer[IO] =
makeTracer.unsafeRunSync()
}

private def noopTracer: Tracer[IO] =
Tracer.noop

}
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ThisBuild / tlBaseVersion := "0.10"
ThisBuild / tlBaseVersion := "0.11"

ThisBuild / organization := "org.typelevel"
ThisBuild / organizationName := "Typelevel"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,7 @@ class OpenTelemetrySdkSuite extends CatsEffectSuite {
s"OpenTelemetrySdk{meterProvider=$meterProvider, " +
"tracerProvider=" +
s"SdkTracerProvider{resource=$resource, spanLimits=$spanLimits, sampler=$sampler, " +
"spanProcessor=SpanProcessor.Multi(" +
s"BatchSpanProcessor{exporter=$exporter, scheduleDelay=5 seconds, exporterTimeout=30 seconds, maxQueueSize=2048, maxExportBatchSize=512}, " +
"SpanStorage)}, " +
s"spanProcessor=BatchSpanProcessor{exporter=$exporter, scheduleDelay=5 seconds, exporterTimeout=30 seconds, maxQueueSize=2048, maxExportBatchSize=512}}, " +
s"propagators=$propagators}, resource=$resource}"

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import scala.concurrent.duration.FiniteDuration
private final class SdkSpanBackend[F[_]: Monad: Clock: Console] private (
spanLimits: SpanLimits,
spanProcessor: SpanProcessor[F],
spanStorage: SpanStorage[F],
immutableState: SdkSpanBackend.ImmutableState,
mutableState: Ref[F, SdkSpanBackend.MutableState]
) extends Span.Backend[F]
Expand Down Expand Up @@ -174,6 +175,7 @@ private final class SdkSpanBackend[F[_]: Monad: Clock: Console] private (
def end(timestamp: FiniteDuration): F[Unit] =
for {
updated <- updateState("end")(s => s.copy(endTimestamp = Some(timestamp)))
_ <- spanStorage.remove(this)
_ <- toSpanData.flatMap(span => spanProcessor.onEnd(span)).whenA(updated)
} yield ()

Expand Down Expand Up @@ -270,6 +272,9 @@ private object SdkSpanBackend {
* @param processor
* the [[SpanProcessor]] to call on span's start and end
*
* @param spanStorage
* the `SpanStorage` to store the span at
*
* @param attributes
* the [[Attributes]] of the span
*
Expand All @@ -289,6 +294,7 @@ private object SdkSpanBackend {
parentContext: Option[SpanContext],
spanLimits: SpanLimits,
processor: SpanProcessor[F],
spanStorage: SpanStorage[F],
attributes: LimitedData[Attribute[_], Attributes],
links: LimitedData[LinkData, Vector[LinkData]],
userStartTimestamp: Option[FiniteDuration]
Expand All @@ -315,12 +321,8 @@ private object SdkSpanBackend {
for {
start <- userStartTimestamp.fold(Clock[F].realTime)(_.pure)
state <- Ref[F].of(mutableState)
backend = new SdkSpanBackend[F](
spanLimits,
processor,
immutableState(start),
state
)
backend = new SdkSpanBackend[F](spanLimits, processor, spanStorage, immutableState(start), state)
_ <- spanStorage.add(backend)
_ <- processor.onStart(parentContext, backend)
} yield backend
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ private final case class SdkSpanBuilder[F[_]: Temporal: Console] private (
parentContext = parentSpanContext,
spanLimits = tracerSharedState.spanLimits,
processor = tracerSharedState.spanProcessor,
spanStorage = tracerSharedState.spanStorage,
attributes = attributes.appendAll(samplingResult.attributes),
links = links,
userStartTimestamp = state.startTimestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.typelevel.otel4s.context.propagation.TextMapUpdater
import org.typelevel.otel4s.meta.InstrumentMeta
import org.typelevel.otel4s.sdk.common.InstrumentationScope
import org.typelevel.otel4s.sdk.context.Context
import org.typelevel.otel4s.sdk.trace.processor.SpanStorage
import org.typelevel.otel4s.trace.Span
import org.typelevel.otel4s.trace.SpanBuilder
import org.typelevel.otel4s.trace.SpanContext
Expand All @@ -39,8 +38,7 @@ private final class SdkTracer[F[_]: Temporal: Console] private[trace] (
scopeInfo: InstrumentationScope,
propagators: ContextPropagators[Context],
sharedState: TracerSharedState[F],
traceScope: TraceScope[F, Context],
storage: SpanStorage[F]
traceScope: TraceScope[F, Context]
) extends Tracer[F] {

val meta: InstrumentMeta[F] = InstrumentMeta.enabled[F]
Expand All @@ -49,10 +47,9 @@ private final class SdkTracer[F[_]: Temporal: Console] private[trace] (
traceScope.current.map(current => current.filter(_.isValid))

private[this] def currentBackend: OptionT[F, Span.Backend[F]] =
OptionT(traceScope.current)
.semiflatMap { ctx =>
OptionT(storage.get(ctx)).getOrElse(Span.Backend.propagating(ctx))
}
OptionT(traceScope.current).semiflatMap { ctx =>
OptionT(sharedState.spanStorage.get(ctx)).getOrElse(Span.Backend.propagating(ctx))
}

def currentSpanOrNoop: F[Span[F]] =
currentBackend
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.typelevel.otel4s.Attributes
import org.typelevel.otel4s.context.propagation.ContextPropagators
import org.typelevel.otel4s.sdk.common.InstrumentationScope
import org.typelevel.otel4s.sdk.context.Context
import org.typelevel.otel4s.sdk.trace.processor.SpanStorage
import org.typelevel.otel4s.trace.TraceScope
import org.typelevel.otel4s.trace.Tracer
import org.typelevel.otel4s.trace.TracerBuilder
Expand All @@ -32,7 +31,6 @@ private final case class SdkTracerBuilder[F[_]: Temporal: Console](
propagators: ContextPropagators[Context],
traceScope: TraceScope[F, Context],
sharedState: TracerSharedState[F],
storage: SpanStorage[F],
name: String,
version: Option[String] = None,
schemaUrl: Option[String] = None
Expand All @@ -50,8 +48,7 @@ private final case class SdkTracerBuilder[F[_]: Temporal: Console](
InstrumentationScope(name, version, schemaUrl, Attributes.empty),
propagators,
sharedState,
traceScope,
storage
traceScope
)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.typelevel.otel4s.context.propagation.TextMapPropagator
import org.typelevel.otel4s.sdk.context.Context
import org.typelevel.otel4s.sdk.context.LocalContext
import org.typelevel.otel4s.sdk.trace.processor.SpanProcessor
import org.typelevel.otel4s.sdk.trace.processor.SpanStorage
import org.typelevel.otel4s.sdk.trace.samplers.Sampler
import org.typelevel.otel4s.trace.TraceScope
import org.typelevel.otel4s.trace.TracerBuilder
Expand All @@ -50,11 +49,12 @@ private class SdkTracerProvider[F[_]: Temporal: Parallel: Console](
resource,
spanLimits,
sampler,
SpanProcessor.of(spanProcessors: _*)
SpanProcessor.of(spanProcessors: _*),
storage
)

def tracer(name: String): TracerBuilder[F] =
new SdkTracerBuilder[F](propagators, traceScope, sharedState, storage, name)
new SdkTracerBuilder[F](propagators, traceScope, sharedState, name)

override def toString: String =
"SdkTracerProvider{" +
Expand Down Expand Up @@ -212,7 +212,7 @@ object SdkTracerProvider {
spanLimits,
sampler,
ContextPropagators.of(propagators: _*),
spanProcessors :+ storage,
spanProcessors,
SdkTraceScope.fromLocal[F],
storage
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@
*/

package org.typelevel.otel4s.sdk.trace
package processor

import cats.Applicative
import cats.effect.Concurrent
import cats.effect.Ref
import cats.effect.std.MapRef
import cats.syntax.functor._
import org.typelevel.otel4s.sdk.trace.data.SpanData
import org.typelevel.otel4s.trace.SpanContext

/** The span storage is used to keep the references of the active SpanRefs.
Expand All @@ -32,30 +29,23 @@ import org.typelevel.otel4s.trace.SpanContext
* @tparam F
* the higher-kinded type of a polymorphic effect
*/
private[trace] class SpanStorage[F[_]: Applicative] private (
storage: Ref[F, Map[SpanContext, SpanRef[F]]]
) extends SpanProcessor[F] {
val name: String = "SpanStorage"
private class SpanStorage[F[_]] private (
storage: MapRef[F, SpanContext, Option[SpanRef[F]]]
) {

def isStartRequired: Boolean = true
def isEndRequired: Boolean = true
def add(span: SpanRef[F]): F[Unit] =
storage(span.context).set(Some(span))

def onStart(parentContext: Option[SpanContext], span: SpanRef[F]): F[Unit] =
storage.update(_.updated(span.context, span))

def onEnd(span: SpanData): F[Unit] =
storage.update(_.removed(span.spanContext))
def remove(span: SpanRef[F]): F[Unit] =
storage(span.context).set(None)

def get(context: SpanContext): F[Option[SpanRef[F]]] =
storage.get.map(_.get(context))

def forceFlush: F[Unit] =
Applicative[F].unit
storage(context).get
}

private[trace] object SpanStorage {
private object SpanStorage {
def create[F[_]: Concurrent]: F[SpanStorage[F]] =
for {
storage <- Ref[F].of(Map.empty[SpanContext, SpanRef[F]])
storage <- MapRef.ofShardedImmutableMap[F, SpanContext, SpanRef[F]](Runtime.getRuntime.availableProcessors())
} yield new SpanStorage[F](storage)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ private final case class TracerSharedState[F[_]](
resource: TelemetryResource,
spanLimits: SpanLimits,
sampler: Sampler[F],
spanProcessor: SpanProcessor[F]
spanProcessor: SpanProcessor[F],
spanStorage: SpanStorage[F]
)
Loading

0 comments on commit 2171738

Please sign in to comment.