Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable application-defined Synchronize gate in AsyncRx.NET #2153

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.

using System.Reactive.Threading;
using System.Threading;

namespace System.Reactive.Linq
Expand All @@ -16,7 +17,7 @@ public static IAsyncObservable<TSource> Synchronize<TSource>(this IAsyncObservab
return Create(source, static (source, observer) => source.SubscribeSafeAsync(AsyncObserver.Synchronize(observer)));
}

public static IAsyncObservable<TSource> Synchronize<TSource>(this IAsyncObservable<TSource> source, AsyncGate gate)
public static IAsyncObservable<TSource> Synchronize<TSource>(this IAsyncObservable<TSource> source, IAsyncGate gate)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
Expand All @@ -40,7 +41,7 @@ public static IAsyncObserver<TSource> Synchronize<TSource>(IAsyncObserver<TSourc
return Synchronize(observer, new AsyncGate());
}

public static IAsyncObserver<TSource> Synchronize<TSource>(IAsyncObserver<TSource> observer, AsyncGate gate)
public static IAsyncObserver<TSource> Synchronize<TSource>(IAsyncObserver<TSource> observer, IAsyncGate gate)
{
if (observer == null)
throw new ArgumentNullException(nameof(observer));
Expand Down
20 changes: 6 additions & 14 deletions AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
// See the LICENSE file in the project root for more information.

using System.Diagnostics;
using System.Reactive.Threading;
using System.Threading.Tasks;

namespace System.Threading
{
public sealed class AsyncGate
public sealed class AsyncGate : IAsyncGate
{
private readonly object _gate = new();
private readonly SemaphoreSlim _semaphore = new(1, 1);
private readonly AsyncLocal<int> _recursionCount = new();

public ValueTask<Releaser> LockAsync()
public ValueTask<AsyncGateReleaser> LockAsync()
{
var shouldAcquire = false;

Expand All @@ -32,13 +33,13 @@ public ValueTask<Releaser> LockAsync()

if (shouldAcquire)
{
return new ValueTask<Releaser>(_semaphore.WaitAsync().ContinueWith(_ => new Releaser(this)));
return new ValueTask<AsyncGateReleaser>(_semaphore.WaitAsync().ContinueWith(_ => new AsyncGateReleaser(this)));
}

return new ValueTask<Releaser>(new Releaser(this));
return new ValueTask<AsyncGateReleaser>(new AsyncGateReleaser(this));
}

private void Release()
void IAsyncGate.Release()
{
lock (_gate)
{
Expand All @@ -50,14 +51,5 @@ private void Release()
}
}
}

public readonly struct Releaser : IDisposable
{
private readonly AsyncGate _parent;

public Releaser(AsyncGate parent) => _parent = parent;

public void Dispose() => _parent.Release();
}
}
}
15 changes: 15 additions & 0 deletions AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateReleaser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.

namespace System.Reactive.Threading
{
public readonly struct AsyncGateReleaser : IDisposable
{
private readonly IAsyncGate _parent;

public AsyncGateReleaser(IAsyncGate parent) => _parent = parent;

public void Dispose() => _parent.Release();
}
}
73 changes: 73 additions & 0 deletions AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.

