Skip to content

Commit

Permalink
fix: improve streams abort process to avoid first chance exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed Jun 27, 2022
1 parent 57d11ad commit ba1ded0
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 26 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup Label="Package information">
<BaseVersionSuffix></BaseVersionSuffix>
<BaseVersion>3.7.0$(BaseVersionSuffix)</BaseVersion>
<BaseVersion>3.7.1$(BaseVersionSuffix)</BaseVersion>
<DatabasePackagesRevision>1</DatabasePackagesRevision>
<DatabasePackagesVersionSuffix>$(BaseVersionSuffix)</DatabasePackagesVersionSuffix>
</PropertyGroup>
Expand Down
6 changes: 6 additions & 0 deletions docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ uid: releases

# Releases

## [3.7.1](https://github.com/BEagle1984/silverback/releases/tag/v3.7.1)

### Fixes

* Improve message streams abort process to avoid first chance exceptions (e.g. during dispose)

## [3.7.0](https://github.com/BEagle1984/silverback/releases/tag/v3.7.0)

### What's new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ internal sealed class LazyMessageStreamEnumerable<TMessage>

private MessageStreamEnumerable<TMessage>? _stream;

private bool _disposed;

public LazyMessageStreamEnumerable(IReadOnlyCollection<IMessageFilter>? filters = null)
{
Filters = filters;
Expand All @@ -45,6 +47,9 @@ public LazyMessageStreamEnumerable(IReadOnlyCollection<IMessageFilter>? filters
/// <inheritdoc cref="ILazyMessageStreamEnumerable.GetOrCreateStream" />
public IMessageStreamEnumerable GetOrCreateStream()
{
if (_disposed)
throw new ObjectDisposedException(GetType().FullName);

if (_stream == null)
{
_stream = new MessageStreamEnumerable<TMessage>();
Expand All @@ -59,8 +64,11 @@ public IMessageStreamEnumerable GetOrCreateStream()

public void Dispose()
{
if (_disposed)
return;

_stream?.Dispose();
_stream = null;
_disposed = true;
}
}
}
82 changes: 59 additions & 23 deletions src/Silverback.Core/Messaging/Messages/MessageStreamProvider`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ internal sealed class MessageStreamProvider<TMessage> : IMessageStreamProvider,

private MethodInfo? _genericCreateStreamMethodInfo;

private bool _completed;

private bool _aborted;

/// <inheritdoc cref="IMessageStreamProvider.MessageType" />
public Type MessageType => typeof(TMessage);

Expand Down Expand Up @@ -82,6 +86,9 @@ public async Task<int> PushAsync(
{
Check.NotNull<object>(message, nameof(message));

if (_completed || _aborted)
throw new InvalidOperationException("The streams are already completed or aborted.");

var messageId = Interlocked.Increment(ref _messagesCount);

var processingTasks = PushToCompatibleStreams(messageId, message, cancellationToken).ToList();
Expand All @@ -94,20 +101,41 @@ public async Task<int> PushAsync(
return processingTasks.Count;
}

/// <inheritdoc cref="Abort"/>
/// <remarks>
/// The abort is performed only if the streams haven't been completed already.
/// </remarks>
public void AbortIfPending()
{
if (!_completed)
Abort();
}

/// <summary>
/// Aborts the ongoing enumerations and the pending calls to
/// <see cref="PushAsync(TMessage,CancellationToken)" />, then marks the
/// stream as complete. Calling this method will cause an <see cref="OperationCanceledException" /> to be
/// thrown by the enumerators and the <see cref="PushAsync(TMessage,CancellationToken)" /> method.
/// </summary>
public void Abort() => _lazyStreams.ParallelForEach(
lazyStream =>
{
if (lazyStream.Stream != null)
lazyStream.Stream.Abort();
else
lazyStream.Cancel();
});
public void Abort()
{
if (_completed)
throw new InvalidOperationException("The streams are already completed.");

if (_aborted)
return;

_lazyStreams.ParallelForEach(
lazyStream =>
{
if (lazyStream.Stream != null)
lazyStream.Stream.Abort();
else
lazyStream.Cancel();
});

_aborted = true;
}

/// <summary>
/// Marks the stream as complete, meaning no more messages will be pushed.
Expand All @@ -118,15 +146,25 @@ public void Abort() => _lazyStreams.ParallelForEach(
/// <returns>
/// A <see cref="Task" /> representing the asynchronous operation.
/// </returns>
public Task CompleteAsync(CancellationToken cancellationToken = default) =>
_lazyStreams.ParallelForEachAsync(
public async Task CompleteAsync(CancellationToken cancellationToken = default)
{
if (_aborted)
throw new InvalidOperationException("The stream are already aborted.");

if (_completed)
return;

await _lazyStreams.ParallelForEachAsync(
async lazyStream =>
{
if (lazyStream.Stream != null)
await lazyStream.Stream.CompleteAsync(cancellationToken).ConfigureAwait(false);
else
lazyStream.Cancel();
});
}).ConfigureAwait(false);

_completed = true;
}

/// <inheritdoc cref="IMessageStreamProvider.CreateStream" />
public IMessageStreamEnumerable<object> CreateStream(
Expand All @@ -138,8 +176,7 @@ public IMessageStreamEnumerable<object> CreateStream(
}

/// <inheritdoc cref="IMessageStreamProvider.CreateStream{TMessage}" />
public IMessageStreamEnumerable<TMessageLinked> CreateStream<TMessageLinked>(
IReadOnlyCollection<IMessageFilter>? filters = null)
public IMessageStreamEnumerable<TMessageLinked> CreateStream<TMessageLinked>(IReadOnlyCollection<IMessageFilter>? filters = null)
{
var lazyStream = (ILazyMessageStreamEnumerable)CreateLazyStream<TMessageLinked>(filters);
return (IMessageStreamEnumerable<TMessageLinked>)lazyStream.GetOrCreateStream();
Expand All @@ -163,8 +200,7 @@ public ILazyMessageStreamEnumerable<object> CreateLazyStream(
}

/// <inheritdoc cref="IMessageStreamProvider.CreateLazyStream{TMessage}" />
public ILazyMessageStreamEnumerable<TMessageLinked> CreateLazyStream<TMessageLinked>(
IReadOnlyCollection<IMessageFilter>? filters = null)
public ILazyMessageStreamEnumerable<TMessageLinked> CreateLazyStream<TMessageLinked>(IReadOnlyCollection<IMessageFilter>? filters = null)
{
var stream = CreateLazyStreamCore<TMessageLinked>(filters);

Expand All @@ -179,11 +215,11 @@ public ILazyMessageStreamEnumerable<TMessageLinked> CreateLazyStream<TMessageLin
/// <inheritdoc cref="IDisposable.Dispose" />
public void Dispose()
{
AsyncHelper.RunSynchronously(() => CompleteAsync());
if (!_aborted && !_completed)
AsyncHelper.RunSynchronously(() => CompleteAsync());
}

private static ILazyMessageStreamEnumerable<TMessageLinked> CreateLazyStreamCore<TMessageLinked>(
IReadOnlyCollection<IMessageFilter>? filters = null) =>
private static ILazyMessageStreamEnumerable<TMessageLinked> CreateLazyStreamCore<TMessageLinked>(IReadOnlyCollection<IMessageFilter>? filters = null) =>
new LazyMessageStreamEnumerable<TMessageLinked>(filters);

private static bool PushIfCompatibleType(
Expand Down Expand Up @@ -232,11 +268,11 @@ private IEnumerable<Task> PushToCompatibleStreams(
foreach (var lazyStream in _lazyStreams)
{
if (PushIfCompatibleType(
lazyStream,
messageId,
message,
cancellationToken,
out var processingTask))
lazyStream,
messageId,
message,
cancellationToken,
out var processingTask))
{
yield return processingTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ private async Task AbortCoreAsync(SequenceAbortReason reason, Exception? excepti
if (await RollbackTransactionAndNotifyProcessingCompletedAsync(exception).ConfigureAwait(false))
LogAbort();

_streamProvider.Abort();
_streamProvider.AbortIfPending();

_abortCancellationTokenSource.Cancel();
_abortingTaskCompletionSource?.SetResult(true);
Expand Down

0 comments on commit ba1ded0

Please sign in to comment.