diff --git a/src/VitalRouter.Unity/Assets/VitalRouter.Tests/R3ExtensionsTest.cs b/src/VitalRouter.Unity/Assets/VitalRouter.Tests/R3ExtensionsTest.cs index c160523..87ca1b6 100644 --- a/src/VitalRouter.Unity/Assets/VitalRouter.Tests/R3ExtensionsTest.cs +++ b/src/VitalRouter.Unity/Assets/VitalRouter.Tests/R3ExtensionsTest.cs @@ -157,7 +157,7 @@ public TestAsyncSubscriber(FrameProvider frameProvider) this.frameProvider = frameProvider; } - public UniTask ReceiveAsync(T command, PublishContext context) where T : ICommand + public ValueTask ReceiveAsync(T command, PublishContext context) where T : ICommand { Queue.Enqueue(command); tcs = new UniTaskCompletionSource(); diff --git a/src/VitalRouter.Unity/Assets/VitalRouter/Runtime/R3Extensions.cs b/src/VitalRouter.Unity/Assets/VitalRouter/Runtime/R3Extensions.cs index 2c668e2..a4a897d 100644 --- a/src/VitalRouter.Unity/Assets/VitalRouter/Runtime/R3Extensions.cs +++ b/src/VitalRouter.Unity/Assets/VitalRouter/Runtime/R3Extensions.cs @@ -1,8 +1,9 @@ #if VITALROUTER_R3_INTEGRATION using System; using System.Collections.Generic; +using System.Runtime.CompilerServices; using System.Threading; -using Cysharp.Threading.Tasks; +using System.Threading.Tasks; using R3; using VitalRouter.Internal; @@ -13,10 +14,10 @@ public static class R3Extensions public static IDisposable SubscribeToPublish(this Observable source, ICommandPublisher publisher) where T : ICommand { - return source.Subscribe(publisher, (x, p) => p.PublishAsync(x).Forget()); + return source.Subscribe(publisher, (x, p) => p.PublishAsync(x)); } - public static UniTask ForEachPublishAndForgetAsync(this Observable source, ICommandPublisher publisher, CancellationToken cancellation = default) + public static Task ForEachPublishAndForgetAsync(this Observable source, ICommandPublisher publisher, CancellationToken cancellation = default) where T : ICommand { var observer = new ForEachPublishAndForgetObserver(publisher, cancellation); @@ -24,7 +25,7 @@ public static UniTask ForEachPublishAndForgetAsync(this Observable source, return observer.Task; } - public static UniTask ForEachPublishAndAwaitAsync(this Observable source, ICommandPublisher publisher, CancellationToken cancellation = default) + public static Task ForEachPublishAndAwaitAsync(this Observable source, ICommandPublisher publisher, CancellationToken cancellation = default) where T : ICommand { var observer = new ForEachPublishAndAwaitObserver(publisher, cancellation); @@ -65,13 +66,13 @@ public void Receive(TReceive command, PublishContext context) where TR sealed class ForEachPublishAndForgetObserver : Observer where T : ICommand { - public UniTask Task => outerCompletionSource.Task; + public Task Task => outerCompletionSource.Task; readonly ICommandPublisher publisher; - readonly UniTaskCompletionSource outerCompletionSource = new(); + readonly TaskCompletionSource outerCompletionSource = new(); - CancellationTokenRegistration tokenRegistration; - CancellationToken cancellationToken; + readonly CancellationTokenRegistration tokenRegistration; + readonly CancellationToken cancellationToken; bool isStopped; @@ -93,7 +94,7 @@ public ForEachPublishAndForgetObserver(ICommandPublisher publisher, Cancellation protected override void OnNextCore(T value) { - publisher.PublishAsync(value, cancellationToken).Forget(); + _ = publisher.PublishAsync(value, cancellationToken); } protected override void OnErrorResumeCore(Exception error) @@ -125,7 +126,7 @@ protected override void OnCompletedCore(Result result) { try { - outerCompletionSource.TrySetResult(); + outerCompletionSource.TrySetResult(true); } finally { @@ -143,16 +144,16 @@ protected override void DisposeCore() sealed class ForEachPublishAndAwaitObserver : Observer where T : ICommand { - public UniTask Task => outerCompletionSource.Task; + public Task Task => outerCompletionSource.Task; readonly ICommandPublisher publisher; - readonly UniTaskCompletionSource outerCompletionSource = new(); + readonly TaskCompletionSource outerCompletionSource = new(); readonly Queue commandQueue = new(); - readonly Action continuation; + readonly Action continuation; - UniTask.Awaiter currentAwaiter; - CancellationTokenRegistration tokenRegistration; - CancellationToken cancellationToken; + ValueTaskAwaiter currentAwaiter; + readonly CancellationTokenRegistration tokenRegistration; + readonly CancellationToken cancellationToken; bool isStopped; @@ -171,7 +172,7 @@ public ForEachPublishAndAwaitObserver(ICommandPublisher publisher, CancellationT }, this, useSynchronizationContext: false); } - continuation = static x => ((ForEachPublishAndAwaitObserver)x).Continue(); + continuation = Continue; } protected override void OnNextCore(T value) @@ -187,7 +188,7 @@ protected override void OnNextCore(T value) } else { - currentAwaiter.SourceOnCompleted(continuation, this); + currentAwaiter.UnsafeOnCompleted(continuation); } } else @@ -231,7 +232,7 @@ protected override void OnCompletedCore(Result result) { try { - outerCompletionSource.TrySetResult(); + outerCompletionSource.TrySetResult(true); } finally { @@ -273,7 +274,7 @@ void Continue() } else { - currentAwaiter.SourceOnCompleted(continuation, this); + currentAwaiter.UnsafeOnCompleted(continuation); } } else @@ -282,7 +283,7 @@ void Continue() { try { - outerCompletionSource.TrySetResult(); + outerCompletionSource.TrySetResult(true); } finally {