diff --git a/src/OpenTelemetry/.publicApi/Experimental/PublicAPI.Unshipped.txt b/src/OpenTelemetry/.publicApi/Experimental/PublicAPI.Unshipped.txt index 7c9a6d046ae..3439878f788 100644 --- a/src/OpenTelemetry/.publicApi/Experimental/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry/.publicApi/Experimental/PublicAPI.Unshipped.txt @@ -24,4 +24,5 @@ static Microsoft.Extensions.Logging.OpenTelemetryLoggingExtensions.UseOpenTeleme static Microsoft.Extensions.Logging.OpenTelemetryLoggingExtensions.UseOpenTelemetry(this Microsoft.Extensions.Logging.ILoggingBuilder! builder, System.Action! configure) -> Microsoft.Extensions.Logging.ILoggingBuilder! static Microsoft.Extensions.Logging.OpenTelemetryLoggingExtensions.UseOpenTelemetry(this Microsoft.Extensions.Logging.ILoggingBuilder! builder, System.Action? configureBuilder, System.Action? configureOptions) -> Microsoft.Extensions.Logging.ILoggingBuilder! static OpenTelemetry.Sdk.CreateLoggerProviderBuilder() -> OpenTelemetry.Logs.LoggerProviderBuilder! +virtual OpenTelemetry.BaseExporter.ExportAsync(OpenTelemetry.Batch batch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! virtual OpenTelemetry.Metrics.FixedSizeExemplarReservoir.OnCollected() -> void diff --git a/src/OpenTelemetry/BaseExporter.cs b/src/OpenTelemetry/BaseExporter.cs index 9fdf396a38c..62076ad73bf 100644 --- a/src/OpenTelemetry/BaseExporter.cs +++ b/src/OpenTelemetry/BaseExporter.cs @@ -42,6 +42,17 @@ public abstract class BaseExporter : IDisposable /// Result of the export operation. public abstract ExportResult Export(in Batch batch); + /// + /// Exports a batch of telemetry objects. + /// + /// Batch of telemetry objects to export. + /// The cancellation token to cancel the export. + /// Result of the export operation. + public virtual Task ExportAsync(Batch batch, CancellationToken cancellationToken = default) + { + return Task.FromResult(this.Export(batch)); + } + /// /// Flushes the exporter, blocks the current thread until flush /// completed, shutdown signaled or timed out. diff --git a/src/OpenTelemetry/BatchExportProcessor.cs b/src/OpenTelemetry/BatchExportProcessor.cs index b377d5e89e5..c49df71c82c 100644 --- a/src/OpenTelemetry/BatchExportProcessor.cs +++ b/src/OpenTelemetry/BatchExportProcessor.cs @@ -24,10 +24,19 @@ public abstract class BatchExportProcessor : BaseExportProcessor internal readonly int ExporterTimeoutMilliseconds; private readonly CircularBuffer circularBuffer; + private readonly bool useDedicatedThread; + + // Sync implementation fields. + private readonly AutoResetEvent exportTrigger; + private readonly ManualResetEvent dataExportedNotification; + private readonly ManualResetEvent shutdownTrigger; private readonly Thread exporterThread; - private readonly AutoResetEvent exportTrigger = new(false); - private readonly ManualResetEvent dataExportedNotification = new(false); - private readonly ManualResetEvent shutdownTrigger = new(false); + + // Async implementation fields. + private readonly CancellationTokenSource exporterTaskCancellation; + private readonly Task exporterTask; + private Task exportTask; + private long shutdownDrainTarget = long.MaxValue; private long droppedCount; private bool disposed; @@ -46,6 +55,30 @@ protected BatchExportProcessor( int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, int maxExportBatchSize = DefaultMaxExportBatchSize) + : this(exporter, false, maxQueueSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds, maxExportBatchSize) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// Exporter instance. + /// + /// True if the processor should use the synchronous in a dedicated thread for + /// each instance. Otherwise, is + /// used on the default thread pool. + /// + /// The maximum queue size. After the size is reached data are dropped. The default value is 2048. + /// The delay interval in milliseconds between two consecutive exports. The default value is 5000. + /// How long the export can run before it is cancelled. The default value is 30000. + /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512. + protected BatchExportProcessor( + BaseExporter exporter, + bool useDedicatedThread, + int maxQueueSize = DefaultMaxQueueSize, + int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, + int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, + int maxExportBatchSize = DefaultMaxExportBatchSize) : base(exporter) { Guard.ThrowIfOutOfRange(maxQueueSize, min: 1); @@ -53,16 +86,31 @@ protected BatchExportProcessor( Guard.ThrowIfOutOfRange(scheduledDelayMilliseconds, min: 1); Guard.ThrowIfOutOfRange(exporterTimeoutMilliseconds, min: 0); - this.circularBuffer = new CircularBuffer(maxQueueSize); + this.MaxExportBatchSize = maxExportBatchSize; this.ScheduledDelayMilliseconds = scheduledDelayMilliseconds; this.ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds; - this.MaxExportBatchSize = maxExportBatchSize; - this.exporterThread = new Thread(this.ExporterProc) + + this.circularBuffer = new CircularBuffer(maxQueueSize); + this.useDedicatedThread = useDedicatedThread; + + if (useDedicatedThread) { - IsBackground = true, - Name = $"OpenTelemetry-{nameof(BatchExportProcessor)}-{exporter.GetType().Name}", - }; - this.exporterThread.Start(); + this.exportTrigger = new AutoResetEvent(false); + this.dataExportedNotification = new ManualResetEvent(false); + this.shutdownTrigger = new ManualResetEvent(false); + this.exporterThread = new Thread(this.ExporterProcSync) + { + IsBackground = true, + Name = $"OpenTelemetry-{nameof(BatchExportProcessor)}-{exporter.GetType().Name}", + }; + this.exporterThread.Start(); + } + else + { + this.exportTask = Task.CompletedTask; + this.exporterTaskCancellation = new CancellationTokenSource(); + this.exporterTask = Task.Run(this.ExporterProcAsync); + } } /// @@ -83,26 +131,34 @@ protected BatchExportProcessor( [MethodImpl(MethodImplOptions.AggressiveInlining)] internal bool TryExport(T data) { - if (this.circularBuffer.TryAdd(data, maxSpinCount: 50000)) + if (!this.circularBuffer.TryAdd(data, maxSpinCount: 50000)) { - if (this.circularBuffer.Count >= this.MaxExportBatchSize) - { - try - { - this.exportTrigger.Set(); - } - catch (ObjectDisposedException) - { - } - } - - return true; // enqueue succeeded + Interlocked.Increment(ref this.droppedCount); + return false; } // either the queue is full or exceeded the spin limit, drop the item on the floor - Interlocked.Increment(ref this.droppedCount); + if (this.circularBuffer.Count < this.MaxExportBatchSize) + { + return true; + } - return false; + if (this.useDedicatedThread) + { + try + { + this.exportTrigger.Set(); + } + catch (ObjectDisposedException) + { + } + } + else + { + _ = this.ExportAsync(); + } + + return true; // enqueue succeeded } /// @@ -114,6 +170,49 @@ protected override void OnExport(T data) /// protected override bool OnForceFlush(int timeoutMilliseconds) { + return this.useDedicatedThread + ? this.FlushSync(timeoutMilliseconds) + : this.FlushAsync(TimeSpan.FromMilliseconds(timeoutMilliseconds)).GetAwaiter().GetResult(); + } + + /// + protected override bool OnShutdown(int timeoutMilliseconds) + { + return this.useDedicatedThread + ? this.ShutdownSync(timeoutMilliseconds) + : this.ShutdownAsync(TimeSpan.FromMilliseconds(timeoutMilliseconds)).GetAwaiter().GetResult(); + } + + /// + protected override void Dispose(bool disposing) + { + if (!this.disposed) + { + if (disposing) + { + if (this.useDedicatedThread) + { + this.exportTrigger.Dispose(); + this.dataExportedNotification.Dispose(); + this.shutdownTrigger.Dispose(); + } + else + { + this.exporterTaskCancellation.Cancel(); + this.exporterTaskCancellation.Dispose(); + } + } + + this.disposed = true; + } + + base.Dispose(disposing); + } + + private bool FlushSync(int timeoutMilliseconds) + { + Debug.Assert(this.useDedicatedThread, $"{nameof(this.FlushSync)} used in the async implementation"); + var tail = this.circularBuffer.RemovedCount; var head = this.circularBuffer.AddedCount; @@ -190,9 +289,70 @@ protected override bool OnForceFlush(int timeoutMilliseconds) } } - /// - protected override bool OnShutdown(int timeoutMilliseconds) + private async Task FlushAsync(TimeSpan timeout) + { + Debug.Assert(!this.useDedicatedThread, $"{nameof(this.FlushAsync)} used in the sync implementation"); + + var tail = this.circularBuffer.RemovedCount; + var head = this.circularBuffer.AddedCount; + + if (head == tail) + { + return true; // nothing to flush + } + + _ = this.ExportAsync(); + + if (timeout == TimeSpan.Zero) + { + return false; + } + + CancellationTokenSource timeoutCancellation; + try + { + timeoutCancellation = CancellationTokenSource.CreateLinkedTokenSource(this.exporterTaskCancellation.Token); + } + catch (ObjectDisposedException) + { + return false; + } + + var timeoutTask = Task.Delay(timeout, timeoutCancellation.Token); + + while (true) + { + Task completedTask; + try + { + completedTask = await Task.WhenAny(timeoutTask, this.ExportAsync()).ConfigureAwait(false); + } + catch (ObjectDisposedException) + { + return false; + } + + if (this.circularBuffer.RemovedCount >= head) + { + return true; + } + + if (completedTask == timeoutTask) + { + return false; + } + + if (Volatile.Read(ref this.shutdownDrainTarget) != long.MaxValue) + { + return false; + } + } + } + + private bool ShutdownSync(int timeoutMilliseconds) { + Debug.Assert(this.useDedicatedThread, $"{nameof(this.ShutdownSync)} used in the async implementation"); + Volatile.Write(ref this.shutdownDrainTarget, this.circularBuffer.AddedCount); try @@ -223,26 +383,44 @@ protected override bool OnShutdown(int timeoutMilliseconds) return this.exporter.Shutdown((int)Math.Max(timeout, 0)); } - /// - protected override void Dispose(bool disposing) + private async Task ShutdownAsync(TimeSpan timeout) { - if (!this.disposed) + Debug.Assert(!this.useDedicatedThread, $"{nameof(this.ShutdownAsync)} used in the sync implementation"); + + Volatile.Write(ref this.shutdownDrainTarget, this.circularBuffer.AddedCount); + + try { - if (disposing) - { - this.exportTrigger.Dispose(); - this.dataExportedNotification.Dispose(); - this.shutdownTrigger.Dispose(); - } + this.exporterTaskCancellation.Cancel(); + } + catch (ObjectDisposedException) + { + return false; + } - this.disposed = true; + OpenTelemetrySdkEventSource.Log.DroppedExportProcessorItems(this.GetType().Name, this.exporter.GetType().Name, this.DroppedCount); + + if (timeout == Timeout.InfiniteTimeSpan) + { + await this.exporterTask.ConfigureAwait(false); + return this.exporter.Shutdown(); } - base.Dispose(disposing); + if (timeout == TimeSpan.Zero) + { + return this.exporter.Shutdown(0); + } + + var sw = Stopwatch.StartNew(); + await Task.WhenAny(this.exporterTask, Task.Delay(timeout)).ConfigureAwait(false); + var remainingTimeout = timeout.TotalMilliseconds - sw.ElapsedMilliseconds; + return this.exporter.Shutdown((int)Math.Max(remainingTimeout, 0)); } - private void ExporterProc() + private void ExporterProcSync() { + Debug.Assert(this.useDedicatedThread, $"{nameof(this.ExporterProcSync)} used in the async implementation"); + var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger }; while (true) @@ -286,4 +464,84 @@ private void ExporterProc() } } } + + private async Task ExporterProcAsync() + { + Debug.Assert(!this.useDedicatedThread, $"{nameof(this.ExporterProcAsync)} used in the sync implementation"); + + while (true) + { + // only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously + if (this.circularBuffer.Count < this.MaxExportBatchSize) + { + try + { + await Task.Delay(this.ScheduledDelayMilliseconds, this.exporterTaskCancellation.Token).ConfigureAwait(false); + } + catch (TaskCanceledException) + { + // The delay was canceled for the exporter to shut down. + } + catch (ObjectDisposedException) + { + // the exporter is somehow disposed before the worker thread could finish its job + return; + } + } + + if (this.circularBuffer.Count > 0) + { + await this.ExportAsync().ConfigureAwait(false); + } + + if (this.circularBuffer.RemovedCount >= Volatile.Read(ref this.shutdownDrainTarget)) + { + return; + } + } + } + + private Task ExportAsync() + { + Debug.Assert(!this.useDedicatedThread, $"{nameof(this.ExportAsync)} used in the sync implementation"); + + var optimisticExportTask = this.exportTask; + if (!optimisticExportTask.IsCompleted) + { + // An export is currently being processed. + return optimisticExportTask; + } + + TaskCompletionSource newCurrentExportTaskCompletion = new(TaskCreationOptions.RunContinuationsAsynchronously); + var localExportTask = Interlocked.CompareExchange( + ref this.exportTask, + newCurrentExportTaskCompletion.Task, + optimisticExportTask); + if (!localExportTask.IsCompleted) + { + // An export is currently being processed. + return localExportTask; + } + + // Use Task.Run to yield the execution as soon as possible. + return Task.Run(CoreAsync); + + async Task CoreAsync() + { + try + { + using (var batch = new Batch(this.circularBuffer, this.MaxExportBatchSize)) + { + await this.exporter.ExportAsync(batch).ConfigureAwait(false); + } + + newCurrentExportTaskCompletion.SetResult(null); + } + catch (Exception e) + { + newCurrentExportTaskCompletion.SetException(e); + throw; + } + } + } }