Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/enhance logging #51

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@

<ItemGroup>
<PackageReference Include="Grpc.AspNetCore" Version="2.49.0" Condition="'$(TargetFramework)' != 'net462'" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.7.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
Expand Down
17 changes: 10 additions & 7 deletions GrpcDotNetNamedPipes.Tests/GrpcDotNetNamedPipes.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.21.9" />
<PackageReference Include="Grpc" Version="2.46.5" />
<PackageReference Include="Grpc.Core.Api" Version="2.49.0" />
<PackageReference Include="Grpc.Tools" Version="2.50.0" PrivateAssets="All" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.2" />
<PackageReference Include="Google.Protobuf" Version="3.24.3" />
<PackageReference Include="Grpc" Version="2.46.6" />
<PackageReference Include="Grpc.Core.Api" Version="2.57.0" />
<PackageReference Include="Grpc.Tools" Version="2.58.0" PrivateAssets="All" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.7.2" />
<PackageReference Include="System.IO.Pipes.AccessControl" Version="5.0.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1" />
<PackageReference Include="xunit" Version="2.5.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class NamedPipeChannelContextFactory : ChannelContextFactory
public ChannelContext Create(NamedPipeServerOptions options, ITestOutputHelper output)
{
var impl = new TestServiceImpl();
var server = new NamedPipeServer(_pipeName, options, output != null ? output.WriteLine : null);
var server = new NamedPipeServer(_pipeName, options, output != null ? output.WriteLine : null, output != null ? output.WriteLine : null);
TestService.BindService(server.ServiceBinder, impl);
server.Start();
return new ChannelContext
Expand All @@ -44,6 +44,7 @@ public override TestService.TestServiceClient CreateClient(ITestOutputHelper out
{
var channel = new NamedPipeChannel(".", _pipeName,
new NamedPipeChannelOptions { ConnectionTimeout = _connectionTimeout },
output != null ? output.WriteLine : null,
output != null ? output.WriteLine : null);
channel.PipeCallback = PipeCallback;
return new TestService.TestServiceClient(channel);
Expand Down
10 changes: 5 additions & 5 deletions GrpcDotNetNamedPipes/GrpcDotNetNamedPipes.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
<LangVersion>11</LangVersion>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>

<Version>2.1.0</Version>
<PackageVersion>2.1.0</PackageVersion>
<Version>2.2.0</Version>
<PackageVersion>2.2.0</PackageVersion>
<AssemblyVersion>2.0.0.0</AssemblyVersion>

<Authors>Ben Olden-Cooligan</Authors>
Expand All @@ -25,9 +25,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.21.9" />
<PackageReference Include="Grpc.Core.Api" Version="2.49.0" />
<PackageReference Include="Grpc.Tools" Version="2.50.0" PrivateAssets="All" />
<PackageReference Include="Google.Protobuf" Version="3.24.3" />
<PackageReference Include="Grpc.Core.Api" Version="2.57.0" />
<PackageReference Include="Grpc.Tools" Version="2.58.0" PrivateAssets="All" />
<PackageReference Include="System.IO.Pipes.AccessControl" Version="5.0.0" Condition="'$(TargetFramework)' == 'net5.0-windows'" />
<PackageReference Include="System.Memory" Version="4.5.5" />
</ItemGroup>
Expand Down
23 changes: 16 additions & 7 deletions GrpcDotNetNamedPipes/Internal/Helpers/ConnectionLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ internal class ConnectionLogger
private static int _lastId;

private static int NextId() => Interlocked.Increment(ref _lastId);
public static ConnectionLogger Client(Action<string> log) => new(log, "CLIENT", log != null ? NextId() : 0);
public static ConnectionLogger Server(Action<string> log) => new(log, "SERVER", 0);
public static ConnectionLogger Client(Action<string> traceLog, Action<string> errorLog) => new(traceLog, errorLog, "CLIENT", traceLog != null ? NextId() : 0);
public static ConnectionLogger Server(Action<string> traceLog, Action<string> errorLog) => new(traceLog, errorLog, "SERVER", 0);

private readonly Action<string> _log;
private readonly Action<string> _traceLog;
private readonly Action<string> _errorLog;
private readonly string _type;

private ConnectionLogger(Action<string> log, string type, int id)
private ConnectionLogger(Action<string> traceLog, Action<string> errorLog, string type, int id)
{
_log = log;
_traceLog = traceLog;
_errorLog = errorLog;
_type = type;
ConnectionId = id;
}
Expand All @@ -38,8 +40,15 @@ private ConnectionLogger(Action<string> log, string type, int id)

public void Log(string message)
{
if (_log == null) return;
if (_traceLog == null) return;
var id = ConnectionId > 0 ? ConnectionId.ToString() : "?";
_log($"[{_type}][{id}] {message}");
_traceLog($"[{_type}][{id}] {message}");
}

public void LogError(string message)
{
if(_errorLog == null) return;
var id = ConnectionId > 0 ? ConnectionId.ToString() : "?";
_errorLog($"[{_type}][{id}] {message}");
}
}
4 changes: 2 additions & 2 deletions GrpcDotNetNamedPipes/Internal/PipeReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ await _transport.Read(_messageHandler).ConfigureAwait(false))
}
catch (EndOfPipeException)
{
_logger.Log("End of pipe");
_logger.LogError("End of pipe");
}
catch (Exception error)
{
_logger.Log("Pipe read error");
_logger.LogError("Pipe read error");
_onError?.Invoke(error);
}
finally
Expand Down
23 changes: 18 additions & 5 deletions GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ internal class ServerConnectionContext : TransportMessageHandler, IDisposable
private readonly ConnectionLogger _logger;
private readonly Dictionary<string, Func<ServerConnectionContext, Task>> _methodHandlers;
private readonly PayloadQueue _payloadQueue;