using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace System.Reactive.Threading
{
/// <summary>
/// Synchronization primitive that provides <see cref="System.Threading.Monitor"/>-style
/// exclusive access semantics, but with an asynchronous API.
/// </summary>
/// <remarks>
/// <para>
/// This enables <see cref="AsyncObservable.Synchronize{TSource}(IAsyncObservable{TSource}, IAsyncGate)"/>
/// and <see cref="AsyncObserver.Synchronize{TSource}(IAsyncObserver{TSource}, IAsyncGate)"/>
/// to be used to synchronize access to an observer with a custom synchronization primitive.
/// </para>
/// <para>
/// These methods model the equivalents for <see cref="IObservable{T}"/> and <see cref="IObserver{T}"/>
/// in <c>System.Reactive</c>. Those offer overloads accepting a 'gate' parameter, and if you pass
/// the same object to multiple calls to these methods, they will all synchronize their operation
/// through that same gate object. The <c>gate</c> parameter in those methods is of type
/// <see cref="System.Object"/>, which works because all .NET objects have an associated monitor.
/// (It's created on demand when you first use <c>lock</c> or something equivalent.)
/// </para>
/// <para>
/// That approach is problematic in an async world, because this built-in monitor blocks the
/// calling thread when contention occurs. The basic idea of AsyncRx.NET is to avoid such
/// blocking. It can't always be avoided, and in cases where we can be certain that lock
/// acquisition times will be short, the conventional .NET monitor is still a good choice.
/// But since these <c>Synchronize</c> operators allow the caller to pass a gate which the
/// application code itself might lock, we have no control over how long the lock might be
/// held. So it would be inappropriate to use a monitor here.
/// </para>
/// <para>
/// Since the .NET runtime does not currently offer any asynchronous direct equivalent to
/// monitor, this interface defines the required API. The <see cref="AsyncGate"/> class
/// provide a basic implementation. If applications require additional features, (e.g.
/// if they want cancellation support when the application tries to acquire the lock)
/// they can provide their own implementation.
/// </para>
/// </remarks>
public interface IAsyncGate
{
/// <summary>
/// Acquires the lock.
/// </summary>
/// <returns>
/// A task that completes when the lock has been acquired, returning an <see cref="AsyncGateReleaser"/>
/// which can be disposed to release the lock.
/// </returns>
/// <remarks>
/// <para>
/// Applications release the lock by disposing the <see cref="AsyncGateReleaser"/> returned by this
/// method. Typically this is done with a <c>using</c> statement or declaration.
/// </para>
/// </remarks>
public ValueTask<AsyncGateReleaser> LockAsync();

/// <summary>
/// Releases the lock. Applications typically won't call this directly, and will use
/// the <see cref="AsyncGateReleaser"/> returned by <see cref="LockAsync"/> instead.
/// </summary>
/// <remarks>
/// This method needs to be publicly accessible so that a single <see cref="AsyncGateReleaser"/>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to get rid of this method, make AsyncGateReleaser internal, and make LockAsync return ValueTask<IDisposable>, and let other implementers decide how to release the lock in the dispose?

And actually, I don't really understand the advantage of having a separate AsyncGateReleaser class vs returning Disposable.Create(Release) in AsyncGate.LockAsync

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is it to keep the semantics of a lock separate from the semantics of IDisposable?

Copy link
Collaborator Author

@idg10 idg10 Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for not returning IDisposable is that that would require an allocation on the GC heap every time you called LockAsync.

In the current implementation (i.e., before this PR) LockAsync returns a struct, and although that does implement IDisposable, C# is able to use its Dispose method from using statements or declarations directly without needing to cast it to IDisposable. (Casting a struct to an interface type causes it to be boxed.)

So when you write this sort of thing:

using (await gate.LockAsync())
{
    // Do work
}

it becomes something like this:

// Note: variable is of `struct` type, so no heap allocation required
AsyncGate.Releaser d = await gate.LockAsync();
try
{
     // Do work...
}
finally
{
    d.Dispose();
}

But if you make LockAsync return ValueTask<IDisposable> C# has to do something more like this:

// Note: variable is of an interface type, so it has to refer to an object on the heap
IDisposable d = await gate.LockAsync();
try
{
     // Do work...
}
finally
{
    d.Dispose();
}

With the existing design, in cases where there was no contention for the lock, the whole 'acquire, release' sequence could be completely allocation-free. We're using ValueTask<T> so a non-blocking await is allocation-free, and by having it return a struct, no allocation is required for the using logic either.

I wanted to preserve that characteristic. Most people aren't going to bring their own gate implementation, and I didn't want to increase the cost for the most common case in which people just use AsyncGate.

I did consider making the interface generic: IAsyncGate<TReleaser> where TReleaser : struct, IDisposable. This would enable each implementation to bring its own releaser implementation. But really the only point of the releaser is to enable using, so I didn't think there was really any use in making every implementation write its own version.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@idg10 would it make sense to consider adding a ValueDisposable helper class to assist creation of allocation-free dispose semantics?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@glopesdev what would this ValueDisposable take as its input?

This AsyncGateReleaser takes the IAsyncGate, which defines the Leave method that it calls. So it is specialized for this scenario.

A more general-purpose ValueDisposable would need some more general purpose resource release mechanism, which would presumably be IDisposable. But the whole point of AsyncGateReleaser is that gives you an IDisposable (so you can use using) in a scenario where you don't already have one.

If you've already got an IDisposable, you don't need to wrap it. You can already use using.

So I think this pattern is only useful in cases where you have something other than IDisposable to start with. In this particular case, a pair of Enter and Leave methods is a common kind of API for locks, so it makes sense. (And it doesn't make sense for AsyncGate to implement IDisposable because Leave very much isn't the same thing as Dispose.) We could imagine an ILeaveable which captured this "I'm done with this for now, but might use it again" (as opposed to the inherently terminal IDispose) then I could see how a generic type of this kind could work. But there isn't such an interface, and it feels scope-creepish for Rx to define it.

But perhaps I'm missing how this would work in the absence of such an interface.

/// can be shared by all implementations of this interface.
/// </remarks>
public void Release();
}
}
Loading