Skip to content

Commit

Permalink
fix: handle race condition in BatchSequence with timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed Mar 11, 2022
1 parent b6894f6 commit 0a255b0
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 59 deletions.
4 changes: 2 additions & 2 deletions Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup Label="Package information">
<BaseVersionSuffix></BaseVersionSuffix>
<BaseVersion>3.6.0$(BaseVersionSuffix)</BaseVersion>
<BaseVersionSuffix>-beta.1</BaseVersionSuffix>
<BaseVersion>3.6.1$(BaseVersionSuffix)</BaseVersion>
<DatabasePackagesRevision>1</DatabasePackagesRevision>
<DatabasePackagesVersionSuffix>$(BaseVersionSuffix)</DatabasePackagesVersionSuffix>
</PropertyGroup>
Expand Down
8 changes: 8 additions & 0 deletions docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ uid: releases

# Releases

## [3.6.1](https://github.com/BEagle1984/silverback/releases/tag/v3.6.1)

### Fixes

* Handle race condition in [BatchSequence](xref:Silverback.Messaging.Sequences.Batch.BatchSequence) with timeout

# Releases

## [3.6.0](https://github.com/BEagle1984/silverback/releases/tag/v3.6.0)

### What's new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ public static class IntegrationLogEvents

/// <summary>
/// Gets the <see cref="LogEvent" /> representing the log that is written when an error occurs while
/// aborting an inbound sequence.
/// executing the timeout action on an inbound sequence.
/// </summary>
public static LogEvent ErrorAbortingInboundSequence { get; } = new(
public static LogEvent SequenceTimeoutError { get; } = new(
LogLevel.Warning,
GetEventId(110, nameof(ErrorAbortingInboundSequence)),
"Error occurred aborting the {sequenceType} '{sequenceId}'.");
GetEventId(110, nameof(SequenceTimeoutError)),
"Error occurred executing the timeout for the {sequenceType} '{sequenceId}'.");

/// <summary>
/// Gets the <see cref="LogEvent" /> representing the log that is written when connecting to the message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ internal static class IntegrationLoggerExtensions
SilverbackLoggerMessage.Define<string>(IntegrationLogEvents.SkippingIncompleteSequence);

private static readonly Action<ILogger, string, string, Exception?>
ErrorAbortingInboundSequence =
SequenceTimeoutError =
SilverbackLoggerMessage.Define<string, string>(
IntegrationLogEvents.ErrorAbortingInboundSequence);
IntegrationLogEvents.SequenceTimeoutError);

private static readonly Action<ILogger, string, Exception?>
BrokerConnecting =
Expand Down Expand Up @@ -286,15 +286,15 @@ public static void LogSkippingIncompleteSequence(
sequence.SequenceId,
null);

