diff --git a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift index c9906fb1..7c79d7b3 100644 --- a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift +++ b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift @@ -15,36 +15,36 @@ import OpenTelemetryApi /// exports the spans to wake up and start a new export cycle. /// This batchSpanProcessor can cause high contention in a very high traffic service. public struct BatchSpanProcessor: SpanProcessor { - fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType" - fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped" - fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name - - fileprivate var worker: BatchWorker + fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType" + fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped" + fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name - public static var name: String { - String(describing: Self.self) - } - - public init( - spanExporter: SpanExporter, - meterProvider: StableMeterProvider, - scheduleDelay: TimeInterval = 5, - exportTimeout: TimeInterval = 30, - maxQueueSize: Int = 2048, - maxExportBatchSize: Int = 512, - willExportCallback: ((inout [SpanData]) -> Void)? = nil - ) { - worker = BatchWorker( - spanExporter: spanExporter, - meterProvider: meterProvider, - scheduleDelay: scheduleDelay, - exportTimeout: exportTimeout, - maxQueueSize: maxQueueSize, - maxExportBatchSize: maxExportBatchSize, - willExportCallback: willExportCallback - ) - worker.start() - } + fileprivate var worker: BatchWorker + + public static var name: String { + String(describing: Self.self) + } + + public init( + spanExporter: SpanExporter, + meterProvider: StableMeterProvider? = nil, + scheduleDelay: TimeInterval = 5, + exportTimeout: TimeInterval = 30, + maxQueueSize: Int = 2048, + maxExportBatchSize: Int = 512, + willExportCallback: ((inout [SpanData]) -> Void)? = nil + ) { + worker = BatchWorker( + spanExporter: spanExporter, + meterProvider: meterProvider, + scheduleDelay: scheduleDelay, + exportTimeout: exportTimeout, + maxQueueSize: maxQueueSize, + maxExportBatchSize: maxExportBatchSize, + willExportCallback: willExportCallback + ) + worker.start() + } public let isStartRequired = false public let isEndRequired = true @@ -72,105 +72,95 @@ public struct BatchSpanProcessor: SpanProcessor { /// the data. /// The list of batched data is protected by a NSCondition which ensures full concurrency. private class BatchWorker: Thread { - let spanExporter: SpanExporter - let meterProvider: StableMeterProvider - let scheduleDelay: TimeInterval - let maxQueueSize: Int - let exportTimeout: TimeInterval - let maxExportBatchSize: Int - let willExportCallback: ((inout [SpanData]) -> Void)? - let halfMaxQueueSize: Int - private let cond = NSCondition() - var spanList = [ReadableSpan]() - var queue: OperationQueue - - private var queueSizeGauge: ObservableLongGauge? - private var spanGaugeObserver: ObservableLongGauge? + let spanExporter: SpanExporter + let meterProvider: StableMeterProvider? + let scheduleDelay: TimeInterval + let maxQueueSize: Int + let exportTimeout: TimeInterval + let maxExportBatchSize: Int + let willExportCallback: ((inout [SpanData]) -> Void)? + let halfMaxQueueSize: Int + private let cond = NSCondition() + var spanList = [ReadableSpan]() + var queue: OperationQueue + + private var queueSizeGauge: ObservableLongGauge? + private var spanGaugeObserver: ObservableLongGauge? + private var processedSpansCounter: LongCounter? + + init( + spanExporter: SpanExporter, + meterProvider: StableMeterProvider? = nil, + scheduleDelay: TimeInterval, + exportTimeout: TimeInterval, + maxQueueSize: Int, + maxExportBatchSize: Int, + willExportCallback: ((inout [SpanData]) -> Void)? + ) { + self.spanExporter = spanExporter + self.meterProvider = meterProvider + self.scheduleDelay = scheduleDelay + self.exportTimeout = exportTimeout + self.maxQueueSize = maxQueueSize + halfMaxQueueSize = maxQueueSize >> 1 + self.maxExportBatchSize = maxExportBatchSize + self.willExportCallback = willExportCallback + queue = OperationQueue() + queue.name = "BatchWorker Queue" + queue.maxConcurrentOperationCount = 1 - private var processedSpansCounter: LongCounter? - private let droppedAttrs: [String: AttributeValue] - private let exportedAttrs: [String: AttributeValue] - private let spanGaugeBuilder: LongGaugeBuilder - init( - spanExporter: SpanExporter, - meterProvider: StableMeterProvider, - scheduleDelay: TimeInterval, - exportTimeout: TimeInterval, - maxQueueSize: Int, - maxExportBatchSize: Int, - willExportCallback: ((inout [SpanData]) -> Void)? - ) { - self.spanExporter = spanExporter - self.meterProvider = meterProvider - self.scheduleDelay = scheduleDelay - self.exportTimeout = exportTimeout - self.maxQueueSize = maxQueueSize - halfMaxQueueSize = maxQueueSize >> 1 - self.maxExportBatchSize = maxExportBatchSize - self.willExportCallback = willExportCallback - queue = OperationQueue() - queue.name = "BatchWorker Queue" - queue.maxConcurrentOperationCount = 1 - - let meter = meterProvider.meterBuilder(name: "io.opentelemetry.sdk.trace").build() - - var longGaugeSdk = meter.gaugeBuilder(name: "queueSize").ofLongs() as? LongGaugeBuilderSdk - longGaugeSdk = longGaugeSdk?.setDescription("The number of items queued") - longGaugeSdk = longGaugeSdk?.setUnit("1") - self.queueSizeGauge = longGaugeSdk?.buildWithCallback { result in - result.record( - value: maxQueueSize, - attributes: [ - BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) - ] - ) + if let meter = meterProvider?.meterBuilder(name: "io.opentelemetry.sdk.trace").build() { + + var longGaugeSdk = meter.gaugeBuilder(name: "queueSize").ofLongs() as? LongGaugeBuilderSdk + longGaugeSdk = longGaugeSdk?.setDescription("The number of items queued") + longGaugeSdk = longGaugeSdk?.setUnit("1") + self.queueSizeGauge = longGaugeSdk?.buildWithCallback { result in + result.record( + value: maxQueueSize, + attributes: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) + ] + ) + } + + var longCounterSdk = meter.counterBuilder(name: "processedSpans") as? LongCounterMeterBuilderSdk + longCounterSdk = longCounterSdk?.setUnit("1") + longCounterSdk = longCounterSdk?.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]") + processedSpansCounter = longCounterSdk?.build() + + // Subscribe to new gauge observer + self.spanGaugeObserver = meter.gaugeBuilder(name: "spanSize") + .ofLongs() + .buildWithCallback { [count = spanList.count] result in + result.record( + value: count, + attributes: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) + ] + ) } - - self.spanGaugeBuilder = meter.gaugeBuilder(name: "spanSize") - .ofLongs() - - var longCounterSdk = meter.counterBuilder(name: "processedSpans") as? LongCounterMeterBuilderSdk - longCounterSdk = longCounterSdk?.setUnit("1") - longCounterSdk = longCounterSdk?.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]") - processedSpansCounter = longCounterSdk?.build() - - droppedAttrs = [ - BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), - BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true) - ] - exportedAttrs = [ - BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), - BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false) - ] - - // Subscribe to new gauge observer - self.spanGaugeObserver = self.spanGaugeBuilder - .buildWithCallback { [count = spanList.count] result in - result.record( - value: count, - attributes: [ - BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) - ] - ) - } } + } + + deinit { + // Cleanup all gauge observer + self.queueSizeGauge?.close() + self.spanGaugeObserver?.close() + } - deinit { - // Cleanup all gauge observer - self.queueSizeGauge?.close() - self.spanGaugeObserver?.close() - } - func addSpan(span: ReadableSpan) { cond.lock() defer { cond.unlock() } if spanList.count == maxQueueSize { - processedSpansCounter?.add(value: 1, attribute: droppedAttrs) + processedSpansCounter?.add(value: 1, attribute: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), + BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true) + ]) return } spanList.append(span) - + // Notify the worker thread that at half of the queue is available. It will take // time anyway for the thread to wake up. if spanList.count >= halfMaxQueueSize { @@ -180,18 +170,18 @@ private class BatchWorker: Thread { override func main() { repeat { - autoreleasepool { - var spansCopy: [ReadableSpan] - cond.lock() - if spanList.count < maxExportBatchSize { - repeat { - cond.wait(until: Date().addingTimeInterval(scheduleDelay)) - } while spanList.isEmpty && !self.isCancelled - } - spansCopy = spanList - spanList.removeAll() - cond.unlock() - self.exportBatch(spanList: spansCopy, explicitTimeout: self.exportTimeout) + autoreleasepool { + var spansCopy: [ReadableSpan] + cond.lock() + if spanList.count < maxExportBatchSize { + repeat { + cond.wait(until: Date().addingTimeInterval(scheduleDelay)) + } while spanList.isEmpty && !self.isCancelled + } + spansCopy = spanList + spanList.removeAll() + cond.unlock() + self.exportBatch(spanList: spansCopy, explicitTimeout: self.exportTimeout) } } while !self.isCancelled } @@ -228,16 +218,18 @@ private class BatchWorker: Thread { timeoutTimer.cancel() } - private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) { - stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach { - var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() } - willExportCallback?(&spansToExport) - let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout) - if result == .success { - cond.lock() - processedSpansCounter?.add(value: spanList.count, attribute: exportedAttrs) - cond.unlock() - } - } + private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) { + stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach { + var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() } + willExportCallback?(&spansToExport) + let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout) + if result == .success { + cond.lock() + processedSpansCounter?.add(value: spanList.count, attribute: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), + BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false)]) + cond.unlock(); + } } + } }