diff --git a/benchmarks/src/main/scala/org/typelevel/otel4s/benchmarks/MetricsBenchmark.scala b/benchmarks/src/main/scala/org/typelevel/otel4s/benchmarks/MetricsBenchmark.scala new file mode 100644 index 000000000..54cf52fe9 --- /dev/null +++ b/benchmarks/src/main/scala/org/typelevel/otel4s/benchmarks/MetricsBenchmark.scala @@ -0,0 +1,266 @@ +/* + * Copyright 2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.benchmarks + +import cats.effect.IO +import cats.effect.Resource +import cats.effect.unsafe.implicits.global +import cats.mtl.Ask +import cats.syntax.foldable._ +import org.openjdk.jmh.annotations._ +import org.openjdk.jmh.infra.ThreadParams +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.metrics.Meter +import org.typelevel.otel4s.metrics.MeterProvider + +import java.util.concurrent.ThreadLocalRandom +import java.util.concurrent.TimeUnit +import scala.util.chaining._ + +// benchmarks/Jmh/run org.typelevel.otel4s.benchmarks.MetricsBenchmark -prof gc +@BenchmarkMode(Array(Mode.AverageTime)) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 10, time = 1) +@Fork(1) +class MetricsBenchmark { + import MetricsBenchmark._ + + @Benchmark + @Threads(1) + def recordToMultipleAttributes(state: BenchThreadState): Unit = + AttributesList.traverse_(attributes => state.op.run(attributes)).unsafeRunSync() + + @Benchmark + @Threads(1) + def oneThread(state: BenchThreadState): Unit = + state.op.run(SharedAttributes).unsafeRunSync() + + @Benchmark + @Threads(8) + def eightThreadsCommonLabelSet(state: BenchThreadState): Unit = + state.op.run(SharedAttributes).unsafeRunSync() + + @Benchmark + @Threads(8) + def eightThreadsSeparateLabelSets(state: BenchThreadState): Unit = + state.op.run(state.threadUniqueLabelSet).unsafeRunSync() + +} + +object MetricsBenchmark { + + private val AttributesList: List[Attributes] = { + val keys = 5 + val valuePerKey = 5 + + List.tabulate(keys) { key => + List + .tabulate(valuePerKey) { value => + Attribute(s"key_$key", s"value_$value") + } + .to(Attributes) + } + } + + private val SharedAttributes: Attributes = + Attributes(Attribute("key", "value")) + + trait BenchOperation { + def run(attributes: Attributes): IO[Unit] + } + + @State(Scope.Benchmark) + class BenchThreadState { + + @Param(Array("oteljava", "sdk", "noop")) + var backend: String = _ + + @Param(Array("noop", "sdk_cumulative", "sdk_delta")) + var variation: String = _ + + @Param(Array("long_counter", "double_counter", "double_histogram", "long_histogram")) + var operation: String = _ + + var op: BenchOperation = _ + var threadUniqueLabelSet: Attributes = _ + + private var finalizer: IO[Unit] = _ + + @Setup() + def setup(params: ThreadParams): Unit = { + threadUniqueLabelSet = Attributes(Attribute("key", params.getThreadIndex.toString)) + + backend match { + case "sdk" => + val (o, release) = sdkMeter(variation).evalMap(bench(operation, _)).allocated.unsafeRunSync() + + op = o + finalizer = release + + case "oteljava" => + val (o, release) = otelJavaMeter(variation).evalMap(bench(operation, _)).allocated.unsafeRunSync() + + op = o + finalizer = release + + case "noop" => + op = bench(operation, noopMeter).unsafeRunSync() + finalizer = IO.unit + + case other => + sys.error(s"unknown backend [$other]") + } + } + + @TearDown() + def cleanup(): Unit = + finalizer.unsafeRunSync() + + } + + private def bench(operation: String, meter: Meter[IO]): IO[BenchOperation] = { + operation match { + case "long_counter" => + for { + counter <- meter.counter[Long]("counter.long").create + } yield new BenchOperation { + def run(attributes: Attributes): IO[Unit] = counter.add(5L, attributes) + } + + case "double_counter" => + for { + counter <- meter.counter[Double]("counter.double").create + } yield new BenchOperation { + def run(attributes: Attributes): IO[Unit] = counter.add(5.0, attributes) + } + + case "long_histogram" => + for { + histogram <- meter.histogram[Long]("histogram.long").create + } yield new BenchOperation { + def run(attributes: Attributes): IO[Unit] = + histogram.record(ThreadLocalRandom.current().nextLong(0, 20000), attributes) + } + + case "double_histogram" => + for { + histogram <- meter.histogram[Double]("histogram.double").create + } yield new BenchOperation { + def run(attributes: Attributes): IO[Unit] = + histogram.record(ThreadLocalRandom.current().nextDouble(0, 20000.0), attributes) + } + + case other => + sys.error(s"unknown operation [$other]") + } + } + + private def otelJavaMeter(variation: String): Resource[IO, Meter[IO]] = { + import io.opentelemetry.sdk.OpenTelemetrySdk + import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader + import io.opentelemetry.sdk.metrics.SdkMeterProvider + import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder + import org.typelevel.otel4s.oteljava.OtelJava + + val namespace = "otel4s.sdk.metrics" + + def create( + reader: InMemoryMetricReader, + customize: SdkMeterProviderBuilder => SdkMeterProviderBuilder = identity, + ): Resource[IO, Meter[IO]] = { + + def meterProvider = SdkMeterProvider + .builder() + .registerMetricReader(reader) + .pipe(customize) + .build() + + def otel = OpenTelemetrySdk + .builder() + .setMeterProvider(meterProvider) + .build() + + OtelJava + .resource[IO](IO(otel)) + .evalMap(_.meterProvider.meter("otel4s.sdk.metrics").get) + } + + variation match { + case "noop" => + Resource.eval(MeterProvider.noop[IO].get(namespace)) + + case "sdk_cumulative" => + create(InMemoryMetricReader.create()) + + case "sdk_delta" => + create(InMemoryMetricReader.createDelta()) + + case other => + sys.error(s"unknown variation [$other]") + } + } + + private def sdkMeter(variation: String): Resource[IO, Meter[IO]] = { + import cats.effect.std.Random + import org.typelevel.otel4s.sdk.context.AskContext + import org.typelevel.otel4s.sdk.context.Context + import org.typelevel.otel4s.sdk.metrics.SdkMeterProvider + import org.typelevel.otel4s.sdk.metrics.data.AggregationTemporality + import org.typelevel.otel4s.sdk.metrics.exporter.AggregationTemporalitySelector + import org.typelevel.otel4s.sdk.testkit.metrics.InMemoryMetricReader + + val namespace = "otel4s.sdk.metrics" + + def create( + customize: SdkMeterProvider.Builder[IO] => SdkMeterProvider.Builder[IO] = identity, + aggregationTemporality: AggregationTemporalitySelector = AggregationTemporalitySelector.alwaysCumulative + ): Resource[IO, Meter[IO]] = + Resource.eval { + Random.scalaUtilRandom[IO].flatMap { implicit random => + InMemoryMetricReader.create[IO](aggregationTemporality).flatMap { reader => + implicit val askContext: AskContext[IO] = Ask.const(Context.root) + SdkMeterProvider + .builder[IO] + .registerMetricReader(reader) + .pipe(customize) + .build + .flatMap(_.get(namespace)) + } + } + } + + variation match { + case "noop" => + Resource.eval(MeterProvider.noop[IO].get(namespace)) + + case "sdk_cumulative" => + create() + + case "sdk_delta" => + create(aggregationTemporality = _ => AggregationTemporality.Delta) + + case other => + sys.error(s"unknown variation [$other]") + } + } + + private def noopMeter: Meter[IO] = + Meter.noop + +} diff --git a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/internal/storage/SynchronousStorage.scala b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/internal/storage/SynchronousStorage.scala index 0be966e24..d627d6a2d 100644 --- a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/internal/storage/SynchronousStorage.scala +++ b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/internal/storage/SynchronousStorage.scala @@ -21,7 +21,6 @@ import cats.data.NonEmptyVector import cats.effect.Concurrent import cats.effect.std.AtomicCell import cats.effect.std.Console -import cats.syntax.applicative._ import cats.syntax.flatMap._ import cats.syntax.functor._ import cats.syntax.traverse._ @@ -45,19 +44,13 @@ import org.typelevel.otel4s.sdk.metrics.view.View /** Stores aggregated metrics for synchronous instruments. */ -private final class SynchronousStorage[ - F[_]: Monad: Console, - A: MeasurementValue -]( +private final class SynchronousStorage[F[_]: Monad: Console, A: MeasurementValue]( reader: RegisteredReader[F], val metricDescriptor: MetricDescriptor, aggregator: SynchronousStorage.SynchronousAggregator[F, A], attributesProcessor: AttributesProcessor, maxCardinality: Int, - accumulators: AtomicCell[ - F, - Map[Attributes, Aggregator.Accumulator[F, A, PointData]] - ] + accumulators: AtomicCell[F, Map[Attributes, Aggregator.Accumulator[F, A, PointData]]] ) extends MetricStorage.Synchronous[F, A] { private val aggregationTemporality = @@ -73,21 +66,17 @@ private final class SynchronousStorage[ v => !cast(v).isNaN } - def record( - value: A, - attributes: Attributes, - context: Context - ): F[Unit] = - (for { - handle <- getHandle(attributes, context) - _ <- handle.record(value, attributes, context) - } yield ()).whenA(isValid(value)) - - def collect( - resource: TelemetryResource, - scope: InstrumentationScope, - window: TimeWindow - ): F[Option[MetricData]] = { + def record(value: A, attributes: Attributes, context: Context): F[Unit] = + if (isValid(value)) { + for { + handle <- getHandle(attributes, context) + _ <- handle.record(value, attributes, context) + } yield () + } else { + Monad[F].unit + } + + def collect(resource: TelemetryResource, scope: InstrumentationScope, window: TimeWindow): F[Option[MetricData]] = { val isDelta = aggregationTemporality == AggregationTemporality.Delta def toMetricData(points: Vector[PointData]): F[Option[MetricData]] = @@ -131,30 +120,34 @@ private final class SynchronousStorage[ } } yield points.flatten - private def getHandle( - attributes: Attributes, - context: Context - ): F[Aggregator.Accumulator[F, A, PointData]] = - accumulators.evalModify { map => + private def getHandle(attributes: Attributes, context: Context): F[Aggregator.Accumulator[F, A, PointData]] = + accumulators.get.flatMap { map => val processed = attributesProcessor.process(attributes, context) - - def create(attrs: Attributes) = - for { - accumulator <- aggregator.createAccumulator - } yield (map.updated(attrs, accumulator), accumulator) - map.get(processed) match { case Some(handle) => - Monad[F].pure((map, handle)) - - case None if map.sizeIs >= maxCardinality => - val overflowed = attributes.added(MetricStorage.OverflowAttribute) - cardinalityWarning >> map - .get(overflowed) - .fold(create(overflowed))(a => Monad[F].pure((map, a))) + Monad[F].pure(handle) case None => - create(processed) + accumulators.evalModify { map => + def create(attrs: Attributes) = + for { + accumulator <- aggregator.createAccumulator + } yield (map.updated(attrs, accumulator), accumulator) + + map.get(processed) match { + case Some(handle) => + Monad[F].pure((map, handle)) + + case None if map.sizeIs >= maxCardinality => + val overflowed = attributes.added(MetricStorage.OverflowAttribute) + cardinalityWarning >> map + .get(overflowed) + .fold(create(overflowed))(a => Monad[F].pure((map, a))) + + case None => + create(processed) + } + } } }