Skip to content

Commit

Permalink
Merge pull request #457 from iRevive/metrics/generic-observable-up-do…
Browse files Browse the repository at this point in the history
…wn-counter

metrics: make `Meter[F].observableUpDownCounter` generic
  • Loading branch information
iRevive authored Jan 28, 2024
2 parents 99c6ce5 + b9725f6 commit 069df5f
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Choose the type of an instrument explicitly, for example:
3) `.histogram[Long](...)` or `.histogram[Double](...)`
4) `.observableGauge[Long](...)` or `.observableGauge[Double](...)`
5) `.observableCounter[Long](...)` or `.observableCounter[Double](...)`
6) `.observableUpDownCounter[Long](...)` or `.observableUpDownCounter[Double](...)`
"""
)
implicit val longMeasurementValue: MeasurementValue[Long] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,20 +208,48 @@ trait Meter[F[_]] {
): ObservableCounter.Builder[F, A]

/** Creates a builder of [[ObservableUpDownCounter]] instrument that collects
* [[scala.Long]] values from the given callback.
* values of type `A` from the given callback.
*
* The [[ObservableUpDownCounter]] is non-monotonic. This means the
* aggregated value can increase and decrease.
*
* @note
* the `A` type must be provided explicitly, for example
* `meter.observableUpDownCounter[Long]` or
* `meter.observableUpDownCounter[Double]`
*
* @example
* {{{
* val meter: Meter[F] = ???
*
* val doubleObservableUpDownCounter: Resource[F, ObservableUpDownCounter] =
* meter
* .observableUpDownCounter[Double]("double-up-down-counter")
* .create(Sync[F].delay(List(Measurement(1.0))))
*
* val longObservableUpDownCounter: Resource[F, ObservableUpDownCounter] =
* meter
* .observableUpDownCounter[Long]("long-up-down-counter")
* .create(Sync[F].delay(List(Measurement(1L))))
* }}}
*
* @note
* the `A` type must be provided explicitly, for example
* `meter.observableCounter[Long]` or `meter.observableCounter[Double]`
*
* @see
* See [[observableCounter]] for monotonic alternative
*
* @param name
* the name of the instrument
*
* @tparam A
* the type of the measurement. [[scala.Long]] and [[scala.Double]] are
* supported out of the box
*/
def observableUpDownCounter(
def observableUpDownCounter[A: MeasurementValue](
name: String
): ObservableInstrumentBuilder[F, Long, ObservableUpDownCounter]
): ObservableUpDownCounter.Builder[F, A]

}

Expand Down Expand Up @@ -308,20 +336,21 @@ object Meter {
Resource.pure(new ObservableCounter {})
}

def observableUpDownCounter(
def observableUpDownCounter[A: MeasurementValue](
name: String
): ObservableInstrumentBuilder[F, Long, ObservableUpDownCounter] =
new ObservableInstrumentBuilder[F, Long, ObservableUpDownCounter] {
type Self = this.type

def withUnit(unit: String): Self = this
def withDescription(description: String): Self = this
def create(
measurements: F[List[Measurement[Long]]]
): ObservableUpDownCounter.Builder[F, A] =
new ObservableUpDownCounter.Builder[F, A] {
def withUnit(unit: String): ObservableUpDownCounter.Builder[F, A] =
this
def withDescription(
description: String
): ObservableUpDownCounter.Builder[F, A] = this
def createWithCallback(
cb: ObservableMeasurement[F, A] => F[Unit]
): Resource[F, ObservableUpDownCounter] =
Resource.pure(new ObservableUpDownCounter {})
def createWithCallback(
cb: ObservableMeasurement[F, Long] => F[Unit]
def create(
measurements: F[Iterable[Measurement[A]]]
): Resource[F, ObservableUpDownCounter] =
Resource.pure(new ObservableUpDownCounter {})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,79 @@

package org.typelevel.otel4s.metrics

import cats.effect.Resource

trait ObservableUpDownCounter

object ObservableUpDownCounter {

/** A builder of [[ObservableUpDownCounter]].
*
* @tparam F
* the higher-kinded type of a polymorphic effect
*
* @tparam A
* the type of the values to record. The type must have an instance of
* [[MeasurementValue]]. [[scala.Long]] and [[scala.Double]] are supported
* out of the box.
*/
trait Builder[F[_], A] {

/** Sets the unit of measure for this instrument.
*
* @see
* [[https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument-unit Instrument unit]]
*
* @param unit
* the measurement unit. Must be 63 or fewer ASCII characters.
*/
def withUnit(unit: String): Builder[F, A]

/** Sets the description for this instrument.
*
* @see
* [[https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument-description Instrument Description]]
*
* @param description
* the description
*/
def withDescription(description: String): Builder[F, A]

/** Creates an instrument with the given callback, using `unit` and
* `description` (if any).
*
* The callback will be called when the instrument is being observed.
*
* The callback is expected to abide by the following restrictions:
* - Short-living and (ideally) non-blocking
* - Run in a finite amount of time
* - Safe to call repeatedly, across multiple threads
*
* @param cb
* the callback which observes measurements when invoked
*/
def createWithCallback(
cb: ObservableMeasurement[F, A] => F[Unit]
): Resource[F, ObservableUpDownCounter]

/** Creates an asynchronous instrument based on an effect that produces a
* number of measurements.
*
* The measurement effect will be evaluated when the instrument is being
* observed.
*
* The measurement effect is expected to abide by the following
* restrictions:
* - Short-living and (ideally) non-blocking
* - Run in a finite amount of time
* - Safe to call repeatedly, across multiple threads
*
* @param measurements
* effect that produces a number of measurements
*/
def create(
measurements: F[Iterable[Measurement[A]]]
): Resource[F, ObservableUpDownCounter]
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ private[oteljava] class MeterImpl[F[_]: Async](jMeter: JMeter)
): ObservableCounter.Builder[F, A] =
ObservableCounterBuilderImpl(jMeter, name)

def observableUpDownCounter(
def observableUpDownCounter[A: MeasurementValue](
name: String
): ObservableInstrumentBuilder[F, Long, ObservableUpDownCounter] =
new ObservableUpDownCounterBuilderImpl(jMeter, name)
): ObservableUpDownCounter.Builder[F, A] =
ObservableUpDownCounterBuilderImpl(jMeter, name)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,55 +23,162 @@ import cats.effect.kernel.Resource
import cats.effect.std.Dispatcher
import cats.syntax.all._
import io.opentelemetry.api.metrics.{Meter => JMeter}
import io.opentelemetry.api.metrics.{
ObservableMeasurement => JObservableMeasurement
}
import io.opentelemetry.api.metrics.LongUpDownCounterBuilder
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
import io.opentelemetry.api.metrics.ObservableLongMeasurement
import org.typelevel.otel4s.metrics._

private[oteljava] case class ObservableUpDownCounterBuilderImpl[F[_]](
jMeter: JMeter,
private[oteljava] case class ObservableUpDownCounterBuilderImpl[F[_], A](
factory: ObservableUpDownCounterBuilderImpl.Factory[F, A],
name: String,
unit: Option[String] = None,
description: Option[String] = None
)(implicit F: Async[F])
extends ObservableInstrumentBuilder[F, Long, ObservableUpDownCounter] {
) extends ObservableUpDownCounter.Builder[F, A] {

type Self = ObservableUpDownCounterBuilderImpl[F]
def withUnit(unit: String): ObservableUpDownCounter.Builder[F, A] =
copy(unit = Option(unit))

def withUnit(unit: String): Self = copy(unit = Option(unit))
def withDescription(description: String): Self =
def withDescription(
description: String
): ObservableUpDownCounter.Builder[F, A] =
copy(description = Option(description))

private def createInternal(
cb: ObservableLongMeasurement => F[Unit]
): Resource[F, ObservableUpDownCounter] =
Dispatcher.sequential.flatMap(dispatcher =>
Resource
.fromAutoCloseable(F.delay {
val b = jMeter.upDownCounterBuilder(name)
unit.foreach(b.setUnit)
description.foreach(b.setDescription)
b.buildWithCallback { olm =>
dispatcher.unsafeRunSync(cb(olm))
}
})
.as(new ObservableUpDownCounter {})
)

def createWithCallback(
cb: ObservableMeasurement[F, Long] => F[Unit]
cb: ObservableMeasurement[F, A] => F[Unit]
): Resource[F, ObservableUpDownCounter] =
createInternal(olm => cb(new ObservableLongImpl(olm)))
factory.createWithCallback(name, unit, description, cb)

def create(
measurements: F[List[Measurement[Long]]]
measurements: F[Iterable[Measurement[A]]]
): Resource[F, ObservableUpDownCounter] =
createInternal(olm =>
measurements.flatMap(ms =>
F.delay(
ms.foreach(m =>
olm.record(m.value, Conversions.toJAttributes(m.attributes))
)
factory.create(name, unit, description, measurements)
}

private[oteljava] object ObservableUpDownCounterBuilderImpl {

def apply[F[_]: Async, A: MeasurementValue](
jMeter: JMeter,
name: String
): ObservableUpDownCounter.Builder[F, A] =
MeasurementValue[A] match {
case MeasurementValue.LongMeasurementValue(cast) =>
ObservableUpDownCounterBuilderImpl(longFactory(jMeter, cast), name)

case MeasurementValue.DoubleMeasurementValue(cast) =>
ObservableUpDownCounterBuilderImpl(doubleFactory(jMeter, cast), name)
}

private[oteljava] sealed abstract class Factory[F[_]: Async, A](
jMeter: JMeter
) {
type JMeasurement <: JObservableMeasurement

final def create(
name: String,
unit: Option[String],
description: Option[String],
measurements: F[Iterable[Measurement[A]]]
): Resource[F, ObservableUpDownCounter] =
createInternal(name, unit, description) { om =>
measurements.flatMap { ms =>
Async[F].delay(ms.foreach(m => doRecord(om, m.value, m.attributes)))
}
}

final def createWithCallback(
name: String,
unit: Option[String],
description: Option[String],
cb: ObservableMeasurement[F, A] => F[Unit]
): Resource[F, ObservableUpDownCounter] =
createInternal(name, unit, description) { om =>
cb(
new ObservableMeasurement[F, A] {
def record(value: A, attributes: Attributes): F[Unit] =
Async[F].delay(
doRecord(om, value, attributes)
)
}
)
)
)
}

protected def create(
builder: LongUpDownCounterBuilder,
dispatcher: Dispatcher[F],
cb: JMeasurement => F[Unit]
): AutoCloseable

protected def doRecord(
measurement: JMeasurement,
value: A,
attributes: Attributes
): Unit

private final def createInternal(
name: String,
unit: Option[String],
description: Option[String]
)(cb: JMeasurement => F[Unit]): Resource[F, ObservableUpDownCounter] =
Dispatcher.sequential.flatMap { dispatcher =>
Resource
.fromAutoCloseable(Async[F].delay {
val b = jMeter.upDownCounterBuilder(name)
unit.foreach(b.setUnit)
description.foreach(b.setDescription)
create(b, dispatcher, cb)
})
.as(new ObservableUpDownCounter {})
}

}

private def longFactory[F[_]: Async, A](
jMeter: JMeter,
cast: A => Long
): Factory[F, A] =
new Factory[F, A](jMeter) {
type JMeasurement = ObservableLongMeasurement

protected def create(
builder: LongUpDownCounterBuilder,
dispatcher: Dispatcher[F],
cb: ObservableLongMeasurement => F[Unit]
): AutoCloseable =
builder.buildWithCallback(om => dispatcher.unsafeRunSync(cb(om)))

protected def doRecord(
om: ObservableLongMeasurement,
value: A,
attributes: Attributes
): Unit =
om.record(cast(value), Conversions.toJAttributes(attributes))
}

private def doubleFactory[F[_]: Async, A](
jMeter: JMeter,
cast: A => Double
): Factory[F, A] =
new Factory[F, A](jMeter) {
type JMeasurement = ObservableDoubleMeasurement

protected def create(
builder: LongUpDownCounterBuilder,
dispatcher: Dispatcher[F],
cb: ObservableDoubleMeasurement => F[Unit]
): AutoCloseable =
builder
.ofDoubles()
.buildWithCallback(om => dispatcher.unsafeRunSync(cb(om)))

protected def doRecord(
om: ObservableDoubleMeasurement,
value: A,
attributes: Attributes
): Unit =
om.record(cast(value), Conversions.toJAttributes(attributes))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class ObservableSuite extends CatsEffectSuite {
.get

_ <- meter
.observableUpDownCounter("updowncounter")
.observableUpDownCounter[Long]("updowncounter")
.withUnit("unit")
.withDescription("description")
.createWithCallback(
Expand All @@ -152,7 +152,7 @@ class ObservableSuite extends CatsEffectSuite {
)

_ <- meter
.observableUpDownCounter("updowncounter")
.observableUpDownCounter[Long]("updowncounter")
.withUnit("unit")
.withDescription("description")
.create(
Expand Down

0 comments on commit 069df5f

Please sign in to comment.