Skip to content

Commit

Permalink
added metrics for drop and export Span processor
Browse files Browse the repository at this point in the history
  • Loading branch information
mamunto committed Nov 11, 2024
1 parent 0cd4a4c commit 43e5b42
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 56 deletions.
20 changes: 18 additions & 2 deletions Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,15 @@ public class DefaultStableMeter : StableMeter {
}
}

private class NoopLongGaugeBuilder : LongGaugeBuilder {
private class NoopLongGaugeBuilder : LongGaugeBuilder {
func setUnit(_ unit: String) -> any LongGaugeBuilder {
self
}

func setDescription(_ description: String) -> any LongGaugeBuilder {
self
}

func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongGauge {
NoopObservableLongGauge()
}
Expand Down Expand Up @@ -123,7 +131,15 @@ public class DefaultStableMeter : StableMeter {
func add(value: Int, attribute: [String : AttributeValue]) {}
}

private class NoopLongCounterBuilder : LongCounterBuilder {
private class NoopLongCounterBuilder : LongCounterBuilder {
func setUnit(_ unit: String) -> any LongCounterBuilder {
self
}

func setDescription(_ description: String) -> any LongCounterBuilder {
self
}

func ofDoubles() -> DoubleCounterBuilder {
NoopDoubleCounterBuilder()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import Foundation
public class DefaultStableMeterProvider: StableMeterProvider {
static let noopMeterBuilder = NoopMeterBuilder()

public init() {}

public static func noop() -> MeterBuilder {
noopMeterBuilder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import Foundation

public protocol LongCounterBuilder : AnyObject {
func setUnit(_ unit: String) -> LongCounterBuilder
func setDescription(_ description: String) -> LongCounterBuilder
func ofDoubles() -> DoubleCounterBuilder
func build() -> LongCounter
func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@
import Foundation

public protocol LongGaugeBuilder : AnyObject {
func setUnit(_ unit: String) -> LongGaugeBuilder
func setDescription(_ description: String) -> LongGaugeBuilder
func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongGauge
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,14 @@ public class LongCounterMeterBuilderSdk: LongCounterBuilder, InstrumentBuilder {
-> OpenTelemetryApi.ObservableLongCounter {
registerLongAsynchronousInstrument(type: .observableCounter, updater: callback)
}

public func setDescription(_ description: String) -> LongCounterBuilder {
self.description = description
return self
}

public func setUnit(_ unit: String) -> LongCounterBuilder {
self.unit = unit
return self
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ public class LongGaugeBuilderSdk : LongGaugeBuilder, InstrumentBuilder {
registerLongAsynchronousInstrument(type: type, updater: callback)
}



public func setDescription(_ description: String) -> LongGaugeBuilder {
self.description = description
return self
}

public func setUnit(_ unit: String) -> LongGaugeBuilder {
self.unit = unit
return self
}
}
149 changes: 105 additions & 44 deletions Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,36 @@ import OpenTelemetryApi
/// exports the spans to wake up and start a new export cycle.
/// This batchSpanProcessor can cause high contention in a very high traffic service.
public struct BatchSpanProcessor: SpanProcessor {
fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType"
fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped"
fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name

fileprivate var worker: BatchWorker


fileprivate var worker: BatchWorker

public init(spanExporter: SpanExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [SpanData]) -> Void)? = nil)
{
worker = BatchWorker(spanExporter: spanExporter,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback)
worker.start()
}
public static var name: String {
String(describing: Self.self)
}

public init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider,
scheduleDelay: TimeInterval = 5,
exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048,
maxExportBatchSize: Int = 512,
willExportCallback: ((inout [SpanData]) -> Void)? = nil
) {
worker = BatchWorker(
spanExporter: spanExporter,
meterProvider: meterProvider,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback
)
worker.start()
}

public let isStartRequired = false
public let isEndRequired = true
Expand Down Expand Up @@ -57,36 +72,77 @@ public struct BatchSpanProcessor: SpanProcessor {
/// the data.
/// The list of batched data is protected by a NSCondition which ensures full concurrency.
private class BatchWorker: Thread {
let spanExporter: SpanExporter
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

init(spanExporter: SpanExporter, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback: ((inout [SpanData]) -> Void)?) {
self.spanExporter = spanExporter
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1
}
let spanExporter: SpanExporter
let meterProvider: StableMeterProvider
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

private var queueSizeGauge: ObservableLongGauge?
private var processedSpansCounter: LongCounter
private let droppedAttrs: [String: AttributeValue]
private let exportedAttrs: [String: AttributeValue]
init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider,
scheduleDelay: TimeInterval,
exportTimeout: TimeInterval,
maxQueueSize: Int,
maxExportBatchSize: Int,
willExportCallback: ((inout [SpanData]) -> Void)?
) {
self.spanExporter = spanExporter
self.meterProvider = meterProvider
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1

let meter = meterProvider.meterBuilder(name: "io.opentelemetry.sdk.trace").build()
self.queueSizeGauge = meter.gaugeBuilder(name: "queueSize")
.ofLongs()
.setDescription("The number of items queued")
.setUnit("1")
.buildWithCallback { result in
result.record(
value: maxQueueSize,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}

processedSpansCounter = meter.counterBuilder(name: "processedSpans")
.setUnit("1")
.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]")
.build()
droppedAttrs = [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true)
]
exportedAttrs = [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false)
]
}

func addSpan(span: ReadableSpan) {
cond.lock()
defer { cond.unlock() }

if spanList.count == maxQueueSize {
// TODO: Record a counter for dropped spans.
processedSpansCounter.add(value: 1, attribute: droppedAttrs)
return
}
// TODO: Record a gauge for referenced spans.
Expand Down Expand Up @@ -148,11 +204,16 @@ private class BatchWorker: Thread {
timeoutTimer.cancel()
}

private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
if result == .success {
cond.lock()
processedSpansCounter.add(value: spanList.count, attribute: exportedAttrs)
cond.unlock()
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,22 @@ class BatchSpansProcessorTests: XCTestCase {
}

func testStartEndRequirements() {
let spansProcessor = BatchSpanProcessor(spanExporter: WaitingSpanExporter(numberToWaitFor: 0))
let spansProcessor = BatchSpanProcessor(
spanExporter: WaitingSpanExporter(numberToWaitFor: 0),
meterProvider: DefaultStableMeterProvider()
)
XCTAssertFalse(spansProcessor.isStartRequired)
XCTAssertTrue(spansProcessor.isEndRequired)
}

func testExportDifferentSampledSpans() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 2)

tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider(),
scheduleDelay: maxScheduleDelay)
)
let span1 = createSampledEndedSpan(spanName: spanName1)
let span2 = createSampledEndedSpan(spanName: spanName2)
let exported = waitingSpanExporter.waitForExport()
Expand All @@ -63,7 +70,12 @@ class BatchSpansProcessorTests: XCTestCase {
func testExportMoreSpansThanTheBufferSize() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 6)

tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay, maxQueueSize: 6, maxExportBatchSize: 2))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider(),
scheduleDelay: maxScheduleDelay,
maxQueueSize: 6, maxExportBatchSize: 2)
)

let span1 = createSampledEndedSpan(spanName: spanName1)
let span2 = createSampledEndedSpan(spanName: spanName1)
Expand All @@ -82,7 +94,13 @@ class BatchSpansProcessorTests: XCTestCase {

func testForceExport() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1)
let batchSpansProcessor = BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: 10, maxQueueSize: 10000, maxExportBatchSize: 2000)
let batchSpansProcessor = BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider(),
scheduleDelay: 10,
maxQueueSize: 10000,
maxExportBatchSize: 2000
)
tracerSdkFactory.addSpanProcessor(batchSpansProcessor)

