From 7983a13d16f3a33f19d19eae5fc93afa180014f4 Mon Sep 17 00:00:00 2001 From: Nicholas Blumhardt Date: Sun, 31 Dec 2023 06:59:16 +1000 Subject: [PATCH] Review feedback --- .../PeriodicBatching/PeriodicBatchingSink.cs | 135 +++++++++--------- .../FailureAwareBatchSchedulerTests.cs | 90 ++++++------ 2 files changed, 112 insertions(+), 113 deletions(-) diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs index cdd5cf4..fbc0db7 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs @@ -98,63 +98,6 @@ public void Emit(LogEvent logEvent) _queue.Writer.TryWrite(logEvent); } - - /// - public void Dispose() - { - try - { - lock (_stateLock) - { - if (!_shutdownSignal.IsCancellationRequested) - { - _queue.Writer.Complete(); - _shutdownSignal.Cancel(); - } - } - - _runLoop.Wait(); - } - catch (Exception ex) - { - // E.g. the task was canceled before ever being run, or internally failed and threw - // an unexpected exception. - SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) caught exception during disposal: {ex}"); - } - - (_targetSink as IDisposable)?.Dispose(); - } - -#if FEATURE_ASYNCDISPOSABLE - /// - public async ValueTask DisposeAsync() - { - try - { - lock (_stateLock) - { - if (!_shutdownSignal.IsCancellationRequested) - { - _queue.Writer.Complete(); - _shutdownSignal.Cancel(); - } - } - - await _runLoop.ConfigureAwait(false); - } - catch (Exception ex) - { - // E.g. the task was canceled before ever being run, or internally failed and threw - // an unexpected exception. - SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}): caught exception during async disposal: {ex}"); - } - - if (_targetSink is IAsyncDisposable asyncDisposable) - await asyncDisposable.DisposeAsync().ConfigureAwait(false); - else - (_targetSink as IDisposable)?.Dispose(); - } -#endif async Task LoopAsync() { @@ -187,24 +130,31 @@ async Task LoopAsync() else { isEagerBatch = false; + await _targetSink.EmitBatchAsync(_currentBatch).ConfigureAwait(false); - } - _currentBatch.Clear(); - _batchScheduler.MarkSuccess(); + _currentBatch.Clear(); + _batchScheduler.MarkSuccess(); + } } catch (Exception ex) { SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) failed emitting a batch: {ex}"); _batchScheduler.MarkFailure(); - + if (_batchScheduler.ShouldDropBatch) + { + SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) dropping the current batch"); _currentBatch.Clear(); + } if (_batchScheduler.ShouldDropQueue) { - // This is not ideal; the goal is to reduce memory pressure on the client if the server is offline - // for extended periods. May be worth reviewing and possibly abandoning this. + SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) dropping all queued events"); + + // Not ideal, uses some CPU capacity unnecessarily and doesn't complete in bounded time. The goal is + // to reduce memory pressure on the client if the server is offline for extended periods. May be + // worth reviewing and possibly abandoning this. while (_queue.Reader.TryRead(out _) && !_shutdownSignal.IsCancellationRequested) { } } @@ -263,4 +213,61 @@ async Task TryWaitToReadAsync(ChannelReader reader, Task timeout // `Task.IsCompletedSuccessfully` not available in .NET Standard 2.0/Framework. return completed != timeout && completed is { IsCompleted: true, IsCanceled: false, IsFaulted: false }; } -} \ No newline at end of file + + /// + public void Dispose() + { + SignalShutdown(); + + try + { + _runLoop.Wait(); + } + catch (Exception ex) + { + // E.g. the task was canceled before ever being run, or internally failed and threw + // an unexpected exception. + SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) caught exception during disposal: {ex}"); + } + + (_targetSink as IDisposable)?.Dispose(); + } + +#if FEATURE_ASYNCDISPOSABLE + /// + public async ValueTask DisposeAsync() + { + SignalShutdown(); + + try + { + await _runLoop.ConfigureAwait(false); + } + catch (Exception ex) + { + // E.g. the task was canceled before ever being run, or internally failed and threw + // an unexpected exception. + SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}): caught exception during async disposal: {ex}"); + } + + if (_targetSink is IAsyncDisposable asyncDisposable) + await asyncDisposable.DisposeAsync().ConfigureAwait(false); + else + (_targetSink as IDisposable)?.Dispose(); + } +#endif + + void SignalShutdown() + { + lock (_stateLock) + { + if (!_shutdownSignal.IsCancellationRequested) + { + // Relies on synchronization via `_stateLock`: once the writer is completed, subsequent attempts to + // complete it will throw. + _queue.Writer.Complete(); + _shutdownSignal.Cancel(); + } + } + } +} diff --git a/test/Serilog.Sinks.PeriodicBatching.Tests/FailureAwareBatchSchedulerTests.cs b/test/Serilog.Sinks.PeriodicBatching.Tests/FailureAwareBatchSchedulerTests.cs index a1ef9a9..2ac40e8 100644 --- a/test/Serilog.Sinks.PeriodicBatching.Tests/FailureAwareBatchSchedulerTests.cs +++ b/test/Serilog.Sinks.PeriodicBatching.Tests/FailureAwareBatchSchedulerTests.cs @@ -5,109 +5,101 @@ namespace Serilog.Sinks.PeriodicBatching.Tests; public class FailureAwareBatchSchedulerTests { - readonly TimeSpan _defaultPeriod = TimeSpan.FromSeconds(2); + static TimeSpan Period => TimeSpan.FromSeconds(2); + FailureAwareBatchScheduler Scheduler { get; } = new(Period); [Fact] - public void WhenNoFailuresHaveOccurredTheRegularIntervalIsUsed() + public void WhenNoFailuresHaveOccurredTheInitialIntervalIsUsed() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); - Assert.Equal(_defaultPeriod, bcs.NextInterval); + Assert.Equal(Period, Scheduler.NextInterval); } [Fact] - public void WhenOneFailureHasOccurredTheRegularIntervalIsUsed() + public void WhenOneFailureHasOccurredTheInitialIntervalIsUsed() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); - bcs.MarkFailure(); - Assert.Equal(_defaultPeriod, bcs.NextInterval); + Scheduler.MarkFailure(); + Assert.Equal(Period, Scheduler.NextInterval); } [Fact] public void WhenTwoFailuresHaveOccurredTheIntervalBacksOff() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); - bcs.MarkFailure(); - bcs.MarkFailure(); - Assert.Equal(TimeSpan.FromSeconds(10), bcs.NextInterval); + Scheduler.MarkFailure(); + Scheduler.MarkFailure(); + Assert.Equal(TimeSpan.FromSeconds(10), Scheduler.NextInterval); } [Fact] public void WhenABatchSucceedsTheStatusResets() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); - bcs.MarkFailure(); - bcs.MarkFailure(); - bcs.MarkSuccess(); - Assert.Equal(_defaultPeriod, bcs.NextInterval); + Scheduler.MarkFailure(); + Scheduler.MarkFailure(); + Scheduler.MarkSuccess(); + Assert.Equal(Period, Scheduler.NextInterval); } [Fact] public void WhenThreeFailuresHaveOccurredTheIntervalBacksOff() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); - bcs.MarkFailure(); - bcs.MarkFailure(); - bcs.MarkFailure(); - Assert.Equal(TimeSpan.FromSeconds(20), bcs.NextInterval); - Assert.False(bcs.ShouldDropBatch); + Scheduler.MarkFailure(); + Scheduler.MarkFailure(); + Scheduler.MarkFailure(); + Assert.Equal(TimeSpan.FromSeconds(20), Scheduler.NextInterval); + Assert.False(Scheduler.ShouldDropBatch); } [Fact] - public void When8FailuresHaveOccurredTheIntervalBacksOffAndBatchIsDropped() + public void WhenEightFailuresHaveOccurredTheIntervalBacksOffAndBatchIsDropped() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); for (var i = 0; i < 8; ++i) { - Assert.False(bcs.ShouldDropBatch); - bcs.MarkFailure(); + Assert.False(Scheduler.ShouldDropBatch); + Scheduler.MarkFailure(); } - Assert.Equal(TimeSpan.FromMinutes(10), bcs.NextInterval); - Assert.True(bcs.ShouldDropBatch); - Assert.False(bcs.ShouldDropQueue); + Assert.Equal(TimeSpan.FromMinutes(10), Scheduler.NextInterval); + Assert.True(Scheduler.ShouldDropBatch); + Assert.False(Scheduler.ShouldDropQueue); } [Fact] - public void When10FailuresHaveOccurredTheQueueIsDropped() + public void WhenTenFailuresHaveOccurredTheQueueIsDropped() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); for (var i = 0; i < 10; ++i) { - Assert.False(bcs.ShouldDropQueue); - bcs.MarkFailure(); + Assert.False(Scheduler.ShouldDropQueue); + Scheduler.MarkFailure(); } - Assert.True(bcs.ShouldDropQueue); + Assert.True(Scheduler.ShouldDropQueue); } [Fact] - public void AtTheDefaultIntervalRetriesFor10MinutesBeforeDroppingBatch() + public void AtTheDefaultIntervalRetriesForTenMinutesBeforeDroppingBatch() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); var cumulative = TimeSpan.Zero; do { - bcs.MarkFailure(); + Scheduler.MarkFailure(); - if (!bcs.ShouldDropBatch) - cumulative += bcs.NextInterval; - } while (!bcs.ShouldDropBatch); + if (!Scheduler.ShouldDropBatch) + cumulative += Scheduler.NextInterval; + } while (!Scheduler.ShouldDropBatch); - Assert.False(bcs.ShouldDropQueue); + Assert.False(Scheduler.ShouldDropQueue); Assert.Equal(TimeSpan.Parse("00:10:32", CultureInfo.InvariantCulture), cumulative); } [Fact] - public void AtTheDefaultIntervalRetriesFor30MinutesBeforeDroppingQueue() + public void AtTheDefaultIntervalRetriesForThirtyMinutesBeforeDroppingQueue() { - var bcs = new FailureAwareBatchScheduler(_defaultPeriod); var cumulative = TimeSpan.Zero; do { - bcs.MarkFailure(); + Scheduler.MarkFailure(); - if (!bcs.ShouldDropQueue) - cumulative += bcs.NextInterval; - } while (!bcs.ShouldDropQueue); + if (!Scheduler.ShouldDropQueue) + cumulative += Scheduler.NextInterval; + } while (!Scheduler.ShouldDropQueue); Assert.Equal(TimeSpan.Parse("00:30:32", CultureInfo.InvariantCulture), cumulative); } -} \ No newline at end of file +}