Skip to content

Commit

Permalink
Fix compilation errors with r3
Browse files Browse the repository at this point in the history
  • Loading branch information
hadashiA committed Sep 30, 2024
1 parent f8857f1 commit 17e1e56
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public TestAsyncSubscriber(FrameProvider frameProvider)
this.frameProvider = frameProvider;
}

public UniTask ReceiveAsync<T>(T command, PublishContext context) where T : ICommand
public ValueTask ReceiveAsync<T>(T command, PublishContext context) where T : ICommand
{
Queue.Enqueue(command);
tcs = new UniTaskCompletionSource();
Expand Down
43 changes: 22 additions & 21 deletions src/VitalRouter.Unity/Assets/VitalRouter/Runtime/R3Extensions.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#if VITALROUTER_R3_INTEGRATION
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using Cysharp.Threading.Tasks;
using System.Threading.Tasks;
using R3;
using VitalRouter.Internal;

Expand All @@ -13,18 +14,18 @@ public static class R3Extensions
public static IDisposable SubscribeToPublish<T>(this Observable<T> source, ICommandPublisher publisher)
where T : ICommand
{
return source.Subscribe(publisher, (x, p) => p.PublishAsync(x).Forget());
return source.Subscribe(publisher, (x, p) => p.PublishAsync(x));
}

public static UniTask ForEachPublishAndForgetAsync<T>(this Observable<T> source, ICommandPublisher publisher, CancellationToken cancellation = default)
public static Task ForEachPublishAndForgetAsync<T>(this Observable<T> source, ICommandPublisher publisher, CancellationToken cancellation = default)
where T : ICommand
{
var observer = new ForEachPublishAndForgetObserver<T>(publisher, cancellation);
source.Subscribe(observer);
return observer.Task;
}

public static UniTask ForEachPublishAndAwaitAsync<T>(this Observable<T> source, ICommandPublisher publisher, CancellationToken cancellation = default)
public static Task ForEachPublishAndAwaitAsync<T>(this Observable<T> source, ICommandPublisher publisher, CancellationToken cancellation = default)
where T : ICommand
{
var observer = new ForEachPublishAndAwaitObserver<T>(publisher, cancellation);
Expand Down Expand Up @@ -65,13 +66,13 @@ public void Receive<TReceive>(TReceive command, PublishContext context) where TR

sealed class ForEachPublishAndForgetObserver<T> : Observer<T> where T : ICommand
{
public UniTask Task => outerCompletionSource.Task;
public Task Task => outerCompletionSource.Task;

readonly ICommandPublisher publisher;
readonly UniTaskCompletionSource outerCompletionSource = new();
readonly TaskCompletionSource<bool> outerCompletionSource = new();

CancellationTokenRegistration tokenRegistration;
CancellationToken cancellationToken;
readonly CancellationTokenRegistration tokenRegistration;
readonly CancellationToken cancellationToken;

bool isStopped;

Expand All @@ -93,7 +94,7 @@ public ForEachPublishAndForgetObserver(ICommandPublisher publisher, Cancellation

protected override void OnNextCore(T value)
{
publisher.PublishAsync(value, cancellationToken).Forget();
_ = publisher.PublishAsync(value, cancellationToken);
}

protected override void OnErrorResumeCore(Exception error)
Expand Down Expand Up @@ -125,7 +126,7 @@ protected override void OnCompletedCore(Result result)
{
try
{
outerCompletionSource.TrySetResult();
outerCompletionSource.TrySetResult(true);
}
finally
{
Expand All @@ -143,16 +144,16 @@ protected override void DisposeCore()

sealed class ForEachPublishAndAwaitObserver<T> : Observer<T> where T : ICommand
{
public UniTask Task => outerCompletionSource.Task;
public Task Task => outerCompletionSource.Task;

readonly ICommandPublisher publisher;
readonly UniTaskCompletionSource outerCompletionSource = new();
readonly TaskCompletionSource<bool> outerCompletionSource = new();
readonly Queue<T> commandQueue = new();
readonly Action<object> continuation;
readonly Action continuation;

UniTask.Awaiter currentAwaiter;
CancellationTokenRegistration tokenRegistration;
CancellationToken cancellationToken;
ValueTaskAwaiter currentAwaiter;
readonly CancellationTokenRegistration tokenRegistration;
readonly CancellationToken cancellationToken;

bool isStopped;

Expand All @@ -171,7 +172,7 @@ public ForEachPublishAndAwaitObserver(ICommandPublisher publisher, CancellationT
}, this, useSynchronizationContext: false);
}

continuation = static x => ((ForEachPublishAndAwaitObserver<T>)x).Continue();
continuation = Continue;
}

protected override void OnNextCore(T value)
Expand All @@ -187,7 +188,7 @@ protected override void OnNextCore(T value)
}
else
{
currentAwaiter.SourceOnCompleted(continuation, this);
currentAwaiter.UnsafeOnCompleted(continuation);
}
}
else
Expand Down Expand Up @@ -231,7 +232,7 @@ protected override void OnCompletedCore(Result result)
{
try
{
outerCompletionSource.TrySetResult();
outerCompletionSource.TrySetResult(true);
}
finally
{
Expand Down Expand Up @@ -273,7 +274,7 @@ void Continue()
}
else
{
currentAwaiter.SourceOnCompleted(continuation, this);
currentAwaiter.UnsafeOnCompleted(continuation);
}
}
else
Expand All @@ -282,7 +283,7 @@ void Continue()
{
try
{
outerCompletionSource.TrySetResult();
outerCompletionSource.TrySetResult(true);
}
finally
{
Expand Down

0 comments on commit 17e1e56

Please sign in to comment.