for _ in 0 ..< 100 {
Expand All @@ -96,7 +114,10 @@ class BatchSpansProcessorTests: XCTestCase {
func testExportSpansToMultipleServices() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 2)
let waitingSpanExporter2 = WaitingSpanExporter(numberToWaitFor: 2)
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, waitingSpanExporter2]), scheduleDelay: maxScheduleDelay))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, waitingSpanExporter2]),
meterProvider: DefaultStableMeterProvider(),
scheduleDelay: maxScheduleDelay))

let span1 = createSampledEndedSpan(spanName: spanName1)
let span2 = createSampledEndedSpan(spanName: spanName2)
Expand All @@ -110,7 +131,13 @@ class BatchSpansProcessorTests: XCTestCase {
let maxQueuedSpans = 8
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: maxQueuedSpans)

tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, blockingSpanExporter]), scheduleDelay: maxScheduleDelay, maxQueueSize: maxQueuedSpans, maxExportBatchSize: maxQueuedSpans / 2))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, blockingSpanExporter]),
meterProvider: DefaultStableMeterProvider(),
scheduleDelay: maxScheduleDelay,
maxQueueSize: maxQueuedSpans,
maxExportBatchSize: maxQueuedSpans / 2)
)

var spansToExport = [SpanData]()
// Wait to block the worker thread in the BatchSampledSpansProcessor. This ensures that no items
Expand Down Expand Up @@ -162,7 +189,11 @@ class BatchSpansProcessorTests: XCTestCase {
func testExportNotSampledSpans() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1)

tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider(),
scheduleDelay: maxScheduleDelay)
)

createNotSampledEndedSpan(spanName: spanName1)
createNotSampledEndedSpan(spanName: spanName2)
Expand All @@ -181,7 +212,11 @@ class BatchSpansProcessorTests: XCTestCase {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1)

// Set the export delay to zero, for no timeout, in order to confirm the #flush() below works
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: 0.1))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider(),
scheduleDelay: 0.1)
)

let span2 = createSampledEndedSpan(spanName: spanName2)

Expand Down

0 comments on commit 43e5b42

Please sign in to comment.