diff --git a/Directory.Build.props b/Directory.Build.props index ab5a8273f..56248125f 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,7 +1,7 @@ - 3.7.0$(BaseVersionSuffix) + 3.7.1$(BaseVersionSuffix) 1 $(BaseVersionSuffix) diff --git a/docs/releases.md b/docs/releases.md index 6d75f3270..a6bc1b4c1 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -4,6 +4,12 @@ uid: releases # Releases +## [3.7.1](https://github.com/BEagle1984/silverback/releases/tag/v3.7.1) + +### Fixes + +* Improve message streams abort process to avoid first chance exceptions (e.g. during dispose) + ## [3.7.0](https://github.com/BEagle1984/silverback/releases/tag/v3.7.0) ### What's new diff --git a/src/Silverback.Core/Messaging/Messages/LazyMessageStreamEnumerable`1.cs b/src/Silverback.Core/Messaging/Messages/LazyMessageStreamEnumerable`1.cs index fafadad4d..3c72e3cde 100644 --- a/src/Silverback.Core/Messaging/Messages/LazyMessageStreamEnumerable`1.cs +++ b/src/Silverback.Core/Messaging/Messages/LazyMessageStreamEnumerable`1.cs @@ -20,6 +20,8 @@ internal sealed class LazyMessageStreamEnumerable private MessageStreamEnumerable? _stream; + private bool _disposed; + public LazyMessageStreamEnumerable(IReadOnlyCollection? filters = null) { Filters = filters; @@ -45,6 +47,9 @@ public LazyMessageStreamEnumerable(IReadOnlyCollection? filters /// public IMessageStreamEnumerable GetOrCreateStream() { + if (_disposed) + throw new ObjectDisposedException(GetType().FullName); + if (_stream == null) { _stream = new MessageStreamEnumerable(); @@ -59,8 +64,11 @@ public IMessageStreamEnumerable GetOrCreateStream() public void Dispose() { + if (_disposed) + return; + _stream?.Dispose(); - _stream = null; + _disposed = true; } } } diff --git a/src/Silverback.Core/Messaging/Messages/MessageStreamProvider`1.cs b/src/Silverback.Core/Messaging/Messages/MessageStreamProvider`1.cs index 98a5518b1..8ea8e6802 100644 --- a/src/Silverback.Core/Messaging/Messages/MessageStreamProvider`1.cs +++ b/src/Silverback.Core/Messaging/Messages/MessageStreamProvider`1.cs @@ -28,6 +28,10 @@ internal sealed class MessageStreamProvider : IMessageStreamProvider, private MethodInfo? _genericCreateStreamMethodInfo; + private bool _completed; + + private bool _aborted; + /// public Type MessageType => typeof(TMessage); @@ -82,6 +86,9 @@ public async Task PushAsync( { Check.NotNull(message, nameof(message)); + if (_completed || _aborted) + throw new InvalidOperationException("The streams are already completed or aborted."); + var messageId = Interlocked.Increment(ref _messagesCount); var processingTasks = PushToCompatibleStreams(messageId, message, cancellationToken).ToList(); @@ -94,20 +101,41 @@ public async Task PushAsync( return processingTasks.Count; } + /// + /// + /// The abort is performed only if the streams haven't been completed already. + /// + public void AbortIfPending() + { + if (!_completed) + Abort(); + } + /// /// Aborts the ongoing enumerations and the pending calls to /// , then marks the /// stream as complete. Calling this method will cause an to be /// thrown by the enumerators and the method. /// - public void Abort() => _lazyStreams.ParallelForEach( - lazyStream => - { - if (lazyStream.Stream != null) - lazyStream.Stream.Abort(); - else - lazyStream.Cancel(); - }); + public void Abort() + { + if (_completed) + throw new InvalidOperationException("The streams are already completed."); + + if (_aborted) + return; + + _lazyStreams.ParallelForEach( + lazyStream => + { + if (lazyStream.Stream != null) + lazyStream.Stream.Abort(); + else + lazyStream.Cancel(); + }); + + _aborted = true; + } /// /// Marks the stream as complete, meaning no more messages will be pushed. @@ -118,15 +146,25 @@ public void Abort() => _lazyStreams.ParallelForEach( /// /// A representing the asynchronous operation. /// - public Task CompleteAsync(CancellationToken cancellationToken = default) => - _lazyStreams.ParallelForEachAsync( + public async Task CompleteAsync(CancellationToken cancellationToken = default) + { + if (_aborted) + throw new InvalidOperationException("The stream are already aborted."); + + if (_completed) + return; + + await _lazyStreams.ParallelForEachAsync( async lazyStream => { if (lazyStream.Stream != null) await lazyStream.Stream.CompleteAsync(cancellationToken).ConfigureAwait(false); else lazyStream.Cancel(); - }); + }).ConfigureAwait(false); + + _completed = true; + } /// public IMessageStreamEnumerable CreateStream( @@ -138,8 +176,7 @@ public IMessageStreamEnumerable CreateStream( } /// - public IMessageStreamEnumerable CreateStream( - IReadOnlyCollection? filters = null) + public IMessageStreamEnumerable CreateStream(IReadOnlyCollection? filters = null) { var lazyStream = (ILazyMessageStreamEnumerable)CreateLazyStream(filters); return (IMessageStreamEnumerable)lazyStream.GetOrCreateStream(); @@ -163,8 +200,7 @@ public ILazyMessageStreamEnumerable CreateLazyStream( } /// - public ILazyMessageStreamEnumerable CreateLazyStream( - IReadOnlyCollection? filters = null) + public ILazyMessageStreamEnumerable CreateLazyStream(IReadOnlyCollection? filters = null) { var stream = CreateLazyStreamCore(filters); @@ -179,11 +215,11 @@ public ILazyMessageStreamEnumerable CreateLazyStream public void Dispose() { - AsyncHelper.RunSynchronously(() => CompleteAsync()); + if (!_aborted && !_completed) + AsyncHelper.RunSynchronously(() => CompleteAsync()); } - private static ILazyMessageStreamEnumerable CreateLazyStreamCore( - IReadOnlyCollection? filters = null) => + private static ILazyMessageStreamEnumerable CreateLazyStreamCore(IReadOnlyCollection? filters = null) => new LazyMessageStreamEnumerable(filters); private static bool PushIfCompatibleType( @@ -232,11 +268,11 @@ private IEnumerable PushToCompatibleStreams( foreach (var lazyStream in _lazyStreams) { if (PushIfCompatibleType( - lazyStream, - messageId, - message, - cancellationToken, - out var processingTask)) + lazyStream, + messageId, + message, + cancellationToken, + out var processingTask)) { yield return processingTask; } diff --git a/src/Silverback.Integration/Messaging/Sequences/SequenceBase`1.cs b/src/Silverback.Integration/Messaging/Sequences/SequenceBase`1.cs index 487a1246f..67bcba936 100644 --- a/src/Silverback.Integration/Messaging/Sequences/SequenceBase`1.cs +++ b/src/Silverback.Integration/Messaging/Sequences/SequenceBase`1.cs @@ -588,7 +588,7 @@ private async Task AbortCoreAsync(SequenceAbortReason reason, Exception? excepti if (await RollbackTransactionAndNotifyProcessingCompletedAsync(exception).ConfigureAwait(false)) LogAbort(); - _streamProvider.Abort(); + _streamProvider.AbortIfPending(); _abortCancellationTokenSource.Cancel(); _abortingTaskCompletionSource?.SetResult(true);