From 217fbe4d0097435bc99a1629da0781de2ed25e32 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Sun, 7 Jan 2024 11:44:15 +0200 Subject: [PATCH] sdk-trace: add `BatchSpanProcessor` --- .../trace/processor/BatchSpanProcessor.scala | 325 ++++++++++++++++++ .../processor/BatchSpanProcessorSuite.scala | 111 ++++++ 2 files changed, 436 insertions(+) create mode 100644 sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessor.scala create mode 100644 sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessorSuite.scala diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessor.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessor.scala new file mode 100644 index 000000000..31484ccaf --- /dev/null +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessor.scala @@ -0,0 +1,325 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk +package trace +package processor + +import cats.effect.Ref +import cats.effect.Resource +import cats.effect.Temporal +import cats.effect.std.Console +import cats.effect.std.CountDownLatch +import cats.effect.std.Queue +import cats.effect.syntax.monadCancel._ +import cats.effect.syntax.spawn._ +import cats.effect.syntax.temporal._ +import cats.syntax.all._ +import org.typelevel.otel4s.sdk.trace.data.SpanData +import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter +import org.typelevel.otel4s.trace.SpanContext + +import scala.concurrent.duration._ + +/** Implementation of the [[SpanProcessor]] that batches spans exported by the + * SDK then pushes them to the exporter pipeline. + * + * All spans reported by the SDK implementation are first added to a queue. If + * the queue is full (with a `config.maxQueueSize` maximum size), the incoming + * spans are dropped. + * + * Spans are exported either when there are `config.maxExportBatchSize` pending + * spans or `config.scheduleDelay` has passed since the last export attempt. + * + * @see + * [[https://opentelemetry.io/docs/specs/otel/trace/sdk/#batching-processor]] + * + * @tparam F + * the higher-kinded type of a polymorphic effect + */ +final class BatchSpanProcessor[F[_]: Temporal: Console] private ( + queue: Queue[F, SpanData], + signal: CountDownLatch[F], + state: Ref[F, BatchSpanProcessor.State], + exporter: SpanExporter[F], + config: BatchSpanProcessor.Config +) extends SpanProcessor[F] { + import BatchSpanProcessor.State + + val name: String = "BatchSpanProcessor{" + + s"exporter=${exporter.name}, " + + s"scheduleDelay=${config.scheduleDelay}, " + + s"exporterTimeout=${config.exporterTimeout}, " + + s"maxQueueSize=${config.maxQueueSize}, " + + s"maxExportBatchSize=${config.maxExportBatchSize}}" + + val isStartRequired: Boolean = false + val isEndRequired: Boolean = true + + def onStart(parentContext: Option[SpanContext], span: SpanRef[F]): F[Unit] = + Temporal[F].unit + + def onEnd(span: SpanData): F[Unit] = { + val canExport = span.spanContext.isSampled + + // if 'spansNeeded' is defined, it means the worker is waiting for a certain number of spans + // and it waits for the 'signal'-latch to be released + // hence, if the queue size is >= than the number of needed spans, the latch can be released + def notifyWorker: F[Unit] = + (queue.size, state.get) + .mapN { (queueSize, state) => + state.spansNeeded.exists(needed => queueSize >= needed) + } + .ifM(signal.release, Temporal[F].unit) + + def enqueue = + for { + offered <- queue.tryOffer(span) + _ <- notifyWorker.whenA(offered) + } yield () + + enqueue.whenA(canExport) + } + + def forceFlush: F[Unit] = + exportAll + + private def worker: F[Unit] = + run.foreverM[Unit] + + private def run: F[Unit] = + Temporal[F].uncancelable { poll => + // export the current batch and reset the state + def doExport(now: FiniteDuration, batch: Vector[SpanData]): F[Unit] = + poll(exportBatch(batch)).guarantee( + state.set(State(now + config.scheduleDelay, Vector.empty, None)) + ) + + // wait either for: + // 1) the signal - it means the queue has enough spans + // 2) the timeout - it means we can export all remaining spans + def pollMore( + now: FiniteDuration, + nextExportTime: FiniteDuration, + currentBatchSize: Int + ): F[Unit] = { + val pollWaitTime = nextExportTime - now + val spansNeeded = config.maxExportBatchSize - currentBatchSize + + val request = + for { + _ <- state.update(_.copy(spansNeeded = Some(spansNeeded))) + _ <- signal.await.timeoutTo(pollWaitTime, Temporal[F].unit) + } yield () + + poll(request) + .guarantee(state.update(_.copy(spansNeeded = None))) + .whenA(pollWaitTime > Duration.Zero) + } + + for { + st <- poll(state.get) + + // try to take enough spans to fill up the current batch + spans <- queue.tryTakeN(Some(config.maxExportBatchSize - st.batch.size)) + + // modify the state with the updated batch + nextState <- state.updateAndGet(_.copy(batch = st.batch ++ spans)) + + // the current timestamp + now <- Temporal[F].monotonic + + _ <- { + val batch = nextState.batch + val nextExportTime = nextState.nextExportTime + + // two reasons to export: + // 1) the current batch size exceeds the limit + // 2) the worker is behind the scheduled export time + val canExport = + batch.size >= config.maxExportBatchSize || now >= nextExportTime + + if (canExport) doExport(now, batch) + else pollMore(now, nextExportTime, batch.size) + } + } yield () + } + + // export all available data + private def exportAll: F[Unit] = + for { + st <- state.get + spanData <- queue.tryTakeN(None) + all = st.batch ++ spanData + _ <- all.grouped(config.maxExportBatchSize).toList.traverse_(exportBatch) + _ <- state.update(_.copy(batch = Vector.empty, spansNeeded = None)) + } yield () + + private def exportBatch(batch: Vector[SpanData]): F[Unit] = + exporter + .exportSpans(batch) + .timeoutTo( + config.exporterTimeout, + Console[F].error( + s"BatchSpanProcessor: the export attempt has been canceled after [${config.exporterTimeout}]" + ) + ) + .handleErrorWith { e => + Console[F].error( + s"BatchSpanProcessor: the export has failed: ${e.getMessage}\n${e.getStackTrace.mkString("\n")}\n" + ) + } + +} + +object BatchSpanProcessor { + + /** Builder for [[BatchSpanProcessor]]. */ + trait Builder[F[_]] { + + /** Sets the delay interval between two consecutive exports. + * + * Default value is `5 seconds`. + */ + def withScheduleDelay(delay: FiniteDuration): Builder[F] + + /** Sets the maximum time an export will be allowed to run before being + * cancelled. + * + * Default value is `30 seconds`. + */ + def withExporterTimeout(timeout: FiniteDuration): Builder[F] + + /** Sets the maximum number of Spans that are kept in the queue before start + * dropping. More memory than this value may be allocated to optimize queue + * access. + * + * Default value is `2048`. + */ + def withMaxQueueSize(maxQueueSize: Int): Builder[F] + + /** Sets the maximum batch size for every export. This must be smaller or + * equal to `maxQueueSize`. + * + * Default value is `512`. + */ + def withMaxExportBatchSize(maxExportBatchSize: Int): Builder[F] + + /** Creates a [[BatchSpanProcessor]] with the configuration of this builder. + */ + def build: Resource[F, BatchSpanProcessor[F]] + } + + /** Create a [[Builder]] for [[BatchSpanProcessor]]. + * + * @param exporter + * the [[exporter.SpanExporter SpanExporter]] to which the spans are pushed + */ + def builder[F[_]: Temporal: Console](exporter: SpanExporter[F]): Builder[F] = + new BuilderImpl[F]( + exporter = exporter, + scheduleDelay = Defaults.ScheduleDelay, + exporterTimeout = Defaults.ExportTimeout, + maxQueueSize = Defaults.MaxQueueSize, + maxExportBatchSize = Defaults.MaxExportBatchSize + ) + + private object Defaults { + val ScheduleDelay: FiniteDuration = 5.second + val ExportTimeout: FiniteDuration = 30.seconds + val MaxQueueSize: Int = 2048 + val MaxExportBatchSize: Int = 512 + } + + /** The configuration of the [[BatchSpanProcessor]]. + * + * @param scheduleDelay + * the maximum delay interval in milliseconds between two consecutive + * exports + * + * @param exporterTimeout + * how long the export can run before it is cancelled + * + * @param maxQueueSize + * the maximum queue size. Once the the limit is reached new spans will be + * dropped + * + * @param maxExportBatchSize + * the maximum batch size of every export + */ + private final case class Config( + scheduleDelay: FiniteDuration, + exporterTimeout: FiniteDuration, + maxQueueSize: Int, + maxExportBatchSize: Int + ) + + private final case class State( + nextExportTime: FiniteDuration, + batch: Vector[SpanData], + spansNeeded: Option[Int] + ) + + private final case class BuilderImpl[F[_]: Temporal: Console]( + exporter: SpanExporter[F], + scheduleDelay: FiniteDuration, + exporterTimeout: FiniteDuration, + maxQueueSize: Int, + maxExportBatchSize: Int + ) extends Builder[F] { + + def withScheduleDelay(delay: FiniteDuration): Builder[F] = + copy(scheduleDelay = delay) + + def withExporterTimeout(timeout: FiniteDuration): Builder[F] = + copy(exporterTimeout = timeout) + + def withMaxQueueSize(maxQueueSize: Int): Builder[F] = + copy(maxQueueSize = maxQueueSize) + + def withMaxExportBatchSize(maxExportBatchSize: Int): Builder[F] = + copy(maxExportBatchSize = maxExportBatchSize) + + def build: Resource[F, BatchSpanProcessor[F]] = { + val config = Config( + scheduleDelay, + exporterTimeout, + maxQueueSize, + maxExportBatchSize + ) + + def create: F[BatchSpanProcessor[F]] = + for { + queue <- Queue.dropping[F, SpanData](maxQueueSize) + now <- Temporal[F].monotonic + state <- Ref.of(State(now + config.scheduleDelay, Vector.empty, None)) + signal <- CountDownLatch[F](1) + } yield new BatchSpanProcessor[F]( + queue = queue, + signal = signal, + state = state, + exporter = exporter, + config = config + ) + + for { + processor <- Resource.eval(create) + _ <- Resource.make(Temporal[F].unit)(_ => processor.exportAll) + _ <- processor.worker.background + } yield processor + } + } +} diff --git a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessorSuite.scala b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessorSuite.scala new file mode 100644 index 000000000..0aef6bacf --- /dev/null +++ b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessorSuite.scala @@ -0,0 +1,111 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.trace +package processor + +import cats.Foldable +import cats.effect.IO +import cats.syntax.foldable._ +import cats.syntax.traverse._ +import munit.CatsEffectSuite +import munit.ScalaCheckEffectSuite +import org.scalacheck.Arbitrary +import org.scalacheck.Test +import org.scalacheck.effect.PropF +import org.typelevel.otel4s.sdk.trace.data.SpanData +import org.typelevel.otel4s.sdk.trace.exporter.InMemorySpanExporter +import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter + +class BatchSpanProcessorSuite + extends CatsEffectSuite + with ScalaCheckEffectSuite { + + private implicit val spanDataArbitrary: Arbitrary[SpanData] = + Arbitrary(Gens.spanData) + + test("show details in the name") { + val exporter = new FailingExporter( + "error-prone", + new RuntimeException("something went wrong") + ) + + val expected = + "BatchSpanProcessor{exporter=error-prone, scheduleDelay=5 seconds, exporterTimeout=30 seconds, maxQueueSize=2048, maxExportBatchSize=512}" + + BatchSpanProcessor.builder(exporter).build.use { processor => + IO(assertEquals(processor.name, expected)) + } + } + + test("do nothing on start") { + PropF.forAllF { (spans: List[SpanData]) => + for { + exporter <- InMemorySpanExporter.create[IO](None) + _ <- BatchSpanProcessor.builder(exporter).build.use { p => + spans.traverse_(_ => p.onStart(None, null)) + } + exported <- exporter.finishedSpans + } yield assert(exported.isEmpty) + } + } + + test("export only sampled spans on end") { + PropF.forAllF { (spans: List[SpanData]) => + val sampled = spans.filter(_.spanContext.isSampled) + + for { + exporter <- InMemorySpanExporter.create[IO](None) + _ <- BatchSpanProcessor.builder(exporter).build.use { p => + spans.traverse_(span => p.onEnd(span)) + } + exported <- exporter.finishedSpans + } yield assertEquals(exported, sampled) + } + } + + test("do not rethrow export errors") { + PropF.forAllF { (spans: List[SpanData]) => + val error = new RuntimeException("something went wrong") + val exporter = new FailingExporter("error-prone", error) + + for { + attempts <- BatchSpanProcessor.builder(exporter).build.use { p => + spans.traverse(span => p.onEnd(span).attempt) + } + } yield assertEquals(attempts, List.fill(spans.size)(Right(()))) + } + } + + override protected def scalaCheckTestParameters: Test.Parameters = + super.scalaCheckTestParameters + .withMinSuccessfulTests(10) + .withMaxSize(10) + + private class FailingExporter( + exporterName: String, + onExport: Throwable + ) extends SpanExporter[IO] { + def name: String = exporterName + + def exportSpans[G[_]: Foldable](spans: G[SpanData]): IO[Unit] = + IO.raiseError(onExport) + + def flush: IO[Unit] = + IO.unit + } + +}