From fd44e21743237d198c2546394677f698e7c27f67 Mon Sep 17 00:00:00 2001 From: Zach Baker Date: Tue, 5 Sep 2023 14:43:51 -0500 Subject: [PATCH 1/3] allow fifo task execution --- .../Internal/ServerConnectionContext.cs | 12 ++++++++---- GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs | 3 ++- GrpcDotNetNamedPipes/NamedPipeServer.cs | 6 ++++-- GrpcDotNetNamedPipes/NamedPipeServerOptions.cs | 7 +++++++ 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs b/GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs index 812536b..b65e5cc 100644 --- a/GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs +++ b/GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs @@ -21,9 +21,10 @@ internal class ServerConnectionContext : TransportMessageHandler, IDisposable private readonly ConnectionLogger _logger; private readonly Dictionary> _methodHandlers; private readonly PayloadQueue _payloadQueue; - + private readonly TaskFactory _taskFactory; + public ServerConnectionContext(NamedPipeServerStream pipeStream, ConnectionLogger logger, - Dictionary> methodHandlers) + Dictionary> methodHandlers, TaskFactory taskFactory) { CallContext = new NamedPipeCallContext(this); PipeStream = pipeStream; @@ -32,6 +33,7 @@ public ServerConnectionContext(NamedPipeServerStream pipeStream, ConnectionLogge _methodHandlers = methodHandlers; _payloadQueue = new PayloadQueue(); CancellationTokenSource = new CancellationTokenSource(); + _taskFactory = taskFactory; } public NamedPipeServerStream PipeStream { get; } @@ -58,11 +60,13 @@ public IServerStreamWriter CreateResponseStream(Marshaller return new ResponseStreamWriterImpl(Transport, CancellationToken.None, responseMarshaller, () => IsCompleted); } - + public override void HandleRequestInit(string methodFullName, DateTime? deadline) { Deadline = new Deadline(deadline); - Task.Run(async () => await _methodHandlers[methodFullName](this).ConfigureAwait(false)); + // Note the use of .ConfigureAwait(false) here... + // https://blog.stephencleary.com/2012/07/dont-block-on-async-code.html + _taskFactory.StartNew(async () => await _methodHandlers[methodFullName](this).ConfigureAwait(false)); } public override void HandleHeaders(Metadata headers) => RequestHeaders = headers; diff --git a/GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs b/GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs index e3806f7..7049b92 100644 --- a/GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs +++ b/GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs @@ -161,7 +161,8 @@ private void RunHandleConnection(NamedPipeServerStream pipeServer) try { await _handleConnection(pipeServer); - pipeServer.Disconnect(); + if (pipeServer.IsConnected) + pipeServer.Disconnect(); } catch (Exception error) { diff --git a/GrpcDotNetNamedPipes/NamedPipeServer.cs b/GrpcDotNetNamedPipes/NamedPipeServer.cs index 7a396f7..e83bd05 100644 --- a/GrpcDotNetNamedPipes/NamedPipeServer.cs +++ b/GrpcDotNetNamedPipes/NamedPipeServer.cs @@ -19,6 +19,7 @@ namespace GrpcDotNetNamedPipes; public class NamedPipeServer : IDisposable { private readonly ServerStreamPool _pool; + private readonly TaskFactory _taskFactory; private readonly Action _log; private readonly Dictionary> _methodHandlers = new(); @@ -32,11 +33,12 @@ public NamedPipeServer(string pipeName, NamedPipeServerOptions options) { } - internal NamedPipeServer(string pipeName, NamedPipeServerOptions options, Action log) + public NamedPipeServer(string pipeName, NamedPipeServerOptions options, Action log) { _pool = new ServerStreamPool(pipeName, options, HandleConnection, InvokeError); _log = log; ServiceBinder = new ServiceBinderImpl(this); + _taskFactory = options.TaskFactory ?? new TaskFactory(); } public ServiceBinderBase ServiceBinder { get; } @@ -66,7 +68,7 @@ public void Dispose() private async Task HandleConnection(NamedPipeServerStream pipeStream) { var logger = ConnectionLogger.Server(_log); - var ctx = new ServerConnectionContext(pipeStream, logger, _methodHandlers); + var ctx = new ServerConnectionContext(pipeStream, logger, _methodHandlers, _taskFactory); await Task.Run(new PipeReader(pipeStream, ctx, logger, ctx.Dispose, InvokeError).ReadLoop); } diff --git a/GrpcDotNetNamedPipes/NamedPipeServerOptions.cs b/GrpcDotNetNamedPipes/NamedPipeServerOptions.cs index 52e7461..8c0acd8 100644 --- a/GrpcDotNetNamedPipes/NamedPipeServerOptions.cs +++ b/GrpcDotNetNamedPipes/NamedPipeServerOptions.cs @@ -31,4 +31,11 @@ public class NamedPipeServerOptions /// public PipeSecurity PipeSecurity { get; set; } #endif + + /// + /// Gets or sets a Custom Task Factory to control how tasks are serviced. + /// For example, causing threads to be processsed in FIFO rather than LIFO + /// by using TaskCreationOptions.preferFairness + /// + public TaskFactory TaskFactory { get; set; } } \ No newline at end of file From b5f49a691ace577653ece178fc39f00486314235 Mon Sep 17 00:00:00 2001 From: Zach Baker Date: Fri, 8 Sep 2023 16:14:29 -0500 Subject: [PATCH 2/3] listener pool control --- .../GrpcDotNetNamedPipes.PerfTests.csproj | 7 +++++-- .../GrpcDotNetNamedPipes.Tests.csproj | 17 ++++++++++------- .../Internal/ServerStreamPool.cs | 4 +++- GrpcDotNetNamedPipes/NamedPipeServerOptions.cs | 6 ++++++ 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/GrpcDotNetNamedPipes.PerfTests/GrpcDotNetNamedPipes.PerfTests.csproj b/GrpcDotNetNamedPipes.PerfTests/GrpcDotNetNamedPipes.PerfTests.csproj index 354a359..0bec2b2 100644 --- a/GrpcDotNetNamedPipes.PerfTests/GrpcDotNetNamedPipes.PerfTests.csproj +++ b/GrpcDotNetNamedPipes.PerfTests/GrpcDotNetNamedPipes.PerfTests.csproj @@ -10,8 +10,11 @@ - - + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/GrpcDotNetNamedPipes.Tests/GrpcDotNetNamedPipes.Tests.csproj b/GrpcDotNetNamedPipes.Tests/GrpcDotNetNamedPipes.Tests.csproj index 4876e8a..e6d16f5 100644 --- a/GrpcDotNetNamedPipes.Tests/GrpcDotNetNamedPipes.Tests.csproj +++ b/GrpcDotNetNamedPipes.Tests/GrpcDotNetNamedPipes.Tests.csproj @@ -9,14 +9,17 @@ - - - - - + + + + + - - + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs b/GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs index 7049b92..1318d58 100644 --- a/GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs +++ b/GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs @@ -18,7 +18,7 @@ namespace GrpcDotNetNamedPipes.Internal; internal class ServerStreamPool : IDisposable { - private const int PoolSize = 4; + private readonly int PoolSize = 4; private const int FallbackMin = 100; private const int FallbackMax = 10_000; @@ -37,6 +37,8 @@ public ServerStreamPool(string pipeName, NamedPipeServerOptions options, _options = options; _handleConnection = handleConnection; _invokeError = invokeError; + if (options.ThreadPoolSize > 0) + PoolSize = options.ThreadPoolSize; } private NamedPipeServerStream CreatePipeServer() diff --git a/GrpcDotNetNamedPipes/NamedPipeServerOptions.cs b/GrpcDotNetNamedPipes/NamedPipeServerOptions.cs index 8c0acd8..0051e10 100644 --- a/GrpcDotNetNamedPipes/NamedPipeServerOptions.cs +++ b/GrpcDotNetNamedPipes/NamedPipeServerOptions.cs @@ -38,4 +38,10 @@ public class NamedPipeServerOptions /// by using TaskCreationOptions.preferFairness /// public TaskFactory TaskFactory { get; set; } + + /// + /// Gets or sets a count of threads to use for the listener. + /// If you need to address a synchronous code execution issue, try increasing + /// + public int ThreadPoolSize { get; set; } = 4; } \ No newline at end of file From 2a654e7de0ab112afb43bc41d9fdabcb239c92c1 Mon Sep 17 00:00:00 2001 From: Zach Baker Date: Fri, 8 Sep 2023 17:38:11 -0500 Subject: [PATCH 3/3] expose logging --- .../Helpers/NamedPipeChannelContextFactory.cs | 3 ++- .../GrpcDotNetNamedPipes.csproj | 10 ++++---- .../Internal/Helpers/ConnectionLogger.cs | 23 +++++++++++++------ GrpcDotNetNamedPipes/Internal/PipeReader.cs | 4 ++-- .../Internal/ServerConnectionContext.cs | 11 ++++++++- GrpcDotNetNamedPipes/NamedPipeChannel.cs | 8 ++++--- GrpcDotNetNamedPipes/NamedPipeServer.cs | 8 ++++--- 7 files changed, 45 insertions(+), 22 deletions(-) diff --git a/GrpcDotNetNamedPipes.Tests/Helpers/NamedPipeChannelContextFactory.cs b/GrpcDotNetNamedPipes.Tests/Helpers/NamedPipeChannelContextFactory.cs index b69d0d9..5572c06 100644 --- a/GrpcDotNetNamedPipes.Tests/Helpers/NamedPipeChannelContextFactory.cs +++ b/GrpcDotNetNamedPipes.Tests/Helpers/NamedPipeChannelContextFactory.cs @@ -24,7 +24,7 @@ public class NamedPipeChannelContextFactory : ChannelContextFactory public ChannelContext Create(NamedPipeServerOptions options, ITestOutputHelper output) { var impl = new TestServiceImpl(); - var server = new NamedPipeServer(_pipeName, options, output != null ? output.WriteLine : null); + var server = new NamedPipeServer(_pipeName, options, output != null ? output.WriteLine : null, output != null ? output.WriteLine : null); TestService.BindService(server.ServiceBinder, impl); server.Start(); return new ChannelContext @@ -44,6 +44,7 @@ public override TestService.TestServiceClient CreateClient(ITestOutputHelper out { var channel = new NamedPipeChannel(".", _pipeName, new NamedPipeChannelOptions { ConnectionTimeout = _connectionTimeout }, + output != null ? output.WriteLine : null, output != null ? output.WriteLine : null); channel.PipeCallback = PipeCallback; return new TestService.TestServiceClient(channel); diff --git a/GrpcDotNetNamedPipes/GrpcDotNetNamedPipes.csproj b/GrpcDotNetNamedPipes/GrpcDotNetNamedPipes.csproj index 50b11b1..057db92 100644 --- a/GrpcDotNetNamedPipes/GrpcDotNetNamedPipes.csproj +++ b/GrpcDotNetNamedPipes/GrpcDotNetNamedPipes.csproj @@ -5,8 +5,8 @@ 11 true - 2.1.0 - 2.1.0 + 2.2.0 + 2.2.0 2.0.0.0 Ben Olden-Cooligan @@ -25,9 +25,9 @@ - - - + + + diff --git a/GrpcDotNetNamedPipes/Internal/Helpers/ConnectionLogger.cs b/GrpcDotNetNamedPipes/Internal/Helpers/ConnectionLogger.cs index c1c183a..e492ce1 100644 --- a/GrpcDotNetNamedPipes/Internal/Helpers/ConnectionLogger.cs +++ b/GrpcDotNetNamedPipes/Internal/Helpers/ConnectionLogger.cs @@ -21,15 +21,17 @@ internal class ConnectionLogger private static int _lastId; private static int NextId() => Interlocked.Increment(ref _lastId); - public static ConnectionLogger Client(Action log) => new(log, "CLIENT", log != null ? NextId() : 0); - public static ConnectionLogger Server(Action log) => new(log, "SERVER", 0); + public static ConnectionLogger Client(Action traceLog, Action errorLog) => new(traceLog, errorLog, "CLIENT", traceLog != null ? NextId() : 0); + public static ConnectionLogger Server(Action traceLog, Action errorLog) => new(traceLog, errorLog, "SERVER", 0); - private readonly Action _log; + private readonly Action _traceLog; + private readonly Action _errorLog; private readonly string _type; - private ConnectionLogger(Action log, string type, int id) + private ConnectionLogger(Action traceLog, Action errorLog, string type, int id) { - _log = log; + _traceLog = traceLog; + _errorLog = errorLog; _type = type; ConnectionId = id; } @@ -38,8 +40,15 @@ private ConnectionLogger(Action log, string type, int id) public void Log(string message) { - if (_log == null) return; + if (_traceLog == null) return; var id = ConnectionId > 0 ? ConnectionId.ToString() : "?"; - _log($"[{_type}][{id}] {message}"); + _traceLog($"[{_type}][{id}] {message}"); + } + + public void LogError(string message) + { + if(_errorLog == null) return; + var id = ConnectionId > 0 ? ConnectionId.ToString() : "?"; + _errorLog($"[{_type}][{id}] {message}"); } } \ No newline at end of file diff --git a/GrpcDotNetNamedPipes/Internal/PipeReader.cs b/GrpcDotNetNamedPipes/Internal/PipeReader.cs index dce42c5..7ca30c5 100644 --- a/GrpcDotNetNamedPipes/Internal/PipeReader.cs +++ b/GrpcDotNetNamedPipes/Internal/PipeReader.cs @@ -49,11 +49,11 @@ await _transport.Read(_messageHandler).ConfigureAwait(false)) } catch (EndOfPipeException) { - _logger.Log("End of pipe"); + _logger.LogError("End of pipe"); } catch (Exception error) { - _logger.Log("Pipe read error"); + _logger.LogError("Pipe read error"); _onError?.Invoke(error); } finally diff --git a/GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs b/GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs index b65e5cc..c6a5603 100644 --- a/GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs +++ b/GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs @@ -79,22 +79,31 @@ public override void HandleRequestInit(string methodFullName, DateTime? deadline public void Error(Exception ex) { - _logger.Log("RPC error"); IsCompleted = true; if (Deadline != null && Deadline.IsExpired) { + _logger.LogError("RPC Warning: Deadline Exceeded"); WriteTrailers(StatusCode.DeadlineExceeded, ""); } else if (CancellationTokenSource.IsCancellationRequested) { + + _logger.LogError("RPC Warning: Cancelled"); WriteTrailers(StatusCode.Cancelled, ""); } else if (ex is RpcException rpcException) { + + _logger.LogError($"RPC Exception: {rpcException.Message} at {rpcException.StackTrace}"); + if (rpcException.InnerException != null) + _logger.LogError($"Inner exception: {rpcException.InnerException.Message} at {rpcException.InnerException.StackTrace}"); WriteTrailers(rpcException.StatusCode, rpcException.Status.Detail); } else { + _logger.LogError($"RPC Exception (unknown): {ex.Message} at {ex.StackTrace}"); + if (ex.InnerException != null) + _logger.LogError($"Inner exception: {ex.InnerException.Message} at {ex.InnerException.StackTrace}"); WriteTrailers(StatusCode.Unknown, "Exception was thrown by handler."); } } diff --git a/GrpcDotNetNamedPipes/NamedPipeChannel.cs b/GrpcDotNetNamedPipes/NamedPipeChannel.cs index e7c1e57..e6540dc 100644 --- a/GrpcDotNetNamedPipes/NamedPipeChannel.cs +++ b/GrpcDotNetNamedPipes/NamedPipeChannel.cs @@ -22,6 +22,7 @@ public class NamedPipeChannel : CallInvoker private readonly string _pipeName; private readonly NamedPipeChannelOptions _options; private readonly Action _log; + private readonly Action _errorLog; public NamedPipeChannel(string serverName, string pipeName) : this(serverName, pipeName, new NamedPipeChannelOptions()) @@ -29,16 +30,17 @@ public NamedPipeChannel(string serverName, string pipeName) } public NamedPipeChannel(string serverName, string pipeName, NamedPipeChannelOptions options) - : this(serverName, pipeName, options, null) + : this(serverName, pipeName, options, null, null) { } - internal NamedPipeChannel(string serverName, string pipeName, NamedPipeChannelOptions options, Action log) + public NamedPipeChannel(string serverName, string pipeName, NamedPipeChannelOptions options, Action log, Action errorLog) { _serverName = serverName; _pipeName = pipeName; _options = options; _log = log; + _errorLog = errorLog; } internal Action PipeCallback { get; set; } @@ -62,7 +64,7 @@ private ClientConnectionContext CreateConnectionContext( try { bool isServerUnary = method.Type == MethodType.Unary || method.Type == MethodType.ClientStreaming; - var logger = ConnectionLogger.Client(_log); + var logger = ConnectionLogger.Client(_log, _errorLog); var ctx = new ClientConnectionContext(stream, callOptions, isServerUnary, _options.ConnectionTimeout, logger); ctx.InitCall(method, request); diff --git a/GrpcDotNetNamedPipes/NamedPipeServer.cs b/GrpcDotNetNamedPipes/NamedPipeServer.cs index e83bd05..a7cc28b 100644 --- a/GrpcDotNetNamedPipes/NamedPipeServer.cs +++ b/GrpcDotNetNamedPipes/NamedPipeServer.cs @@ -21,6 +21,7 @@ public class NamedPipeServer : IDisposable private readonly ServerStreamPool _pool; private readonly TaskFactory _taskFactory; private readonly Action _log; + private readonly Action _errorLog; private readonly Dictionary> _methodHandlers = new(); public NamedPipeServer(string pipeName) @@ -29,14 +30,15 @@ public NamedPipeServer(string pipeName) } public NamedPipeServer(string pipeName, NamedPipeServerOptions options) - : this(pipeName, options, null) + : this(pipeName, options, null, null) { } - public NamedPipeServer(string pipeName, NamedPipeServerOptions options, Action log) + public NamedPipeServer(string pipeName, NamedPipeServerOptions options, Action log, Action errorLog) { _pool = new ServerStreamPool(pipeName, options, HandleConnection, InvokeError); _log = log; + _errorLog = errorLog; ServiceBinder = new ServiceBinderImpl(this); _taskFactory = options.TaskFactory ?? new TaskFactory(); } @@ -67,7 +69,7 @@ public void Dispose() private async Task HandleConnection(NamedPipeServerStream pipeStream) { - var logger = ConnectionLogger.Server(_log); + var logger = ConnectionLogger.Server(_log, _errorLog); var ctx = new ServerConnectionContext(pipeStream, logger, _methodHandlers, _taskFactory); await Task.Run(new PipeReader(pipeStream, ctx, logger, ctx.Dispose, InvokeError).ReadLoop); }