Skip to content

Commit

Permalink
Merge pull request #825 from iRevive/benchmarks/metrics
Browse files Browse the repository at this point in the history
benchmarks: add metrics benchmark
  • Loading branch information
iRevive authored Oct 30, 2024
2 parents aa5639c + 846cd5b commit bb56bff
Show file tree
Hide file tree
Showing 2 changed files with 302 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/*
* Copyright 2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.typelevel.otel4s.benchmarks

import cats.effect.IO
import cats.effect.Resource
import cats.effect.unsafe.implicits.global
import cats.mtl.Ask
import cats.syntax.foldable._
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.ThreadParams
import org.typelevel.otel4s.Attribute
import org.typelevel.otel4s.Attributes
import org.typelevel.otel4s.metrics.Meter
import org.typelevel.otel4s.metrics.MeterProvider

import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit
import scala.util.chaining._

// benchmarks/Jmh/run org.typelevel.otel4s.benchmarks.MetricsBenchmark -prof gc
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@Fork(1)
class MetricsBenchmark {
import MetricsBenchmark._

@Benchmark
@Threads(1)
def recordToMultipleAttributes(state: BenchThreadState): Unit =
AttributesList.traverse_(attributes => state.op.run(attributes)).unsafeRunSync()

@Benchmark
@Threads(1)
def oneThread(state: BenchThreadState): Unit =
state.op.run(SharedAttributes).unsafeRunSync()

@Benchmark
@Threads(8)
def eightThreadsCommonLabelSet(state: BenchThreadState): Unit =
state.op.run(SharedAttributes).unsafeRunSync()

@Benchmark
@Threads(8)
def eightThreadsSeparateLabelSets(state: BenchThreadState): Unit =
state.op.run(state.threadUniqueLabelSet).unsafeRunSync()

}

object MetricsBenchmark {

private val AttributesList: List[Attributes] = {
val keys = 5
val valuePerKey = 5

List.tabulate(keys) { key =>
List
.tabulate(valuePerKey) { value =>
Attribute(s"key_$key", s"value_$value")
}
.to(Attributes)
}
}

private val SharedAttributes: Attributes =
Attributes(Attribute("key", "value"))

trait BenchOperation {
def run(attributes: Attributes): IO[Unit]
}

@State(Scope.Benchmark)
class BenchThreadState {

@Param(Array("oteljava", "sdk", "noop"))
var backend: String = _

@Param(Array("noop", "sdk_cumulative", "sdk_delta"))
var variation: String = _

@Param(Array("long_counter", "double_counter", "double_histogram", "long_histogram"))
var operation: String = _

var op: BenchOperation = _
var threadUniqueLabelSet: Attributes = _

private var finalizer: IO[Unit] = _

@Setup()
def setup(params: ThreadParams): Unit = {
threadUniqueLabelSet = Attributes(Attribute("key", params.getThreadIndex.toString))

backend match {
case "sdk" =>
val (o, release) = sdkMeter(variation).evalMap(bench(operation, _)).allocated.unsafeRunSync()

op = o
finalizer = release

case "oteljava" =>
val (o, release) = otelJavaMeter(variation).evalMap(bench(operation, _)).allocated.unsafeRunSync()

op = o
finalizer = release

case "noop" =>
op = bench(operation, noopMeter).unsafeRunSync()
finalizer = IO.unit

case other =>
sys.error(s"unknown backend [$other]")
}
}

@TearDown()
def cleanup(): Unit =
finalizer.unsafeRunSync()

}

private def bench(operation: String, meter: Meter[IO]): IO[BenchOperation] = {
operation match {
case "long_counter" =>
for {
counter <- meter.counter[Long]("counter.long").create
} yield new BenchOperation {
def run(attributes: Attributes): IO[Unit] = counter.add(5L, attributes)
}

case "double_counter" =>
for {
counter <- meter.counter[Double]("counter.double").create
} yield new BenchOperation {
def run(attributes: Attributes): IO[Unit] = counter.add(5.0, attributes)
}

case "long_histogram" =>
for {
histogram <- meter.histogram[Long]("histogram.long").create
} yield new BenchOperation {
def run(attributes: Attributes): IO[Unit] =
histogram.record(ThreadLocalRandom.current().nextLong(0, 20000), attributes)
}

case "double_histogram" =>
for {
histogram <- meter.histogram[Double]("histogram.double").create
} yield new BenchOperation {
def run(attributes: Attributes): IO[Unit] =
histogram.record(ThreadLocalRandom.current().nextDouble(0, 20000.0), attributes)
}

case other =>
sys.error(s"unknown operation [$other]")
}
}

private def otelJavaMeter(variation: String): Resource[IO, Meter[IO]] = {
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder
import org.typelevel.otel4s.oteljava.OtelJava

val namespace = "otel4s.sdk.metrics"

def create(
reader: InMemoryMetricReader,
customize: SdkMeterProviderBuilder => SdkMeterProviderBuilder = identity,
): Resource[IO, Meter[IO]] = {

def meterProvider = SdkMeterProvider
.builder()
.registerMetricReader(reader)
.pipe(customize)
.build()

def otel = OpenTelemetrySdk
.builder()
.setMeterProvider(meterProvider)
.build()

OtelJava
.resource[IO](IO(otel))
.evalMap(_.meterProvider.meter("otel4s.sdk.metrics").get)
}

variation match {
case "noop" =>
Resource.eval(MeterProvider.noop[IO].get(namespace))

case "sdk_cumulative" =>
create(InMemoryMetricReader.create())

case "sdk_delta" =>
create(InMemoryMetricReader.createDelta())

case other =>
sys.error(s"unknown variation [$other]")
}
}

private def sdkMeter(variation: String): Resource[IO, Meter[IO]] = {
import cats.effect.std.Random
import org.typelevel.otel4s.sdk.context.AskContext
import org.typelevel.otel4s.sdk.context.Context
import org.typelevel.otel4s.sdk.metrics.SdkMeterProvider
import org.typelevel.otel4s.sdk.metrics.data.AggregationTemporality
import org.typelevel.otel4s.sdk.metrics.exporter.AggregationTemporalitySelector
import org.typelevel.otel4s.sdk.testkit.metrics.InMemoryMetricReader

val namespace = "otel4s.sdk.metrics"

def create(
customize: SdkMeterProvider.Builder[IO] => SdkMeterProvider.Builder[IO] = identity,
aggregationTemporality: AggregationTemporalitySelector = AggregationTemporalitySelector.alwaysCumulative
): Resource[IO, Meter[IO]] =
Resource.eval {
Random.scalaUtilRandom[IO].flatMap { implicit random =>
InMemoryMetricReader.create[IO](aggregationTemporality).flatMap { reader =>
implicit val askContext: AskContext[IO] = Ask.const(Context.root)
SdkMeterProvider
.builder[IO]
.registerMetricReader(reader)
.pipe(customize)
.build
.flatMap(_.get(namespace))
}
}
}

variation match {
case "noop" =>
Resource.eval(MeterProvider.noop[IO].get(namespace))

case "sdk_cumulative" =>
create()

case "sdk_delta" =>
create(aggregationTemporality = _ => AggregationTemporality.Delta)

case other =>
sys.error(s"unknown variation [$other]")
}
}

private def noopMeter: Meter[IO] =
Meter.noop

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import cats.data.NonEmptyVector
import cats.effect.Concurrent
import cats.effect.std.AtomicCell
import cats.effect.std.Console
import cats.syntax.applicative._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.traverse._
Expand All @@ -45,19 +44,13 @@ import org.typelevel.otel4s.sdk.metrics.view.View

/** Stores aggregated metrics for synchronous instruments.
*/
private final class SynchronousStorage[
F[_]: Monad: Console,
A: MeasurementValue
](
private final class SynchronousStorage[F[_]: Monad: Console, A: MeasurementValue](
reader: RegisteredReader[F],
val metricDescriptor: MetricDescriptor,
aggregator: SynchronousStorage.SynchronousAggregator[F, A],
attributesProcessor: AttributesProcessor,
maxCardinality: Int,
accumulators: AtomicCell[
F,
Map[Attributes, Aggregator.Accumulator[F, A, PointData]]
]
accumulators: AtomicCell[F, Map[Attributes, Aggregator.Accumulator[F, A, PointData]]]
) extends MetricStorage.Synchronous[F, A] {

private val aggregationTemporality =
Expand All @@ -73,21 +66,17 @@ private final class SynchronousStorage[
v => !cast(v).isNaN
}

def record(
value: A,
attributes: Attributes,
context: Context
): F[Unit] =
(for {
handle <- getHandle(attributes, context)
_ <- handle.record(value, attributes, context)
} yield ()).whenA(isValid(value))

def collect(
resource: TelemetryResource,
scope: InstrumentationScope,
window: TimeWindow
): F[Option[MetricData]] = {
def record(value: A, attributes: Attributes, context: Context): F[Unit] =
if (isValid(value)) {
for {
handle <- getHandle(attributes, context)
_ <- handle.record(value, attributes, context)
} yield ()
} else {
Monad[F].unit
}

def collect(resource: TelemetryResource, scope: InstrumentationScope, window: TimeWindow): F[Option[MetricData]] = {
val isDelta = aggregationTemporality == AggregationTemporality.Delta

def toMetricData(points: Vector[PointData]): F[Option[MetricData]] =
Expand Down Expand Up @@ -131,30 +120,34 @@ private final class SynchronousStorage[
}
} yield points.flatten

private def getHandle(
attributes: Attributes,
context: Context
): F[Aggregator.Accumulator[F, A, PointData]] =
accumulators.evalModify { map =>
private def getHandle(attributes: Attributes, context: Context): F[Aggregator.Accumulator[F, A, PointData]] =
accumulators.get.flatMap { map =>
val processed = attributesProcessor.process(attributes, context)

def create(attrs: Attributes) =
for {
accumulator <- aggregator.createAccumulator
} yield (map.updated(attrs, accumulator), accumulator)

map.get(processed) match {
case Some(handle) =>
Monad[F].pure((map, handle))

case None if map.sizeIs >= maxCardinality =>
val overflowed = attributes.added(MetricStorage.OverflowAttribute)
cardinalityWarning >> map
.get(overflowed)
.fold(create(overflowed))(a => Monad[F].pure((map, a)))
Monad[F].pure(handle)

case None =>
create(processed)
accumulators.evalModify { map =>
def create(attrs: Attributes) =
for {
accumulator <- aggregator.createAccumulator
} yield (map.updated(attrs, accumulator), accumulator)

map.get(processed) match {
case Some(handle) =>
Monad[F].pure((map, handle))

case None if map.sizeIs >= maxCardinality =>
val overflowed = attributes.added(MetricStorage.OverflowAttribute)
cardinalityWarning >> map
.get(overflowed)
.fold(create(overflowed))(a => Monad[F].pure((map, a)))

case None =>
create(processed)
}
}
}
}

Expand Down

0 comments on commit bb56bff

Please sign in to comment.