public static void LogSequenceAbortingError(
public static void LogSequenceTimeoutError(
this ISilverbackLogger logger,
ISequence sequence,
Exception exception)
{
if (!logger.IsEnabled(IntegrationLogEvents.ErrorAbortingInboundSequence))
if (!logger.IsEnabled(IntegrationLogEvents.SequenceTimeoutError))
return;

ErrorAbortingInboundSequence(
SequenceTimeoutError(
logger.InnerLogger,
sequence.GetType().Name,
sequence.SequenceId,
Expand Down Expand Up @@ -398,7 +398,11 @@ public static void LogConsumerDisconnectError(
this ISilverbackLogger logger,
IConsumer consumer,
Exception exception) =>
ConsumerDisconnectError(logger.InnerLogger, consumer.Id, consumer.Endpoint.DisplayName, exception);
ConsumerDisconnectError(
logger.InnerLogger,
consumer.Id,
consumer.Endpoint.DisplayName,
exception);

public static void LogConsumerStartError(
this ISilverbackLogger logger,
Expand Down Expand Up @@ -451,10 +455,14 @@ public static void LogErrorProcessingOutbox(
Exception exception) =>
ErrorProcessingOutbox(logger.InnerLogger, exception);

public static void LogInvalidMessageProduced(this ISilverbackLogger logger, string validationErrors) =>
public static void LogInvalidMessageProduced(
this ISilverbackLogger logger,
string validationErrors) =>
InvalidMessageProduced(logger.InnerLogger, validationErrors, null);

public static void LogInvalidMessageProcessed(this ISilverbackLogger logger, string validationErrors) =>
public static void LogInvalidMessageProcessed(
this ISilverbackLogger logger,
string validationErrors) =>
InvalidMessageProcessed(logger.InnerLogger, validationErrors, null);

public static void LogInvalidEndpointConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,12 @@ await PublishEnvelopeAsync(context, context.Envelope.Endpoint.ThrowIfUnhandled)
{
var unboundedSequence = await GetUnboundedSequenceAsync(context).ConfigureAwait(false);

int pushedStreamsCount =
await unboundedSequence!.AddAsync(envelope, null, false).ConfigureAwait(false);
var result = await unboundedSequence!.AddAsync(envelope, null, false).ConfigureAwait(false);

if (unboundedSequence.IsAborted && unboundedSequence.AbortException != null)
throw unboundedSequence.AbortException;

throwIfUnhandled &= pushedStreamsCount == 0;
throwIfUnhandled &= result.PushedStreamsCount == 0;
}

await PublishEnvelopeAsync(context, throwIfUnhandled).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2020 Sergio Aquilini
// This code is licensed under MIT license (see LICENSE file for details)

namespace Silverback.Messaging.Sequences
{
/// <summary>
/// Encapsulates the result of the <see cref="ISequence.AddAsync" /> method.
/// </summary>
/// <param name="IsSuccess">
/// A value indicating whether the operation was successful or not.
/// </param>
/// <param name="PushedStreamsCount">
/// The number of streams that have been actually pushed.
/// </param>
public record AddToSequenceResult(bool IsSuccess, int PushedStreamsCount)
{
/// <summary>
/// Gets a static instance representing a failed call to <see cref="ISequence.AddAsync" />.
/// </summary>
public static AddToSequenceResult Failed { get; } = new(false, -1);

/// <summary>
/// Returns a new instance representing a successful call to <see cref="ISequence.AddAsync" />.
/// </summary>
/// <param name="pushedStreams">
/// The number of streams that have been actually pushed.
/// </param>
/// <returns>
/// The <see cref="AddToSequenceResult" />.
/// </returns>
public static AddToSequenceResult Success(int pushedStreams) => new(true, pushedStreams);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public ChunkSequence(string sequenceId, int? totalLength, ConsumerPipelineContex
}

/// <inheritdoc cref="SequenceBase{TEnvelope}.AddCoreAsync" />
protected override Task<int> AddCoreAsync(
protected override Task<AddToSequenceResult> AddCoreAsync(
IRawInboundEnvelope envelope,
ISequence? sequence,
bool throwIfUnhandled)
Expand All @@ -47,7 +47,7 @@ protected override Task<int> AddCoreAsync(
throw new InvalidOperationException("Chunk index header not found.");

if (!EnsureOrdering(chunkIndex))
return Task.FromResult(0);
return Task.FromResult(AddToSequenceResult.Success(0));

return base.AddCoreAsync(envelope, sequence, throwIfUnhandled);
}
Expand Down
10 changes: 7 additions & 3 deletions src/Silverback.Integration/Messaging/Sequences/ISequence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,14 @@ IMessageStreamEnumerable<TMessage> CreateStream<TMessage>(
/// message.
/// </param>
/// <returns>
/// A <see cref="Task{TResult}" /> representing the asynchronous operation. The task result contains the
/// number of streams that have been pushed.
/// A <see cref="Task{TResult}" /> representing the asynchronous operation. The task result contains
/// a flag indicating whether the operation was successful and the number of streams that have been
/// actually pushed.
/// </returns>
Task<int> AddAsync(IRawInboundEnvelope envelope, ISequence? sequence, bool throwIfUnhandled = true);
Task<AddToSequenceResult> AddAsync(
IRawInboundEnvelope envelope,
ISequence? sequence,
bool throwIfUnhandled = true);

/// <summary>
/// Aborts the sequence processing. Used for example to signal that an exception occurred or the
Expand Down
64 changes: 39 additions & 25 deletions src/Silverback.Integration/Messaging/Sequences/SequenceBase`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public IMessageStreamEnumerable<TMessage> CreateStream<TMessage>(
StreamProvider.CreateStream<TMessage>(filters);

/// <inheritdoc cref="ISequence.AddAsync" />
public Task<int> AddAsync(
public Task<AddToSequenceResult> AddAsync(
IRawInboundEnvelope envelope,
ISequence? sequence,
bool throwIfUnhandled = true)
Expand Down Expand Up @@ -265,35 +265,39 @@ public void Dispose()
/// message.
/// </param>
/// <returns>
/// A <see cref="Task{TResult}" /> representing the asynchronous operation. The task result contains the
/// number of streams that have been pushed.
/// A <see cref="Task{TResult}" /> representing the asynchronous operation. The task result contains
/// a flag indicating whether the operation was successful and the number of streams that have been
/// actually pushed.
/// </returns>
protected virtual async Task<int> AddCoreAsync(
protected virtual async Task<AddToSequenceResult> AddCoreAsync(
TEnvelope envelope,
ISequence? sequence,
bool throwIfUnhandled)
{
if (!IsPending || IsCompleting)
return 0;

ResetTimeout();
await _addingSemaphoreSlim.WaitAsync().ConfigureAwait(false);

if (sequence != null && sequence != this)
{
_sequences ??= new List<ISequence>();
_sequences.Add(sequence);
(sequence as ISequenceImplementation)?.SetParentSequence(this);
}
else if (_trackIdentifiers)
{
_messageIdentifiers ??= new List<IBrokerMessageIdentifier>();
_messageIdentifiers.Add(envelope.BrokerMessageIdentifier);
}
if (IsComplete || IsCompleting)
return AddToSequenceResult.Failed;

await _addingSemaphoreSlim.WaitAsync().ConfigureAwait(false);
if (IsAborted)
throw new InvalidOperationException("The sequence has been aborted.");

try
{
ResetTimeout();

if (sequence != null && sequence != this)
{
_sequences ??= new List<ISequence>();
_sequences.Add(sequence);
(sequence as ISequenceImplementation)?.SetParentSequence(this);
}
else if (_trackIdentifiers)
{
_messageIdentifiers ??= new List<IBrokerMessageIdentifier>();
_messageIdentifiers.Add(envelope.BrokerMessageIdentifier);
}

_abortCancellationTokenSource.Token.ThrowIfCancellationRequested();

Length++;
Expand Down Expand Up @@ -322,12 +326,12 @@ protected virtual async Task<int> AddCoreAsync(
if (IsCompleting)
await CompleteAsync().ConfigureAwait(false);

return pushedStreamsCount;
return AddToSequenceResult.Success(pushedStreamsCount);
}
catch (OperationCanceledException)
{
// Ignore
return 0;
// Ignore and consider successful, it just means that the sequence was aborted.
return AddToSequenceResult.Success(0);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -473,7 +477,7 @@ private void CompleteLinkedSequence(ISequenceImplementation sequence)
}
}

[SuppressMessage("", "CA1031", Justification = Justifications.ExceptionLogged)]
[SuppressMessage("", "CA1031", Justification = "Logged in OnTimeoutElapsedAsync if needed")]
[SuppressMessage("", "VSTHRD110", Justification = Justifications.FireAndForget)]
private void ResetTimeout()
{
Expand All @@ -492,13 +496,18 @@ private void ResetTimeout()
_timeoutCancellationTokenSource = new CancellationTokenSource();
var cancellationToken = _timeoutCancellationTokenSource.Token;

bool releaseAddingSemaphore = false;

Task.Run(
async () =>
{
try
{
await Task.Delay(_timeout, cancellationToken).ConfigureAwait(false);
await _addingSemaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
releaseAddingSemaphore = true;
await OnTimeoutElapsedAsync().ConfigureAwait(false);
}
catch (OperationCanceledException)
Expand All @@ -507,7 +516,12 @@ private void ResetTimeout()
}
catch (Exception ex)
{
_logger.LogSequenceAbortingError(this, ex);
_logger.LogSequenceTimeoutError(this, ex);
}
finally
{
if (releaseAddingSemaphore)
_addingSemaphoreSlim.Release();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,14 @@ public virtual async Task HandleAsync(ConsumerPipelineContext context, ConsumerB
return;

// Loop again if the retrieved sequence has completed already in the meanwhile
// ...unless it was a new sequence, in which case it can only mean that an error
// occurred in the subscriber before consuming the actual first message and it doesn't
// make sense to recreate and publish once again the sequence.
if (!sequence.IsPending || sequence.IsCompleting)
{
if (sequence.IsNew)
break;

continue;
}

await sequence.AddAsync(originalEnvelope, previousSequence).ConfigureAwait(false);
AddToSequenceResult addResult =
await sequence.AddAsync(originalEnvelope, previousSequence).ConfigureAwait(false);

if (!addResult.IsSuccess)
continue;

_logger.LogMessageAddedToSequence(context.Envelope, sequence);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,11 @@ public void LogSkippingIncompleteSequence_Logged()
}

[Fact]
public void LogSequenceAbortingError_Logged()
public void LogSequenceTimeoutError_Logged()
{
var expectedMessage = "Error occurred aborting the FakeSequence 'fake1'.";
var expectedMessage = "Error occurred executing the timeout for the FakeSequence 'fake1'.";

_silverbackLogger.LogSequenceAbortingError(new FakeSequence(), new TimeoutException());
_silverbackLogger.LogSequenceTimeoutError(new FakeSequence(), new TimeoutException());

_loggerSubstitute.Received(LogLevel.Warning, typeof(TimeoutException), expectedMessage, 1110);
}
Expand Down

0 comments on commit 0a255b0

Please sign in to comment.