private readonly TaskFactory _taskFactory;

public ServerConnectionContext(NamedPipeServerStream pipeStream, ConnectionLogger logger,
Dictionary<string, Func<ServerConnectionContext, Task>> methodHandlers)
Dictionary<string, Func<ServerConnectionContext, Task>> methodHandlers, TaskFactory taskFactory)
{
CallContext = new NamedPipeCallContext(this);
PipeStream = pipeStream;
Expand All @@ -32,6 +33,7 @@ public ServerConnectionContext(NamedPipeServerStream pipeStream, ConnectionLogge
_methodHandlers = methodHandlers;
_payloadQueue = new PayloadQueue();
CancellationTokenSource = new CancellationTokenSource();
_taskFactory = taskFactory;
}

public NamedPipeServerStream PipeStream { get; }
Expand All @@ -58,11 +60,13 @@ public IServerStreamWriter<TResponse> CreateResponseStream<TResponse>(Marshaller
return new ResponseStreamWriterImpl<TResponse>(Transport, CancellationToken.None, responseMarshaller,
() => IsCompleted);
}

public override void HandleRequestInit(string methodFullName, DateTime? deadline)
{
Deadline = new Deadline(deadline);
Task.Run(async () => await _methodHandlers[methodFullName](this).ConfigureAwait(false));
// Note the use of .ConfigureAwait(false) here...
// https://blog.stephencleary.com/2012/07/dont-block-on-async-code.html
_taskFactory.StartNew(async () => await _methodHandlers[methodFullName](this).ConfigureAwait(false));
}

public override void HandleHeaders(Metadata headers) => RequestHeaders = headers;
Expand All @@ -75,22 +79,31 @@ public override void HandleRequestInit(string methodFullName, DateTime? deadline

public void Error(Exception ex)
{
_logger.Log("RPC error");
IsCompleted = true;
if (Deadline != null && Deadline.IsExpired)
{
_logger.LogError("RPC Warning: Deadline Exceeded");
WriteTrailers(StatusCode.DeadlineExceeded, "");
}
else if (CancellationTokenSource.IsCancellationRequested)
{

_logger.LogError("RPC Warning: Cancelled");
WriteTrailers(StatusCode.Cancelled, "");
}
else if (ex is RpcException rpcException)
{

_logger.LogError($"RPC Exception: {rpcException.Message} at {rpcException.StackTrace}");
if (rpcException.InnerException != null)
_logger.LogError($"Inner exception: {rpcException.InnerException.Message} at {rpcException.InnerException.StackTrace}");
WriteTrailers(rpcException.StatusCode, rpcException.Status.Detail);
}
else
{
_logger.LogError($"RPC Exception (unknown): {ex.Message} at {ex.StackTrace}");
if (ex.InnerException != null)
_logger.LogError($"Inner exception: {ex.InnerException.Message} at {ex.InnerException.StackTrace}");
WriteTrailers(StatusCode.Unknown, "Exception was thrown by handler.");
}
}
Expand Down
7 changes: 5 additions & 2 deletions GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace GrpcDotNetNamedPipes.Internal;

internal class ServerStreamPool : IDisposable
{
private const int PoolSize = 4;
private readonly int PoolSize = 4;
private const int FallbackMin = 100;
private const int FallbackMax = 10_000;

Expand All @@ -37,6 +37,8 @@ public ServerStreamPool(string pipeName, NamedPipeServerOptions options,
_options = options;
_handleConnection = handleConnection;
_invokeError = invokeError;
if (options.ThreadPoolSize > 0)
PoolSize = options.ThreadPoolSize;
}

private NamedPipeServerStream CreatePipeServer()
Expand Down Expand Up @@ -161,7 +163,8 @@ private void RunHandleConnection(NamedPipeServerStream pipeServer)
try
{
await _handleConnection(pipeServer);
pipeServer.Disconnect();
if (pipeServer.IsConnected)
pipeServer.Disconnect();
}
catch (Exception error)
{
Expand Down
8 changes: 5 additions & 3 deletions GrpcDotNetNamedPipes/NamedPipeChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,25 @@ public class NamedPipeChannel : CallInvoker
private readonly string _pipeName;
private readonly NamedPipeChannelOptions _options;
private readonly Action<string> _log;
private readonly Action<string> _errorLog;

public NamedPipeChannel(string serverName, string pipeName)
: this(serverName, pipeName, new NamedPipeChannelOptions())
{
}

public NamedPipeChannel(string serverName, string pipeName, NamedPipeChannelOptions options)
: this(serverName, pipeName, options, null)
: this(serverName, pipeName, options, null, null)
{
}

internal NamedPipeChannel(string serverName, string pipeName, NamedPipeChannelOptions options, Action<string> log)
public NamedPipeChannel(string serverName, string pipeName, NamedPipeChannelOptions options, Action<string> log, Action<string> errorLog)
{
_serverName = serverName;
_pipeName = pipeName;
_options = options;
_log = log;
_errorLog = errorLog;
}

internal Action<NamedPipeClientStream> PipeCallback { get; set; }
Expand All @@ -62,7 +64,7 @@ private ClientConnectionContext CreateConnectionContext<TRequest, TResponse>(
try
{
bool isServerUnary = method.Type == MethodType.Unary || method.Type == MethodType.ClientStreaming;
var logger = ConnectionLogger.Client(_log);
var logger = ConnectionLogger.Client(_log, _errorLog);
var ctx = new ClientConnectionContext(stream, callOptions, isServerUnary, _options.ConnectionTimeout,
logger);
ctx.InitCall(method, request);
Expand Down
12 changes: 8 additions & 4 deletions GrpcDotNetNamedPipes/NamedPipeServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ namespace GrpcDotNetNamedPipes;
public class NamedPipeServer : IDisposable
{
private readonly ServerStreamPool _pool;
private readonly TaskFactory _taskFactory;
private readonly Action<string> _log;
private readonly Action<string> _errorLog;
private readonly Dictionary<string, Func<ServerConnectionContext, Task>> _methodHandlers = new();

public NamedPipeServer(string pipeName)
Expand All @@ -28,15 +30,17 @@ public NamedPipeServer(string pipeName)
}

public NamedPipeServer(string pipeName, NamedPipeServerOptions options)
: this(pipeName, options, null)
: this(pipeName, options, null, null)
{
}

internal NamedPipeServer(string pipeName, NamedPipeServerOptions options, Action<string> log)
public NamedPipeServer(string pipeName, NamedPipeServerOptions options, Action<string> log, Action<string> errorLog)
{
_pool = new ServerStreamPool(pipeName, options, HandleConnection, InvokeError);
_log = log;
_errorLog = errorLog;
ServiceBinder = new ServiceBinderImpl(this);
_taskFactory = options.TaskFactory ?? new TaskFactory();
}

public ServiceBinderBase ServiceBinder { get; }
Expand Down Expand Up @@ -65,8 +69,8 @@ public void Dispose()

private async Task HandleConnection(NamedPipeServerStream pipeStream)
{
var logger = ConnectionLogger.Server(_log);
var ctx = new ServerConnectionContext(pipeStream, logger, _methodHandlers);
var logger = ConnectionLogger.Server(_log, _errorLog);
var ctx = new ServerConnectionContext(pipeStream, logger, _methodHandlers, _taskFactory);
await Task.Run(new PipeReader(pipeStream, ctx, logger, ctx.Dispose, InvokeError).ReadLoop);
}

Expand Down
13 changes: 13 additions & 0 deletions GrpcDotNetNamedPipes/NamedPipeServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,17 @@ public class NamedPipeServerOptions
/// </summary>
public PipeSecurity PipeSecurity { get; set; }
#endif

/// <summary>
/// Gets or sets a Custom Task Factory to control how tasks are serviced.
/// For example, causing threads to be processsed in FIFO rather than LIFO
/// by using TaskCreationOptions.preferFairness
/// </summary>
public TaskFactory TaskFactory { get; set; }

/// <summary>
/// Gets or sets a count of threads to use for the listener.
/// If you need to address a synchronous code execution issue, try increasing
/// </summary>
public int ThreadPoolSize { get; set; } = 4;
}