Skip to content

Commit

Permalink
Review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
nblumhardt committed Dec 30, 2023
1 parent 535a290 commit 7983a13
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,63 +98,6 @@ public void Emit(LogEvent logEvent)

_queue.Writer.TryWrite(logEvent);
}

/// <inheritdoc/>
public void Dispose()
{
try
{
lock (_stateLock)
{
if (!_shutdownSignal.IsCancellationRequested)
{
_queue.Writer.Complete();
_shutdownSignal.Cancel();
}
}

_runLoop.Wait();
}
catch (Exception ex)
{
// E.g. the task was canceled before ever being run, or internally failed and threw
// an unexpected exception.
SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) caught exception during disposal: {ex}");
}

(_targetSink as IDisposable)?.Dispose();
}

#if FEATURE_ASYNCDISPOSABLE
/// <inheritdoc/>
public async ValueTask DisposeAsync()
{
try
{
lock (_stateLock)
{
if (!_shutdownSignal.IsCancellationRequested)
{
_queue.Writer.Complete();
_shutdownSignal.Cancel();
}
}

await _runLoop.ConfigureAwait(false);
}
catch (Exception ex)
{
// E.g. the task was canceled before ever being run, or internally failed and threw
// an unexpected exception.
SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}): caught exception during async disposal: {ex}");
}

if (_targetSink is IAsyncDisposable asyncDisposable)
await asyncDisposable.DisposeAsync().ConfigureAwait(false);
else
(_targetSink as IDisposable)?.Dispose();
}
#endif

async Task LoopAsync()
{
Expand Down Expand Up @@ -187,24 +130,31 @@ async Task LoopAsync()
else
{
isEagerBatch = false;

await _targetSink.EmitBatchAsync(_currentBatch).ConfigureAwait(false);
}

_currentBatch.Clear();
_batchScheduler.MarkSuccess();
_currentBatch.Clear();
_batchScheduler.MarkSuccess();
}
}
catch (Exception ex)
{
SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) failed emitting a batch: {ex}");
_batchScheduler.MarkFailure();

if (_batchScheduler.ShouldDropBatch)
{
SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) dropping the current batch");
_currentBatch.Clear();
}

if (_batchScheduler.ShouldDropQueue)
{
// This is not ideal; the goal is to reduce memory pressure on the client if the server is offline
// for extended periods. May be worth reviewing and possibly abandoning this.
SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) dropping all queued events");

// Not ideal, uses some CPU capacity unnecessarily and doesn't complete in bounded time. The goal is
// to reduce memory pressure on the client if the server is offline for extended periods. May be
// worth reviewing and possibly abandoning this.
while (_queue.Reader.TryRead(out _) && !_shutdownSignal.IsCancellationRequested) { }
}

Expand Down Expand Up @@ -263,4 +213,61 @@ async Task<bool> TryWaitToReadAsync(ChannelReader<LogEvent> reader, Task timeout
// `Task.IsCompletedSuccessfully` not available in .NET Standard 2.0/Framework.
return completed != timeout && completed is { IsCompleted: true, IsCanceled: false, IsFaulted: false };
}
}

/// <inheritdoc/>
public void Dispose()
{
SignalShutdown();

try
{
_runLoop.Wait();
}
catch (Exception ex)
{
// E.g. the task was canceled before ever being run, or internally failed and threw
// an unexpected exception.
SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}) caught exception during disposal: {ex}");
}

(_targetSink as IDisposable)?.Dispose();
}

#if FEATURE_ASYNCDISPOSABLE
/// <inheritdoc/>
public async ValueTask DisposeAsync()
{
SignalShutdown();

try
{
await _runLoop.ConfigureAwait(false);
}
catch (Exception ex)
{
// E.g. the task was canceled before ever being run, or internally failed and threw
// an unexpected exception.
SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}): caught exception during async disposal: {ex}");
}

if (_targetSink is IAsyncDisposable asyncDisposable)
await asyncDisposable.DisposeAsync().ConfigureAwait(false);
else
(_targetSink as IDisposable)?.Dispose();
}
#endif

