From bfdd002ba4c6d7addde6674d362ab67d3789be8f Mon Sep 17 00:00:00 2001 From: verdie-g Date: Mon, 16 Sep 2024 19:48:36 -0400 Subject: [PATCH 1/4] Migrate BatchExporterProcessor to async --- src/OpenTelemetry/BaseExporter.cs | 10 + src/OpenTelemetry/BatchExportProcessor.cs | 212 +++++++++++----------- 2 files changed, 120 insertions(+), 102 deletions(-) diff --git a/src/OpenTelemetry/BaseExporter.cs b/src/OpenTelemetry/BaseExporter.cs index 9fdf396a38c..5b49fe0a1b0 100644 --- a/src/OpenTelemetry/BaseExporter.cs +++ b/src/OpenTelemetry/BaseExporter.cs @@ -42,6 +42,16 @@ public abstract class BaseExporter : IDisposable /// Result of the export operation. public abstract ExportResult Export(in Batch batch); + /// + /// Exports a batch of telemetry objects. + /// + /// Batch of telemetry objects to export. + /// Result of the export operation. + public virtual Task ExportAsync(Batch batch) + { + return Task.FromResult(this.Export(batch)); + } + /// /// Flushes the exporter, blocks the current thread until flush /// completed, shutdown signaled or timed out. diff --git a/src/OpenTelemetry/BatchExportProcessor.cs b/src/OpenTelemetry/BatchExportProcessor.cs index b377d5e89e5..6d2942c9447 100644 --- a/src/OpenTelemetry/BatchExportProcessor.cs +++ b/src/OpenTelemetry/BatchExportProcessor.cs @@ -24,10 +24,9 @@ public abstract class BatchExportProcessor : BaseExportProcessor internal readonly int ExporterTimeoutMilliseconds; private readonly CircularBuffer circularBuffer; - private readonly Thread exporterThread; - private readonly AutoResetEvent exportTrigger = new(false); - private readonly ManualResetEvent dataExportedNotification = new(false); - private readonly ManualResetEvent shutdownTrigger = new(false); + private readonly CancellationTokenSource exporterTaskCancellation; + private readonly Task exporterTask; + private Task exportTask; private long shutdownDrainTarget = long.MaxValue; private long droppedCount; private bool disposed; @@ -57,12 +56,9 @@ protected BatchExportProcessor( this.ScheduledDelayMilliseconds = scheduledDelayMilliseconds; this.ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds; this.MaxExportBatchSize = maxExportBatchSize; - this.exporterThread = new Thread(this.ExporterProc) - { - IsBackground = true, - Name = $"OpenTelemetry-{nameof(BatchExportProcessor)}-{exporter.GetType().Name}", - }; - this.exporterThread.Start(); + this.exportTask = Task.CompletedTask; + this.exporterTaskCancellation = new CancellationTokenSource(); + this.exporterTask = Task.Run(this.ExporterProc); } /// @@ -87,13 +83,7 @@ internal bool TryExport(T data) { if (this.circularBuffer.Count >= this.MaxExportBatchSize) { - try - { - this.exportTrigger.Set(); - } - catch (ObjectDisposedException) - { - } + _ = this.ExportAsync(); } return true; // enqueue succeeded @@ -113,6 +103,34 @@ protected override void OnExport(T data) /// protected override bool OnForceFlush(int timeoutMilliseconds) + { + return this.FlushAsync(TimeSpan.FromMilliseconds(timeoutMilliseconds)).GetAwaiter().GetResult(); + } + + /// + protected override bool OnShutdown(int timeoutMilliseconds) + { + return this.ShutdownAsync(TimeSpan.FromMilliseconds(timeoutMilliseconds)).GetAwaiter().GetResult(); + } + + /// + protected override void Dispose(bool disposing) + { + if (!this.disposed) + { + if (disposing) + { + this.exporterTaskCancellation.Cancel(); + this.exporterTaskCancellation.Dispose(); + } + + this.disposed = true; + } + + base.Dispose(disposing); + } + + private async Task FlushAsync(TimeSpan timeout) { var tail = this.circularBuffer.RemovedCount; var head = this.circularBuffer.AddedCount; @@ -122,60 +140,35 @@ protected override bool OnForceFlush(int timeoutMilliseconds) return true; // nothing to flush } - try - { - this.exportTrigger.Set(); - } - catch (ObjectDisposedException) + _ = this.ExportAsync(); + + if (timeout == TimeSpan.Zero) { return false; } - if (timeoutMilliseconds == 0) + CancellationTokenSource timeoutCancellation; + try + { + timeoutCancellation = CancellationTokenSource.CreateLinkedTokenSource(this.exporterTaskCancellation.Token); + } + catch (ObjectDisposedException) { return false; } - var triggers = new WaitHandle[] { this.dataExportedNotification, this.shutdownTrigger }; - - var sw = timeoutMilliseconds == Timeout.Infinite - ? null - : Stopwatch.StartNew(); - - // There is a chance that the export thread finished processing all the data from the queue, - // and signaled before we enter wait here, use polling to prevent being blocked indefinitely. - const int pollingMilliseconds = 1000; + var timeoutTask = Task.Delay(timeout, timeoutCancellation.Token); while (true) { - if (sw == null) + Task completedTask; + try { - try - { - WaitHandle.WaitAny(triggers, pollingMilliseconds); - } - catch (ObjectDisposedException) - { - return false; - } + completedTask = await Task.WhenAny(timeoutTask, this.ExportAsync()).ConfigureAwait(false); } - else + catch (ObjectDisposedException) { - var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; - - if (timeout <= 0) - { - return this.circularBuffer.RemovedCount >= head; - } - - try - { - WaitHandle.WaitAny(triggers, Math.Min((int)timeout, pollingMilliseconds)); - } - catch (ObjectDisposedException) - { - return false; - } + return false; } if (this.circularBuffer.RemovedCount >= head) @@ -183,6 +176,11 @@ protected override bool OnForceFlush(int timeoutMilliseconds) return true; } + if (completedTask == timeoutTask) + { + return false; + } + if (Volatile.Read(ref this.shutdownDrainTarget) != long.MaxValue) { return false; @@ -190,14 +188,13 @@ protected override bool OnForceFlush(int timeoutMilliseconds) } } - /// - protected override bool OnShutdown(int timeoutMilliseconds) + private async Task ShutdownAsync(TimeSpan timeout) { Volatile.Write(ref this.shutdownDrainTarget, this.circularBuffer.AddedCount); try { - this.shutdownTrigger.Set(); + this.exporterTaskCancellation.Cancel(); } catch (ObjectDisposedException) { @@ -206,45 +203,25 @@ protected override bool OnShutdown(int timeoutMilliseconds) OpenTelemetrySdkEventSource.Log.DroppedExportProcessorItems(this.GetType().Name, this.exporter.GetType().Name, this.DroppedCount); - if (timeoutMilliseconds == Timeout.Infinite) + if (timeout == Timeout.InfiniteTimeSpan) { - this.exporterThread.Join(); + await this.exporterTask.ConfigureAwait(false); return this.exporter.Shutdown(); } - if (timeoutMilliseconds == 0) + if (timeout == TimeSpan.Zero) { return this.exporter.Shutdown(0); } var sw = Stopwatch.StartNew(); - this.exporterThread.Join(timeoutMilliseconds); - var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; - return this.exporter.Shutdown((int)Math.Max(timeout, 0)); - } - - /// - protected override void Dispose(bool disposing) - { - if (!this.disposed) - { - if (disposing) - { - this.exportTrigger.Dispose(); - this.dataExportedNotification.Dispose(); - this.shutdownTrigger.Dispose(); - } - - this.disposed = true; - } - - base.Dispose(disposing); + await Task.WhenAny(this.exporterTask, Task.Delay(timeout)).ConfigureAwait(false); + var remainingTimeout = timeout.TotalMilliseconds - sw.ElapsedMilliseconds; + return this.exporter.Shutdown((int)Math.Max(remainingTimeout, 0)); } - private void ExporterProc() + private async Task ExporterProc() { - var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger }; - while (true) { // only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously @@ -252,7 +229,11 @@ private void ExporterProc() { try { - WaitHandle.WaitAny(triggers, this.ScheduledDelayMilliseconds); + await Task.Delay(this.ScheduledDelayMilliseconds, this.exporterTaskCancellation.Token).ConfigureAwait(false); + } + catch (TaskCanceledException) + { + // The delay was canceled for the exporter to shut down. } catch (ObjectDisposedException) { @@ -262,27 +243,54 @@ private void ExporterProc() } if (this.circularBuffer.Count > 0) + { + await this.ExportAsync().ConfigureAwait(false); + } + + if (this.circularBuffer.RemovedCount >= Volatile.Read(ref this.shutdownDrainTarget)) + { + return; + } + } + } + + private Task ExportAsync() + { + var optimisticExportTask = this.exportTask; + if (!optimisticExportTask.IsCompleted) + { + // An export is currently being processed. + return optimisticExportTask; + } + + TaskCompletionSource newCurrentExportTaskCompletion = new(TaskCreationOptions.RunContinuationsAsynchronously); + var localExportTask = Interlocked.CompareExchange( + ref this.exportTask, + newCurrentExportTaskCompletion.Task, + optimisticExportTask); + if (!localExportTask.IsCompleted) + { + // An export is currently being processed. + return localExportTask; + } + + // Use Task.Run to yield the execution as soon as possible. + return Task.Run(CoreAsync); + + async Task CoreAsync() + { + try { using (var batch = new Batch(this.circularBuffer, this.MaxExportBatchSize)) { - this.exporter.Export(batch); + await this.exporter.ExportAsync(batch).ConfigureAwait(false); } - try - { - this.dataExportedNotification.Set(); - this.dataExportedNotification.Reset(); - } - catch (ObjectDisposedException) - { - // the exporter is somehow disposed before the worker thread could finish its job - return; - } + newCurrentExportTaskCompletion.SetResult(null); } - - if (this.circularBuffer.RemovedCount >= Volatile.Read(ref this.shutdownDrainTarget)) + catch (Exception e) { - return; + newCurrentExportTaskCompletion.SetException(e); } } } From b658977e41ea01d2dd070183fef1ad62342ed682 Mon Sep 17 00:00:00 2001 From: verdie-g Date: Mon, 16 Sep 2024 19:58:49 -0400 Subject: [PATCH 2/4] Throw exception in ExportAsync --- src/OpenTelemetry/BatchExportProcessor.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/OpenTelemetry/BatchExportProcessor.cs b/src/OpenTelemetry/BatchExportProcessor.cs index 6d2942c9447..6bbe3784406 100644 --- a/src/OpenTelemetry/BatchExportProcessor.cs +++ b/src/OpenTelemetry/BatchExportProcessor.cs @@ -291,6 +291,7 @@ async Task CoreAsync() catch (Exception e) { newCurrentExportTaskCompletion.SetException(e); + throw; } } } From 039b07e88122f5923a82c41b74bd156221af09e2 Mon Sep 17 00:00:00 2001 From: verdie-g Date: Wed, 18 Sep 2024 17:37:02 -0400 Subject: [PATCH 3/4] Add CancellationToken to ExportAsync --- .../.publicApi/Experimental/PublicAPI.Unshipped.txt | 1 + src/OpenTelemetry/BaseExporter.cs | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/OpenTelemetry/.publicApi/Experimental/PublicAPI.Unshipped.txt b/src/OpenTelemetry/.publicApi/Experimental/PublicAPI.Unshipped.txt index a6594de1a9c..4ef744702d3 100644 --- a/src/OpenTelemetry/.publicApi/Experimental/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry/.publicApi/Experimental/PublicAPI.Unshipped.txt @@ -26,4 +26,5 @@ static Microsoft.Extensions.Logging.OpenTelemetryLoggingExtensions.UseOpenTeleme static Microsoft.Extensions.Logging.OpenTelemetryLoggingExtensions.UseOpenTelemetry(this Microsoft.Extensions.Logging.ILoggingBuilder! builder, System.Action! configure) -> Microsoft.Extensions.Logging.ILoggingBuilder! static Microsoft.Extensions.Logging.OpenTelemetryLoggingExtensions.UseOpenTelemetry(this Microsoft.Extensions.Logging.ILoggingBuilder! builder, System.Action? configureBuilder, System.Action? configureOptions) -> Microsoft.Extensions.Logging.ILoggingBuilder! static OpenTelemetry.Sdk.CreateLoggerProviderBuilder() -> OpenTelemetry.Logs.LoggerProviderBuilder! +virtual OpenTelemetry.BaseExporter.ExportAsync(OpenTelemetry.Batch batch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! virtual OpenTelemetry.Metrics.FixedSizeExemplarReservoir.OnCollected() -> void diff --git a/src/OpenTelemetry/BaseExporter.cs b/src/OpenTelemetry/BaseExporter.cs index 5b49fe0a1b0..62076ad73bf 100644 --- a/src/OpenTelemetry/BaseExporter.cs +++ b/src/OpenTelemetry/BaseExporter.cs @@ -46,8 +46,9 @@ public abstract class BaseExporter : IDisposable /// Exports a batch of telemetry objects. /// /// Batch of telemetry objects to export. + /// The cancellation token to cancel the export. /// Result of the export operation. - public virtual Task ExportAsync(Batch batch) + public virtual Task ExportAsync(Batch batch, CancellationToken cancellationToken = default) { return Task.FromResult(this.Export(batch)); } From 4368fc58b0da715f1fee8811c9175574bdbc6d99 Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Sat, 23 Nov 2024 10:07:32 -0500 Subject: [PATCH 4/4] Add flag to switch between sync/async implementations --- src/OpenTelemetry/BatchExportProcessor.cs | 287 ++++++++++++++++++++-- 1 file changed, 268 insertions(+), 19 deletions(-) diff --git a/src/OpenTelemetry/BatchExportProcessor.cs b/src/OpenTelemetry/BatchExportProcessor.cs index 6bbe3784406..c49df71c82c 100644 --- a/src/OpenTelemetry/BatchExportProcessor.cs +++ b/src/OpenTelemetry/BatchExportProcessor.cs @@ -24,9 +24,19 @@ public abstract class BatchExportProcessor : BaseExportProcessor internal readonly int ExporterTimeoutMilliseconds; private readonly CircularBuffer circularBuffer; + private readonly bool useDedicatedThread; + + // Sync implementation fields. + private readonly AutoResetEvent exportTrigger; + private readonly ManualResetEvent dataExportedNotification; + private readonly ManualResetEvent shutdownTrigger; + private readonly Thread exporterThread; + + // Async implementation fields. private readonly CancellationTokenSource exporterTaskCancellation; private readonly Task exporterTask; private Task exportTask; + private long shutdownDrainTarget = long.MaxValue; private long droppedCount; private bool disposed; @@ -45,6 +55,30 @@ protected BatchExportProcessor( int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, int maxExportBatchSize = DefaultMaxExportBatchSize) + : this(exporter, false, maxQueueSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds, maxExportBatchSize) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// Exporter instance. + /// + /// True if the processor should use the synchronous in a dedicated thread for + /// each instance. Otherwise, is + /// used on the default thread pool. + /// + /// The maximum queue size. After the size is reached data are dropped. The default value is 2048. + /// The delay interval in milliseconds between two consecutive exports. The default value is 5000. + /// How long the export can run before it is cancelled. The default value is 30000. + /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512. + protected BatchExportProcessor( + BaseExporter exporter, + bool useDedicatedThread, + int maxQueueSize = DefaultMaxQueueSize, + int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, + int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, + int maxExportBatchSize = DefaultMaxExportBatchSize) : base(exporter) { Guard.ThrowIfOutOfRange(maxQueueSize, min: 1); @@ -52,13 +86,31 @@ protected BatchExportProcessor( Guard.ThrowIfOutOfRange(scheduledDelayMilliseconds, min: 1); Guard.ThrowIfOutOfRange(exporterTimeoutMilliseconds, min: 0); - this.circularBuffer = new CircularBuffer(maxQueueSize); + this.MaxExportBatchSize = maxExportBatchSize; this.ScheduledDelayMilliseconds = scheduledDelayMilliseconds; this.ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds; - this.MaxExportBatchSize = maxExportBatchSize; - this.exportTask = Task.CompletedTask; - this.exporterTaskCancellation = new CancellationTokenSource(); - this.exporterTask = Task.Run(this.ExporterProc); + + this.circularBuffer = new CircularBuffer(maxQueueSize); + this.useDedicatedThread = useDedicatedThread; + + if (useDedicatedThread) + { + this.exportTrigger = new AutoResetEvent(false); + this.dataExportedNotification = new ManualResetEvent(false); + this.shutdownTrigger = new ManualResetEvent(false); + this.exporterThread = new Thread(this.ExporterProcSync) + { + IsBackground = true, + Name = $"OpenTelemetry-{nameof(BatchExportProcessor)}-{exporter.GetType().Name}", + }; + this.exporterThread.Start(); + } + else + { + this.exportTask = Task.CompletedTask; + this.exporterTaskCancellation = new CancellationTokenSource(); + this.exporterTask = Task.Run(this.ExporterProcAsync); + } } /// @@ -79,20 +131,34 @@ protected BatchExportProcessor( [MethodImpl(MethodImplOptions.AggressiveInlining)] internal bool TryExport(T data) { - if (this.circularBuffer.TryAdd(data, maxSpinCount: 50000)) + if (!this.circularBuffer.TryAdd(data, maxSpinCount: 50000)) { - if (this.circularBuffer.Count >= this.MaxExportBatchSize) - { - _ = this.ExportAsync(); - } - - return true; // enqueue succeeded + Interlocked.Increment(ref this.droppedCount); + return false; } // either the queue is full or exceeded the spin limit, drop the item on the floor - Interlocked.Increment(ref this.droppedCount); + if (this.circularBuffer.Count < this.MaxExportBatchSize) + { + return true; + } - return false; + if (this.useDedicatedThread) + { + try + { + this.exportTrigger.Set(); + } + catch (ObjectDisposedException) + { + } + } + else + { + _ = this.ExportAsync(); + } + + return true; // enqueue succeeded } /// @@ -104,13 +170,17 @@ protected override void OnExport(T data) /// protected override bool OnForceFlush(int timeoutMilliseconds) { - return this.FlushAsync(TimeSpan.FromMilliseconds(timeoutMilliseconds)).GetAwaiter().GetResult(); + return this.useDedicatedThread + ? this.FlushSync(timeoutMilliseconds) + : this.FlushAsync(TimeSpan.FromMilliseconds(timeoutMilliseconds)).GetAwaiter().GetResult(); } /// protected override bool OnShutdown(int timeoutMilliseconds) { - return this.ShutdownAsync(TimeSpan.FromMilliseconds(timeoutMilliseconds)).GetAwaiter().GetResult(); + return this.useDedicatedThread + ? this.ShutdownSync(timeoutMilliseconds) + : this.ShutdownAsync(TimeSpan.FromMilliseconds(timeoutMilliseconds)).GetAwaiter().GetResult(); } /// @@ -120,8 +190,17 @@ protected override void Dispose(bool disposing) { if (disposing) { - this.exporterTaskCancellation.Cancel(); - this.exporterTaskCancellation.Dispose(); + if (this.useDedicatedThread) + { + this.exportTrigger.Dispose(); + this.dataExportedNotification.Dispose(); + this.shutdownTrigger.Dispose(); + } + else + { + this.exporterTaskCancellation.Cancel(); + this.exporterTaskCancellation.Dispose(); + } } this.disposed = true; @@ -130,8 +209,90 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } + private bool FlushSync(int timeoutMilliseconds) + { + Debug.Assert(this.useDedicatedThread, $"{nameof(this.FlushSync)} used in the async implementation"); + + var tail = this.circularBuffer.RemovedCount; + var head = this.circularBuffer.AddedCount; + + if (head == tail) + { + return true; // nothing to flush + } + + try + { + this.exportTrigger.Set(); + } + catch (ObjectDisposedException) + { + return false; + } + + if (timeoutMilliseconds == 0) + { + return false; + } + + var triggers = new WaitHandle[] { this.dataExportedNotification, this.shutdownTrigger }; + + var sw = timeoutMilliseconds == Timeout.Infinite + ? null + : Stopwatch.StartNew(); + + // There is a chance that the export thread finished processing all the data from the queue, + // and signaled before we enter wait here, use polling to prevent being blocked indefinitely. + const int pollingMilliseconds = 1000; + + while (true) + { + if (sw == null) + { + try + { + WaitHandle.WaitAny(triggers, pollingMilliseconds); + } + catch (ObjectDisposedException) + { + return false; + } + } + else + { + var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; + + if (timeout <= 0) + { + return this.circularBuffer.RemovedCount >= head; + } + + try + { + WaitHandle.WaitAny(triggers, Math.Min((int)timeout, pollingMilliseconds)); + } + catch (ObjectDisposedException) + { + return false; + } + } + + if (this.circularBuffer.RemovedCount >= head) + { + return true; + } + + if (Volatile.Read(ref this.shutdownDrainTarget) != long.MaxValue) + { + return false; + } + } + } + private async Task FlushAsync(TimeSpan timeout) { + Debug.Assert(!this.useDedicatedThread, $"{nameof(this.FlushAsync)} used in the sync implementation"); + var tail = this.circularBuffer.RemovedCount; var head = this.circularBuffer.AddedCount; @@ -188,8 +349,44 @@ private async Task FlushAsync(TimeSpan timeout) } } + private bool ShutdownSync(int timeoutMilliseconds) + { + Debug.Assert(this.useDedicatedThread, $"{nameof(this.ShutdownSync)} used in the async implementation"); + + Volatile.Write(ref this.shutdownDrainTarget, this.circularBuffer.AddedCount); + + try + { + this.shutdownTrigger.Set(); + } + catch (ObjectDisposedException) + { + return false; + } + + OpenTelemetrySdkEventSource.Log.DroppedExportProcessorItems(this.GetType().Name, this.exporter.GetType().Name, this.DroppedCount); + + if (timeoutMilliseconds == Timeout.Infinite) + { + this.exporterThread.Join(); + return this.exporter.Shutdown(); + } + + if (timeoutMilliseconds == 0) + { + return this.exporter.Shutdown(0); + } + + var sw = Stopwatch.StartNew(); + this.exporterThread.Join(timeoutMilliseconds); + var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; + return this.exporter.Shutdown((int)Math.Max(timeout, 0)); + } + private async Task ShutdownAsync(TimeSpan timeout) { + Debug.Assert(!this.useDedicatedThread, $"{nameof(this.ShutdownAsync)} used in the sync implementation"); + Volatile.Write(ref this.shutdownDrainTarget, this.circularBuffer.AddedCount); try @@ -220,8 +417,58 @@ private async Task ShutdownAsync(TimeSpan timeout) return this.exporter.Shutdown((int)Math.Max(remainingTimeout, 0)); } - private async Task ExporterProc() + private void ExporterProcSync() { + Debug.Assert(this.useDedicatedThread, $"{nameof(this.ExporterProcSync)} used in the async implementation"); + + var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger }; + + while (true) + { + // only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously + if (this.circularBuffer.Count < this.MaxExportBatchSize) + { + try + { + WaitHandle.WaitAny(triggers, this.ScheduledDelayMilliseconds); + } + catch (ObjectDisposedException) + { + // the exporter is somehow disposed before the worker thread could finish its job + return; + } + } + + if (this.circularBuffer.Count > 0) + { + using (var batch = new Batch(this.circularBuffer, this.MaxExportBatchSize)) + { + this.exporter.Export(batch); + } + + try + { + this.dataExportedNotification.Set(); + this.dataExportedNotification.Reset(); + } + catch (ObjectDisposedException) + { + // the exporter is somehow disposed before the worker thread could finish its job + return; + } + } + + if (this.circularBuffer.RemovedCount >= Volatile.Read(ref this.shutdownDrainTarget)) + { + return; + } + } + } + + private async Task ExporterProcAsync() + { + Debug.Assert(!this.useDedicatedThread, $"{nameof(this.ExporterProcAsync)} used in the sync implementation"); + while (true) { // only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously @@ -256,6 +503,8 @@ private async Task ExporterProc() private Task ExportAsync() { + Debug.Assert(!this.useDedicatedThread, $"{nameof(this.ExportAsync)} used in the sync implementation"); + var optimisticExportTask = this.exportTask; if (!optimisticExportTask.IsCompleted) {