Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ToObservable operator can throw unhandled exception #2172

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ public void ToObservable_Null()
Assert.Throws<ArgumentNullException>(() => AsyncEnumerable.ToObservable<int>(null));
}

[Fact]
public void ToObservable1()
[Theory]
[InlineData(true)]
[InlineData(false)]
public void ToObservable1(bool ignoreExceptionsAfterUnsubscribe)
{
using var evt = new ManualResetEvent(false);

var fail = false;

var xs = AsyncEnumerable.Empty<int>().ToObservable();
var xs = AsyncEnumerable.Empty<int>().ToObservable(ignoreExceptionsAfterUnsubscribe);
xs.Subscribe(new MyObserver<int>(
x =>
{
Expand All @@ -47,15 +49,17 @@ public void ToObservable1()
Assert.False(fail);
}

[Fact]
public void ToObservable2()
[Theory]
[InlineData(true)]
[InlineData(false)]
public void ToObservable2(bool ignoreExceptionsAfterUnsubscribe)
{
using var evt = new ManualResetEvent(false);

var lst = new List<int>();
var fail = false;

var xs = Return42.ToObservable();
var xs = Return42.ToObservable(ignoreExceptionsAfterUnsubscribe);
xs.Subscribe(new MyObserver<int>(
x =>
{
Expand All @@ -77,15 +81,17 @@ public void ToObservable2()
Assert.True(lst.SequenceEqual([42]));
}

[Fact]
public void ToObservable3()
[Theory]
[InlineData(true)]
[InlineData(false)]
public void ToObservable3(bool ignoreExceptionsAfterUnsubscribe)
{
using var evt = new ManualResetEvent(false);

var lst = new List<int>();
var fail = false;

var xs = AsyncEnumerable.Range(0, 10).ToObservable();
var xs = AsyncEnumerable.Range(0, 10).ToObservable(ignoreExceptionsAfterUnsubscribe);
xs.Subscribe(new MyObserver<int>(
x =>
{
Expand All @@ -107,16 +113,18 @@ public void ToObservable3()
Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
}

[Fact]
public void ToObservable_ThrowOnMoveNext()
[Theory]
[InlineData(true)]
[InlineData(false)]
public void ToObservable_ThrowOnMoveNext(bool ignoreExceptionsAfterUnsubscribe)
{
using var evt = new ManualResetEvent(false);

var ex1 = new Exception("Bang!");
var ex_ = default(Exception);
var fail = false;

var xs = Throw<int>(ex1).ToObservable();
var xs = Throw<int>(ex1).ToObservable(ignoreExceptionsAfterUnsubscribe);
xs.Subscribe(new MyObserver<int>(
x =>
{
Expand All @@ -139,8 +147,10 @@ public void ToObservable_ThrowOnMoveNext()
Assert.Equal(ex1, ex_);
}

[Fact]
public void ToObservable_ThrowOnCurrent()
[Theory]
[InlineData(true)]
[InlineData(false)]
public void ToObservable_ThrowOnCurrent(bool ignoreExceptionsAfterUnsubscribe)
{
var ex1 = new Exception("Bang!");
var ex_ = default(Exception);
Expand All @@ -150,7 +160,7 @@ public void ToObservable_ThrowOnCurrent()
_ => new ThrowOnCurrentAsyncEnumerator(ex1)
);

ae.ToObservable()
ae.ToObservable(ignoreExceptionsAfterUnsubscribe)
.Subscribe(new MyObserver<int>(
x =>
{
Expand All @@ -170,8 +180,10 @@ public void ToObservable_ThrowOnCurrent()
Assert.Equal(ex1, ex_);
}

[Fact]
public void ToObservable_DisposesEnumeratorOnCompletion()
[Theory]
[InlineData(true)]
[InlineData(false)]
public void ToObservable_DisposesEnumeratorOnCompletion(bool ignoreExceptionsAfterUnsubscribe)
{
using var evt = new ManualResetEvent(false);

Expand All @@ -184,7 +196,7 @@ public void ToObservable_DisposesEnumeratorOnCompletion()
() => { evt.Set(); return default; }));

ae
.ToObservable()
.ToObservable(ignoreExceptionsAfterUnsubscribe)
.Subscribe(new MyObserver<int>(
x =>
{
Expand All @@ -203,8 +215,10 @@ public void ToObservable_DisposesEnumeratorOnCompletion()
Assert.False(fail);
}

[Fact]
public void ToObservable_DisposesEnumeratorWhenSubscriptionIsDisposed()
[Theory]
[InlineData(true)]
[InlineData(false)]
public void ToObservable_DisposesEnumeratorWhenSubscriptionIsDisposed(bool ignoreExceptionsAfterUnsubscribe)
{
using var evt = new ManualResetEvent(false);

Expand All @@ -227,7 +241,7 @@ public void ToObservable_DisposesEnumeratorWhenSubscriptionIsDisposed()
}));

subscription = ae
.ToObservable()
.ToObservable(ignoreExceptionsAfterUnsubscribe)
.Subscribe(new MyObserver<int>(
x =>
{
Expand All @@ -250,8 +264,10 @@ public void ToObservable_DisposesEnumeratorWhenSubscriptionIsDisposed()
Assert.False(fail);
}

[Fact]
public void ToObservable_DesNotCallMoveNextAgainWhenSubscriptionIsDisposed()
[Theory]
[InlineData(true)]
[InlineData(false)]
public void ToObservable_DesNotCallMoveNextAgainWhenSubscriptionIsDisposed(bool ignoreExceptionsAfterUnsubscribe)
{
using var evt = new ManualResetEvent(false);

Expand All @@ -277,7 +293,7 @@ public void ToObservable_DesNotCallMoveNextAgainWhenSubscriptionIsDisposed()
}));

subscription = ae
.ToObservable()
.ToObservable(ignoreExceptionsAfterUnsubscribe)
.Subscribe(new MyObserver<int>(
x =>
{
Expand All @@ -301,14 +317,16 @@ public void ToObservable_DesNotCallMoveNextAgainWhenSubscriptionIsDisposed()
Assert.False(fail);
}

[Fact]
public void ToObservable_SupportsLargeEnumerable()
[Theory]
[InlineData(true)]
[InlineData(false)]
public void ToObservable_SupportsLargeEnumerable(bool ignoreExceptionsAfterUnsubscribe)
{
using var evt = new ManualResetEvent(false);

var fail = false;

var xs = AsyncEnumerable.Range(0, 10000).ToObservable();
var xs = AsyncEnumerable.Range(0, 10000).ToObservable(ignoreExceptionsAfterUnsubscribe);
xs.Subscribe(new MyObserver<int>(
x =>
{
Expand All @@ -329,6 +347,64 @@ public void ToObservable_SupportsLargeEnumerable()
Assert.False(fail);
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public void ToObservable_ShouldNotCrashOnEnumeratorDispose(bool ignoreExceptionsAfterUnsubscribe)
{
var exception = new Exception("Exception message");
Exception? received = null;
var enumerable = AsyncEnumerable.Create<int>(_ => throw exception);
using var evt = new ManualResetEvent(false);

var observable = enumerable.ToObservable(ignoreExceptionsAfterUnsubscribe);
observable.Subscribe(new MyObserver<int>(_ =>
{
evt.Set();
},
e =>
{
received = e;
evt.Set();
}, () =>
{
evt.Set();
}));

evt.WaitOne();
Assert.NotNull(received);
Assert.Equal(exception.Message, received!.Message);
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public void ToObservable_ShouldForwardExceptionOnGetEnumeratorAsync(bool ignoreExceptionsAfterUnsubscribe)
{
var exception = new Exception("Exception message");
Exception? recievedException = null;
var enumerable = AsyncEnumerable.Create<int>(_ => throw exception);
using var evt = new ManualResetEvent(false);

var observable = enumerable.ToObservable(ignoreExceptionsAfterUnsubscribe);
observable.Subscribe(new MyObserver<int>(_ =>
{
evt.Set();
},
e =>
{
recievedException = e;
evt.Set();
}, () =>
{
evt.Set();
}));

evt.WaitOne();
Assert.NotNull(recievedException);
Assert.Equal(exception.Message, recievedException!.Message);
}

private sealed class MyObserver<T> : IObserver<T>
{
private readonly Action<T> _onNext;
Expand Down
Loading