void SignalShutdown()
{
lock (_stateLock)
{
if (!_shutdownSignal.IsCancellationRequested)
{
// Relies on synchronization via `_stateLock`: once the writer is completed, subsequent attempts to
// complete it will throw.
_queue.Writer.Complete();
_shutdownSignal.Cancel();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,109 +5,101 @@ namespace Serilog.Sinks.PeriodicBatching.Tests;

public class FailureAwareBatchSchedulerTests
{
readonly TimeSpan _defaultPeriod = TimeSpan.FromSeconds(2);
static TimeSpan Period => TimeSpan.FromSeconds(2);
FailureAwareBatchScheduler Scheduler { get; } = new(Period);

[Fact]
public void WhenNoFailuresHaveOccurredTheRegularIntervalIsUsed()
public void WhenNoFailuresHaveOccurredTheInitialIntervalIsUsed()
{
var bcs = new FailureAwareBatchScheduler(_defaultPeriod);
Assert.Equal(_defaultPeriod, bcs.NextInterval);
Assert.Equal(Period, Scheduler.NextInterval);
}

[Fact]
public void WhenOneFailureHasOccurredTheRegularIntervalIsUsed()
public void WhenOneFailureHasOccurredTheInitialIntervalIsUsed()
{
var bcs = new FailureAwareBatchScheduler(_defaultPeriod);
bcs.MarkFailure();
Assert.Equal(_defaultPeriod, bcs.NextInterval);
Scheduler.MarkFailure();
Assert.Equal(Period, Scheduler.NextInterval);
}

[Fact]
public void WhenTwoFailuresHaveOccurredTheIntervalBacksOff()
{
var bcs = new FailureAwareBatchScheduler(_defaultPeriod);
bcs.MarkFailure();
bcs.MarkFailure();
Assert.Equal(TimeSpan.FromSeconds(10), bcs.NextInterval);
Scheduler.MarkFailure();
Scheduler.MarkFailure();
Assert.Equal(TimeSpan.FromSeconds(10), Scheduler.NextInterval);
}

[Fact]
public void WhenABatchSucceedsTheStatusResets()
{
var bcs = new FailureAwareBatchScheduler(_defaultPeriod);
bcs.MarkFailure();
bcs.MarkFailure();
bcs.MarkSuccess();
Assert.Equal(_defaultPeriod, bcs.NextInterval);
Scheduler.MarkFailure();
Scheduler.MarkFailure();
Scheduler.MarkSuccess();
Assert.Equal(Period, Scheduler.NextInterval);
}

[Fact]
public void WhenThreeFailuresHaveOccurredTheIntervalBacksOff()
{
var bcs = new FailureAwareBatchScheduler(_defaultPeriod);
bcs.MarkFailure();
bcs.MarkFailure();
bcs.MarkFailure();
Assert.Equal(TimeSpan.FromSeconds(20), bcs.NextInterval);
Assert.False(bcs.ShouldDropBatch);
Scheduler.MarkFailure();
Scheduler.MarkFailure();
Scheduler.MarkFailure();
Assert.Equal(TimeSpan.FromSeconds(20), Scheduler.NextInterval);
Assert.False(Scheduler.ShouldDropBatch);
}

[Fact]
public void When8FailuresHaveOccurredTheIntervalBacksOffAndBatchIsDropped()
public void WhenEightFailuresHaveOccurredTheIntervalBacksOffAndBatchIsDropped()
{
var bcs = new FailureAwareBatchScheduler(_defaultPeriod);
for (var i = 0; i < 8; ++i)
{
Assert.False(bcs.ShouldDropBatch);
bcs.MarkFailure();
Assert.False(Scheduler.ShouldDropBatch);
Scheduler.MarkFailure();
}
Assert.Equal(TimeSpan.FromMinutes(10), bcs.NextInterval);
Assert.True(bcs.ShouldDropBatch);
Assert.False(bcs.ShouldDropQueue);
Assert.Equal(TimeSpan.FromMinutes(10), Scheduler.NextInterval);
Assert.True(Scheduler.ShouldDropBatch);
Assert.False(Scheduler.ShouldDropQueue);
}

[Fact]
public void When10FailuresHaveOccurredTheQueueIsDropped()
public void WhenTenFailuresHaveOccurredTheQueueIsDropped()
{
var bcs = new FailureAwareBatchScheduler(_defaultPeriod);
for (var i = 0; i < 10; ++i)
{
Assert.False(bcs.ShouldDropQueue);
bcs.MarkFailure();
Assert.False(Scheduler.ShouldDropQueue);
Scheduler.MarkFailure();
}
Assert.True(bcs.ShouldDropQueue);
Assert.True(Scheduler.ShouldDropQueue);
}

[Fact]
public void AtTheDefaultIntervalRetriesFor10MinutesBeforeDroppingBatch()
public void AtTheDefaultIntervalRetriesForTenMinutesBeforeDroppingBatch()
{
var bcs = new FailureAwareBatchScheduler(_defaultPeriod);
var cumulative = TimeSpan.Zero;
do
{
bcs.MarkFailure();
Scheduler.MarkFailure();

if (!bcs.ShouldDropBatch)
cumulative += bcs.NextInterval;
} while (!bcs.ShouldDropBatch);
if (!Scheduler.ShouldDropBatch)
cumulative += Scheduler.NextInterval;
} while (!Scheduler.ShouldDropBatch);

Assert.False(bcs.ShouldDropQueue);
Assert.False(Scheduler.ShouldDropQueue);
Assert.Equal(TimeSpan.Parse("00:10:32", CultureInfo.InvariantCulture), cumulative);
}

[Fact]
public void AtTheDefaultIntervalRetriesFor30MinutesBeforeDroppingQueue()
public void AtTheDefaultIntervalRetriesForThirtyMinutesBeforeDroppingQueue()
{
var bcs = new FailureAwareBatchScheduler(_defaultPeriod);
var cumulative = TimeSpan.Zero;
do
{
bcs.MarkFailure();
Scheduler.MarkFailure();

if (!bcs.ShouldDropQueue)
cumulative += bcs.NextInterval;
} while (!bcs.ShouldDropQueue);
if (!Scheduler.ShouldDropQueue)
cumulative += Scheduler.NextInterval;
} while (!Scheduler.ShouldDropQueue);

Assert.Equal(TimeSpan.Parse("00:30:32", CultureInfo.InvariantCulture), cumulative);
}
}
}

0 comments on commit 7983a13

Please sign in to comment.