diff --git a/benchmarks/src/main/scala/org/typelevel/otel4s/benchmarks/BatchSpanProcessorBenchmark.scala b/benchmarks/src/main/scala/org/typelevel/otel4s/benchmarks/BatchSpanProcessorBenchmark.scala new file mode 100644 index 000000000..aa96ce17e --- /dev/null +++ b/benchmarks/src/main/scala/org/typelevel/otel4s/benchmarks/BatchSpanProcessorBenchmark.scala @@ -0,0 +1,199 @@ +/* + * Copyright 2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.benchmarks + +import cats.Foldable +import cats.effect.IO +import cats.effect.Resource +import cats.effect.std.Random +import cats.effect.unsafe.implicits.global +import cats.syntax.foldable._ +import org.openjdk.jmh.annotations._ + +import java.util.concurrent.TimeUnit +import scala.concurrent.duration._ + +// benchmarks/Jmh/run org.typelevel.otel4s.benchmarks.BatchSpanExporterBenchmark -prof gc +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +@OutputTimeUnit(TimeUnit.SECONDS) +@Threads(5) +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 10, time = 1) +class BatchSpanProcessorBenchmark { + + import BatchSpanProcessorBenchmark._ + + @Param(Array("oteljava", "sdk")) + var backend: String = _ + + @Param(Array("0", "1", "5")) + var delayMs: Int = _ + + @Param(Array("1000", "2000", "5000")) + var spanCount: Int = _ + + private var processor: Processor = _ + private var finalizer: IO[Unit] = _ + + @Benchmark + def doExport(): Unit = + processor.doExport() + + @Setup(Level.Trial) + def setup(): Unit = + backend match { + case "oteljava" => + val (proc, release) = Processor.otelJava(delayMs.millis, spanCount).allocated.unsafeRunSync() + + processor = proc + finalizer = release + + case "sdk" => + val (proc, release) = Processor.sdk(delayMs.millis, spanCount).allocated.unsafeRunSync() + + processor = proc + finalizer = release + + case other => + sys.error(s"unknown backend [$other]") + } + + @TearDown(Level.Trial) + def cleanup(): Unit = + finalizer.unsafeRunSync() +} + +object BatchSpanProcessorBenchmark { + + trait Processor { + def doExport(): Unit + } + + object Processor { + + def otelJava(delay: FiniteDuration, spanCount: Int): Resource[IO, Processor] = { + import io.opentelemetry.api.trace.Span + import io.opentelemetry.sdk.common.CompletableResultCode + import io.opentelemetry.sdk.trace.{ReadableSpan, SdkTracerProvider} + import io.opentelemetry.sdk.trace.data.SpanData + import io.opentelemetry.sdk.trace.`export`.{BatchSpanProcessor, SpanExporter} + import java.util.concurrent.Executors + import java.util.concurrent.ScheduledExecutorService + + def toIO(codeIO: IO[CompletableResultCode]): IO[Unit] = + codeIO.flatMap { code => + IO.async[Unit] { cb => + IO.delay { + code.whenComplete { () => + cb(Either.cond(code.isSuccess, (), new RuntimeException("OpenTelemetry SDK async operation failed"))) + } + None + } + } + } + + def exporter(executor: ScheduledExecutorService): SpanExporter = new SpanExporter { + def `export`(spans: java.util.Collection[SpanData]): CompletableResultCode = { + val result = new CompletableResultCode() + executor.schedule(() => result.succeed(), delay.toMillis, TimeUnit.MILLISECONDS) + result + } + + def flush(): CompletableResultCode = + CompletableResultCode.ofSuccess() + + def shutdown(): CompletableResultCode = + CompletableResultCode.ofSuccess() + } + + val tracer = SdkTracerProvider.builder().build().get("benchmarkTracer") + + val spans: Vector[Span] = + Vector.fill(spanCount)(tracer.spanBuilder("span").startSpan()) + + def makeBsp(executor: ScheduledExecutorService) = + BatchSpanProcessor + .builder(exporter(executor)) + .setMaxQueueSize(spanCount * 2) + .build + + for { + executor <- Resource.make(IO.delay(Executors.newScheduledThreadPool(5)))(e => IO.delay(e.shutdown())) + bsp <- Resource.make(IO.delay(makeBsp(executor)))(r => toIO(IO.delay(r.shutdown()))) + } yield new Processor { + def doExport(): Unit = { + spans.foreach(span => bsp.onEnd(span.asInstanceOf[ReadableSpan])) + val _ = bsp.forceFlush().join(10, TimeUnit.MINUTES) + () + } + } + } + + def sdk(delay: FiniteDuration, spanCount: Int): Resource[IO, Processor] = { + import org.typelevel.otel4s.trace.{TraceFlags, TraceState} + import org.typelevel.otel4s.trace.{SpanContext, SpanKind} + import org.typelevel.otel4s.sdk.TelemetryResource + import org.typelevel.otel4s.sdk.common.InstrumentationScope + import org.typelevel.otel4s.sdk.trace.IdGenerator + import org.typelevel.otel4s.sdk.trace.data.{LimitedData, SpanData, StatusData} + import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter + import org.typelevel.otel4s.sdk.trace.processor.BatchSpanProcessor + + val exporter: SpanExporter[IO] = new SpanExporter[IO] { + def name: String = s"DelayExporter($delay)" + def exportSpans[G[_]: Foldable](spans: G[SpanData]): IO[Unit] = IO.sleep(delay) + def flush: IO[Unit] = IO.unit + } + + def mkSpanData(idGenerator: IdGenerator[IO], random: Random[IO]): IO[SpanData] = + for { + name <- random.nextString(20) + traceId <- idGenerator.generateTraceId + spanId <- idGenerator.generateSpanId + } yield SpanData( + name = name, + spanContext = SpanContext(traceId, spanId, TraceFlags.Sampled, TraceState.empty, remote = false), + parentSpanContext = None, + kind = SpanKind.Internal, + startTimestamp = Duration.Zero, + endTimestamp = None, + status = StatusData.Ok, + attributes = LimitedData.attributes(Int.MaxValue, 1024), + events = LimitedData.events(Int.MaxValue), + links = LimitedData.links(Int.MaxValue), + instrumentationScope = InstrumentationScope.empty, + resource = TelemetryResource.empty + ) + + for { + bsp <- BatchSpanProcessor.builder[IO](exporter).withMaxQueueSize(spanCount * 2).build + spans <- Resource.eval( + Random.scalaUtilRandom[IO].flatMap { implicit random => + val generator = IdGenerator.random[IO] + mkSpanData(generator, random).replicateA(spanCount) + } + ) + } yield new Processor { + def doExport(): Unit = + (spans.traverse_(span => bsp.onEnd(span)) >> bsp.forceFlush).unsafeRunSync() + } + } + + } + +} diff --git a/build.sbt b/build.sbt index 40e009dae..8b2b721fe 100644 --- a/build.sbt +++ b/build.sbt @@ -710,7 +710,7 @@ lazy val benchmarks = project .enablePlugins(NoPublishPlugin) .enablePlugins(JmhPlugin) .in(file("benchmarks")) - .dependsOn(core.jvm, sdk.jvm, oteljava) + .dependsOn(core.jvm, sdk.jvm, `sdk-testkit`.jvm, oteljava) .settings( name := "otel4s-benchmarks", libraryDependencies ++= Seq( diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessor.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessor.scala index b661aee8d..7f3ece7a5 100644 --- a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessor.scala +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessor.scala @@ -58,6 +58,8 @@ private final class BatchSpanProcessor[F[_]: Temporal: Console] private ( ) extends SpanProcessor[F] { import BatchSpanProcessor.State + private val unit = Temporal[F].unit + val name: String = "BatchSpanProcessor{" + s"exporter=${exporter.name}, " + s"scheduleDelay=${config.scheduleDelay}, " + @@ -69,29 +71,27 @@ private final class BatchSpanProcessor[F[_]: Temporal: Console] private ( val isEndRequired: Boolean = true def onStart(parentContext: Option[SpanContext], span: SpanRef[F]): F[Unit] = - Temporal[F].unit - - def onEnd(span: SpanData): F[Unit] = { - val canExport = span.spanContext.isSampled - - // if 'spansNeeded' is defined, it means the worker is waiting for a certain number of spans - // and it waits for the 'signal'-latch to be released - // hence, if the queue size is >= than the number of needed spans, the latch can be released - def notifyWorker: F[Unit] = - (queue.size, state.get) - .mapN { (queueSize, state) => - state.spansNeeded.exists(needed => queueSize >= needed) - } - .ifM(signal.release, Temporal[F].unit) + unit + + def onEnd(span: SpanData): F[Unit] = + if (span.spanContext.isSampled) { + // if 'spansNeeded' is defined, it means the worker is waiting for a certain number of spans + // and it waits for the 'signal'-latch to be released + // hence, if the queue size is >= than the number of needed spans, the latch can be released + def notifyWorker: F[Unit] = + for { + queueSize <- queue.size + state <- state.get + _ <- if (state.spansNeeded.exists(needed => queueSize >= needed)) signal.release else unit + } yield () - def enqueue = for { offered <- queue.tryOffer(span) - _ <- notifyWorker.whenA(offered) + _ <- if (offered) notifyWorker else unit } yield () - - enqueue.whenA(canExport) - } + } else { + unit + } def forceFlush: F[Unit] = exportAll @@ -117,12 +117,14 @@ private final class BatchSpanProcessor[F[_]: Temporal: Console] private ( val request = for { _ <- state.update(_.copy(spansNeeded = Some(spansNeeded))) - _ <- signal.await.timeoutTo(pollWaitTime, Temporal[F].unit) + _ <- signal.await.timeoutTo(pollWaitTime, unit) } yield () - poll(request) - .guarantee(state.update(_.copy(spansNeeded = None))) - .whenA(pollWaitTime > Duration.Zero) + if (pollWaitTime > Duration.Zero) { + poll(request).guarantee(state.update(_.copy(spansNeeded = None))) + } else { + unit + } } for {