Skip to content

Commit

Permalink
made meter provider optional in BatchSpanProcessor (#645)
Browse files Browse the repository at this point in the history
* made meter provider optional in BatchSpanProcessor

* re-indented BatchSpanProcessor file.
  • Loading branch information
bryce-b authored Nov 23, 2024
1 parent 95092cc commit 95742b0
Showing 1 changed file with 131 additions and 139 deletions.
270 changes: 131 additions & 139 deletions Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,36 @@ import OpenTelemetryApi
/// exports the spans to wake up and start a new export cycle.
/// This batchSpanProcessor can cause high contention in a very high traffic service.
public struct BatchSpanProcessor: SpanProcessor {
fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType"
fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped"
fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name

fileprivate var worker: BatchWorker
fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType"
fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped"
fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name

public static var name: String {
String(describing: Self.self)
}

public init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider,
scheduleDelay: TimeInterval = 5,
exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048,
maxExportBatchSize: Int = 512,
willExportCallback: ((inout [SpanData]) -> Void)? = nil
) {
worker = BatchWorker(
spanExporter: spanExporter,
meterProvider: meterProvider,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback
)
worker.start()
}
fileprivate var worker: BatchWorker

public static var name: String {
String(describing: Self.self)
}

public init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider? = nil,
scheduleDelay: TimeInterval = 5,
exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048,
maxExportBatchSize: Int = 512,
willExportCallback: ((inout [SpanData]) -> Void)? = nil
) {
worker = BatchWorker(
spanExporter: spanExporter,
meterProvider: meterProvider,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback
)
worker.start()
}

public let isStartRequired = false
public let isEndRequired = true
Expand Down Expand Up @@ -72,105 +72,95 @@ public struct BatchSpanProcessor: SpanProcessor {
/// the data.
/// The list of batched data is protected by a NSCondition which ensures full concurrency.
private class BatchWorker: Thread {
let spanExporter: SpanExporter
let meterProvider: StableMeterProvider
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

private var queueSizeGauge: ObservableLongGauge?
private var spanGaugeObserver: ObservableLongGauge?
let spanExporter: SpanExporter
let meterProvider: StableMeterProvider?
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

private var queueSizeGauge: ObservableLongGauge?
private var spanGaugeObserver: ObservableLongGauge?
private var processedSpansCounter: LongCounter?

init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider? = nil,
scheduleDelay: TimeInterval,
exportTimeout: TimeInterval,
maxQueueSize: Int,
maxExportBatchSize: Int,
willExportCallback: ((inout [SpanData]) -> Void)?
) {
self.spanExporter = spanExporter
self.meterProvider = meterProvider
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1

private var processedSpansCounter: LongCounter?
private let droppedAttrs: [String: AttributeValue]
private let exportedAttrs: [String: AttributeValue]
private let spanGaugeBuilder: LongGaugeBuilder
init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider,
scheduleDelay: TimeInterval,
exportTimeout: TimeInterval,
maxQueueSize: Int,
maxExportBatchSize: Int,
willExportCallback: ((inout [SpanData]) -> Void)?
) {
self.spanExporter = spanExporter
self.meterProvider = meterProvider
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1

let meter = meterProvider.meterBuilder(name: "io.opentelemetry.sdk.trace").build()

var longGaugeSdk = meter.gaugeBuilder(name: "queueSize").ofLongs() as? LongGaugeBuilderSdk
longGaugeSdk = longGaugeSdk?.setDescription("The number of items queued")
longGaugeSdk = longGaugeSdk?.setUnit("1")
self.queueSizeGauge = longGaugeSdk?.buildWithCallback { result in
result.record(
value: maxQueueSize,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
if let meter = meterProvider?.meterBuilder(name: "io.opentelemetry.sdk.trace").build() {

var longGaugeSdk = meter.gaugeBuilder(name: "queueSize").ofLongs() as? LongGaugeBuilderSdk
longGaugeSdk = longGaugeSdk?.setDescription("The number of items queued")
longGaugeSdk = longGaugeSdk?.setUnit("1")
self.queueSizeGauge = longGaugeSdk?.buildWithCallback { result in
result.record(
value: maxQueueSize,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}

var longCounterSdk = meter.counterBuilder(name: "processedSpans") as? LongCounterMeterBuilderSdk
longCounterSdk = longCounterSdk?.setUnit("1")
longCounterSdk = longCounterSdk?.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]")
processedSpansCounter = longCounterSdk?.build()

// Subscribe to new gauge observer
self.spanGaugeObserver = meter.gaugeBuilder(name: "spanSize")
.ofLongs()
.buildWithCallback { [count = spanList.count] result in
result.record(
value: count,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}

self.spanGaugeBuilder = meter.gaugeBuilder(name: "spanSize")
.ofLongs()

var longCounterSdk = meter.counterBuilder(name: "processedSpans") as? LongCounterMeterBuilderSdk
longCounterSdk = longCounterSdk?.setUnit("1")
longCounterSdk = longCounterSdk?.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]")
processedSpansCounter = longCounterSdk?.build()

droppedAttrs = [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true)
]
exportedAttrs = [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false)
]

// Subscribe to new gauge observer
self.spanGaugeObserver = self.spanGaugeBuilder
.buildWithCallback { [count = spanList.count] result in
result.record(
value: count,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}
}
}

deinit {
// Cleanup all gauge observer
self.queueSizeGauge?.close()
self.spanGaugeObserver?.close()
}

deinit {
// Cleanup all gauge observer
self.queueSizeGauge?.close()
self.spanGaugeObserver?.close()
}

func addSpan(span: ReadableSpan) {
cond.lock()
defer { cond.unlock() }

if spanList.count == maxQueueSize {
processedSpansCounter?.add(value: 1, attribute: droppedAttrs)
processedSpansCounter?.add(value: 1, attribute: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true)
])
return
}
spanList.append(span)

// Notify the worker thread that at half of the queue is available. It will take
// time anyway for the thread to wake up.
if spanList.count >= halfMaxQueueSize {
Expand All @@ -180,18 +170,18 @@ private class BatchWorker: Thread {

override func main() {
repeat {
autoreleasepool {
var spansCopy: [ReadableSpan]
cond.lock()
if spanList.count < maxExportBatchSize {
repeat {
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
} while spanList.isEmpty && !self.isCancelled
}
spansCopy = spanList
spanList.removeAll()
cond.unlock()
self.exportBatch(spanList: spansCopy, explicitTimeout: self.exportTimeout)
autoreleasepool {
var spansCopy: [ReadableSpan]
cond.lock()
if spanList.count < maxExportBatchSize {
repeat {
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
} while spanList.isEmpty && !self.isCancelled
}
spansCopy = spanList
spanList.removeAll()
cond.unlock()
self.exportBatch(spanList: spansCopy, explicitTimeout: self.exportTimeout)
}
} while !self.isCancelled
}
Expand Down Expand Up @@ -228,16 +218,18 @@ private class BatchWorker: Thread {
timeoutTimer.cancel()
}

private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
if result == .success {
cond.lock()
processedSpansCounter?.add(value: spanList.count, attribute: exportedAttrs)
cond.unlock()
}
}
private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
if result == .success {
cond.lock()
processedSpansCounter?.add(value: spanList.count, attribute: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false)])
cond.unlock();
}
}
}
}

0 comments on commit 95742b0

Please sign in to comment.