diff --git a/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift b/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift index f13a20ae..ca56fda6 100644 --- a/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift +++ b/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift @@ -60,7 +60,15 @@ public class DefaultStableMeter : StableMeter { } } - private class NoopLongGaugeBuilder : LongGaugeBuilder { + private class NoopLongGaugeBuilder : LongGaugeBuilder { + func setUnit(_ unit: String) -> any LongGaugeBuilder { + self + } + + func setDescription(_ description: String) -> any LongGaugeBuilder { + self + } + func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongGauge { NoopObservableLongGauge() } @@ -123,7 +131,15 @@ public class DefaultStableMeter : StableMeter { func add(value: Int, attribute: [String : AttributeValue]) {} } - private class NoopLongCounterBuilder : LongCounterBuilder { + private class NoopLongCounterBuilder : LongCounterBuilder { + func setUnit(_ unit: String) -> any LongCounterBuilder { + self + } + + func setDescription(_ description: String) -> any LongCounterBuilder { + self + } + func ofDoubles() -> DoubleCounterBuilder { NoopDoubleCounterBuilder() } diff --git a/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeterProvider.swift b/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeterProvider.swift index 9cca201f..14634a3e 100644 --- a/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeterProvider.swift +++ b/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeterProvider.swift @@ -8,6 +8,8 @@ import Foundation public class DefaultStableMeterProvider: StableMeterProvider { static let noopMeterBuilder = NoopMeterBuilder() + public init() {} + public static func noop() -> MeterBuilder { noopMeterBuilder } diff --git a/Sources/OpenTelemetryApi/Metrics/Stable/LongCounterBuilder.swift b/Sources/OpenTelemetryApi/Metrics/Stable/LongCounterBuilder.swift index f6181dd1..fcb7995b 100644 --- a/Sources/OpenTelemetryApi/Metrics/Stable/LongCounterBuilder.swift +++ b/Sources/OpenTelemetryApi/Metrics/Stable/LongCounterBuilder.swift @@ -6,6 +6,8 @@ import Foundation public protocol LongCounterBuilder : AnyObject { + func setUnit(_ unit: String) -> LongCounterBuilder + func setDescription(_ description: String) -> LongCounterBuilder func ofDoubles() -> DoubleCounterBuilder func build() -> LongCounter func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter diff --git a/Sources/OpenTelemetryApi/Metrics/Stable/LongGaugeBuilder.swift b/Sources/OpenTelemetryApi/Metrics/Stable/LongGaugeBuilder.swift index 42bf644c..b61cc7b1 100644 --- a/Sources/OpenTelemetryApi/Metrics/Stable/LongGaugeBuilder.swift +++ b/Sources/OpenTelemetryApi/Metrics/Stable/LongGaugeBuilder.swift @@ -6,5 +6,7 @@ import Foundation public protocol LongGaugeBuilder : AnyObject { + func setUnit(_ unit: String) -> LongGaugeBuilder + func setDescription(_ description: String) -> LongGaugeBuilder func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongGauge } diff --git a/Sources/OpenTelemetrySdk/Metrics/Stable/LongCounterMeterBuilderSdk.swift b/Sources/OpenTelemetrySdk/Metrics/Stable/LongCounterMeterBuilderSdk.swift index 92dde2d7..39a19cb9 100644 --- a/Sources/OpenTelemetrySdk/Metrics/Stable/LongCounterMeterBuilderSdk.swift +++ b/Sources/OpenTelemetrySdk/Metrics/Stable/LongCounterMeterBuilderSdk.swift @@ -41,4 +41,14 @@ public class LongCounterMeterBuilderSdk: LongCounterBuilder, InstrumentBuilder { -> OpenTelemetryApi.ObservableLongCounter { registerLongAsynchronousInstrument(type: .observableCounter, updater: callback) } + + public func setDescription(_ description: String) -> LongCounterBuilder { + self.description = description + return self + } + + public func setUnit(_ unit: String) -> LongCounterBuilder { + self.unit = unit + return self + } } diff --git a/Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift b/Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift index fa9c0e96..9d2087da 100644 --- a/Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift +++ b/Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift @@ -34,7 +34,13 @@ public class LongGaugeBuilderSdk : LongGaugeBuilder, InstrumentBuilder { registerLongAsynchronousInstrument(type: type, updater: callback) } - - + public func setDescription(_ description: String) -> LongGaugeBuilder { + self.description = description + return self + } + public func setUnit(_ unit: String) -> LongGaugeBuilder { + self.unit = unit + return self + } } diff --git a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift index a393e3d9..b87824db 100644 --- a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift +++ b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift @@ -15,21 +15,36 @@ import OpenTelemetryApi /// exports the spans to wake up and start a new export cycle. /// This batchSpanProcessor can cause high contention in a very high traffic service. public struct BatchSpanProcessor: SpanProcessor { + fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType" + fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped" + fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name + + fileprivate var worker: BatchWorker - - fileprivate var worker: BatchWorker - - public init(spanExporter: SpanExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30, - maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [SpanData]) -> Void)? = nil) - { - worker = BatchWorker(spanExporter: spanExporter, - scheduleDelay: scheduleDelay, - exportTimeout: exportTimeout, - maxQueueSize: maxQueueSize, - maxExportBatchSize: maxExportBatchSize, - willExportCallback: willExportCallback) - worker.start() - } + public static var name: String { + String(describing: Self.self) + } + + public init( + spanExporter: SpanExporter, + meterProvider: StableMeterProvider, + scheduleDelay: TimeInterval = 5, + exportTimeout: TimeInterval = 30, + maxQueueSize: Int = 2048, + maxExportBatchSize: Int = 512, + willExportCallback: ((inout [SpanData]) -> Void)? = nil + ) { + worker = BatchWorker( + spanExporter: spanExporter, + meterProvider: meterProvider, + scheduleDelay: scheduleDelay, + exportTimeout: exportTimeout, + maxQueueSize: maxQueueSize, + maxExportBatchSize: maxExportBatchSize, + willExportCallback: willExportCallback + ) + worker.start() + } public let isStartRequired = false public let isEndRequired = true @@ -57,36 +72,77 @@ public struct BatchSpanProcessor: SpanProcessor { /// the data. /// The list of batched data is protected by a NSCondition which ensures full concurrency. private class BatchWorker: Thread { - let spanExporter: SpanExporter - let scheduleDelay: TimeInterval - let maxQueueSize: Int - let exportTimeout: TimeInterval - let maxExportBatchSize: Int - let willExportCallback: ((inout [SpanData]) -> Void)? - let halfMaxQueueSize: Int - private let cond = NSCondition() - var spanList = [ReadableSpan]() - var queue: OperationQueue - - init(spanExporter: SpanExporter, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback: ((inout [SpanData]) -> Void)?) { - self.spanExporter = spanExporter - self.scheduleDelay = scheduleDelay - self.exportTimeout = exportTimeout - self.maxQueueSize = maxQueueSize - halfMaxQueueSize = maxQueueSize >> 1 - self.maxExportBatchSize = maxExportBatchSize - self.willExportCallback = willExportCallback - queue = OperationQueue() - queue.name = "BatchWorker Queue" - queue.maxConcurrentOperationCount = 1 - } + let spanExporter: SpanExporter + let meterProvider: StableMeterProvider + let scheduleDelay: TimeInterval + let maxQueueSize: Int + let exportTimeout: TimeInterval + let maxExportBatchSize: Int + let willExportCallback: ((inout [SpanData]) -> Void)? + let halfMaxQueueSize: Int + private let cond = NSCondition() + var spanList = [ReadableSpan]() + var queue: OperationQueue + + private var queueSizeGauge: ObservableLongGauge? + private var processedSpansCounter: LongCounter + private let droppedAttrs: [String: AttributeValue] + private let exportedAttrs: [String: AttributeValue] + init( + spanExporter: SpanExporter, + meterProvider: StableMeterProvider, + scheduleDelay: TimeInterval, + exportTimeout: TimeInterval, + maxQueueSize: Int, + maxExportBatchSize: Int, + willExportCallback: ((inout [SpanData]) -> Void)? + ) { + self.spanExporter = spanExporter + self.meterProvider = meterProvider + self.scheduleDelay = scheduleDelay + self.exportTimeout = exportTimeout + self.maxQueueSize = maxQueueSize + halfMaxQueueSize = maxQueueSize >> 1 + self.maxExportBatchSize = maxExportBatchSize + self.willExportCallback = willExportCallback + queue = OperationQueue() + queue.name = "BatchWorker Queue" + queue.maxConcurrentOperationCount = 1 + + let meter = meterProvider.meterBuilder(name: "io.opentelemetry.sdk.trace").build() + self.queueSizeGauge = meter.gaugeBuilder(name: "queueSize") + .ofLongs() + .setDescription("The number of items queued") + .setUnit("1") + .buildWithCallback { result in + result.record( + value: maxQueueSize, + attributes: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) + ] + ) + } + + processedSpansCounter = meter.counterBuilder(name: "processedSpans") + .setUnit("1") + .setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]") + .build() + droppedAttrs = [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), + BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true) + ] + exportedAttrs = [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), + BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false) + ] + } func addSpan(span: ReadableSpan) { cond.lock() defer { cond.unlock() } if spanList.count == maxQueueSize { - // TODO: Record a counter for dropped spans. + processedSpansCounter.add(value: 1, attribute: droppedAttrs) return } // TODO: Record a gauge for referenced spans. @@ -148,11 +204,16 @@ private class BatchWorker: Thread { timeoutTimer.cancel() } - private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) { - stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach { - var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() } - willExportCallback?(&spansToExport) - spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout) + private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) { + stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach { + var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() } + willExportCallback?(&spansToExport) + let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout) + if result == .success { + cond.lock() + processedSpansCounter.add(value: spanList.count, attribute: exportedAttrs) + cond.unlock() + } + } } - } } diff --git a/Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift b/Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift index 47507f0e..68d5355e 100644 --- a/Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift +++ b/Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift @@ -44,7 +44,10 @@ class BatchSpansProcessorTests: XCTestCase { } func testStartEndRequirements() { - let spansProcessor = BatchSpanProcessor(spanExporter: WaitingSpanExporter(numberToWaitFor: 0)) + let spansProcessor = BatchSpanProcessor( + spanExporter: WaitingSpanExporter(numberToWaitFor: 0), + meterProvider: DefaultStableMeterProvider() + ) XCTAssertFalse(spansProcessor.isStartRequired) XCTAssertTrue(spansProcessor.isEndRequired) } @@ -52,7 +55,11 @@ class BatchSpansProcessorTests: XCTestCase { func testExportDifferentSampledSpans() { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 2) - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: waitingSpanExporter, + meterProvider: DefaultStableMeterProvider(), + scheduleDelay: maxScheduleDelay) + ) let span1 = createSampledEndedSpan(spanName: spanName1) let span2 = createSampledEndedSpan(spanName: spanName2) let exported = waitingSpanExporter.waitForExport() @@ -63,7 +70,12 @@ class BatchSpansProcessorTests: XCTestCase { func testExportMoreSpansThanTheBufferSize() { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 6) - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay, maxQueueSize: 6, maxExportBatchSize: 2)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: waitingSpanExporter, + meterProvider: DefaultStableMeterProvider(), + scheduleDelay: maxScheduleDelay, + maxQueueSize: 6, maxExportBatchSize: 2) + ) let span1 = createSampledEndedSpan(spanName: spanName1) let span2 = createSampledEndedSpan(spanName: spanName1) @@ -82,7 +94,13 @@ class BatchSpansProcessorTests: XCTestCase { func testForceExport() { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1) - let batchSpansProcessor = BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: 10, maxQueueSize: 10000, maxExportBatchSize: 2000) + let batchSpansProcessor = BatchSpanProcessor( + spanExporter: waitingSpanExporter, + meterProvider: DefaultStableMeterProvider(), + scheduleDelay: 10, + maxQueueSize: 10000, + maxExportBatchSize: 2000 + ) tracerSdkFactory.addSpanProcessor(batchSpansProcessor) for _ in 0 ..< 100 { @@ -96,7 +114,10 @@ class BatchSpansProcessorTests: XCTestCase { func testExportSpansToMultipleServices() { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 2) let waitingSpanExporter2 = WaitingSpanExporter(numberToWaitFor: 2) - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, waitingSpanExporter2]), scheduleDelay: maxScheduleDelay)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, waitingSpanExporter2]), + meterProvider: DefaultStableMeterProvider(), + scheduleDelay: maxScheduleDelay)) let span1 = createSampledEndedSpan(spanName: spanName1) let span2 = createSampledEndedSpan(spanName: spanName2) @@ -110,7 +131,13 @@ class BatchSpansProcessorTests: XCTestCase { let maxQueuedSpans = 8 let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: maxQueuedSpans) - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, blockingSpanExporter]), scheduleDelay: maxScheduleDelay, maxQueueSize: maxQueuedSpans, maxExportBatchSize: maxQueuedSpans / 2)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, blockingSpanExporter]), + meterProvider: DefaultStableMeterProvider(), + scheduleDelay: maxScheduleDelay, + maxQueueSize: maxQueuedSpans, + maxExportBatchSize: maxQueuedSpans / 2) + ) var spansToExport = [SpanData]() // Wait to block the worker thread in the BatchSampledSpansProcessor. This ensures that no items @@ -162,7 +189,11 @@ class BatchSpansProcessorTests: XCTestCase { func testExportNotSampledSpans() { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1) - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: waitingSpanExporter, + meterProvider: DefaultStableMeterProvider(), + scheduleDelay: maxScheduleDelay) + ) createNotSampledEndedSpan(spanName: spanName1) createNotSampledEndedSpan(spanName: spanName2) @@ -181,7 +212,11 @@ class BatchSpansProcessorTests: XCTestCase { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1) // Set the export delay to zero, for no timeout, in order to confirm the #flush() below works - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: 0.1)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: waitingSpanExporter, + meterProvider: DefaultStableMeterProvider(), + scheduleDelay: 0.1) + ) let span2 = createSampledEndedSpan(spanName: spanName2)