diff --git a/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs b/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs index fa7677ee9..a53f49267 100644 --- a/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs +++ b/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs @@ -19,14 +19,16 @@ public void ToObservable_Null() Assert.Throws(() => AsyncEnumerable.ToObservable(null)); } - [Fact] - public void ToObservable1() + [Theory] + [InlineData(true)] + [InlineData(false)] + public void ToObservable1(bool ignoreExceptionsAfterUnsubscribe) { using var evt = new ManualResetEvent(false); var fail = false; - var xs = AsyncEnumerable.Empty().ToObservable(); + var xs = AsyncEnumerable.Empty().ToObservable(ignoreExceptionsAfterUnsubscribe); xs.Subscribe(new MyObserver( x => { @@ -47,15 +49,17 @@ public void ToObservable1() Assert.False(fail); } - [Fact] - public void ToObservable2() + [Theory] + [InlineData(true)] + [InlineData(false)] + public void ToObservable2(bool ignoreExceptionsAfterUnsubscribe) { using var evt = new ManualResetEvent(false); var lst = new List(); var fail = false; - var xs = Return42.ToObservable(); + var xs = Return42.ToObservable(ignoreExceptionsAfterUnsubscribe); xs.Subscribe(new MyObserver( x => { @@ -77,15 +81,17 @@ public void ToObservable2() Assert.True(lst.SequenceEqual([42])); } - [Fact] - public void ToObservable3() + [Theory] + [InlineData(true)] + [InlineData(false)] + public void ToObservable3(bool ignoreExceptionsAfterUnsubscribe) { using var evt = new ManualResetEvent(false); var lst = new List(); var fail = false; - var xs = AsyncEnumerable.Range(0, 10).ToObservable(); + var xs = AsyncEnumerable.Range(0, 10).ToObservable(ignoreExceptionsAfterUnsubscribe); xs.Subscribe(new MyObserver( x => { @@ -107,8 +113,10 @@ public void ToObservable3() Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10))); } - [Fact] - public void ToObservable_ThrowOnMoveNext() + [Theory] + [InlineData(true)] + [InlineData(false)] + public void ToObservable_ThrowOnMoveNext(bool ignoreExceptionsAfterUnsubscribe) { using var evt = new ManualResetEvent(false); @@ -116,7 +124,7 @@ public void ToObservable_ThrowOnMoveNext() var ex_ = default(Exception); var fail = false; - var xs = Throw(ex1).ToObservable(); + var xs = Throw(ex1).ToObservable(ignoreExceptionsAfterUnsubscribe); xs.Subscribe(new MyObserver( x => { @@ -139,8 +147,10 @@ public void ToObservable_ThrowOnMoveNext() Assert.Equal(ex1, ex_); } - [Fact] - public void ToObservable_ThrowOnCurrent() + [Theory] + [InlineData(true)] + [InlineData(false)] + public void ToObservable_ThrowOnCurrent(bool ignoreExceptionsAfterUnsubscribe) { var ex1 = new Exception("Bang!"); var ex_ = default(Exception); @@ -150,7 +160,7 @@ public void ToObservable_ThrowOnCurrent() _ => new ThrowOnCurrentAsyncEnumerator(ex1) ); - ae.ToObservable() + ae.ToObservable(ignoreExceptionsAfterUnsubscribe) .Subscribe(new MyObserver( x => { @@ -170,8 +180,10 @@ public void ToObservable_ThrowOnCurrent() Assert.Equal(ex1, ex_); } - [Fact] - public void ToObservable_DisposesEnumeratorOnCompletion() + [Theory] + [InlineData(true)] + [InlineData(false)] + public void ToObservable_DisposesEnumeratorOnCompletion(bool ignoreExceptionsAfterUnsubscribe) { using var evt = new ManualResetEvent(false); @@ -184,7 +196,7 @@ public void ToObservable_DisposesEnumeratorOnCompletion() () => { evt.Set(); return default; })); ae - .ToObservable() + .ToObservable(ignoreExceptionsAfterUnsubscribe) .Subscribe(new MyObserver( x => { @@ -203,8 +215,10 @@ public void ToObservable_DisposesEnumeratorOnCompletion() Assert.False(fail); } - [Fact] - public void ToObservable_DisposesEnumeratorWhenSubscriptionIsDisposed() + [Theory] + [InlineData(true)] + [InlineData(false)] + public void ToObservable_DisposesEnumeratorWhenSubscriptionIsDisposed(bool ignoreExceptionsAfterUnsubscribe) { using var evt = new ManualResetEvent(false); @@ -227,7 +241,7 @@ public void ToObservable_DisposesEnumeratorWhenSubscriptionIsDisposed() })); subscription = ae - .ToObservable() + .ToObservable(ignoreExceptionsAfterUnsubscribe) .Subscribe(new MyObserver( x => { @@ -250,8 +264,10 @@ public void ToObservable_DisposesEnumeratorWhenSubscriptionIsDisposed() Assert.False(fail); } - [Fact] - public void ToObservable_DesNotCallMoveNextAgainWhenSubscriptionIsDisposed() + [Theory] + [InlineData(true)] + [InlineData(false)] + public void ToObservable_DesNotCallMoveNextAgainWhenSubscriptionIsDisposed(bool ignoreExceptionsAfterUnsubscribe) { using var evt = new ManualResetEvent(false); @@ -277,7 +293,7 @@ public void ToObservable_DesNotCallMoveNextAgainWhenSubscriptionIsDisposed() })); subscription = ae - .ToObservable() + .ToObservable(ignoreExceptionsAfterUnsubscribe) .Subscribe(new MyObserver( x => { @@ -301,14 +317,16 @@ public void ToObservable_DesNotCallMoveNextAgainWhenSubscriptionIsDisposed() Assert.False(fail); } - [Fact] - public void ToObservable_SupportsLargeEnumerable() + [Theory] + [InlineData(true)] + [InlineData(false)] + public void ToObservable_SupportsLargeEnumerable(bool ignoreExceptionsAfterUnsubscribe) { using var evt = new ManualResetEvent(false); var fail = false; - var xs = AsyncEnumerable.Range(0, 10000).ToObservable(); + var xs = AsyncEnumerable.Range(0, 10000).ToObservable(ignoreExceptionsAfterUnsubscribe); xs.Subscribe(new MyObserver( x => { @@ -329,6 +347,64 @@ public void ToObservable_SupportsLargeEnumerable() Assert.False(fail); } + [Theory] + [InlineData(true)] + [InlineData(false)] + public void ToObservable_ShouldNotCrashOnEnumeratorDispose(bool ignoreExceptionsAfterUnsubscribe) + { + var exception = new Exception("Exception message"); + Exception? received = null; + var enumerable = AsyncEnumerable.Create(_ => throw exception); + using var evt = new ManualResetEvent(false); + + var observable = enumerable.ToObservable(ignoreExceptionsAfterUnsubscribe); + observable.Subscribe(new MyObserver(_ => + { + evt.Set(); + }, + e => + { + received = e; + evt.Set(); + }, () => + { + evt.Set(); + })); + + evt.WaitOne(); + Assert.NotNull(received); + Assert.Equal(exception.Message, received!.Message); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public void ToObservable_ShouldForwardExceptionOnGetEnumeratorAsync(bool ignoreExceptionsAfterUnsubscribe) + { + var exception = new Exception("Exception message"); + Exception? recievedException = null; + var enumerable = AsyncEnumerable.Create(_ => throw exception); + using var evt = new ManualResetEvent(false); + + var observable = enumerable.ToObservable(ignoreExceptionsAfterUnsubscribe); + observable.Subscribe(new MyObserver(_ => + { + evt.Set(); + }, + e => + { + recievedException = e; + evt.Set(); + }, () => + { + evt.Set(); + })); + + evt.WaitOne(); + Assert.NotNull(recievedException); + Assert.Equal(exception.Message, recievedException!.Message); + } + private sealed class MyObserver : IObserver { private readonly Action _onNext; diff --git a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs index 0e60ff045..87ab3e63c 100644 --- a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs +++ b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Collections.Generic; +using System.Threading.Tasks; namespace System.Linq { @@ -20,61 +21,121 @@ public static IObservable ToObservable(this IAsyncEnumerable(source); + return new ToObservableObservable(source, false); } + + /// + /// Converts an async-enumerable sequence to an observable sequence. + /// + /// The type of the elements in the source sequence. + /// Enumerable sequence to convert to an observable sequence. + /// If this is true, exceptions that occur after all observers have unsubscribed will be handled and silently ignored. + /// If false, they will go unobserved, meaning they will eventually emerge through + /// The observable sequence whose elements are pulled from the given enumerable sequence. + /// is null. + public static IObservable ToObservable(this IAsyncEnumerable source, bool ignoreExceptionsAfterUnsubscribe) + { + if (source == null) + throw Error.ArgumentNull(nameof(source)); + + return new ToObservableObservable(source, ignoreExceptionsAfterUnsubscribe); + } + + private sealed class ToObservableObservable : IObservable { private readonly IAsyncEnumerable _source; + private readonly bool _ignoreExceptionsAfterUnsubscribe; - public ToObservableObservable(IAsyncEnumerable source) + public ToObservableObservable(IAsyncEnumerable source, bool ignoreExceptionsAfterUnsubscribe) { _source = source; + _ignoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe; } public IDisposable Subscribe(IObserver observer) { var ctd = new CancellationTokenDisposable(); - async void Core() + async ValueTask Core() { - await using var e = _source.GetAsyncEnumerator(ctd.Token); - do + IAsyncEnumerator e; + + try + { + e = _source.GetAsyncEnumerator(ctd.Token); + } + catch (Exception ex) { - bool hasNext; - var value = default(T)!; + if (!ctd.Token.IsCancellationRequested) + { + observer.OnError(ex); + } - try + return; + } + + + try + { + do { - hasNext = await e.MoveNextAsync().ConfigureAwait(false); - if (hasNext) + bool hasNext; + var value = default(T)!; + + try { - value = e.Current; + hasNext = await e.MoveNextAsync().ConfigureAwait(false); + if (hasNext) + { + value = e.Current; + } } - } - catch (Exception ex) - { - if (!ctd.Token.IsCancellationRequested) + catch (Exception ex) { - observer.OnError(ex); + if (!ctd.Token.IsCancellationRequested) + { + observer.OnError(ex); + } + + return; } - return; - } + if (!hasNext) + { + observer.OnCompleted(); + return; + } - if (!hasNext) + observer.OnNext(value); + } while (!ctd.Token.IsCancellationRequested); + } + finally + { + if (_ignoreExceptionsAfterUnsubscribe) { - observer.OnCompleted(); - return; + try + { + await e.DisposeAsync().ConfigureAwait(false); + } + catch + { + // Ignored + } } + else + { + // Exceptions will go in TaskScheduler.UnobservedTaskException. + // This behavior is similar to Observable.FromAsync - observer.OnNext(value); + await e.DisposeAsync().ConfigureAwait(false); + } } - while (!ctd.Token.IsCancellationRequested); } // Fire and forget - Core(); + _ = Core(); return ctd; }