From 41e8166c56a6b704b381a5c54e4b1af790460fa8 Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Wed, 27 Dec 2023 15:27:04 +1000 Subject: [PATCH 01/16] Update everything, drop some obsolete targets --- global.json | 2 +- .../Properties/AssemblyInfo.cs | 2 -- .../Serilog.Sinks.PeriodicBatching.csproj | 26 +++++-------------- .../BoundedConcurrentQueue.cs | 5 ++-- .../Sinks/PeriodicBatching/PortableTimer.cs | 21 +-------------- ...s.PeriodicBatching.PerformanceTests.csproj | 25 +++++++----------- .../Support/SynchronizedQueue.cs | 8 +++--- ...erilog.Sinks.PeriodicBatching.Tests.csproj | 19 ++++++-------- 8 files changed, 35 insertions(+), 73 deletions(-) diff --git a/global.json b/global.json index ba75026..ec30e8e 100644 --- a/global.json +++ b/global.json @@ -1,6 +1,6 @@ { "sdk": { - "version": "6.0.300", + "version": "8.0.100", "allowPrerelease": false, "rollForward": "latestFeature" } diff --git a/src/Serilog.Sinks.PeriodicBatching/Properties/AssemblyInfo.cs b/src/Serilog.Sinks.PeriodicBatching/Properties/AssemblyInfo.cs index 505c096..499c93b 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Properties/AssemblyInfo.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Properties/AssemblyInfo.cs @@ -1,8 +1,6 @@ using System.Reflection; using System.Runtime.CompilerServices; -[assembly: AssemblyVersion("3.0.0.0")] - [assembly: CLSCompliant(true)] [assembly: InternalsVisibleTo("Serilog.Sinks.PeriodicBatching.Tests, PublicKey=" + diff --git a/src/Serilog.Sinks.PeriodicBatching/Serilog.Sinks.PeriodicBatching.csproj b/src/Serilog.Sinks.PeriodicBatching/Serilog.Sinks.PeriodicBatching.csproj index 57f79b9..7b7eb09 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Serilog.Sinks.PeriodicBatching.csproj +++ b/src/Serilog.Sinks.PeriodicBatching/Serilog.Sinks.PeriodicBatching.csproj @@ -2,9 +2,10 @@ Buffer batches of log events to be flushed asynchronously. - 3.1.1 + 4.0.0 Serilog Contributors - net45;netstandard1.1;netstandard1.2;netstandard2.0;netstandard2.1 + net462 + $(TargetFrameworks);netstandard2.0;netstandard2.1;net6.0 true Serilog serilog;batching;timer @@ -13,28 +14,15 @@ https://github.com/serilog/serilog-sinks-periodicbatching https://github.com/serilog/serilog-sinks-periodicbatching git - false + enable - - $(DefineConstants);FEATURE_THREADING_TIMER - - - - $(DefineConstants);FEATURE_THREADING_TIMER;FEATURE_EXECUTION_CONTEXT - - - - $(DefineConstants);FEATURE_THREADING_TIMER;FEATURE_EXECUTION_CONTEXT;FEATURE_ASYNCDISPOSABLE + + $(DefineConstants);FEATURE_ASYNCDISPOSABLE - - - - - - + diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/BoundedConcurrentQueue.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/BoundedConcurrentQueue.cs index b2c6600..8028490 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/BoundedConcurrentQueue.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/BoundedConcurrentQueue.cs @@ -13,6 +13,7 @@ // limitations under the License. using System.Collections.Concurrent; +using System.Diagnostics.CodeAnalysis; namespace Serilog.Sinks.PeriodicBatching; @@ -27,7 +28,7 @@ class BoundedConcurrentQueue public BoundedConcurrentQueue(int? queueLimit = null) { - if (queueLimit.HasValue && queueLimit <= 0) + if (queueLimit is <= 0) throw new ArgumentOutOfRangeException(nameof(queueLimit), "Queue limit must be positive, or `null` to indicate unbounded."); _queueLimit = queueLimit ?? Unbounded; @@ -35,7 +36,7 @@ public BoundedConcurrentQueue(int? queueLimit = null) public int Count => _queue.Count; - public bool TryDequeue(out T item) + public bool TryDequeue([MaybeNullWhen(false)] out T item) { if (_queueLimit == Unbounded) return _queue.TryDequeue(out item); diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PortableTimer.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PortableTimer.cs index 0759b81..5731581 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PortableTimer.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PortableTimer.cs @@ -22,10 +22,7 @@ class PortableTimer : IDisposable readonly Func _onTick; readonly CancellationTokenSource _cancel = new(); - -#if FEATURE_THREADING_TIMER readonly Timer _timer; -#endif bool _running; bool _disposed; @@ -34,12 +31,8 @@ public PortableTimer(Func onTick) { _onTick = onTick ?? throw new ArgumentNullException(nameof(onTick)); -#if FEATURE_THREADING_TIMER -#if FEATURE_EXECUTION_CONTEXT using (ExecutionContext.SuppressFlow()) -#endif _timer = new(_ => OnTick(), null, Timeout.Infinite, Timeout.Infinite); -#endif } public void Start(TimeSpan interval) @@ -51,16 +44,7 @@ public void Start(TimeSpan interval) if (_disposed) throw new ObjectDisposedException(nameof(PortableTimer)); -#if FEATURE_THREADING_TIMER _timer.Change(interval, Timeout.InfiniteTimeSpan); -#else - Task.Delay(interval, _cancel.Token) - .ContinueWith( - _ => OnTick(), - CancellationToken.None, - TaskContinuationOptions.DenyChildAttach, - TaskScheduler.Default); -#endif } } @@ -126,11 +110,8 @@ public void Dispose() Monitor.Wait(_stateLock); } -#if FEATURE_THREADING_TIMER _timer.Dispose(); -#endif - _disposed = true; } } -} \ No newline at end of file +} diff --git a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Serilog.Sinks.PeriodicBatching.PerformanceTests.csproj b/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Serilog.Sinks.PeriodicBatching.PerformanceTests.csproj index 57704d7..e1605b7 100644 --- a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Serilog.Sinks.PeriodicBatching.PerformanceTests.csproj +++ b/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Serilog.Sinks.PeriodicBatching.PerformanceTests.csproj @@ -1,7 +1,8 @@  - net452;netcoreapp1.1 + net472 + $(TargetFrameworks);net8.0 ../../assets/Serilog.snk true true @@ -17,19 +18,13 @@ - - - - + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + - - - - - - - - - - + diff --git a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Support/SynchronizedQueue.cs b/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Support/SynchronizedQueue.cs index 1ddda40..06d10c2 100644 --- a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Support/SynchronizedQueue.cs +++ b/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Support/SynchronizedQueue.cs @@ -1,4 +1,6 @@ -namespace Serilog.Sinks.PeriodicBatching.PerformanceTests.Support; +using System.Diagnostics.CodeAnalysis; + +namespace Serilog.Sinks.PeriodicBatching.PerformanceTests.Support; class SynchronizedQueue : Queue { @@ -16,9 +18,9 @@ public SynchronizedQueue(int queueLimit) _queueLimit = queueLimit; } - public bool TryDequeue(out T? item) + public new bool TryDequeue(out T item) { - item = default; + item = default!; lock (this) { diff --git a/test/Serilog.Sinks.PeriodicBatching.Tests/Serilog.Sinks.PeriodicBatching.Tests.csproj b/test/Serilog.Sinks.PeriodicBatching.Tests/Serilog.Sinks.PeriodicBatching.Tests.csproj index e3ff328..032b402 100644 --- a/test/Serilog.Sinks.PeriodicBatching.Tests/Serilog.Sinks.PeriodicBatching.Tests.csproj +++ b/test/Serilog.Sinks.PeriodicBatching.Tests/Serilog.Sinks.PeriodicBatching.Tests.csproj @@ -1,11 +1,11 @@  - net452;netcoreapp1.1;net6.0 + net472 + $(TargetFrameworks);net8.0 ../../assets/Serilog.snk true true - false @@ -17,15 +17,12 @@ - - - - - - - - - + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + From 46149efab28eff026de7f602694419983b51b940 Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Wed, 27 Dec 2023 17:34:38 +1000 Subject: [PATCH 02/16] Port to System.Threading.Channels --- .DS_Store | Bin 0 -> 6148 bytes serilog-sinks-periodicbatching.sln | 7 - .../Serilog.Sinks.PeriodicBatching.csproj | 1 + .../BatchedConnectionStatus.cs | 5 - .../BoundedConcurrentQueue.cs | 85 ----- .../PeriodicBatching/PeriodicBatchingSink.cs | 292 ++++-------------- .../Sinks/PeriodicBatching/PortableTimer.cs | 117 ------- test/.DS_Store | Bin 0 -> 6148 bytes .../BoundedQueue_Enqueue_Benchmark.cs | 73 ----- .../BoundedQueue_Enqueue_Dequeue_Benchmark.cs | 72 ----- .../Runner.cs | 19 -- ...s.PeriodicBatching.PerformanceTests.csproj | 30 -- .../Support/Some.cs | 82 ----- .../Support/SynchronizedQueue.cs | 50 --- ...edQueue_Enqueue_Benchmark-report-github.md | 60 ---- ...Enqueue_Dequeue_Benchmark-report-github.md | 36 --- ...edQueue_Enqueue_Benchmark-report-github.md | 61 ---- ...Enqueue_Dequeue_Benchmark-report-github.md | 37 --- .../xunit.runner.json | 3 - .../BackwardsCompatibilityTests.cs | 20 -- .../BoundedConcurrentQueueTests.cs | 26 -- .../PeriodicBatchingSinkTests.cs | 8 +- .../PortableTimerTests.cs | 146 --------- .../Support/LegacyDisposeTrackingSink.cs | 18 -- 24 files changed, 67 insertions(+), 1181 deletions(-) create mode 100644 .DS_Store delete mode 100644 src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/BoundedConcurrentQueue.cs delete mode 100644 src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PortableTimer.cs create mode 100644 test/.DS_Store delete mode 100644 test/Serilog.Sinks.PeriodicBatching.PerformanceTests/BoundedQueue_Enqueue_Benchmark.cs delete mode 100644 test/Serilog.Sinks.PeriodicBatching.PerformanceTests/BoundedQueue_Enqueue_Dequeue_Benchmark.cs delete mode 100644 test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Runner.cs delete mode 100644 test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Serilog.Sinks.PeriodicBatching.PerformanceTests.csproj delete mode 100644 test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Support/Some.cs delete mode 100644 test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Support/SynchronizedQueue.cs delete mode 100644 test/Serilog.Sinks.PeriodicBatching.PerformanceTests/results/net452/BoundedQueue_Enqueue_Benchmark-report-github.md delete mode 100644 test/Serilog.Sinks.PeriodicBatching.PerformanceTests/results/net452/BoundedQueue_Enqueue_Dequeue_Benchmark-report-github.md delete mode 100644 test/Serilog.Sinks.PeriodicBatching.PerformanceTests/results/netcoreapp1.1/BoundedQueue_Enqueue_Benchmark-report-github.md delete mode 100644 test/Serilog.Sinks.PeriodicBatching.PerformanceTests/results/netcoreapp1.1/BoundedQueue_Enqueue_Dequeue_Benchmark-report-github.md delete mode 100644 test/Serilog.Sinks.PeriodicBatching.PerformanceTests/xunit.runner.json delete mode 100644 test/Serilog.Sinks.PeriodicBatching.Tests/BackwardsCompatibilityTests.cs delete mode 100644 test/Serilog.Sinks.PeriodicBatching.Tests/BoundedConcurrentQueueTests.cs delete mode 100644 test/Serilog.Sinks.PeriodicBatching.Tests/PortableTimerTests.cs delete mode 100644 test/Serilog.Sinks.PeriodicBatching.Tests/Support/LegacyDisposeTrackingSink.cs diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..a2eb533abf551d042629249e6fe5fb378b3b5ee0 GIT binary patch literal 6148 zcmeH~u?oUK42Bc!Ah>jNyu}Cb4Gz&K=nFU~E>c0O^F6wMazU^xC+1b{XuyJ79K1T}(S!u1*@b}wNMJ-@TJzTK|1JE}{6A`8N&+PC zX9Tp_belC^D(=>|*R%RAs + diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/BatchedConnectionStatus.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/BatchedConnectionStatus.cs index 50f1133..3298ca6 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/BatchedConnectionStatus.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/BatchedConnectionStatus.cs @@ -25,11 +25,6 @@ namespace Serilog.Sinks.PeriodicBatching; /// also overproduction (the second, queue-dropping response). In combination these should provide a /// reasonable delivery effort but ultimately protect the sender from memory exhaustion. /// -/// -/// Currently used only by , but may -/// provide the basis for a "smart" exponential backoff timer. There are other factors to consider -/// including the desire to send batches "when full" rather than continuing to buffer, and so-on. -/// class BatchedConnectionStatus { static readonly TimeSpan MinimumBackoffPeriod = TimeSpan.FromSeconds(5); diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/BoundedConcurrentQueue.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/BoundedConcurrentQueue.cs deleted file mode 100644 index 8028490..0000000 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/BoundedConcurrentQueue.cs +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright © Serilog Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System.Collections.Concurrent; -using System.Diagnostics.CodeAnalysis; - -namespace Serilog.Sinks.PeriodicBatching; - -class BoundedConcurrentQueue -{ - const int Unbounded = -1; - - readonly ConcurrentQueue _queue = new(); - readonly int _queueLimit; - - int _counter; - - public BoundedConcurrentQueue(int? queueLimit = null) - { - if (queueLimit is <= 0) - throw new ArgumentOutOfRangeException(nameof(queueLimit), "Queue limit must be positive, or `null` to indicate unbounded."); - - _queueLimit = queueLimit ?? Unbounded; - } - - public int Count => _queue.Count; - - public bool TryDequeue([MaybeNullWhen(false)] out T item) - { - if (_queueLimit == Unbounded) - return _queue.TryDequeue(out item); - - var result = false; - try - { } - finally // prevent state corrupt while aborting - { - if (_queue.TryDequeue(out item)) - { - Interlocked.Decrement(ref _counter); - result = true; - } - } - - return result; - } - - public bool TryEnqueue(T item) - { - if (_queueLimit == Unbounded) - { - _queue.Enqueue(item); - return true; - } - - var result = true; - try - { } - finally - { - if (Interlocked.Increment(ref _counter) <= _queueLimit) - { - _queue.Enqueue(item); - } - else - { - Interlocked.Decrement(ref _counter); - result = false; - } - } - - return result; - } -} \ No newline at end of file diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs index de5c419..01534b4 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +using System.Threading.Channels; using Serilog.Core; using Serilog.Debugging; using Serilog.Events; @@ -23,31 +24,26 @@ namespace Serilog.Sinks.PeriodicBatching; /// /// Buffers log events into batches for background flushing. /// -public class PeriodicBatchingSink : ILogEventSink, IDisposable, IBatchedLogEventSink +public sealed class PeriodicBatchingSink : ILogEventSink, IDisposable #if FEATURE_ASYNCDISPOSABLE , IAsyncDisposable #endif { - /// - /// Constant used with legacy constructor to indicate that the internal queue shouldn't be limited. - /// - [Obsolete("Implement `IBatchedLogEventSink` and use the `PeriodicBatchingSinkOptions` constructor.")] - public const int NoQueueLimit = -1; + // Buffers events from the write- to the read side. + readonly Channel _queue; + // Used by the write side to signal shutdown. + readonly object _stateLock = new(); + readonly CancellationTokenSource _unloading = new(); + readonly Task _loop; + + // Used only by the read side readonly IBatchedLogEventSink _batchedLogEventSink; readonly int _batchSizeLimit; readonly bool _eagerlyEmitFirstEvent; - readonly BoundedConcurrentQueue _queue; readonly BatchedConnectionStatus _status; readonly Queue _waitingBatch = new(); - readonly object _stateLock = new(); - - readonly PortableTimer _timer; - - bool _unloading; - bool _started; - /// /// Construct a . /// @@ -56,71 +52,22 @@ public class PeriodicBatchingSink : ILogEventSink, IDisposable, IBatchedLogEvent /// it will dispose this object if possible. /// Options controlling behavior of the sink. public PeriodicBatchingSink(IBatchedLogEventSink batchedSink, PeriodicBatchingSinkOptions options) - : this(options) - { - _batchedLogEventSink = batchedSink ?? throw new ArgumentNullException(nameof(batchedSink)); - } - - /// - /// Construct a . New code should avoid subclassing - /// and use - /// - /// instead. - /// - /// The maximum number of events to include in a single batch. - /// The time to wait between checking for event batches. - [Obsolete("Implement `IBatchedLogEventSink` and use the `PeriodicBatchingSinkOptions` constructor.")] - protected PeriodicBatchingSink(int batchSizeLimit, TimeSpan period) - : this(new() - { - BatchSizeLimit = batchSizeLimit, - Period = period, - EagerlyEmitFirstEvent = true, - QueueLimit = null - }) - { - _batchedLogEventSink = this; - } - - /// - /// Construct a . New code should avoid subclassing - /// and use - /// - /// instead. - /// - /// The maximum number of events to include in a single batch. - /// The time to wait between checking for event batches. - /// Maximum number of events in the queue - use for an unbounded queue. - [Obsolete("Implement `IBatchedLogEventSink` and use the `PeriodicBatchingSinkOptions` constructor.")] - protected PeriodicBatchingSink(int batchSizeLimit, TimeSpan period, int queueLimit) - : this(new() - { - BatchSizeLimit = batchSizeLimit, - Period = period, - EagerlyEmitFirstEvent = true, - QueueLimit = queueLimit == NoQueueLimit ? null : queueLimit - }) - { - _batchedLogEventSink = this; - } - - PeriodicBatchingSink(PeriodicBatchingSinkOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); - if (options.BatchSizeLimit <= 0) throw new ArgumentOutOfRangeException(nameof(options), "The batch size limit must be greater than zero."); if (options.Period <= TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(options), "The period must be greater than zero."); + _batchedLogEventSink = batchedSink ?? throw new ArgumentNullException(nameof(batchedSink)); _batchSizeLimit = options.BatchSizeLimit; - _queue = new(options.QueueLimit); - _status = new(options.Period); + _queue = options.QueueLimit is { } limit + ? Channel.CreateBounded(new BoundedChannelOptions(limit) { SingleReader = true }) + : Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + _status = new BatchedConnectionStatus(options.Period); _eagerlyEmitFirstEvent = options.EagerlyEmitFirstEvent; - _timer = new(_ => OnTick()); - // Initialized by externally-callable constructors. - _batchedLogEventSink = null!; + _loop = Task.Factory.StartNew(LoopAsync, _unloading.Token, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); } /// @@ -129,32 +76,16 @@ protected PeriodicBatchingSink(int batchSizeLimit, TimeSpan period, int queueLim /// 2 public void Dispose() { - Dispose(true); - GC.SuppressFinalize(this); - } - - /// - /// Free resources held by the sink. - /// - /// If true, called because the object is being disposed; if false, - /// the object is being disposed from the finalizer. - protected virtual void Dispose(bool disposing) - { - if (!disposing) return; - lock (_stateLock) { - if (_unloading) - return; - - _unloading = true; + if (!_unloading.IsCancellationRequested) + { + _queue.Writer.Complete(); + _unloading.Cancel(); + } } - _timer.Dispose(); - - // This is the place where SynchronizationContext.Current is unknown and can be != null - // so we prevent possible deadlocks here for sync-over-async downstream implementations. - TaskUtil.ResetSyncContextAndWait(OnTick); + _loop.Wait(); (_batchedLogEventSink as IDisposable)?.Dispose(); } @@ -165,112 +96,78 @@ public async ValueTask DisposeAsync() { lock (_stateLock) { - if (_unloading) - return; - - _unloading = true; + if (!_unloading.IsCancellationRequested) + { + _queue.Writer.Complete(); + _unloading.Cancel(); + } } - _timer.Dispose(); - - await OnTick().ConfigureAwait(false); - - if (ReferenceEquals(_batchedLogEventSink, this)) - { - // The sink is being used in the obsolete inheritance-based mode. Old sinks won't - // override something like `DisposeAsyncCore()`; we just forward to the synchronous - // `Dispose()` method to ensure whatever cleanup they do still occurs. - Dispose(true); - return; - } + await _loop.ConfigureAwait(false); if (_batchedLogEventSink is IAsyncDisposable asyncDisposable) await asyncDisposable.DisposeAsync().ConfigureAwait(false); else (_batchedLogEventSink as IDisposable)?.Dispose(); - - GC.SuppressFinalize(this); } #endif - /// - /// Emit a batch of log events, running to completion synchronously. - /// - /// The events to emit. - /// Override either or , - /// not both. - protected virtual void EmitBatch(IEnumerable events) + async Task LoopAsync() { - } - - /// - /// Emit a batch of log events, running asynchronously. - /// - /// The events to emit. - /// Override either or , - /// not both. -#pragma warning disable 1998 - protected virtual async Task EmitBatchAsync(IEnumerable events) -#pragma warning restore 1998 - { - // ReSharper disable once MethodHasAsyncOverload - EmitBatch(events); - } - - async Task OnTick() - { - try + var isEagerBatch = _eagerlyEmitFirstEvent; + do { - bool batchWasFull; + using var fillBatch = Task.Delay(_status.NextInterval, _unloading.Token); + do { while (_waitingBatch.Count < _batchSizeLimit && - _queue.TryDequeue(out var next)) + !_unloading.IsCancellationRequested && + _queue.Reader.TryRead(out var next)) { _waitingBatch.Enqueue(next); } + } while ((_waitingBatch.Count < _batchSizeLimit || _waitingBatch.Count > 0 && isEagerBatch) && + !_unloading.IsCancellationRequested && + await Task.WhenAny(fillBatch, _queue.Reader.WaitToReadAsync(_unloading.Token).AsTask()) != fillBatch); + try + { if (_waitingBatch.Count == 0) { await _batchedLogEventSink.OnEmptyBatchAsync().ConfigureAwait(false); - return; + } + else + { + isEagerBatch = false; + await _batchedLogEventSink.EmitBatchAsync(_waitingBatch).ConfigureAwait(false); } - await _batchedLogEventSink.EmitBatchAsync(_waitingBatch).ConfigureAwait(false); - - batchWasFull = _waitingBatch.Count >= _batchSizeLimit; _waitingBatch.Clear(); _status.MarkSuccess(); } - while (batchWasFull); // Otherwise, allow the period to elapse - } - catch (Exception ex) - { - SelfLog.WriteLine("Exception while emitting periodic batch from {0}: {1}", this, ex); - _status.MarkFailure(); - } - finally - { - if (_status.ShouldDropBatch) - _waitingBatch.Clear(); - - if (_status.ShouldDropQueue) + catch (Exception ex) { - while (_queue.TryDequeue(out _)) { } + SelfLog.WriteLine("Exception while emitting periodic batch from {0}: {1}", this, ex); + _status.MarkFailure(); } - - lock (_stateLock) + finally { - if (!_unloading) - SetTimer(_status.NextInterval); + if (_status.ShouldDropBatch) + _waitingBatch.Clear(); + + if (_status.ShouldDropQueue) + { + // This is not ideal; the goal is to reduce memory pressure on the client if + // the server is offline for extended periods. May be worth reviewing and abandoning + // this. + while (_queue.Reader.TryRead(out _) && !_unloading.IsCancellationRequested) { } + } } } + while (!_unloading.IsCancellationRequested); } - void SetTimer(TimeSpan interval) - { - _timer.Start(interval); - } /// /// Emit the provided log event to the sink. If the sink is being disposed or @@ -287,74 +184,9 @@ public void Emit(LogEvent logEvent) { if (logEvent == null) throw new ArgumentNullException(nameof(logEvent)); - if (_unloading) + if (_unloading.IsCancellationRequested) return; - if (!_started) - { - lock (_stateLock) - { - if (_unloading) return; - if (!_started) - { - _queue.TryEnqueue(logEvent); - _started = true; - - if (_eagerlyEmitFirstEvent) - { - // Special handling to try to get the first event across as quickly - // as possible to show we're alive! - SetTimer(TimeSpan.Zero); - } - else - { - SetTimer(_status.NextInterval); - } - - return; - } - } - } - - _queue.TryEnqueue(logEvent); - } - - /// - /// Determine whether a queued log event should be included in the batch. If - /// an override returns false, the event will be dropped. - /// - /// An event to test for inclusion. - /// True if the event should be included in the batch; otherwise, false. - // ReSharper disable once UnusedParameter.Global - protected virtual bool CanInclude(LogEvent logEvent) - { - return true; - } - - /// - /// Allows derived sinks to perform periodic work without requiring additional threads - /// or timers (thus avoiding additional flush/shut-down complexity). - /// - /// Override either or , - /// not both. - protected virtual void OnEmptyBatch() - { - } - - /// - /// Allows derived sinks to perform periodic work without requiring additional threads - /// or timers (thus avoiding additional flush/shut-down complexity). - /// - /// Override either or , - /// not both. -#pragma warning disable 1998 - protected virtual async Task OnEmptyBatchAsync() -#pragma warning restore 1998 - { - // ReSharper disable once MethodHasAsyncOverload - OnEmptyBatch(); + _queue.Writer.TryWrite(logEvent); } - - Task IBatchedLogEventSink.EmitBatchAsync(IEnumerable batch) => EmitBatchAsync(batch); - Task IBatchedLogEventSink.OnEmptyBatchAsync() => OnEmptyBatchAsync(); } \ No newline at end of file diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PortableTimer.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PortableTimer.cs deleted file mode 100644 index 5731581..0000000 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PortableTimer.cs +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright © Serilog Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using Serilog.Debugging; - -namespace Serilog.Sinks.PeriodicBatching; - -class PortableTimer : IDisposable -{ - readonly object _stateLock = new(); - - readonly Func _onTick; - readonly CancellationTokenSource _cancel = new(); - readonly Timer _timer; - - bool _running; - bool _disposed; - - public PortableTimer(Func onTick) - { - _onTick = onTick ?? throw new ArgumentNullException(nameof(onTick)); - - using (ExecutionContext.SuppressFlow()) - _timer = new(_ => OnTick(), null, Timeout.Infinite, Timeout.Infinite); - } - - public void Start(TimeSpan interval) - { - if (interval < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(interval)); - - lock (_stateLock) - { - if (_disposed) - throw new ObjectDisposedException(nameof(PortableTimer)); - - _timer.Change(interval, Timeout.InfiniteTimeSpan); - } - } - - async void OnTick() - { - try - { - lock (_stateLock) - { - if (_disposed) - { - return; - } - - // There's a little bit of raciness here, but it's needed to support the - // current API, which allows the tick handler to reenter and set the next interval. - - if (_running) - { - Monitor.Wait(_stateLock); - - if (_disposed) - { - return; - } - } - - _running = true; - } - - if (!_cancel.Token.IsCancellationRequested) - { - await _onTick(_cancel.Token); - } - } - catch (OperationCanceledException tcx) - { - SelfLog.WriteLine("The timer was canceled during invocation: {0}", tcx); - } - finally - { - lock (_stateLock) - { - _running = false; - Monitor.PulseAll(_stateLock); - } - } - } - - public void Dispose() - { - _cancel.Cancel(); - - lock (_stateLock) - { - if (_disposed) - { - return; - } - - while (_running) - { - Monitor.Wait(_stateLock); - } - - _timer.Dispose(); - _disposed = true; - } - } -} diff --git a/test/.DS_Store b/test/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..5efd228bfa2b3231208ff19ce8b8f9c89b90a095 GIT binary patch literal 6148 zcmeHKyH3ME5S)b+k!Vs<-WP!Q1Ir3MlKB7-2?3GVk`2`E_-$q%phONW1?)<@5h9g)Y|&wZ3p9A1WFL2E?IrWq=vlL0-{J)W z_qvH#hZQp(P~#r2#3!y=JIym5d3KK-qb;Lr+)@W-4)0ChZo78lZ+ZMR(wht<1Ia)# zkPQ4A2DJB5=4Xy+lYwL)8Te*E?}tKFtO7?zyEmEh#_81e~G**aCG!?NDLnmCreByVyAQeV(E~|F>NxC3>-3W>dS@p|7+?G^Z#K{ zu9AUd;7>6ivt_fa`AN}RCm*N1wovb=U&35Vr?FN{v{uZ8w&L@>yrR#HSAnCWmD6wK P#QYIZUQ&{Q-(cVqiB%%M literal 0 HcmV?d00001 diff --git a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/BoundedQueue_Enqueue_Benchmark.cs b/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/BoundedQueue_Enqueue_Benchmark.cs deleted file mode 100644 index d4bec14..0000000 --- a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/BoundedQueue_Enqueue_Benchmark.cs +++ /dev/null @@ -1,73 +0,0 @@ -using BenchmarkDotNet.Attributes; -using Serilog.Events; -using Serilog.Sinks.PeriodicBatching.PerformanceTests.Support; -using Serilog.Tests.Support; -using System.Collections.Concurrent; - -namespace Serilog.Sinks.PeriodicBatching.PerformanceTests; - -public class BoundedQueue_Enqueue_Benchmark -{ - const int NON_BOUNDED = -1; - - [Params(50, 100)] - public int Items { get; set; } - - [Params(NON_BOUNDED, 50, 100)] - public int Limit { get; set; } - - [Params(1, -1)] - public int ConcurrencyLevel { get; set; } - - readonly LogEvent _logEvent = Some.LogEvent(); - - Func> _concurrentQueueFactory = null!; - Func> _boundedConcurrentQueueFactory = null!; - Func> _blockingCollectionFactory = null!; - Func> _synchronizedQueueFactory = null!; - - [Setup] - public void Setup() - { - _concurrentQueueFactory = () => new(); - _boundedConcurrentQueueFactory = Limit != NON_BOUNDED ? () => new(Limit) - : new Func>(() => new()); - _blockingCollectionFactory = Limit != NON_BOUNDED ? () => new(Limit) - : new Func>(() => new()); - _synchronizedQueueFactory = Limit != NON_BOUNDED ? () => new(Limit) - : new Func>(() => new()); - } - - [Benchmark(Baseline = true)] - public void ConcurrentQueue() - { - var queue = _concurrentQueueFactory(); - EnqueueItems(evt => queue.Enqueue(evt)); - } - - [Benchmark] - public void BoundedConcurrentQueue() - { - var queue = _boundedConcurrentQueueFactory(); - EnqueueItems(evt => queue.TryEnqueue(evt)); - } - - [Benchmark] - public void BlockingCollection() - { - var queue = _blockingCollectionFactory(); - EnqueueItems(evt => queue.TryAdd(evt)); - } - - [Benchmark] - public void SynchronizedQueue() - { - var queue = _synchronizedQueueFactory(); - EnqueueItems(evt => queue.TryEnqueue(evt)); - } - - void EnqueueItems(Action enqueueAction) - { - Parallel.For(0, Items, new() { MaxDegreeOfParallelism = ConcurrencyLevel }, _ => enqueueAction(_logEvent)); - } -} \ No newline at end of file diff --git a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/BoundedQueue_Enqueue_Dequeue_Benchmark.cs b/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/BoundedQueue_Enqueue_Dequeue_Benchmark.cs deleted file mode 100644 index 218b8bd..0000000 --- a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/BoundedQueue_Enqueue_Dequeue_Benchmark.cs +++ /dev/null @@ -1,72 +0,0 @@ -using BenchmarkDotNet.Attributes; -using Serilog.Events; -using Serilog.Sinks.PeriodicBatching.PerformanceTests.Support; -using Serilog.Tests.Support; -using System.Collections.Concurrent; - -namespace Serilog.Sinks.PeriodicBatching.PerformanceTests; - -public class BoundedQueue_Enqueue_Dequeue_Benchmark -{ - const int NON_BOUNDED = -1; - - [Params(50, 100)] - public int Items { get; set; } - - [Params(NON_BOUNDED, 50, 100)] - public int Limit { get; set; } - - readonly LogEvent _logEvent = Some.LogEvent(); - - Func> _concurrentQueueFactory = null!; - Func> _boundedConcurrentQueueFactory = null!; - Func> _blockingCollectionFactory = null!; - Func> _synchronizedQueueFactory = null!; - - [Setup] - public void Setup() - { - _concurrentQueueFactory = () => new(); - _boundedConcurrentQueueFactory = Limit != NON_BOUNDED ? () => new(Limit) - : new Func>(() => new()); - _blockingCollectionFactory = Limit != NON_BOUNDED ? () => new(Limit) - : new Func>(() => new()); - _synchronizedQueueFactory = Limit != NON_BOUNDED ? () => new(Limit) - : new Func>(() => new()); - } - - [Benchmark(Baseline = true)] - public void ConcurrentQueue() - { - var queue = _concurrentQueueFactory(); - EnqueueDequeueItems(evt => queue.Enqueue(evt), evt => queue.TryDequeue(out evt)); - } - - [Benchmark] - public void BoundedConcurrentQueue() - { - var queue = _boundedConcurrentQueueFactory(); - EnqueueDequeueItems(evt => queue.TryEnqueue(evt), evt => queue.TryDequeue(out evt)); - } - - [Benchmark] - public void BlockingCollection() - { - var queue = _blockingCollectionFactory(); - EnqueueDequeueItems(evt => queue.TryAdd(evt), evt => queue.TryTake(out evt)); - } - - [Benchmark] - public void SynchronizedQueue() - { - var queue = _synchronizedQueueFactory(); - EnqueueDequeueItems(evt => queue.TryEnqueue(evt), evt => queue.TryDequeue(out evt)); - } - - void EnqueueDequeueItems(Action enqueueAction, Action dequeueAction) - { - Parallel.Invoke( - () => Parallel.For(0, Items, _ => enqueueAction(_logEvent)), - () => { for (var i = 0; i < Items; i++) dequeueAction(_logEvent); }); - } -} \ No newline at end of file diff --git a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Runner.cs b/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Runner.cs deleted file mode 100644 index e77f3ec..0000000 --- a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Runner.cs +++ /dev/null @@ -1,19 +0,0 @@ -using BenchmarkDotNet.Running; -using Xunit; - -namespace Serilog.Sinks.PeriodicBatching.PerformanceTests; - -public class Runner -{ - [Fact] - public void BoundedQueue_Enqueue() - { - BenchmarkRunner.Run(); - } - - [Fact] - public void BoundedQueue_Enqueue_Dequeue() - { - BenchmarkRunner.Run(); - } -} \ No newline at end of file diff --git a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Serilog.Sinks.PeriodicBatching.PerformanceTests.csproj b/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Serilog.Sinks.PeriodicBatching.PerformanceTests.csproj deleted file mode 100644 index e1605b7..0000000 --- a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Serilog.Sinks.PeriodicBatching.PerformanceTests.csproj +++ /dev/null @@ -1,30 +0,0 @@ - - - - net472 - $(TargetFrameworks);net8.0 - ../../assets/Serilog.snk - true - true - false - - - - - - - - - - - - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - - - - - diff --git a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Support/Some.cs b/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Support/Some.cs deleted file mode 100644 index 7586453..0000000 --- a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Support/Some.cs +++ /dev/null @@ -1,82 +0,0 @@ -using Serilog.Events; -using Serilog.Parsing; - -namespace Serilog.Tests.Support; - -static class Some -{ - static int Counter; - - public static int Int() - { - return Interlocked.Increment(ref Counter); - } - - public static decimal Decimal() - { - return Int() + 0.123m; - } - - public static string String(string? tag = null) - { - return (tag ?? "") + "__" + Int(); - } - - public static TimeSpan TimeSpan() - { - return System.TimeSpan.FromMinutes(Int()); - } - - public static DateTime Instant() - { - return new DateTime(2012, 10, 28) + TimeSpan(); - } - - public static DateTimeOffset OffsetInstant() - { - return new(Instant()); - } - - public static LogEvent LogEvent(DateTimeOffset? timestamp = null, LogEventLevel level = LogEventLevel.Information) - { - return new(timestamp ?? OffsetInstant(), level, - null, MessageTemplate(), Enumerable.Empty()); - } - - public static LogEvent InformationEvent(DateTimeOffset? timestamp = null) - { - return LogEvent(timestamp); - } - - public static LogEvent DebugEvent(DateTimeOffset? timestamp = null) - { - return LogEvent(timestamp, LogEventLevel.Debug); - } - - public static LogEventProperty LogEventProperty() - { - return new(String(), new ScalarValue(Int())); - } - - public static string NonexistentTempFilePath() - { - return Path.Combine(Path.GetTempPath(), Guid.NewGuid() + ".txt"); - } - - public static string TempFilePath() - { - return Path.GetTempFileName(); - } - - public static string TempFolderPath() - { - var dir = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString()); - Directory.CreateDirectory(dir); - return dir; - } - - public static MessageTemplate MessageTemplate() - { - return new MessageTemplateParser().Parse(String()); - } -} \ No newline at end of file diff --git a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Support/SynchronizedQueue.cs b/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Support/SynchronizedQueue.cs deleted file mode 100644 index 06d10c2..0000000 --- a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/Support/SynchronizedQueue.cs +++ /dev/null @@ -1,50 +0,0 @@ -using System.Diagnostics.CodeAnalysis; - -namespace Serilog.Sinks.PeriodicBatching.PerformanceTests.Support; - -class SynchronizedQueue : Queue -{ - const int NON_BOUNDED = -1; - - readonly int _queueLimit; - - public SynchronizedQueue() - { - _queueLimit = NON_BOUNDED; - } - - public SynchronizedQueue(int queueLimit) - { - _queueLimit = queueLimit; - } - - public new bool TryDequeue(out T item) - { - item = default!; - - lock (this) - { - if (base.Count > 0) - { - item = base.Dequeue(); - return true; - } - - return false; - } - } - - public bool TryEnqueue(T item) - { - lock (this) - { - if (base.Count < _queueLimit || _queueLimit == NON_BOUNDED) - { - base.Enqueue(item); - return true; - } - - return false; - } - } -} \ No newline at end of file diff --git a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/results/net452/BoundedQueue_Enqueue_Benchmark-report-github.md b/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/results/net452/BoundedQueue_Enqueue_Benchmark-report-github.md deleted file mode 100644 index 73113a3..0000000 --- a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/results/net452/BoundedQueue_Enqueue_Benchmark-report-github.md +++ /dev/null @@ -1,60 +0,0 @@ -``` ini - -BenchmarkDotNet=v0.10.3.0, OS=Microsoft Windows NT 6.2.9200.0 -Processor=Intel(R) Core(TM) i7-4790 CPU 3.60GHz, ProcessorCount=8 -Frequency=3507499 Hz, Resolution=285.1034 ns, Timer=TSC - [Host] : Clr 4.0.30319.42000, 32bit LegacyJIT-v4.6.1586.0 - DefaultJob : Clr 4.0.30319.42000, 32bit LegacyJIT-v4.6.1586.0 - - -``` - | Method | Items | Limit | ConcurrencyLevel | Mean | StdErr | StdDev | Scaled | Scaled-StdDev | - |----------------------- |------ |------ |----------------- |-------------- |---------- |---------- |------- |-------------- | - | **ConcurrentQueue** | **50** | **-1** | **-1** | **7.2804 us** | **0.0103 us** | **0.0401 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | -1 | -1 | 7.2908 us | 0.0388 us | 0.1503 us | 1.00 | 0.02 | - | BlockingCollection | 50 | -1 | -1 | 17.7474 us | 0.0848 us | 0.3059 us | 2.44 | 0.04 | - | SynchronizedQueue | 50 | -1 | -1 | 12.1052 us | 0.0216 us | 0.0837 us | 1.66 | 0.01 | - | **ConcurrentQueue** | **50** | **-1** | **1** | **3.4143 us** | **0.0338 us** | **0.1551 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | -1 | 1 | 3.5365 us | 0.0352 us | 0.1409 us | 1.04 | 0.06 | - | BlockingCollection | 50 | -1 | 1 | 6.8568 us | 0.0684 us | 0.3683 us | 2.01 | 0.14 | - | SynchronizedQueue | 50 | -1 | 1 | 4.4259 us | 0.0376 us | 0.1455 us | 1.30 | 0.07 | - | **ConcurrentQueue** | **50** | **50** | **-1** | **7.2713 us** | **0.0079 us** | **0.0297 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | 50 | -1 | 8.4583 us | 0.0461 us | 0.1787 us | 1.16 | 0.02 | - | BlockingCollection | 50 | 50 | -1 | 23.0957 us | 0.1349 us | 0.5224 us | 3.18 | 0.07 | - | SynchronizedQueue | 50 | 50 | -1 | 12.0785 us | 0.0264 us | 0.1023 us | 1.66 | 0.02 | - | **ConcurrentQueue** | **50** | **50** | **1** | **3.3591 us** | **0.0283 us** | **0.1019 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | 50 | 1 | 3.8662 us | 0.0378 us | 0.1730 us | 1.15 | 0.06 | - | BlockingCollection | 50 | 50 | 1 | 9.2685 us | 0.0939 us | 0.8237 us | 2.76 | 0.26 | - | SynchronizedQueue | 50 | 50 | 1 | 4.3973 us | 0.0456 us | 0.3288 us | 1.31 | 0.10 | - | **ConcurrentQueue** | **50** | **100** | **-1** | **7.2897 us** | **0.0112 us** | **0.0434 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | 100 | -1 | 8.4600 us | 0.0271 us | 0.1049 us | 1.16 | 0.02 | - | BlockingCollection | 50 | 100 | -1 | 22.9237 us | 0.1219 us | 0.4723 us | 3.14 | 0.07 | - | SynchronizedQueue | 50 | 100 | -1 | 12.0344 us | 0.0149 us | 0.0578 us | 1.65 | 0.01 | - | **ConcurrentQueue** | **50** | **100** | **1** | **3.4225 us** | **0.0333 us** | **0.1596 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | 100 | 1 | 4.0642 us | 0.0207 us | 0.0716 us | 1.19 | 0.06 | - | BlockingCollection | 50 | 100 | 1 | 9.2182 us | 0.0922 us | 0.8194 us | 2.70 | 0.27 | - | SynchronizedQueue | 50 | 100 | 1 | 4.3935 us | 0.0434 us | 0.2713 us | 1.29 | 0.10 | - | **ConcurrentQueue** | **100** | **-1** | **-1** | **10.4820 us** | **0.0080 us** | **0.0301 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | -1 | -1 | 11.2203 us | 0.0277 us | 0.1075 us | 1.07 | 0.01 | - | BlockingCollection | 100 | -1 | -1 | 36.4722 us | 0.2981 us | 1.1544 us | 3.48 | 0.11 | - | SynchronizedQueue | 100 | -1 | -1 | 22.3040 us | 0.1371 us | 0.5130 us | 2.13 | 0.05 | - | **ConcurrentQueue** | **100** | **-1** | **1** | **4.7591 us** | **0.0396 us** | **0.1534 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | -1 | 1 | 5.0724 us | 0.0504 us | 0.2715 us | 1.07 | 0.07 | - | BlockingCollection | 100 | -1 | 1 | 10.6618 us | 0.1146 us | 0.7856 us | 2.24 | 0.18 | - | SynchronizedQueue | 100 | -1 | 1 | 6.5492 us | 0.0650 us | 0.5438 us | 1.38 | 0.12 | - | **ConcurrentQueue** | **100** | **50** | **-1** | **10.5168 us** | **0.0088 us** | **0.0329 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | 50 | -1 | 11.8475 us | 0.0321 us | 0.1201 us | 1.13 | 0.01 | - | BlockingCollection | 100 | 50 | -1 | 368.4878 us | 0.3846 us | 1.4392 us | 35.04 | 0.17 | - | SynchronizedQueue | 100 | 50 | -1 | 19.3906 us | 0.1377 us | 0.5333 us | 1.84 | 0.05 | - | **ConcurrentQueue** | **100** | **50** | **1** | **5.8222 us** | **0.0265 us** | **0.1243 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | 50 | 1 | 5.0835 us | 0.0500 us | 0.2447 us | 0.87 | 0.05 | - | BlockingCollection | 100 | 50 | 1 | 1,418.6158 us | 1.9614 us | 7.5965 us | 243.77 | 5.69 | - | SynchronizedQueue | 100 | 50 | 1 | 6.1161 us | 0.0601 us | 0.4073 us | 1.05 | 0.07 | - | **ConcurrentQueue** | **100** | **100** | **-1** | **10.4971 us** | **0.0164 us** | **0.0633 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | 100 | -1 | 12.6199 us | 0.0405 us | 0.1569 us | 1.20 | 0.02 | - | BlockingCollection | 100 | 100 | -1 | 51.6912 us | 0.5041 us | 2.5203 us | 4.92 | 0.24 | - | SynchronizedQueue | 100 | 100 | -1 | 23.4763 us | 0.1860 us | 0.7204 us | 2.24 | 0.07 | - | **ConcurrentQueue** | **100** | **100** | **1** | **4.8530 us** | **0.0388 us** | **0.1503 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | 100 | 1 | 5.8461 us | 0.0580 us | 0.4141 us | 1.21 | 0.09 | - | BlockingCollection | 100 | 100 | 1 | 14.9473 us | 0.1493 us | 1.3015 us | 3.08 | 0.28 | - | SynchronizedQueue | 100 | 100 | 1 | 6.5940 us | 0.0655 us | 0.5360 us | 1.36 | 0.12 | diff --git a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/results/net452/BoundedQueue_Enqueue_Dequeue_Benchmark-report-github.md b/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/results/net452/BoundedQueue_Enqueue_Dequeue_Benchmark-report-github.md deleted file mode 100644 index 46ec454..0000000 --- a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/results/net452/BoundedQueue_Enqueue_Dequeue_Benchmark-report-github.md +++ /dev/null @@ -1,36 +0,0 @@ -``` ini - -BenchmarkDotNet=v0.10.3.0, OS=Microsoft Windows NT 6.2.9200.0 -Processor=Intel(R) Core(TM) i7-4790 CPU 3.60GHz, ProcessorCount=8 -Frequency=3507499 Hz, Resolution=285.1034 ns, Timer=TSC - [Host] : Clr 4.0.30319.42000, 32bit LegacyJIT-v4.6.1586.0 - DefaultJob : Clr 4.0.30319.42000, 32bit LegacyJIT-v4.6.1586.0 - - -``` - | Method | Items | Limit | Mean | StdErr | StdDev | Scaled | Scaled-StdDev | - |----------------------- |------ |------ |------------ |---------- |---------- |------- |-------------- | - | **ConcurrentQueue** | **50** | **-1** | **10.3018 us** | **0.0261 us** | **0.0940 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | -1 | 11.0476 us | 0.0208 us | 0.0804 us | 1.07 | 0.01 | - | BlockingCollection | 50 | -1 | 37.3304 us | 0.1895 us | 0.7091 us | 3.62 | 0.07 | - | SynchronizedQueue | 50 | -1 | 20.8889 us | 0.0861 us | 0.3223 us | 2.03 | 0.03 | - | **ConcurrentQueue** | **50** | **50** | **10.3639 us** | **0.0167 us** | **0.0648 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | 50 | 12.5340 us | 0.0182 us | 0.0706 us | 1.21 | 0.01 | - | BlockingCollection | 50 | 50 | 52.3100 us | 0.5042 us | 1.8865 us | 5.05 | 0.18 | - | SynchronizedQueue | 50 | 50 | 20.8874 us | 0.1339 us | 0.5186 us | 2.02 | 0.05 | - | **ConcurrentQueue** | **50** | **100** | **10.2835 us** | **0.0262 us** | **0.0981 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | 100 | 12.5809 us | 0.0318 us | 0.1230 us | 1.22 | 0.02 | - | BlockingCollection | 50 | 100 | 50.8533 us | 0.4826 us | 1.9898 us | 4.95 | 0.19 | - | SynchronizedQueue | 50 | 100 | 20.5994 us | 0.1159 us | 0.4338 us | 2.00 | 0.04 | - | **ConcurrentQueue** | **100** | **-1** | **14.7901 us** | **0.0334 us** | **0.1295 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | -1 | 15.3619 us | 0.0328 us | 0.1270 us | 1.04 | 0.01 | - | BlockingCollection | 100 | -1 | 67.4710 us | 0.3917 us | 1.4122 us | 4.56 | 0.10 | - | SynchronizedQueue | 100 | -1 | 41.1039 us | 0.4912 us | 2.0254 us | 2.78 | 0.13 | - | **ConcurrentQueue** | **100** | **50** | **14.8436 us** | **0.0276 us** | **0.0956 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | 50 | 17.3593 us | 0.0352 us | 0.1364 us | 1.17 | 0.01 | - | BlockingCollection | 100 | 50 | 173.0738 us | 1.0031 us | 3.7533 us | 11.66 | 0.25 | - | SynchronizedQueue | 100 | 50 | 36.4733 us | 0.2613 us | 1.0121 us | 2.46 | 0.07 | - | **ConcurrentQueue** | **100** | **100** | **14.8464 us** | **0.0231 us** | **0.0894 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | 100 | 17.4290 us | 0.0391 us | 0.1516 us | 1.17 | 0.01 | - | BlockingCollection | 100 | 100 | 91.5932 us | 0.9253 us | 4.3401 us | 6.17 | 0.29 | - | SynchronizedQueue | 100 | 100 | 40.4670 us | 0.3657 us | 1.3186 us | 2.73 | 0.09 | diff --git a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/results/netcoreapp1.1/BoundedQueue_Enqueue_Benchmark-report-github.md b/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/results/netcoreapp1.1/BoundedQueue_Enqueue_Benchmark-report-github.md deleted file mode 100644 index d82acc4..0000000 --- a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/results/netcoreapp1.1/BoundedQueue_Enqueue_Benchmark-report-github.md +++ /dev/null @@ -1,61 +0,0 @@ -``` ini - -BenchmarkDotNet=v0.10.3.0, OS=Microsoft Windows 10.0.14393 -Processor=Intel(R) Core(TM) i7-4790 CPU 3.60GHz, ProcessorCount=8 -Frequency=3507499 Hz, Resolution=285.1034 ns, Timer=TSC -dotnet cli version=1.0.0 - [Host] : .NET Core 4.6.25009.03, 64bit RyuJIT - DefaultJob : .NET Core 4.6.25009.03, 64bit RyuJIT - - -``` - | Method | Items | Limit | ConcurrencyLevel | Mean | StdErr | StdDev | Scaled | Scaled-StdDev | - |----------------------- |------ |------ |----------------- |----------- |---------- |---------- |------- |-------------- | - | **ConcurrentQueue** | **50** | **-1** | **-1** | **5.1811 us** | **0.0061 us** | **0.0237 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | -1 | -1 | 5.5466 us | 0.0111 us | 0.0431 us | 1.07 | 0.01 | - | BlockingCollection | 50 | -1 | -1 | 14.6056 us | 0.0977 us | 0.3783 us | 2.82 | 0.07 | - | SynchronizedQueue | 50 | -1 | -1 | 7.6025 us | 0.0130 us | 0.0505 us | 1.47 | 0.01 | - | **ConcurrentQueue** | **50** | **-1** | **1** | **1.9633 us** | **0.0018 us** | **0.0069 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | -1 | 1 | 2.1595 us | 0.0067 us | 0.0232 us | 1.10 | 0.01 | - | BlockingCollection | 50 | -1 | 1 | 4.4581 us | 0.0030 us | 0.0111 us | 2.27 | 0.01 | - | SynchronizedQueue | 50 | -1 | 1 | 2.4642 us | 0.0039 us | 0.0151 us | 1.26 | 0.01 | - | **ConcurrentQueue** | **50** | **50** | **-1** | **5.1484 us** | **0.0080 us** | **0.0312 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | 50 | -1 | 6.3047 us | 0.0042 us | 0.0153 us | 1.22 | 0.01 | - | BlockingCollection | 50 | 50 | -1 | 21.8181 us | 0.2179 us | 1.1734 us | 4.24 | 0.23 | - | SynchronizedQueue | 50 | 50 | -1 | 7.5549 us | 0.0140 us | 0.0542 us | 1.47 | 0.01 | - | **ConcurrentQueue** | **50** | **50** | **1** | **1.9670 us** | **0.0023 us** | **0.0090 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | 50 | 1 | 2.4802 us | 0.0022 us | 0.0083 us | 1.26 | 0.01 | - | BlockingCollection | 50 | 50 | 1 | 6.9471 us | 0.0075 us | 0.0281 us | 3.53 | 0.02 | - | SynchronizedQueue | 50 | 50 | 1 | 2.4790 us | 0.0012 us | 0.0048 us | 1.26 | 0.01 | - | **ConcurrentQueue** | **50** | **100** | **-1** | **5.5230 us** | **0.0074 us** | **0.0286 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | 100 | -1 | 6.3623 us | 0.0122 us | 0.0456 us | 1.15 | 0.01 | - | BlockingCollection | 50 | 100 | -1 | 21.1418 us | 0.1764 us | 0.6600 us | 3.83 | 0.12 | - | SynchronizedQueue | 50 | 100 | -1 | 7.5030 us | 0.0113 us | 0.0422 us | 1.36 | 0.01 | - | **ConcurrentQueue** | **50** | **100** | **1** | **1.9761 us** | **0.0030 us** | **0.0118 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | 100 | 1 | 2.4867 us | 0.0015 us | 0.0058 us | 1.26 | 0.01 | - | BlockingCollection | 50 | 100 | 1 | 6.9420 us | 0.0099 us | 0.0385 us | 3.51 | 0.03 | - | SynchronizedQueue | 50 | 100 | 1 | 2.4812 us | 0.0021 us | 0.0083 us | 1.26 | 0.01 | - | **ConcurrentQueue** | **100** | **-1** | **-1** | **7.6981 us** | **0.0179 us** | **0.0692 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | -1 | -1 | 8.0702 us | 0.0197 us | 0.0763 us | 1.05 | 0.01 | - | BlockingCollection | 100 | -1 | -1 | 32.4460 us | 0.4472 us | 1.6731 us | 4.22 | 0.21 | - | SynchronizedQueue | 100 | -1 | -1 | 12.7754 us | 0.0340 us | 0.1272 us | 1.66 | 0.02 | - | **ConcurrentQueue** | **100** | **-1** | **1** | **3.0726 us** | **0.0040 us** | **0.0154 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | -1 | 1 | 3.4069 us | 0.0024 us | 0.0095 us | 1.11 | 0.01 | - | BlockingCollection | 100 | -1 | 1 | 7.7433 us | 0.0116 us | 0.0435 us | 2.52 | 0.02 | - | SynchronizedQueue | 100 | -1 | 1 | 3.7620 us | 0.0022 us | 0.0079 us | 1.22 | 0.01 | - | **ConcurrentQueue** | **100** | **50** | **-1** | **7.8410 us** | **0.0306 us** | **0.1184 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | 50 | -1 | 8.4654 us | 0.0106 us | 0.0395 us | 1.08 | 0.02 | - | BlockingCollection | 100 | 50 | -1 | 28.9868 us | 0.2876 us | 1.0761 us | 3.70 | 0.14 | - | SynchronizedQueue | 100 | 50 | -1 | 11.0092 us | 0.0305 us | 0.1183 us | 1.40 | 0.03 | - | **ConcurrentQueue** | **100** | **50** | **1** | **3.0622 us** | **0.0029 us** | **0.0103 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | 50 | 1 | 3.3265 us | 0.0031 us | 0.0122 us | 1.09 | 0.01 | - | BlockingCollection | 100 | 50 | 1 | 8.1403 us | 0.0100 us | 0.0386 us | 2.66 | 0.01 | - | SynchronizedQueue | 100 | 50 | 1 | 3.4894 us | 0.0032 us | 0.0126 us | 1.14 | 0.01 | - | **ConcurrentQueue** | **100** | **100** | **-1** | **7.6139 us** | **0.0162 us** | **0.0607 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | 100 | -1 | 9.5550 us | 0.0392 us | 0.1517 us | 1.26 | 0.02 | - | BlockingCollection | 100 | 100 | -1 | 47.2014 us | 0.4680 us | 2.3402 us | 6.20 | 0.30 | - | SynchronizedQueue | 100 | 100 | -1 | 12.6909 us | 0.0638 us | 0.2471 us | 1.67 | 0.03 | - | **ConcurrentQueue** | **100** | **100** | **1** | **3.0812 us** | **0.0045 us** | **0.0174 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | 100 | 1 | 4.0404 us | 0.0046 us | 0.0179 us | 1.31 | 0.01 | - | BlockingCollection | 100 | 100 | 1 | 12.7325 us | 0.0157 us | 0.0607 us | 4.13 | 0.03 | - | SynchronizedQueue | 100 | 100 | 1 | 3.7607 us | 0.0032 us | 0.0119 us | 1.22 | 0.01 | diff --git a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/results/netcoreapp1.1/BoundedQueue_Enqueue_Dequeue_Benchmark-report-github.md b/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/results/netcoreapp1.1/BoundedQueue_Enqueue_Dequeue_Benchmark-report-github.md deleted file mode 100644 index bbedcad..0000000 --- a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/results/netcoreapp1.1/BoundedQueue_Enqueue_Dequeue_Benchmark-report-github.md +++ /dev/null @@ -1,37 +0,0 @@ -``` ini - -BenchmarkDotNet=v0.10.3.0, OS=Microsoft Windows 10.0.14393 -Processor=Intel(R) Core(TM) i7-4790 CPU 3.60GHz, ProcessorCount=8 -Frequency=3507499 Hz, Resolution=285.1034 ns, Timer=TSC -dotnet cli version=1.0.0 - [Host] : .NET Core 4.6.25009.03, 64bit RyuJIT - DefaultJob : .NET Core 4.6.25009.03, 64bit RyuJIT - - -``` - | Method | Items | Limit | Mean | StdDev | Scaled | Scaled-StdDev | - |----------------------- |------ |------ |----------- |---------- |------- |-------------- | - | **ConcurrentQueue** | **50** | **-1** | **7.2422 us** | **0.0720 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | -1 | 7.6356 us | 0.0919 us | 1.05 | 0.02 | - | BlockingCollection | 50 | -1 | 25.4929 us | 0.8222 us | 3.52 | 0.11 | - | SynchronizedQueue | 50 | -1 | 12.8276 us | 0.1913 us | 1.77 | 0.03 | - | **ConcurrentQueue** | **50** | **50** | **7.2297 us** | **0.0630 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | 50 | 8.7806 us | 0.0856 us | 1.21 | 0.02 | - | BlockingCollection | 50 | 50 | 35.1334 us | 0.7982 us | 4.86 | 0.11 | - | SynchronizedQueue | 50 | 50 | 12.8640 us | 0.2109 us | 1.78 | 0.03 | - | **ConcurrentQueue** | **50** | **100** | **7.3600 us** | **0.1077 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 50 | 100 | 8.8626 us | 0.0979 us | 1.20 | 0.02 | - | BlockingCollection | 50 | 100 | 36.0215 us | 0.8452 us | 4.90 | 0.13 | - | SynchronizedQueue | 50 | 100 | 12.9318 us | 0.1471 us | 1.76 | 0.03 | - | **ConcurrentQueue** | **100** | **-1** | **10.6257 us** | **0.1468 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | -1 | 11.1374 us | 0.1332 us | 1.05 | 0.02 | - | BlockingCollection | 100 | -1 | 49.6138 us | 1.3730 us | 4.67 | 0.14 | - | SynchronizedQueue | 100 | -1 | 23.9550 us | 0.6100 us | 2.25 | 0.06 | - | **ConcurrentQueue** | **100** | **50** | **10.6249 us** | **0.0920 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | 50 | 12.9409 us | 0.1868 us | 1.22 | 0.02 | - | BlockingCollection | 100 | 50 | 55.0996 us | 2.3733 us | 5.19 | 0.22 | - | SynchronizedQueue | 100 | 50 | 22.3050 us | 0.8641 us | 2.10 | 0.08 | - | **ConcurrentQueue** | **100** | **100** | **10.6000 us** | **0.1713 us** | **1.00** | **0.00** | - | BoundedConcurrentQueue | 100 | 100 | 13.2870 us | 0.2239 us | 1.25 | 0.03 | - | BlockingCollection | 100 | 100 | 59.8952 us | 2.8346 us | 5.65 | 0.28 | - | SynchronizedQueue | 100 | 100 | 23.5773 us | 0.6238 us | 2.22 | 0.07 | diff --git a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/xunit.runner.json b/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/xunit.runner.json deleted file mode 100644 index 07120b4..0000000 --- a/test/Serilog.Sinks.PeriodicBatching.PerformanceTests/xunit.runner.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "shadowCopy": false -} diff --git a/test/Serilog.Sinks.PeriodicBatching.Tests/BackwardsCompatibilityTests.cs b/test/Serilog.Sinks.PeriodicBatching.Tests/BackwardsCompatibilityTests.cs deleted file mode 100644 index 1cb2ed0..0000000 --- a/test/Serilog.Sinks.PeriodicBatching.Tests/BackwardsCompatibilityTests.cs +++ /dev/null @@ -1,20 +0,0 @@ -#if FEATURE_ASYNCDISPOSABLE - -using Serilog.Sinks.PeriodicBatching.Tests.Support; -using Xunit; - -namespace Serilog.Sinks.PeriodicBatching.Tests; - -public class BackwardsCompatibilityTests -{ - [Fact] - public async Task LegacySinksAreDisposedWhenLoggerIsDisposedAsync() - { - var sink = new LegacyDisposeTrackingSink(); - var logger = new LoggerConfiguration().WriteTo.Sink(sink).CreateLogger(); - await logger.DisposeAsync(); - Assert.True(sink.IsDisposed); - } -} - -#endif diff --git a/test/Serilog.Sinks.PeriodicBatching.Tests/BoundedConcurrentQueueTests.cs b/test/Serilog.Sinks.PeriodicBatching.Tests/BoundedConcurrentQueueTests.cs deleted file mode 100644 index 7e9524e..0000000 --- a/test/Serilog.Sinks.PeriodicBatching.Tests/BoundedConcurrentQueueTests.cs +++ /dev/null @@ -1,26 +0,0 @@ -using Xunit; - -namespace Serilog.Sinks.PeriodicBatching.Tests; - -public class BoundedConcurrentQueueTests -{ - [Fact] - public void WhenBoundedShouldNotExceedLimit() - { - const int limit = 100; - var queue = new BoundedConcurrentQueue(limit); - - for (var i = 0; i < limit * 2; i++) - { - queue.TryEnqueue(i); - } - - Assert.Equal(limit, queue.Count); - } - - [Fact] - public void WhenInvalidLimitWillThrow() - { - Assert.Throws(() => new BoundedConcurrentQueue(-42)); - } -} \ No newline at end of file diff --git a/test/Serilog.Sinks.PeriodicBatching.Tests/PeriodicBatchingSinkTests.cs b/test/Serilog.Sinks.PeriodicBatching.Tests/PeriodicBatchingSinkTests.cs index 91e73cb..1f31580 100644 --- a/test/Serilog.Sinks.PeriodicBatching.Tests/PeriodicBatchingSinkTests.cs +++ b/test/Serilog.Sinks.PeriodicBatching.Tests/PeriodicBatchingSinkTests.cs @@ -17,8 +17,8 @@ public void WhenAnEventIsEnqueuedItIsWrittenToABatchOnDispose() var evt = Some.InformationEvent(); pbs.Emit(evt); pbs.Dispose(); - Assert.Equal(1, bs.Batches.Count); - Assert.Equal(1, bs.Batches[0].Count); + Assert.Single(bs.Batches); + Assert.Single(bs.Batches[0]); Assert.Same(evt, bs.Batches[0][0]); Assert.True(bs.IsDisposed); Assert.False(bs.WasCalledAfterDisposal); @@ -61,7 +61,7 @@ public void WhenAnEventIsEnqueuedItIsWrittenToABatchOnTimer() Thread.Sleep(TinyWait + TinyWait); bs.Stop(); pbs.Dispose(); - Assert.Equal(1, bs.Batches.Count); + Assert.Single(bs.Batches); Assert.True(bs.IsDisposed); Assert.False(bs.WasCalledAfterDisposal); } @@ -75,7 +75,7 @@ public void WhenAnEventIsEnqueuedItIsWrittenToABatchOnDisposeWhileRunning() pbs.Emit(evt); Thread.Sleep(TinyWait); pbs.Dispose(); - Assert.Equal(1, bs.Batches.Count); + Assert.Single(bs.Batches); Assert.True(bs.IsDisposed); Assert.False(bs.WasCalledAfterDisposal); } diff --git a/test/Serilog.Sinks.PeriodicBatching.Tests/PortableTimerTests.cs b/test/Serilog.Sinks.PeriodicBatching.Tests/PortableTimerTests.cs deleted file mode 100644 index 44549ab..0000000 --- a/test/Serilog.Sinks.PeriodicBatching.Tests/PortableTimerTests.cs +++ /dev/null @@ -1,146 +0,0 @@ -using Serilog.Debugging; -using Xunit; - -#pragma warning disable 1998 - -// ReSharper disable AccessToModifiedClosure - -namespace Serilog.Sinks.PeriodicBatching.Tests; - -public class PortableTimerTests -{ - [Fact] - public void WhenItStartsItWaitsUntilHandled_OnDispose() - { - var wasCalled = false; - - var barrier = new Barrier(participantCount: 2); - - using (var timer = new PortableTimer( - async delegate - { - barrier.SignalAndWait(); - await Task.Delay(100); - wasCalled = true; - })) - { - timer.Start(TimeSpan.Zero); - barrier.SignalAndWait(); - } - - Assert.True(wasCalled); - } - - [Fact] - public void WhenWaitingShouldCancel_OnDispose() - { - var wasCalled = false; - var writtenToSelfLog = false; - - SelfLog.Enable(_ => writtenToSelfLog = true); - - using (var timer = new PortableTimer(async delegate { await Task.Delay(50); wasCalled = true; })) - { - timer.Start(TimeSpan.FromMilliseconds(20)); - } - - Thread.Sleep(100); - - Assert.False(wasCalled, "tick handler was called"); - Assert.False(writtenToSelfLog, "message was written to SelfLog"); - } - - [Fact] - public void WhenActiveShouldCancel_OnDispose() - { - var wasCalled = false; - var writtenToSelfLog = false; - - SelfLog.Enable(_ => writtenToSelfLog = true); - - var barrier = new Barrier(participantCount: 2); - - using (var timer = new PortableTimer( - async token => - { - // ReSharper disable once MethodSupportsCancellation - barrier.SignalAndWait(); - // ReSharper disable once MethodSupportsCancellation - await Task.Delay(20); - - wasCalled = true; - Interlocked.MemoryBarrier(); - await Task.Delay(100, token); - })) - { - timer.Start(TimeSpan.FromMilliseconds(20)); - barrier.SignalAndWait(); - } - - Thread.Sleep(100); - Interlocked.MemoryBarrier(); - - Assert.True(wasCalled, "tick handler wasn't called"); - Assert.True(writtenToSelfLog, "message wasn't written to SelfLog"); - } - - [Fact] - public void WhenDisposedWillThrow_OnStart() - { - var wasCalled = false; - var timer = new PortableTimer(async delegate { wasCalled = true; }); - timer.Start(TimeSpan.FromMilliseconds(100)); - timer.Dispose(); - - Assert.False(wasCalled); - Assert.Throws(() => timer.Start(TimeSpan.Zero)); - } - - [Fact] - public void WhenOverlapsShouldProcessOneAtTime_OnTick() - { - var userHandlerOverlapped = false; - - PortableTimer timer = null!; - timer = new( - async _ => - { - if (Monitor.TryEnter(timer)) - { - try - { - // ReSharper disable once PossibleNullReferenceException - timer.Start(TimeSpan.Zero); - Thread.Sleep(20); - } - finally - { - Monitor.Exit(timer); - } - } - else - { - userHandlerOverlapped = true; - } - }); - - timer.Start(TimeSpan.FromMilliseconds(1)); - Thread.Sleep(50); - timer.Dispose(); - - Assert.False(userHandlerOverlapped); - } - - [Fact] - public void CanBeDisposedFromMultipleThreads() - { - PortableTimer? timer = null; - // ReSharper disable once PossibleNullReferenceException - timer = new(async _ => timer?.Start(TimeSpan.FromMilliseconds(1))); - - timer.Start(TimeSpan.Zero); - Thread.Sleep(50); - - Parallel.For(0, Environment.ProcessorCount * 2, _ => timer.Dispose()); - } -} \ No newline at end of file diff --git a/test/Serilog.Sinks.PeriodicBatching.Tests/Support/LegacyDisposeTrackingSink.cs b/test/Serilog.Sinks.PeriodicBatching.Tests/Support/LegacyDisposeTrackingSink.cs deleted file mode 100644 index df120d7..0000000 --- a/test/Serilog.Sinks.PeriodicBatching.Tests/Support/LegacyDisposeTrackingSink.cs +++ /dev/null @@ -1,18 +0,0 @@ -#pragma warning disable CS0618 - -namespace Serilog.Sinks.PeriodicBatching.Tests.Support; - -public class LegacyDisposeTrackingSink : PeriodicBatchingSink -{ - public bool IsDisposed { get; private set; } - - public LegacyDisposeTrackingSink() - : base(10, TimeSpan.FromMinutes(1)) - { - } - - protected override void Dispose(bool disposing) - { - IsDisposed = true; - } -} \ No newline at end of file From 8b95611d4a69b5ea19da57b92f9366fcb1cae0f3 Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Fri, 29 Dec 2023 13:03:55 +1000 Subject: [PATCH 03/16] Remove netstandard2.1, ignore .DS_Store --- .DS_Store | Bin 6148 -> 0 bytes .gitignore | 1 + .../Serilog.Sinks.PeriodicBatching.csproj | 6 +++--- test/.DS_Store | Bin 6148 -> 0 bytes 4 files changed, 4 insertions(+), 3 deletions(-) delete mode 100644 .DS_Store delete mode 100644 test/.DS_Store diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index a2eb533abf551d042629249e6fe5fb378b3b5ee0..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeH~u?oUK42Bc!Ah>jNyu}Cb4Gz&K=nFU~E>c0O^F6wMazU^xC+1b{XuyJ79K1T}(S!u1*@b}wNMJ-@TJzTK|1JE}{6A`8N&+PC zX9Tp_belC^D(=>|*R%RAsBuffer batches of log events to be flushed asynchronously. 4.0.0 Serilog Contributors - net462 - $(TargetFrameworks);netstandard2.0;netstandard2.1;net6.0 + net462 + $(TargetFrameworks);netstandard2.0;net6.0 true Serilog serilog;batching;timer @@ -17,7 +17,7 @@ enable - + $(DefineConstants);FEATURE_ASYNCDISPOSABLE diff --git a/test/.DS_Store b/test/.DS_Store deleted file mode 100644 index 5efd228bfa2b3231208ff19ce8b8f9c89b90a095..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHKyH3ME5S)b+k!Vs<-WP!Q1Ir3MlKB7-2?3GVk`2`E_-$q%phONW1?)<@5h9g)Y|&wZ3p9A1WFL2E?IrWq=vlL0-{J)W z_qvH#hZQp(P~#r2#3!y=JIym5d3KK-qb;Lr+)@W-4)0ChZo78lZ+ZMR(wht<1Ia)# zkPQ4A2DJB5=4Xy+lYwL)8Te*E?}tKFtO7?zyEmEh#_81e~G**aCG!?NDLnmCreByVyAQeV(E~|F>NxC3>-3W>dS@p|7+?G^Z#K{ zu9AUd;7>6ivt_fa`AN}RCm*N1wovb=U&35Vr?FN{v{uZ8w&L@>yrR#HSAnCWmD6wK P#QYIZUQ&{Q-(cVqiB%%M From a1b06ca3d1e7e2455e4b1bc7245b7a91fad95f39 Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Fri, 29 Dec 2023 14:55:50 +1000 Subject: [PATCH 04/16] Implmement shut-down logic --- .../PeriodicBatching/PeriodicBatchingSink.cs | 148 +++++++++++++----- .../PeriodicBatchingSinkTests.cs | 15 +- ...erilog.Sinks.PeriodicBatching.Tests.csproj | 2 +- .../Support/InMemoryBatchedSink.cs | 15 +- 4 files changed, 123 insertions(+), 57 deletions(-) diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs index 01534b4..87d306e 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs @@ -32,12 +32,17 @@ public sealed class PeriodicBatchingSink : ILogEventSink, IDisposable // Buffers events from the write- to the read side. readonly Channel _queue; - // Used by the write side to signal shutdown. + // These fields are used by the write side to signal shutdown. + // A mutex is required because the queue writer `Complete()` call is not idempotent and will throw if + // called multiple times, e.g. via multiple `Dispose()` calls on this sink. readonly object _stateLock = new(); + // Needed because the read loop needs to observe shutdown even when the target batched (remote) sink is + // unable to accept events (preventing the queue from being drained and completion being observed). readonly CancellationTokenSource _unloading = new(); + // The write side can wait on this to ensure shutdown has completed. readonly Task _loop; - // Used only by the read side + // Used only by the read side. readonly IBatchedLogEventSink _batchedLogEventSink; readonly int _batchSizeLimit; readonly bool _eagerlyEmitFirstEvent; @@ -67,25 +72,55 @@ public PeriodicBatchingSink(IBatchedLogEventSink batchedSink, PeriodicBatchingSi _status = new BatchedConnectionStatus(options.Period); _eagerlyEmitFirstEvent = options.EagerlyEmitFirstEvent; - _loop = Task.Factory.StartNew(LoopAsync, _unloading.Token, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); + _loop = Task.Run(LoopAsync, _unloading.Token); } + /// + /// Emit the provided log event to the sink. If the sink is being disposed or + /// the app domain unloaded, then the event is ignored. + /// + /// Log event to emit. + /// The event is null. + /// + /// The sink implements the contract that any events whose Emit() method has + /// completed at the time of sink disposal will be flushed (or attempted to, + /// depending on app domain state). + /// + public void Emit(LogEvent logEvent) + { + if (logEvent == null) throw new ArgumentNullException(nameof(logEvent)); + + if (_unloading.IsCancellationRequested) + return; + + _queue.Writer.TryWrite(logEvent); + } + /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// /// 2 public void Dispose() { - lock (_stateLock) + try { - if (!_unloading.IsCancellationRequested) + lock (_stateLock) { - _queue.Writer.Complete(); - _unloading.Cancel(); + if (!_unloading.IsCancellationRequested) + { + _queue.Writer.Complete(); + _unloading.Cancel(); + } } - } - _loop.Wait(); + _loop.Wait(); + } + catch (Exception ex) + { + // E.g. the task was canceled before ever being run, or internally failed and threw + // an unexpected exception. + SelfLog.WriteLine($"PeriodicBatchingSink ({_batchedLogEventSink}) caught exception during disposal: {ex}"); + } (_batchedLogEventSink as IDisposable)?.Dispose(); } @@ -94,16 +129,25 @@ public void Dispose() /// public async ValueTask DisposeAsync() { - lock (_stateLock) + try { - if (!_unloading.IsCancellationRequested) + lock (_stateLock) { - _queue.Writer.Complete(); - _unloading.Cancel(); + if (!_unloading.IsCancellationRequested) + { + _queue.Writer.Complete(); + _unloading.Cancel(); + } } - } - await _loop.ConfigureAwait(false); + await _loop.ConfigureAwait(false); + } + catch (Exception ex) + { + // E.g. the task was canceled before ever being run, or internally failed and threw + // an unexpected exception. + SelfLog.WriteLine($"PeriodicBatchingSink ({_batchedLogEventSink}): caught exception during async disposal: {ex}"); + } if (_batchedLogEventSink is IAsyncDisposable asyncDisposable) await asyncDisposable.DisposeAsync().ConfigureAwait(false); @@ -129,7 +173,7 @@ async Task LoopAsync() } } while ((_waitingBatch.Count < _batchSizeLimit || _waitingBatch.Count > 0 && isEagerBatch) && !_unloading.IsCancellationRequested && - await Task.WhenAny(fillBatch, _queue.Reader.WaitToReadAsync(_unloading.Token).AsTask()) != fillBatch); + await TryWaitToReadAsync(_queue.Reader, fillBatch, _unloading.Token)); try { @@ -148,45 +192,67 @@ async Task LoopAsync() } catch (Exception ex) { - SelfLog.WriteLine("Exception while emitting periodic batch from {0}: {1}", this, ex); + SelfLog.WriteLine($"PeriodicBatchingSink ({_batchedLogEventSink}) failed emitting a batch: {ex}"); _status.MarkFailure(); - } - finally - { + if (_status.ShouldDropBatch) _waitingBatch.Clear(); if (_status.ShouldDropQueue) { - // This is not ideal; the goal is to reduce memory pressure on the client if - // the server is offline for extended periods. May be worth reviewing and abandoning - // this. + // This is not ideal; the goal is to reduce memory pressure on the client if the server is offline + // for extended periods. May be worth reviewing and possibly abandoning this. while (_queue.Reader.TryRead(out _) && !_unloading.IsCancellationRequested) { } } } } while (!_unloading.IsCancellationRequested); - } + + // At this point: + // - The sink is being disposed + // - The queue has been completed + // - The queue may or may not be empty + // - The waiting batch may or may not be empty + // - The target sink may or may not be accepting events + + // Try flushing the rest of the queue, but bail out on any failure. Shutdown time is unbounded, but it + // doesn't make sense to pick an arbitrary limit - a future version might add a new option to control this. + try + { + while (_queue.Reader.TryPeek(out _)) + { + while (_waitingBatch.Count < _batchSizeLimit && + _queue.Reader.TryRead(out var next)) + { + _waitingBatch.Enqueue(next); + } + if (_waitingBatch.Count != 0) + { + await _batchedLogEventSink.EmitBatchAsync(_waitingBatch).ConfigureAwait(false); + _waitingBatch.Clear(); + } + } + } + catch (Exception ex) + { + SelfLog.WriteLine($"PeriodicBatchingSink ({_batchedLogEventSink}) failed emitting a batch during shutdown; dropping remaining queued events: {ex}"); + } + } - /// - /// Emit the provided log event to the sink. If the sink is being disposed or - /// the app domain unloaded, then the event is ignored. - /// - /// Log event to emit. - /// The event is null. - /// - /// The sink implements the contract that any events whose Emit() method has - /// completed at the time of sink disposal will be flushed (or attempted to, - /// depending on app domain state). - /// - public void Emit(LogEvent logEvent) + // Wait until `reader` has items to read. Returns `false` if the `timeout` task completes, or if the reader is cancelled. + async Task TryWaitToReadAsync(ChannelReader reader, Task timeout, CancellationToken cancellationToken) { - if (logEvent == null) throw new ArgumentNullException(nameof(logEvent)); + var completed = await Task.WhenAny(timeout, reader.WaitToReadAsync(cancellationToken).AsTask()); - if (_unloading.IsCancellationRequested) - return; - - _queue.Writer.TryWrite(logEvent); + // Avoid unobserved task exceptions in the cancellation and failure cases. Note that we may not end up observing + // both the timeout and read task cancellation exceptions during shutdown, may be some room to improve. + if (completed is { Exception: not null, IsCanceled: false }) + { + SelfLog.WriteLine($"PeriodicBatchingSink ({_batchedLogEventSink}) could not read from queue: {completed.Exception}"); + } + + // `Task.IsCompletedSuccessfully` not available in .NET Standard 2.0/Framework. + return completed != timeout && completed is { IsCompleted: true, IsCanceled: false, IsFaulted: false }; } } \ No newline at end of file diff --git a/test/Serilog.Sinks.PeriodicBatching.Tests/PeriodicBatchingSinkTests.cs b/test/Serilog.Sinks.PeriodicBatching.Tests/PeriodicBatchingSinkTests.cs index 1f31580..e2399bf 100644 --- a/test/Serilog.Sinks.PeriodicBatching.Tests/PeriodicBatchingSinkTests.cs +++ b/test/Serilog.Sinks.PeriodicBatching.Tests/PeriodicBatchingSinkTests.cs @@ -1,6 +1,8 @@ -using Serilog.Sinks.PeriodicBatching.Tests.Support; +using Serilog.Debugging; +using Serilog.Sinks.PeriodicBatching.Tests.Support; using Xunit; using Serilog.Tests.Support; +using Xunit.Abstractions; namespace Serilog.Sinks.PeriodicBatching.Tests; @@ -9,6 +11,11 @@ public class PeriodicBatchingSinkTests static readonly TimeSpan TinyWait = TimeSpan.FromMilliseconds(200); static readonly TimeSpan MicroWait = TimeSpan.FromMilliseconds(1); + public PeriodicBatchingSinkTests(ITestOutputHelper testOutputHelper) + { + SelfLog.Enable(testOutputHelper.WriteLine); + } + [Fact] public void WhenAnEventIsEnqueuedItIsWrittenToABatchOnDispose() { @@ -26,7 +33,7 @@ public void WhenAnEventIsEnqueuedItIsWrittenToABatchOnDispose() #if FEATURE_ASYNCDISPOSABLE [Fact] - public async ValueTask WhenAnEventIsEnqueuedItIsWrittenToABatchOnDisposeAsync() + public async Task WhenAnEventIsEnqueuedItIsWrittenToABatchOnDisposeAsync() { var bs = new InMemoryBatchedSink(TimeSpan.Zero); var pbs = new PeriodicBatchingSink( @@ -37,8 +44,8 @@ public async ValueTask WhenAnEventIsEnqueuedItIsWrittenToABatchOnDisposeAsync() var evt = Some.InformationEvent(); pbs.Emit(evt); await pbs.DisposeAsync(); - Assert.Equal(1, bs.Batches.Count); - Assert.Equal(1, bs.Batches[0].Count); + Assert.Single(bs.Batches); + Assert.Single(bs.Batches[0]); Assert.Same(evt, bs.Batches[0][0]); Assert.True(bs.IsDisposed); Assert.True(bs.IsDisposedAsync); diff --git a/test/Serilog.Sinks.PeriodicBatching.Tests/Serilog.Sinks.PeriodicBatching.Tests.csproj b/test/Serilog.Sinks.PeriodicBatching.Tests/Serilog.Sinks.PeriodicBatching.Tests.csproj index 032b402..59d0280 100644 --- a/test/Serilog.Sinks.PeriodicBatching.Tests/Serilog.Sinks.PeriodicBatching.Tests.csproj +++ b/test/Serilog.Sinks.PeriodicBatching.Tests/Serilog.Sinks.PeriodicBatching.Tests.csproj @@ -8,7 +8,7 @@ true - + $(DefineConstants);FEATURE_ASYNCDISPOSABLE diff --git a/test/Serilog.Sinks.PeriodicBatching.Tests/Support/InMemoryBatchedSink.cs b/test/Serilog.Sinks.PeriodicBatching.Tests/Support/InMemoryBatchedSink.cs index 094be42..71ce5d3 100644 --- a/test/Serilog.Sinks.PeriodicBatching.Tests/Support/InMemoryBatchedSink.cs +++ b/test/Serilog.Sinks.PeriodicBatching.Tests/Support/InMemoryBatchedSink.cs @@ -2,26 +2,19 @@ namespace Serilog.Sinks.PeriodicBatching.Tests.Support; -sealed class InMemoryBatchedSink : IBatchedLogEventSink, IDisposable +sealed class InMemoryBatchedSink(TimeSpan batchEmitDelay) : IBatchedLogEventSink, IDisposable #if FEATURE_ASYNCDISPOSABLE - , IAsyncDisposable + , IAsyncDisposable #endif { - readonly TimeSpan _batchEmitDelay; readonly object _stateLock = new(); bool _stopped; // Postmortem only public bool WasCalledAfterDisposal { get; private set; } - public IList> Batches { get; } + public IList> Batches { get; } = new List>(); public bool IsDisposed { get; private set; } - public InMemoryBatchedSink(TimeSpan batchEmitDelay) - { - _batchEmitDelay = batchEmitDelay; - Batches = new List>(); - } - public void Stop() { lock (_stateLock) @@ -40,7 +33,7 @@ public Task EmitBatchAsync(IEnumerable events) if (IsDisposed) WasCalledAfterDisposal = true; - Thread.Sleep(_batchEmitDelay); + Thread.Sleep(batchEmitDelay); Batches.Add(events.ToList()); } From eb2f8e96eac32d34d4a079485a83de03a8315872 Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Fri, 29 Dec 2023 14:59:03 +1000 Subject: [PATCH 05/16] Ensure build script detects failures --- Build.ps1 | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/Build.ps1 b/Build.ps1 index b9ed918..e482fe3 100644 --- a/Build.ps1 +++ b/Build.ps1 @@ -33,7 +33,7 @@ foreach ($src in Get-ChildItem src/*) { } else { & dotnet pack -c Release --no-build -o ..\..\artifacts } - if($LASTEXITCODE -ne 0) { exit 1 } + if($LASTEXITCODE -ne 0) { throw "Failed" } Pop-Location } @@ -44,18 +44,7 @@ foreach ($test in Get-ChildItem test/*.Tests) { Write-Output "build: Testing project in $test" & dotnet test -c Release - if($LASTEXITCODE -ne 0) { exit 3 } - - Pop-Location -} - -foreach ($test in Get-ChildItem test/*.PerformanceTests) { - Push-Location $test - - Write-Output "build: Building performance test project in $test" - - & dotnet build -c Release - if($LASTEXITCODE -ne 0) { exit 2 } + if($LASTEXITCODE -ne 0) { throw "Failed" } Pop-Location } From 04f2efe5a2120673376012a7a500b16900d135bd Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Fri, 29 Dec 2023 16:09:30 +1000 Subject: [PATCH 06/16] Fix bugs, add test harness --- serilog-sinks-periodicbatching.sln | 7 ++ .../PeriodicBatching/PeriodicBatchingSink.cs | 5 +- .../PeriodicBatchingSinkOptions.cs | 2 +- test/TestHarness/Program.cs | 102 ++++++++++++++++++ test/TestHarness/TestHarness.csproj | 14 +++ 5 files changed, 126 insertions(+), 4 deletions(-) create mode 100644 test/TestHarness/Program.cs create mode 100644 test/TestHarness/TestHarness.csproj diff --git a/serilog-sinks-periodicbatching.sln b/serilog-sinks-periodicbatching.sln index a7c9310..ef32185 100644 --- a/serilog-sinks-periodicbatching.sln +++ b/serilog-sinks-periodicbatching.sln @@ -30,6 +30,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "sln", "sln", "{E49CF29C-764 Directory.Build.props = Directory.Build.props EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestHarness", "test\TestHarness\TestHarness.csproj", "{20A1F97F-3EEB-4D9B-981C-20818228FAC9}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -44,6 +46,10 @@ Global {3C2D8E01-5580-426A-BDD9-EC59CD98E618}.Debug|Any CPU.Build.0 = Debug|Any CPU {3C2D8E01-5580-426A-BDD9-EC59CD98E618}.Release|Any CPU.ActiveCfg = Release|Any CPU {3C2D8E01-5580-426A-BDD9-EC59CD98E618}.Release|Any CPU.Build.0 = Release|Any CPU + {20A1F97F-3EEB-4D9B-981C-20818228FAC9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {20A1F97F-3EEB-4D9B-981C-20818228FAC9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {20A1F97F-3EEB-4D9B-981C-20818228FAC9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {20A1F97F-3EEB-4D9B-981C-20818228FAC9}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -51,5 +57,6 @@ Global GlobalSection(NestedProjects) = preSolution {324C2F52-D9F7-4844-9BC4-9906E228D380} = {037440DE-440B-4129-9F7A-09B42D00397E} {3C2D8E01-5580-426A-BDD9-EC59CD98E618} = {F6E07A13-B9D3-4019-B25A-DE1F6C17E108} + {20A1F97F-3EEB-4D9B-981C-20818228FAC9} = {F6E07A13-B9D3-4019-B25A-DE1F6C17E108} EndGlobalSection EndGlobal diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs index 87d306e..247e7fd 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs @@ -161,8 +161,7 @@ async Task LoopAsync() var isEagerBatch = _eagerlyEmitFirstEvent; do { - using var fillBatch = Task.Delay(_status.NextInterval, _unloading.Token); - + var fillBatch = Task.Delay(_status.NextInterval); do { while (_waitingBatch.Count < _batchSizeLimit && @@ -246,7 +245,7 @@ async Task TryWaitToReadAsync(ChannelReader reader, Task timeout var completed = await Task.WhenAny(timeout, reader.WaitToReadAsync(cancellationToken).AsTask()); // Avoid unobserved task exceptions in the cancellation and failure cases. Note that we may not end up observing - // both the timeout and read task cancellation exceptions during shutdown, may be some room to improve. + // read task cancellation exceptions during shutdown, may be some room to improve. if (completed is { Exception: not null, IsCanceled: false }) { SelfLog.WriteLine($"PeriodicBatchingSink ({_batchedLogEventSink}) could not read from queue: {completed.Exception}"); diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSinkOptions.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSinkOptions.cs index 5aa14eb..c30a168 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSinkOptions.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSinkOptions.cs @@ -32,7 +32,7 @@ public class PeriodicBatchingSinkOptions public int BatchSizeLimit { get; set; } = 1000; /// - /// The time to wait between checking for event batches. The default is two seconds. + /// The maximum buffering delay between event batches. The default is two seconds. /// public TimeSpan Period { get; set; } = TimeSpan.FromSeconds(2); diff --git a/test/TestHarness/Program.cs b/test/TestHarness/Program.cs new file mode 100644 index 0000000..387245f --- /dev/null +++ b/test/TestHarness/Program.cs @@ -0,0 +1,102 @@ +using System.Diagnostics; +using Serilog; +using Serilog.Debugging; +using Serilog.Events; +using Serilog.Sinks.PeriodicBatching; +// ReSharper disable PossibleMultipleEnumeration + +// ReSharper disable PossibleLossOfFraction + +SelfLog.Enable(Console.Error); + +const int producers = 3; +const int countPerProducer = 1000; +var producerInterval = TimeSpan.FromMilliseconds(90); + +const int batchSize = 50; +var batchInterval = TimeSpan.FromMilliseconds(100); +var maximumBufferDelay = TimeSpan.FromMilliseconds(80); + +var sinkFailAfterEvents = (int?)50; + +Console.WriteLine($"Producers will take {countPerProducer * producerInterval} to complete"); +Console.WriteLine($"Producers will generate a total of {countPerProducer * producers} events"); +Console.WriteLine($"Consumer will take {countPerProducer * producers / batchSize * batchInterval} to write batches"); + +var options = new PeriodicBatchingSinkOptions +{ + EagerlyEmitFirstEvent = false, + QueueLimit = null, + BatchSizeLimit = batchSize, + Period = maximumBufferDelay +}; + +var batchedSink = new EmptyBatchedSink(batchInterval, sinkFailAfterEvents); + +var logger = new LoggerConfiguration() + .WriteTo.Sink(new PeriodicBatchingSink(batchedSink, options)) + .CreateLogger(); + +var threads = Enumerable.Range(0, producers) + .Select(id => new Thread(() => RunProducer(id, logger, producerInterval, countPerProducer))) + .ToArray(); + +var sw = Stopwatch.StartNew(); + +foreach (var thread in threads) +{ + thread.Start(); +} + +foreach (var thread in threads) +{ + thread.Join(); +} + +Console.WriteLine($"All producers done in {sw.Elapsed}"); + +await logger.DisposeAsync(); + +Console.WriteLine($"All batches processed in {sw.Elapsed}"); +Console.WriteLine($"Sink saw {batchedSink.EventCount} events in {batchedSink.BatchCount} batches"); +Console.WriteLine($"Largest batch was {batchedSink.MaxBatchSize}; smallest was {batchedSink.MinBatchSize}"); + +static void RunProducer(int id, ILogger logger, TimeSpan interval, int count) +{ + for (var i = 0; i < count; ++i) + { + logger.Information("Hello number {N} from producer {Id}!", i, id); + Thread.Sleep(interval); + } +} + +class EmptyBatchedSink(TimeSpan flushDelay, int? failAfterEvents): IBatchedLogEventSink +{ + bool _failureDone; + + public int BatchCount { get; private set; } + public int EventCount { get; private set; } + public int MaxBatchSize { get; private set; } + public int MinBatchSize { get; private set; } = int.MaxValue; + + public async Task EmitBatchAsync(IEnumerable batch) + { + if (failAfterEvents is { } f && EventCount >= f && !_failureDone) + { + _failureDone = true; + throw new Exception("This batch failed."); + } + + BatchCount++; + EventCount += batch.Count(); + MinBatchSize = Math.Min(MinBatchSize, batch.Count()); + MaxBatchSize = Math.Max(MaxBatchSize, batch.Count()); + + await Task.Delay(flushDelay); + } + + public Task OnEmptyBatchAsync() + { + return Task.CompletedTask; + } +} diff --git a/test/TestHarness/TestHarness.csproj b/test/TestHarness/TestHarness.csproj new file mode 100644 index 0000000..6f889fe --- /dev/null +++ b/test/TestHarness/TestHarness.csproj @@ -0,0 +1,14 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + From 258c30f2465ae95b985808d09f469a2e4082ac8e Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Fri, 29 Dec 2023 16:30:51 +1000 Subject: [PATCH 07/16] Back-off on failure --- .../Sinks/PeriodicBatching/PeriodicBatchingSink.cs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs index 247e7fd..19b196e 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs @@ -48,6 +48,7 @@ public sealed class PeriodicBatchingSink : ILogEventSink, IDisposable readonly bool _eagerlyEmitFirstEvent; readonly BatchedConnectionStatus _status; readonly Queue _waitingBatch = new(); + readonly Task _delayUntilUnload; /// /// Construct a . @@ -71,6 +72,8 @@ public PeriodicBatchingSink(IBatchedLogEventSink batchedSink, PeriodicBatchingSi : Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); _status = new BatchedConnectionStatus(options.Period); _eagerlyEmitFirstEvent = options.EagerlyEmitFirstEvent; + _delayUntilUnload = Task.Delay(Timeout.InfiniteTimeSpan, _unloading.Token) + .ContinueWith(e => e.Exception, TaskContinuationOptions.OnlyOnFaulted); _loop = Task.Run(LoopAsync, _unloading.Token); } @@ -203,6 +206,11 @@ async Task LoopAsync() // for extended periods. May be worth reviewing and possibly abandoning this. while (_queue.Reader.TryRead(out _) && !_unloading.IsCancellationRequested) { } } + + // Wait out the remainder of the batch fill time so that we don't overwhelm the server. With each + // successive failure the interval will increase. Needs special handling so that we don't need to + // make `fillBatch` cancellable (and thus fallible). + await Task.WhenAny(fillBatch, _delayUntilUnload); } } while (!_unloading.IsCancellationRequested); @@ -238,7 +246,7 @@ async Task LoopAsync() SelfLog.WriteLine($"PeriodicBatchingSink ({_batchedLogEventSink}) failed emitting a batch during shutdown; dropping remaining queued events: {ex}"); } } - + // Wait until `reader` has items to read. Returns `false` if the `timeout` task completes, or if the reader is cancelled. async Task TryWaitToReadAsync(ChannelReader reader, Task timeout, CancellationToken cancellationToken) { From 685ed4da52b0dd41b279a209ab00e50c1abda79a Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Fri, 29 Dec 2023 16:49:16 +1000 Subject: [PATCH 08/16] ConfigureAwait(false) --- .../Sinks/PeriodicBatching/PeriodicBatchingSink.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs index 19b196e..a6f4f48 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs @@ -175,7 +175,7 @@ async Task LoopAsync() } } while ((_waitingBatch.Count < _batchSizeLimit || _waitingBatch.Count > 0 && isEagerBatch) && !_unloading.IsCancellationRequested && - await TryWaitToReadAsync(_queue.Reader, fillBatch, _unloading.Token)); + await TryWaitToReadAsync(_queue.Reader, fillBatch, _unloading.Token).ConfigureAwait(false)); try { @@ -210,7 +210,7 @@ async Task LoopAsync() // Wait out the remainder of the batch fill time so that we don't overwhelm the server. With each // successive failure the interval will increase. Needs special handling so that we don't need to // make `fillBatch` cancellable (and thus fallible). - await Task.WhenAny(fillBatch, _delayUntilUnload); + await Task.WhenAny(fillBatch, _delayUntilUnload).ConfigureAwait(false); } } while (!_unloading.IsCancellationRequested); @@ -250,7 +250,7 @@ async Task LoopAsync() // Wait until `reader` has items to read. Returns `false` if the `timeout` task completes, or if the reader is cancelled. async Task TryWaitToReadAsync(ChannelReader reader, Task timeout, CancellationToken cancellationToken) { - var completed = await Task.WhenAny(timeout, reader.WaitToReadAsync(cancellationToken).AsTask()); + var completed = await Task.WhenAny(timeout, reader.WaitToReadAsync(cancellationToken).AsTask()).ConfigureAwait(false); // Avoid unobserved task exceptions in the cancellation and failure cases. Note that we may not end up observing // read task cancellation exceptions during shutdown, may be some room to improve. From 6cda3fb2f68ee41ed50651371559d0ed9985c718 Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Fri, 29 Dec 2023 16:59:10 +1000 Subject: [PATCH 09/16] Remove unused TaskUtil; we may need to reintroduce this somehow, but currently the sink pushes all work to the thread pool so deadlocks in dispose should be avoided --- .../Sinks/PeriodicBatching/TaskUtil.cs | 32 ------------------- 1 file changed, 32 deletions(-) delete mode 100644 src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/TaskUtil.cs diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/TaskUtil.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/TaskUtil.cs deleted file mode 100644 index ac18b44..0000000 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/TaskUtil.cs +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright © Serilog Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -namespace Serilog.Sinks.PeriodicBatching; - -static class TaskUtil -{ - public static void ResetSyncContextAndWait(Func taskFactory) - { - var prevContext = SynchronizationContext.Current; - SynchronizationContext.SetSynchronizationContext(null); - try - { - taskFactory().Wait(); - } - finally - { - SynchronizationContext.SetSynchronizationContext(prevContext); - } - } -} \ No newline at end of file From 7adfee7156adad0180ac484e32b88f8587b8543b Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Sat, 30 Dec 2023 08:08:44 +1000 Subject: [PATCH 10/16] Internal type renames --- ...tatus.cs => FailureAwareBatchScheduler.cs} | 7 +- .../PeriodicBatching/PeriodicBatchingSink.cs | 98 +++++++++---------- ....cs => FailureAwareBatchSchedulerTests.cs} | 20 ++-- ...erilog.Sinks.PeriodicBatching.Tests.csproj | 2 + 4 files changed, 65 insertions(+), 62 deletions(-) rename src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/{BatchedConnectionStatus.cs => FailureAwareBatchScheduler.cs} (93%) rename test/Serilog.Sinks.PeriodicBatching.Tests/{BatchedConnectionStatusTests.cs => FailureAwareBatchSchedulerTests.cs} (80%) diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/BatchedConnectionStatus.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/FailureAwareBatchScheduler.cs similarity index 93% rename from src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/BatchedConnectionStatus.cs rename to src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/FailureAwareBatchScheduler.cs index 3298ca6..8b6366f 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/BatchedConnectionStatus.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/FailureAwareBatchScheduler.cs @@ -25,7 +25,7 @@ namespace Serilog.Sinks.PeriodicBatching; /// also overproduction (the second, queue-dropping response). In combination these should provide a /// reasonable delivery effort but ultimately protect the sender from memory exhaustion. /// -class BatchedConnectionStatus +class FailureAwareBatchScheduler { static readonly TimeSpan MinimumBackoffPeriod = TimeSpan.FromSeconds(5); static readonly TimeSpan MaximumBackoffInterval = TimeSpan.FromMinutes(10); @@ -37,9 +37,10 @@ class BatchedConnectionStatus int _failuresSinceSuccessfulBatch; - public BatchedConnectionStatus(TimeSpan period) + public FailureAwareBatchScheduler(TimeSpan period) { - if (period < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(period), "The batching period must be a positive timespan"); + if (period < TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(period), "The batching period must be a positive timespan."); _period = period; } diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs index a6f4f48..4676136 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs @@ -38,17 +38,17 @@ public sealed class PeriodicBatchingSink : ILogEventSink, IDisposable readonly object _stateLock = new(); // Needed because the read loop needs to observe shutdown even when the target batched (remote) sink is // unable to accept events (preventing the queue from being drained and completion being observed). - readonly CancellationTokenSource _unloading = new(); + readonly CancellationTokenSource _shutdownSignal = new(); // The write side can wait on this to ensure shutdown has completed. - readonly Task _loop; + readonly Task _runLoop; // Used only by the read side. - readonly IBatchedLogEventSink _batchedLogEventSink; + readonly IBatchedLogEventSink _targetSink; readonly int _batchSizeLimit; readonly bool _eagerlyEmitFirstEvent; - readonly BatchedConnectionStatus _status; - readonly Queue _waitingBatch = new(); - readonly Task _delayUntilUnload; + readonly FailureAwareBatchScheduler _batchScheduler; + readonly Queue _currentBatch = new(); + readonly Task _waitForShutdownSignal; /// /// Construct a . @@ -65,17 +65,17 @@ public PeriodicBatchingSink(IBatchedLogEventSink batchedSink, PeriodicBatchingSi if (options.Period <= TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(options), "The period must be greater than zero."); - _batchedLogEventSink = batchedSink ?? throw new ArgumentNullException(nameof(batchedSink)); + _targetSink = batchedSink ?? throw new ArgumentNullException(nameof(batchedSink)); _batchSizeLimit = options.BatchSizeLimit; _queue = options.QueueLimit is { } limit ? Channel.CreateBounded(new BoundedChannelOptions(limit) { SingleReader = true }) : Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); - _status = new BatchedConnectionStatus(options.Period); + _batchScheduler = new FailureAwareBatchScheduler(options.Period); _eagerlyEmitFirstEvent = options.EagerlyEmitFirstEvent; - _delayUntilUnload = Task.Delay(Timeout.InfiniteTimeSpan, _unloading.Token) + _waitForShutdownSignal = Task.Delay(Timeout.InfiniteTimeSpan, _shutdownSignal.Token) .ContinueWith(e => e.Exception, TaskContinuationOptions.OnlyOnFaulted); - _loop = Task.Run(LoopAsync, _unloading.Token); + _runLoop = Task.Run(LoopAsync, _shutdownSignal.Token); } /// @@ -93,7 +93,7 @@ public void Emit(LogEvent logEvent) { if (logEvent == null) throw new ArgumentNullException(nameof(logEvent)); - if (_unloading.IsCancellationRequested) + if (_shutdownSignal.IsCancellationRequested) return; _queue.Writer.TryWrite(logEvent); @@ -109,23 +109,23 @@ public void Dispose() { lock (_stateLock) { - if (!_unloading.IsCancellationRequested) + if (!_shutdownSignal.IsCancellationRequested) { _queue.Writer.Complete(); - _unloading.Cancel(); + _shutdownSignal.Cancel(); } } - _loop.Wait(); + _runLoop.Wait(); } catch (Exception ex) { // E.g. the task was canceled before ever being run, or internally failed and threw // an unexpected exception. - SelfLog.WriteLine($"PeriodicBatchingSink ({_batchedLogEventSink}) caught exception during disposal: {ex}"); + SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) caught exception during disposal: {ex}"); } - (_batchedLogEventSink as IDisposable)?.Dispose(); + (_targetSink as IDisposable)?.Dispose(); } #if FEATURE_ASYNCDISPOSABLE @@ -136,26 +136,26 @@ public async ValueTask DisposeAsync() { lock (_stateLock) { - if (!_unloading.IsCancellationRequested) + if (!_shutdownSignal.IsCancellationRequested) { _queue.Writer.Complete(); - _unloading.Cancel(); + _shutdownSignal.Cancel(); } } - await _loop.ConfigureAwait(false); + await _runLoop.ConfigureAwait(false); } catch (Exception ex) { // E.g. the task was canceled before ever being run, or internally failed and threw // an unexpected exception. - SelfLog.WriteLine($"PeriodicBatchingSink ({_batchedLogEventSink}): caught exception during async disposal: {ex}"); + SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}): caught exception during async disposal: {ex}"); } - if (_batchedLogEventSink is IAsyncDisposable asyncDisposable) + if (_targetSink is IAsyncDisposable asyncDisposable) await asyncDisposable.DisposeAsync().ConfigureAwait(false); else - (_batchedLogEventSink as IDisposable)?.Dispose(); + (_targetSink as IDisposable)?.Dispose(); } #endif @@ -164,56 +164,56 @@ async Task LoopAsync() var isEagerBatch = _eagerlyEmitFirstEvent; do { - var fillBatch = Task.Delay(_status.NextInterval); + var fillBatch = Task.Delay(_batchScheduler.NextInterval); do { - while (_waitingBatch.Count < _batchSizeLimit && - !_unloading.IsCancellationRequested && + while (_currentBatch.Count < _batchSizeLimit && + !_shutdownSignal.IsCancellationRequested && _queue.Reader.TryRead(out var next)) { - _waitingBatch.Enqueue(next); + _currentBatch.Enqueue(next); } - } while ((_waitingBatch.Count < _batchSizeLimit || _waitingBatch.Count > 0 && isEagerBatch) && - !_unloading.IsCancellationRequested && - await TryWaitToReadAsync(_queue.Reader, fillBatch, _unloading.Token).ConfigureAwait(false)); + } while ((_currentBatch.Count < _batchSizeLimit || _currentBatch.Count > 0 && isEagerBatch) && + !_shutdownSignal.IsCancellationRequested && + await TryWaitToReadAsync(_queue.Reader, fillBatch, _shutdownSignal.Token).ConfigureAwait(false)); try { - if (_waitingBatch.Count == 0) + if (_currentBatch.Count == 0) { - await _batchedLogEventSink.OnEmptyBatchAsync().ConfigureAwait(false); + await _targetSink.OnEmptyBatchAsync().ConfigureAwait(false); } else { isEagerBatch = false; - await _batchedLogEventSink.EmitBatchAsync(_waitingBatch).ConfigureAwait(false); + await _targetSink.EmitBatchAsync(_currentBatch).ConfigureAwait(false); } - _waitingBatch.Clear(); - _status.MarkSuccess(); + _currentBatch.Clear(); + _batchScheduler.MarkSuccess(); } catch (Exception ex) { - SelfLog.WriteLine($"PeriodicBatchingSink ({_batchedLogEventSink}) failed emitting a batch: {ex}"); - _status.MarkFailure(); + SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) failed emitting a batch: {ex}"); + _batchScheduler.MarkFailure(); - if (_status.ShouldDropBatch) - _waitingBatch.Clear(); + if (_batchScheduler.ShouldDropBatch) + _currentBatch.Clear(); - if (_status.ShouldDropQueue) + if (_batchScheduler.ShouldDropQueue) { // This is not ideal; the goal is to reduce memory pressure on the client if the server is offline // for extended periods. May be worth reviewing and possibly abandoning this. - while (_queue.Reader.TryRead(out _) && !_unloading.IsCancellationRequested) { } + while (_queue.Reader.TryRead(out _) && !_shutdownSignal.IsCancellationRequested) { } } // Wait out the remainder of the batch fill time so that we don't overwhelm the server. With each // successive failure the interval will increase. Needs special handling so that we don't need to // make `fillBatch` cancellable (and thus fallible). - await Task.WhenAny(fillBatch, _delayUntilUnload).ConfigureAwait(false); + await Task.WhenAny(fillBatch, _waitForShutdownSignal).ConfigureAwait(false); } } - while (!_unloading.IsCancellationRequested); + while (!_shutdownSignal.IsCancellationRequested); // At this point: // - The sink is being disposed @@ -228,22 +228,22 @@ async Task LoopAsync() { while (_queue.Reader.TryPeek(out _)) { - while (_waitingBatch.Count < _batchSizeLimit && + while (_currentBatch.Count < _batchSizeLimit && _queue.Reader.TryRead(out var next)) { - _waitingBatch.Enqueue(next); + _currentBatch.Enqueue(next); } - if (_waitingBatch.Count != 0) + if (_currentBatch.Count != 0) { - await _batchedLogEventSink.EmitBatchAsync(_waitingBatch).ConfigureAwait(false); - _waitingBatch.Clear(); + await _targetSink.EmitBatchAsync(_currentBatch).ConfigureAwait(false); + _currentBatch.Clear(); } } } catch (Exception ex) { - SelfLog.WriteLine($"PeriodicBatchingSink ({_batchedLogEventSink}) failed emitting a batch during shutdown; dropping remaining queued events: {ex}"); + SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) failed emitting a batch during shutdown; dropping remaining queued events: {ex}"); } } @@ -256,7 +256,7 @@ async Task TryWaitToReadAsync(ChannelReader reader, Task timeout // read task cancellation exceptions during shutdown, may be some room to improve. if (completed is { Exception: not null, IsCanceled: false }) { - SelfLog.WriteLine($"PeriodicBatchingSink ({_batchedLogEventSink}) could not read from queue: {completed.Exception}"); + SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) could not read from queue: {completed.Exception}"); } // `Task.IsCompletedSuccessfully` not available in .NET Standard 2.0/Framework. diff --git a/test/Serilog.Sinks.PeriodicBatching.Tests/BatchedConnectionStatusTests.cs b/test/Serilog.Sinks.PeriodicBatching.Tests/FailureAwareBatchSchedulerTests.cs similarity index 80% rename from test/Serilog.Sinks.PeriodicBatching.Tests/BatchedConnectionStatusTests.cs rename to test/Serilog.Sinks.PeriodicBatching.Tests/FailureAwareBatchSchedulerTests.cs index f1b30b5..a1ef9a9 100644 --- a/test/Serilog.Sinks.PeriodicBatching.Tests/BatchedConnectionStatusTests.cs +++ b/test/Serilog.Sinks.PeriodicBatching.Tests/FailureAwareBatchSchedulerTests.cs @@ -3,21 +3,21 @@ namespace Serilog.Sinks.PeriodicBatching.Tests; -public class BatchedConnectionStatusTests +public class FailureAwareBatchSchedulerTests { readonly TimeSpan _defaultPeriod = TimeSpan.FromSeconds(2); [Fact] public void WhenNoFailuresHaveOccurredTheRegularIntervalIsUsed() { - var bcs = new BatchedConnectionStatus(_defaultPeriod); + var bcs = new FailureAwareBatchScheduler(_defaultPeriod); Assert.Equal(_defaultPeriod, bcs.NextInterval); } [Fact] public void WhenOneFailureHasOccurredTheRegularIntervalIsUsed() { - var bcs = new BatchedConnectionStatus(_defaultPeriod); + var bcs = new FailureAwareBatchScheduler(_defaultPeriod); bcs.MarkFailure(); Assert.Equal(_defaultPeriod, bcs.NextInterval); } @@ -25,7 +25,7 @@ public void WhenOneFailureHasOccurredTheRegularIntervalIsUsed() [Fact] public void WhenTwoFailuresHaveOccurredTheIntervalBacksOff() { - var bcs = new BatchedConnectionStatus(_defaultPeriod); + var bcs = new FailureAwareBatchScheduler(_defaultPeriod); bcs.MarkFailure(); bcs.MarkFailure(); Assert.Equal(TimeSpan.FromSeconds(10), bcs.NextInterval); @@ -34,7 +34,7 @@ public void WhenTwoFailuresHaveOccurredTheIntervalBacksOff() [Fact] public void WhenABatchSucceedsTheStatusResets() { - var bcs = new BatchedConnectionStatus(_defaultPeriod); + var bcs = new FailureAwareBatchScheduler(_defaultPeriod); bcs.MarkFailure(); bcs.MarkFailure(); bcs.MarkSuccess(); @@ -44,7 +44,7 @@ public void WhenABatchSucceedsTheStatusResets() [Fact] public void WhenThreeFailuresHaveOccurredTheIntervalBacksOff() { - var bcs = new BatchedConnectionStatus(_defaultPeriod); + var bcs = new FailureAwareBatchScheduler(_defaultPeriod); bcs.MarkFailure(); bcs.MarkFailure(); bcs.MarkFailure(); @@ -55,7 +55,7 @@ public void WhenThreeFailuresHaveOccurredTheIntervalBacksOff() [Fact] public void When8FailuresHaveOccurredTheIntervalBacksOffAndBatchIsDropped() { - var bcs = new BatchedConnectionStatus(_defaultPeriod); + var bcs = new FailureAwareBatchScheduler(_defaultPeriod); for (var i = 0; i < 8; ++i) { Assert.False(bcs.ShouldDropBatch); @@ -69,7 +69,7 @@ public void When8FailuresHaveOccurredTheIntervalBacksOffAndBatchIsDropped() [Fact] public void When10FailuresHaveOccurredTheQueueIsDropped() { - var bcs = new BatchedConnectionStatus(_defaultPeriod); + var bcs = new FailureAwareBatchScheduler(_defaultPeriod); for (var i = 0; i < 10; ++i) { Assert.False(bcs.ShouldDropQueue); @@ -81,7 +81,7 @@ public void When10FailuresHaveOccurredTheQueueIsDropped() [Fact] public void AtTheDefaultIntervalRetriesFor10MinutesBeforeDroppingBatch() { - var bcs = new BatchedConnectionStatus(_defaultPeriod); + var bcs = new FailureAwareBatchScheduler(_defaultPeriod); var cumulative = TimeSpan.Zero; do { @@ -98,7 +98,7 @@ public void AtTheDefaultIntervalRetriesFor10MinutesBeforeDroppingBatch() [Fact] public void AtTheDefaultIntervalRetriesFor30MinutesBeforeDroppingQueue() { - var bcs = new BatchedConnectionStatus(_defaultPeriod); + var bcs = new FailureAwareBatchScheduler(_defaultPeriod); var cumulative = TimeSpan.Zero; do { diff --git a/test/Serilog.Sinks.PeriodicBatching.Tests/Serilog.Sinks.PeriodicBatching.Tests.csproj b/test/Serilog.Sinks.PeriodicBatching.Tests/Serilog.Sinks.PeriodicBatching.Tests.csproj index 59d0280..1383a8e 100644 --- a/test/Serilog.Sinks.PeriodicBatching.Tests/Serilog.Sinks.PeriodicBatching.Tests.csproj +++ b/test/Serilog.Sinks.PeriodicBatching.Tests/Serilog.Sinks.PeriodicBatching.Tests.csproj @@ -6,6 +6,8 @@ ../../assets/Serilog.snk true true + latest + enable From f61250cfe9798559da7321b23d5f645d86a7bb0f Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Sat, 30 Dec 2023 08:14:18 +1000 Subject: [PATCH 11/16] Add README to package --- .../Serilog.Sinks.PeriodicBatching.csproj | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Serilog.Sinks.PeriodicBatching/Serilog.Sinks.PeriodicBatching.csproj b/src/Serilog.Sinks.PeriodicBatching/Serilog.Sinks.PeriodicBatching.csproj index 6e02281..b2b628d 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Serilog.Sinks.PeriodicBatching.csproj +++ b/src/Serilog.Sinks.PeriodicBatching/Serilog.Sinks.PeriodicBatching.csproj @@ -15,6 +15,7 @@ https://github.com/serilog/serilog-sinks-periodicbatching git enable + README.md @@ -28,6 +29,7 @@ + From 81b23fa3162dbbd14f3dfb9fc6e608fdb8971248 Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Sat, 30 Dec 2023 08:17:59 +1000 Subject: [PATCH 12/16] Don't allow the run loop to be canceled before execution, since it violates the flush-on-shutdown contract --- .../Sinks/PeriodicBatching/PeriodicBatchingSink.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs index 4676136..95ad675 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs @@ -75,7 +75,7 @@ public PeriodicBatchingSink(IBatchedLogEventSink batchedSink, PeriodicBatchingSi _waitForShutdownSignal = Task.Delay(Timeout.InfiniteTimeSpan, _shutdownSignal.Token) .ContinueWith(e => e.Exception, TaskContinuationOptions.OnlyOnFaulted); - _runLoop = Task.Run(LoopAsync, _shutdownSignal.Token); + _runLoop = Task.Run(LoopAsync); } /// From 9f2cea4cc461d28ba15ca0f36c69d49189b22250 Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Sat, 30 Dec 2023 08:20:09 +1000 Subject: [PATCH 13/16] Inherit docs for IDisposable.Dispose() --- .../Sinks/PeriodicBatching/PeriodicBatchingSink.cs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs index 95ad675..ad1865d 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs @@ -99,10 +99,7 @@ public void Emit(LogEvent logEvent) _queue.Writer.TryWrite(logEvent); } - /// - /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. - /// - /// 2 + /// public void Dispose() { try From 535a290298ae593110f4592659cb8025bf87e7f1 Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Sat, 30 Dec 2023 11:13:20 +1000 Subject: [PATCH 14/16] Comment explaining why the period-bounded batch filling loop is outside the try block --- .../Sinks/PeriodicBatching/PeriodicBatchingSink.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs index ad1865d..cdd5cf4 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs @@ -161,6 +161,10 @@ async Task LoopAsync() var isEagerBatch = _eagerlyEmitFirstEvent; do { + // Code from here through to the `try` block is expected to be infallible. It's structured this way because + // any failure modes within it haven't been accounted for in the rest of the sink design, and would need + // consideration in order for the sink to function robustly (i.e. to avoid hot/infinite looping). + var fillBatch = Task.Delay(_batchScheduler.NextInterval); do { From 7983a13d16f3a33f19d19eae5fc93afa180014f4 Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Sun, 31 Dec 2023 06:59:16 +1000 Subject: [PATCH 15/16] Review feedback --- .../PeriodicBatching/PeriodicBatchingSink.cs | 135 +++++++++--------- .../FailureAwareBatchSchedulerTests.cs | 90 ++++++------ 2 files changed, 112 insertions(+), 113 deletions(-) diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs index cdd5cf4..fbc0db7 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs @@ -98,63 +98,6 @@ public void Emit(LogEvent logEvent) _queue.Writer.TryWrite(logEvent); } - - /// - public void Dispose() - { - try - { - lock (_stateLock) - { - if (!_shutdownSignal.IsCancellationRequested) - { - _queue.Writer.Complete(); - _shutdownSignal.Cancel(); - } - } - - _runLoop.Wait(); - } - catch (Exception ex) - { - // E.g. the task was canceled before ever being run, or internally failed and threw - // an unexpected exception. - SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) caught exception during disposal: {ex}"); - } - - (_targetSink as IDisposable)?.Dispose(); - } - -#if FEATURE_ASYNCDISPOSABLE - /// - public async ValueTask DisposeAsync() - { - try - { - lock (_stateLock) - { - if (!_shutdownSignal.IsCancellationRequested) - { - _queue.Writer.Complete(); - _shutdownSignal.Cancel(); - } - } - - await _runLoop.ConfigureAwait(false); - } - catch (Exception ex) - { - // E.g. the task was canceled before ever being run, or internally failed and threw - // an unexpected exception. - SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}): caught exception during async disposal: {ex}"); - } - - if (_targetSink is IAsyncDisposable asyncDisposable) - await asyncDisposable.DisposeAsync().ConfigureAwait(false); - else - (_targetSink as IDisposable)?.Dispose(); - } -#endif async Task LoopAsync() { @@ -187,24 +130,31 @@ async Task LoopAsync() else { isEagerBatch = false; + await _targetSink.EmitBatchAsync(_currentBatch).ConfigureAwait(false); - } - _currentBatch.Clear(); - _batchScheduler.MarkSuccess(); + _currentBatch.Clear(); + _batchScheduler.MarkSuccess(); + } } catch (Exception ex) { SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) failed emitting a batch: {ex}"); _batchScheduler.MarkFailure(); - + if (_batchScheduler.ShouldDropBatch) + { + SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) dropping the current batch"); _currentBatch.Clear(); + } if (_batchScheduler.ShouldDropQueue) { - // This is not ideal; the goal is to reduce memory pressure on the client if the server is offline - // for extended periods. May be worth reviewing and possibly abandoning this. + SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) dropping all queued events"); + + // Not ideal, uses some CPU capacity unnecessarily and doesn't complete in bounded time. The goal is + // to reduce memory pressure on the client if the server is offline for extended periods. May be + // worth reviewing and possibly abandoning this. while (_queue.Reader.TryRead(out _) && !_shutdownSignal.IsCancellationRequested) { } } @@ -263,4 +213,61 @@ async Task TryWaitToReadAsync(ChannelReader reader, Task timeout // `Task.IsCompletedSuccessfully` not available in .NET Standard 2.0/Framework. return completed != timeout && completed is { IsCompleted: true, IsCanceled: false, IsFaulted: false }; } -} \ No newline at end of file + + /// + public void Dispose() + { + SignalShutdown(); + + try + { + _runLoop.Wait(); + } + catch (Exception ex) + { + // E.g. the task was canceled before ever being run, or internally failed and threw + // an unexpected exception. + SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) caught exception during disposal: {ex}"); + } + + (_targetSink as IDisposable)?.Dispose(); + } + +#if FEATURE_ASYNCDISPOSABLE + /// + public async ValueTask DisposeAsync() + { + SignalShutdown(); + + try + { + await _runLoop.ConfigureAwait(false); + } + catch (Exception ex) + { + // E.g. the task was canceled before ever being run, or internally failed and threw + // an unexpected exception. + SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}): caught exception during async disposal: {ex}"); + } + + if (_targetSink is IAsyncDisposable asyncDisposable) + await asyncDisposable.DisposeAsync().ConfigureAwait(false); + else + (_targetSink as IDisposable)?.Dispose(); + } +#endif + + void SignalShutdown() + { + lock (_stateLock) + { + if (!_shutdownSignal.IsCancellationRequested) + { + // Relies on synchronization via `_stateLock`: once the writer is completed, subsequent attempts to + // complete it will throw. + _queue.Writer.Complete(); + _shutdownSignal.Cancel(); + } + } + } +} diff --git a/test/Serilog.Sinks.PeriodicBatching.Tests/FailureAwareBatchSchedulerTests.cs b/test/Serilog.Sinks.PeriodicBatching.Tests/FailureAwareBatchSchedulerTests.cs index a1ef9a9..2ac40e8 100644 --- a/test/Serilog.Sinks.PeriodicBatching.Tests/FailureAwareBatchSchedulerTests.cs +++ b/test/Serilog.Sinks.PeriodicBatching.Tests/FailureAwareBatchSchedulerTests.cs @@ -5,109 +5,101 @@ namespace Serilog.Sinks.PeriodicBatching.Tests; public class FailureAwareBatchSchedulerTests { - readonly TimeSpan _defaultPeriod = TimeSpan.FromSeconds(2); + static TimeSpan Period => TimeSpan.FromSeconds(2); + FailureAwareBatchScheduler Scheduler { get; } = new(Period); [Fact] - public void WhenNoFailuresHaveOccurredTheRegularIntervalIsUsed() + public void WhenNoFailuresHaveOccurredTheInitialIntervalIsUsed() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); - Assert.Equal(_defaultPeriod, bcs.NextInterval); + Assert.Equal(Period, Scheduler.NextInterval); } [Fact] - public void WhenOneFailureHasOccurredTheRegularIntervalIsUsed() + public void WhenOneFailureHasOccurredTheInitialIntervalIsUsed() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); - bcs.MarkFailure(); - Assert.Equal(_defaultPeriod, bcs.NextInterval); + Scheduler.MarkFailure(); + Assert.Equal(Period, Scheduler.NextInterval); } [Fact] public void WhenTwoFailuresHaveOccurredTheIntervalBacksOff() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); - bcs.MarkFailure(); - bcs.MarkFailure(); - Assert.Equal(TimeSpan.FromSeconds(10), bcs.NextInterval); + Scheduler.MarkFailure(); + Scheduler.MarkFailure(); + Assert.Equal(TimeSpan.FromSeconds(10), Scheduler.NextInterval); } [Fact] public void WhenABatchSucceedsTheStatusResets() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); - bcs.MarkFailure(); - bcs.MarkFailure(); - bcs.MarkSuccess(); - Assert.Equal(_defaultPeriod, bcs.NextInterval); + Scheduler.MarkFailure(); + Scheduler.MarkFailure(); + Scheduler.MarkSuccess(); + Assert.Equal(Period, Scheduler.NextInterval); } [Fact] public void WhenThreeFailuresHaveOccurredTheIntervalBacksOff() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); - bcs.MarkFailure(); - bcs.MarkFailure(); - bcs.MarkFailure(); - Assert.Equal(TimeSpan.FromSeconds(20), bcs.NextInterval); - Assert.False(bcs.ShouldDropBatch); + Scheduler.MarkFailure(); + Scheduler.MarkFailure(); + Scheduler.MarkFailure(); + Assert.Equal(TimeSpan.FromSeconds(20), Scheduler.NextInterval); + Assert.False(Scheduler.ShouldDropBatch); } [Fact] - public void When8FailuresHaveOccurredTheIntervalBacksOffAndBatchIsDropped() + public void WhenEightFailuresHaveOccurredTheIntervalBacksOffAndBatchIsDropped() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); for (var i = 0; i < 8; ++i) { - Assert.False(bcs.ShouldDropBatch); - bcs.MarkFailure(); + Assert.False(Scheduler.ShouldDropBatch); + Scheduler.MarkFailure(); } - Assert.Equal(TimeSpan.FromMinutes(10), bcs.NextInterval); - Assert.True(bcs.ShouldDropBatch); - Assert.False(bcs.ShouldDropQueue); + Assert.Equal(TimeSpan.FromMinutes(10), Scheduler.NextInterval); + Assert.True(Scheduler.ShouldDropBatch); + Assert.False(Scheduler.ShouldDropQueue); } [Fact] - public void When10FailuresHaveOccurredTheQueueIsDropped() + public void WhenTenFailuresHaveOccurredTheQueueIsDropped() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); for (var i = 0; i < 10; ++i) { - Assert.False(bcs.ShouldDropQueue); - bcs.MarkFailure(); + Assert.False(Scheduler.ShouldDropQueue); + Scheduler.MarkFailure(); } - Assert.True(bcs.ShouldDropQueue); + Assert.True(Scheduler.ShouldDropQueue); } [Fact] - public void AtTheDefaultIntervalRetriesFor10MinutesBeforeDroppingBatch() + public void AtTheDefaultIntervalRetriesForTenMinutesBeforeDroppingBatch() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); var cumulative = TimeSpan.Zero; do { - bcs.MarkFailure(); + Scheduler.MarkFailure(); - if (!bcs.ShouldDropBatch) - cumulative += bcs.NextInterval; - } while (!bcs.ShouldDropBatch); + if (!Scheduler.ShouldDropBatch) + cumulative += Scheduler.NextInterval; + } while (!Scheduler.ShouldDropBatch); - Assert.False(bcs.ShouldDropQueue); + Assert.False(Scheduler.ShouldDropQueue); Assert.Equal(TimeSpan.Parse("00:10:32", CultureInfo.InvariantCulture), cumulative); } [Fact] - public void AtTheDefaultIntervalRetriesFor30MinutesBeforeDroppingQueue() + public void AtTheDefaultIntervalRetriesForThirtyMinutesBeforeDroppingQueue() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); var cumulative = TimeSpan.Zero; do { - bcs.MarkFailure(); + Scheduler.MarkFailure(); - if (!bcs.ShouldDropQueue) - cumulative += bcs.NextInterval; - } while (!bcs.ShouldDropQueue); + if (!Scheduler.ShouldDropQueue) + cumulative += Scheduler.NextInterval; + } while (!Scheduler.ShouldDropQueue); Assert.Equal(TimeSpan.Parse("00:30:32", CultureInfo.InvariantCulture), cumulative); } -} \ No newline at end of file +} From 83f9133ff87183c97f882ec8964fab0dc6e843fd Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Mon, 1 Jan 2024 10:34:06 +1000 Subject: [PATCH 16/16] Factor out SelfLog calls and improve formatting --- .../PeriodicBatching/PeriodicBatchingSink.cs | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs index fbc0db7..2415ea0 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs @@ -139,18 +139,18 @@ async Task LoopAsync() } catch (Exception ex) { - SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) failed emitting a batch: {ex}"); + WriteToSelfLog("failed emitting a batch", ex); _batchScheduler.MarkFailure(); if (_batchScheduler.ShouldDropBatch) { - SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) dropping the current batch"); + WriteToSelfLog("dropping the current batch"); _currentBatch.Clear(); } if (_batchScheduler.ShouldDropQueue) { - SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) dropping all queued events"); + WriteToSelfLog("dropping all queued events"); // Not ideal, uses some CPU capacity unnecessarily and doesn't complete in bounded time. The goal is // to reduce memory pressure on the client if the server is offline for extended periods. May be @@ -194,7 +194,7 @@ async Task LoopAsync() } catch (Exception ex) { - SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) failed emitting a batch during shutdown; dropping remaining queued events: {ex}"); + WriteToSelfLog("failed emitting a batch during shutdown; dropping remaining queued events", ex); } } @@ -207,7 +207,7 @@ async Task TryWaitToReadAsync(ChannelReader reader, Task timeout // read task cancellation exceptions during shutdown, may be some room to improve. if (completed is { Exception: not null, IsCanceled: false }) { - SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) could not read from queue: {completed.Exception}"); + WriteToSelfLog($"could not read from queue: {completed.Exception}"); } // `Task.IsCompletedSuccessfully` not available in .NET Standard 2.0/Framework. @@ -227,7 +227,7 @@ public void Dispose() { // E.g. the task was canceled before ever being run, or internally failed and threw // an unexpected exception. - SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) caught exception during disposal: {ex}"); + WriteToSelfLog("caught exception during disposal", ex); } (_targetSink as IDisposable)?.Dispose(); @@ -247,7 +247,7 @@ public async ValueTask DisposeAsync() { // E.g. the task was canceled before ever being run, or internally failed and threw // an unexpected exception. - SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}): caught exception during async disposal: {ex}"); + WriteToSelfLog("caught exception during async disposal", ex); } if (_targetSink is IAsyncDisposable asyncDisposable) @@ -270,4 +270,10 @@ void SignalShutdown() } } } + + void WriteToSelfLog(string message, Exception? exception = null) + { + var ex = exception != null ? $"{Environment.NewLine}{exception}" : ""; + SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}): {message}{ex}"); + } }