From 0589f8b660b065d57cc8909870ea3cb23da7cd42 Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Mon, 17 Jun 2024 14:33:53 +0900 Subject: [PATCH 1/4] Use Dictionary + lock instead of ConcurrentDictionary --- .../Microbenchmark.Client/Program.cs | 3 +- .../StreamingHubClientBase.cs | 44 ++++++++++++++----- .../StreamingHubClientBase.cs | 44 ++++++++++++++----- 3 files changed, 66 insertions(+), 25 deletions(-) diff --git a/perf/Microbenchmark/Microbenchmark.Client/Program.cs b/perf/Microbenchmark/Microbenchmark.Client/Program.cs index 1984c62ac..a14ec7067 100644 --- a/perf/Microbenchmark/Microbenchmark.Client/Program.cs +++ b/perf/Microbenchmark/Microbenchmark.Client/Program.cs @@ -2,7 +2,8 @@ using MagicOnion; using Microbenchmark.Client; -BenchmarkRunner.Run(); +//BenchmarkRunner.Run(); +//BenchmarkRunner.Run(); class MySynchronizationContext : SynchronizationContext; diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs index 528e431da..9518d098e 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs @@ -84,7 +84,7 @@ public abstract class StreamingHubClientBase readonly IMagicOnionSerializer messageSerializer; readonly Method duplexStreamingConnectMethod; // {messageId, TaskCompletionSource} - readonly ConcurrentDictionary responseFutures = new(); + readonly Dictionary responseFutures = new(); readonly TaskCompletionSource waitForDisconnect = new(); readonly CancellationTokenSource cancellationTokenSource = new(); @@ -319,19 +319,26 @@ SendOrPostCallback CreateBroadcastCallback(int methodId, int consumed) void ProcessResponse(SynchronizationContext? syncContext, StreamingHubPayload payload, ref StreamingHubClientMessageReader messageReader) { var message = messageReader.ReadResponseMessage(); - if (responseFutures.Remove(message.MessageId, out var future)) + + ITaskCompletion? future; + lock (responseFutures) { - try + if (!responseFutures.Remove(message.MessageId, out future)) { - OnResponseEvent(message.MethodId, future, message.Body); - StreamingHubPayloadPool.Shared.Return(payload); + return; } - catch (Exception ex) + } + + try + { + OnResponseEvent(message.MethodId, future, message.Body); + StreamingHubPayloadPool.Shared.Return(payload); + } + catch (Exception ex) + { + if (!future.TrySetException(ex)) { - if (!future.TrySetException(ex)) - { - throw; - } + throw; } } } @@ -339,7 +346,17 @@ void ProcessResponse(SynchronizationContext? syncContext, StreamingHubPayload pa void ProcessResponseWithError(SynchronizationContext? syncContext, StreamingHubPayload payload, ref StreamingHubClientMessageReader messageReader) { var message = messageReader.ReadResponseWithErrorMessage(); - if (responseFutures.Remove(message.MessageId, out var future)) + + ITaskCompletion? future; + lock (responseFutures) + { + if (!responseFutures.Remove(message.MessageId, out future)) + { + return; + } + } + + if (responseFutures.Remove(message.MessageId, out future)) { RpcException ex; if (string.IsNullOrWhiteSpace(message.Error)) @@ -462,7 +479,10 @@ protected Task WriteMessageWithResponseAsync(int TaskCreationOptions.RunContinuationsAsynchronously #endif ); - responseFutures[mid] = tcs; + lock (responseFutures) + { + responseFutures[mid] = tcs; + } var v = BuildRequestMessage(methodId, mid, message); _ = writerQueue.Writer.TryWrite(v); diff --git a/src/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client/StreamingHubClientBase.cs index 528e431da..9518d098e 100644 --- a/src/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client/StreamingHubClientBase.cs @@ -84,7 +84,7 @@ public abstract class StreamingHubClientBase readonly IMagicOnionSerializer messageSerializer; readonly Method duplexStreamingConnectMethod; // {messageId, TaskCompletionSource} - readonly ConcurrentDictionary responseFutures = new(); + readonly Dictionary responseFutures = new(); readonly TaskCompletionSource waitForDisconnect = new(); readonly CancellationTokenSource cancellationTokenSource = new(); @@ -319,19 +319,26 @@ SendOrPostCallback CreateBroadcastCallback(int methodId, int consumed) void ProcessResponse(SynchronizationContext? syncContext, StreamingHubPayload payload, ref StreamingHubClientMessageReader messageReader) { var message = messageReader.ReadResponseMessage(); - if (responseFutures.Remove(message.MessageId, out var future)) + + ITaskCompletion? future; + lock (responseFutures) { - try + if (!responseFutures.Remove(message.MessageId, out future)) { - OnResponseEvent(message.MethodId, future, message.Body); - StreamingHubPayloadPool.Shared.Return(payload); + return; } - catch (Exception ex) + } + + try + { + OnResponseEvent(message.MethodId, future, message.Body); + StreamingHubPayloadPool.Shared.Return(payload); + } + catch (Exception ex) + { + if (!future.TrySetException(ex)) { - if (!future.TrySetException(ex)) - { - throw; - } + throw; } } } @@ -339,7 +346,17 @@ void ProcessResponse(SynchronizationContext? syncContext, StreamingHubPayload pa void ProcessResponseWithError(SynchronizationContext? syncContext, StreamingHubPayload payload, ref StreamingHubClientMessageReader messageReader) { var message = messageReader.ReadResponseWithErrorMessage(); - if (responseFutures.Remove(message.MessageId, out var future)) + + ITaskCompletion? future; + lock (responseFutures) + { + if (!responseFutures.Remove(message.MessageId, out future)) + { + return; + } + } + + if (responseFutures.Remove(message.MessageId, out future)) { RpcException ex; if (string.IsNullOrWhiteSpace(message.Error)) @@ -462,7 +479,10 @@ protected Task WriteMessageWithResponseAsync(int TaskCreationOptions.RunContinuationsAsynchronously #endif ); - responseFutures[mid] = tcs; + lock (responseFutures) + { + responseFutures[mid] = tcs; + } var v = BuildRequestMessage(methodId, mid, message); _ = writerQueue.Writer.TryWrite(v); From 0d71347e152f267239abc808ec402699906d5064 Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Mon, 17 Jun 2024 14:49:57 +0900 Subject: [PATCH 2/4] Update benchmark --- .../ChannelAsyncStreamReader.cs | 9 ++++- .../Microbenchmark.Client/Program.cs | 6 ++++ .../StreamingHubClientTestHelper.cs | 35 +++++++++++++++---- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/perf/Microbenchmark/Microbenchmark.Client/ChannelAsyncStreamReader.cs b/perf/Microbenchmark/Microbenchmark.Client/ChannelAsyncStreamReader.cs index 8d3fc2427..bfa5bf7bb 100644 --- a/perf/Microbenchmark/Microbenchmark.Client/ChannelAsyncStreamReader.cs +++ b/perf/Microbenchmark/Microbenchmark.Client/ChannelAsyncStreamReader.cs @@ -1,3 +1,4 @@ +using System.Runtime.CompilerServices; using System.Threading.Channels; using Grpc.Core; @@ -14,7 +15,13 @@ public ChannelAsyncStreamReader(Channel channel) reader = channel.Reader; } - public async Task MoveNext(CancellationToken cancellationToken) + public Task MoveNext(CancellationToken cancellationToken) + { + return MoveNextCore(cancellationToken).AsTask(); + } + + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] + async ValueTask MoveNextCore(CancellationToken cancellationToken) { if (await reader.WaitToReadAsync()) { diff --git a/perf/Microbenchmark/Microbenchmark.Client/Program.cs b/perf/Microbenchmark/Microbenchmark.Client/Program.cs index a14ec7067..86af100d0 100644 --- a/perf/Microbenchmark/Microbenchmark.Client/Program.cs +++ b/perf/Microbenchmark/Microbenchmark.Client/Program.cs @@ -5,6 +5,12 @@ //BenchmarkRunner.Run(); //BenchmarkRunner.Run(); +#if FALSE +var b = new HubMethodBenchmarks(); +for (var i = 0; i < 1000000; i++) + await b.Parameter_Zero_Return_ValueType(); +#endif + class MySynchronizationContext : SynchronizationContext; class TestHubReceiver : ITestHubReceiver diff --git a/perf/Microbenchmark/Microbenchmark.Client/StreamingHubClientTestHelper.cs b/perf/Microbenchmark/Microbenchmark.Client/StreamingHubClientTestHelper.cs index ebb85bf12..c6490db6f 100644 --- a/perf/Microbenchmark/Microbenchmark.Client/StreamingHubClientTestHelper.cs +++ b/perf/Microbenchmark/Microbenchmark.Client/StreamingHubClientTestHelper.cs @@ -86,28 +86,49 @@ public async Task ConnectAsync(StreamingHubClientOptions options, ); } - public async Task> ReadRequestRawAsync() + public async ValueTask> ReadRequestRawAsync() { var requestPayload = await requestChannel.Reader.ReadAsync(); return requestPayload.Memory; } - public async Task<(int MessageId, int MethodId, T Request)> ReadRequestAsync() + public async ValueTask<(int MessageId, int MethodId, T Request)> ReadRequestAsync() { var requestPayload = await requestChannel.Reader.ReadAsync(); - return ReadRequestPayload(requestPayload.Memory); + try + { + return ReadRequestPayload(requestPayload.Memory); + } + finally + { + StreamingHubPayloadPool.Shared.Return(requestPayload); + } } - public async Task<(int MessageId, int MethodId, ReadOnlyMemory Request)> ReadRequestNoDeserializeAsync() + public async ValueTask<(int MessageId, int MethodId, ReadOnlyMemory Request)> ReadRequestNoDeserializeAsync() { var requestPayload = await requestChannel.Reader.ReadAsync(); - return ReadRequestPayload(requestPayload.Memory); + try + { + return ReadRequestPayload(requestPayload.Memory); + } + finally + { + StreamingHubPayloadPool.Shared.Return(requestPayload); + } } - public async Task<(int MethodId, T Request)> ReadFireAndForgetRequestAsync() + public async ValueTask<(int MethodId, T Request)> ReadFireAndForgetRequestAsync() { var requestPayload = await requestChannel.Reader.ReadAsync(); - return ReadFireAndForgetRequestPayload(requestPayload.Memory); + try + { + return ReadFireAndForgetRequestPayload(requestPayload.Memory); + } + finally + { + StreamingHubPayloadPool.Shared.Return(requestPayload); + } } public void WriteResponseRaw(ReadOnlySpan data) From 0e5cb85e2254a0d51a60f1acee2234e0e2c3b63e Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Mon, 17 Jun 2024 14:52:34 +0900 Subject: [PATCH 3/4] Fix build --- perf/Microbenchmark/Microbenchmark.Client/Program.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/perf/Microbenchmark/Microbenchmark.Client/Program.cs b/perf/Microbenchmark/Microbenchmark.Client/Program.cs index 86af100d0..60e603a1b 100644 --- a/perf/Microbenchmark/Microbenchmark.Client/Program.cs +++ b/perf/Microbenchmark/Microbenchmark.Client/Program.cs @@ -3,7 +3,7 @@ using Microbenchmark.Client; //BenchmarkRunner.Run(); -//BenchmarkRunner.Run(); +BenchmarkRunner.Run(); #if FALSE var b = new HubMethodBenchmarks(); From 036e8137a81a098fbd504454769c9a60ad7532b7 Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Mon, 17 Jun 2024 15:07:52 +0900 Subject: [PATCH 4/4] Fix --- .../StreamingHubClientBase.cs | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client/StreamingHubClientBase.cs index 9518d098e..fcb51a355 100644 --- a/src/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client/StreamingHubClientBase.cs @@ -356,21 +356,18 @@ void ProcessResponseWithError(SynchronizationContext? syncContext, StreamingHubP } } - if (responseFutures.Remove(message.MessageId, out future)) + RpcException ex; + if (string.IsNullOrWhiteSpace(message.Error)) { - RpcException ex; - if (string.IsNullOrWhiteSpace(message.Error)) - { - ex = new RpcException(new Status((StatusCode)message.StatusCode, message.Detail ?? string.Empty)); - } - else - { - ex = new RpcException(new Status((StatusCode)message.StatusCode, message.Detail ?? string.Empty), message.Detail + Environment.NewLine + message.Error); - } - - future.TrySetException(ex); - StreamingHubPayloadPool.Shared.Return(payload); + ex = new RpcException(new Status((StatusCode)message.StatusCode, message.Detail ?? string.Empty)); } + else + { + ex = new RpcException(new Status((StatusCode)message.StatusCode, message.Detail ?? string.Empty), message.Detail + Environment.NewLine + message.Error); + } + + future.TrySetException(ex); + StreamingHubPayloadPool.Shared.Return(payload); } void ProcessClientResultRequest(SynchronizationContext? syncContext, StreamingHubPayload payload, ref StreamingHubClientMessageReader messageReader)