Skip to content

Commit

Permalink
High performance logging
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Jan 12, 2025
1 parent 8a5fff7 commit 704caa9
Show file tree
Hide file tree
Showing 23 changed files with 719 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,46 @@
/// <summary>
/// Resolves the <see cref="IMessageBus"/> from the current ASP.NET Core web request (if present, otherwise falls back to the application root container).
/// </summary>
public class HttpContextAccessorCurrentMessageBusProvider(
public partial class HttpContextAccessorCurrentMessageBusProvider(
ILogger<HttpContextAccessorCurrentMessageBusProvider> logger,
IHttpContextAccessor httpContextAccessor,
IServiceProvider serviceProvider)
: CurrentMessageBusProvider(serviceProvider)
{
private readonly ILogger<HttpContextAccessorCurrentMessageBusProvider> _logger = logger;

public override IMessageBus GetCurrent()
{
// When the call to resolve the given type is made within an HTTP Request, use the request scope service provider
var httpContext = httpContextAccessor?.HttpContext;
if (httpContext != null)
{
logger.LogTrace("The type IMessageBus will be requested from the per-request scope");
LogCurrentFrom("request");
return httpContext.RequestServices.GetService<IMessageBus>();
}

// otherwise use the app wide scope provider
logger.LogTrace("The type IMessageBus will be requested from the app scope");
LogCurrentFrom("root");
return base.GetCurrent();
}

#region Logging

[LoggerMessage(
EventId = 0,
Level = LogLevel.Trace,
Message = "The type IMessageBus will be requested from the {ScopeName} scope")]
private partial void LogCurrentFrom(string scopeName);

#endregion
}

#if NETSTANDARD2_0

public partial class HttpContextAccessorCurrentMessageBusProvider
{
private partial void LogCurrentFrom(string scopeName)
=> _logger.LogTrace("The type IMessageBus will be requested from the {ScopeName} scope", scopeName);
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/// <summary>
/// Circuit breaker to toggle consumer status on an external events.
/// </summary>
internal sealed class CircuitBreakerAbstractConsumerInterceptor : IAbstractConsumerInterceptor
internal sealed class CircuitBreakerAbstractConsumerInterceptor(ILogger logger) : IAbstractConsumerInterceptor
{
public int Order => 100;

Expand Down Expand Up @@ -33,12 +33,12 @@ async Task BreakerChanged(Circuit state)
var bus = consumer.Settings[0].MessageBusSettings.Name ?? "default";
if (shouldPause)
{
consumer.Logger.LogWarning("Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.", path, bus);
logger.LogWarning("Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.", path, bus);
await consumer.DoStop().ConfigureAwait(false);
}
else
{
consumer.Logger.LogInformation("Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.", path, bus);
logger.LogInformation("Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.", path, bus);
await consumer.DoStart().ConfigureAwait(false);
}
consumer.SetIsPaused(shouldPause);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
namespace SlimMessageBus.Host.Memory;

public abstract class AbstractMessageProcessorQueue(IMessageProcessor<object> messageProcessor, ILogger logger) : IMessageProcessorQueue
public abstract partial class AbstractMessageProcessorQueue(IMessageProcessor<object> messageProcessor, ILogger logger) : IMessageProcessorQueue
{
private readonly ILogger _logger = logger;

public abstract void Enqueue(object transportMessage, IReadOnlyDictionary<string, object> messageHeaders);

protected async Task ProcessMessage(object transportMessage, IReadOnlyDictionary<string, object> messageHeaders, CancellationToken cancellationToken)
Expand All @@ -23,7 +25,27 @@ protected async Task ProcessMessage(object transportMessage, IReadOnlyDictionary
if (r.Exception != null)
{
// We rely on the IMessageProcessor to execute the ConsumerErrorHandler<T>, but if it's not registered in the DI, it fails, or there is another fatal error then the message will be lost.
logger.LogError(r.Exception, "Error processing message {Message} of type {MessageType}", transportMessage, transportMessage.GetType());
LogMessageError(transportMessage, transportMessage.GetType(), r.Exception);
}
}

#region Logging

[LoggerMessage(
EventId = 0,
Level = LogLevel.Error,
Message = "Error processing message {TransportMessage} of type {TransportMessageType}")]
private partial void LogMessageError(object transportMessage, Type transportMessageType, Exception e);

#endregion
}

#if NETSTANDARD2_0

public abstract partial class AbstractMessageProcessorQueue
{
private partial void LogMessageError(object transportMessage, Type transportMessageType, Exception e)
=> _logger.LogError(e, "Error processing message {TransportMessage} of type {TransportMessageType}", transportMessage, transportMessageType);
}

#endif
24 changes: 22 additions & 2 deletions src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/// <summary>
/// In-memory message bus <see cref="IMessageBus"/> implementation to use for in process message passing.
/// </summary>
public class MemoryMessageBus : MessageBusBase<MemoryMessageBusSettings>
public partial class MemoryMessageBus : MessageBusBase<MemoryMessageBusSettings>
{
private readonly ILogger _logger;
private IDictionary<string, IMessageProcessor<object>> _messageProcessorByPath;
Expand Down Expand Up @@ -133,7 +133,7 @@ private async Task<TResponseMessage> ProduceInternal<TResponseMessage>(object me
path ??= GetDefaultPath(producerSettings.MessageType, producerSettings);
if (!_messageProcessorByPath.TryGetValue(path, out var messageProcessor))
{
_logger.LogDebug("No consumers interested in message type {MessageType} on path {Path}", messageType, path);
LogNoConsumerInterestedInMessageType(path, messageType);
return default;
}

Expand Down Expand Up @@ -165,4 +165,24 @@ private async Task<TResponseMessage> ProduceInternal<TResponseMessage>(object me
}
return (TResponseMessage)r.Response;
}

#region Logging

[LoggerMessage(
EventId = 0,
Level = LogLevel.Debug,
Message = "No consumers interested in message type {MessageType} on path {Path}")]
private partial void LogNoConsumerInterestedInMessageType(string path, Type messageType);

#endregion
}

#if NETSTANDARD2_0

public partial class MemoryMessageBus
{
private partial void LogNoConsumerInterestedInMessageType(string path, Type messageType)
=> _logger.LogDebug("No consumers interested in message type {MessageType} on path {Path}", messageType, path);
}

#endif
37 changes: 33 additions & 4 deletions src/SlimMessageBus.Host/Collections/ProducerByMessageTypeCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/// The message type hierarchy is discovered at runtime and cached for faster access.
/// </summary>
/// <typeparam name="TProducer">The producer type</typeparam>
public class ProducerByMessageTypeCache<TProducer> : IReadOnlyCache<Type, TProducer>
public partial class ProducerByMessageTypeCache<TProducer> : IReadOnlyCache<Type, TProducer>
where TProducer : class
{
private readonly ILogger _logger;
Expand Down Expand Up @@ -37,7 +37,7 @@ private TProducer CalculateProducer(Type messageType)
var assignableProducer = assignableProducers.FirstOrDefault();
if (assignableProducer.Key != null)
{
_logger.LogDebug("Matched producer for message type {ProducerMessageType} for dispatched message type {MessageType}", assignableProducer.Key, messageType);
LogMatchedProducerForMessageType(messageType, assignableProducer.Key);
return assignableProducer.Value;
}

Expand All @@ -52,7 +52,7 @@ private TProducer CalculateProducer(Type messageType)
}
}

_logger.LogDebug("Unable to match any declared producer for dispatched message type {MessageType}", messageType);
LogUnmatchedProducerForMessageType(messageType);

// Note: Nulls are also added to dictionary, so that we don't look them up using reflection next time (cached).
return null;
Expand All @@ -74,4 +74,33 @@ private static int CalculateBaseClassDistance(Type type, Type baseType)

return distance;
}
}

#region Logging

[LoggerMessage(
EventId = 0,
Level = LogLevel.Debug,
Message = "Matched producer for message type {ProducerMessageType} for dispatched message type {MessageType}")]
private partial void LogMatchedProducerForMessageType(Type messageType, Type producerMessageType);

[LoggerMessage(
EventId = 1,
Level = LogLevel.Debug,
Message = "Unable to match any declared producer for dispatched message type {MessageType}")]
private partial void LogUnmatchedProducerForMessageType(Type messageType);

#endregion
}

#if NETSTANDARD2_0

public partial class ProducerByMessageTypeCache<TProducer>
{
private partial void LogMatchedProducerForMessageType(Type messageType, Type producerMessageType)
=> _logger.LogDebug("Matched producer for message type {ProducerMessageType} for dispatched message type {MessageType}", producerMessageType, messageType);

private partial void LogUnmatchedProducerForMessageType(Type messageType)
=> _logger.LogDebug("Unable to match any declared producer for dispatched message type {MessageType}", messageType);
}

#endif
27 changes: 24 additions & 3 deletions src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
namespace SlimMessageBus.Host;

public abstract class AbstractConsumer : HasProviderExtensions, IAsyncDisposable, IConsumerControl
public abstract partial class AbstractConsumer : HasProviderExtensions, IAsyncDisposable, IConsumerControl
{
protected readonly ILogger Logger;
private readonly SemaphoreSlim _semaphore;
private readonly IReadOnlyList<IAbstractConsumerInterceptor> _interceptors;
private CancellationTokenSource _cancellationTokenSource;
Expand All @@ -10,7 +11,6 @@ public abstract class AbstractConsumer : HasProviderExtensions, IAsyncDisposable

public bool IsStarted { get; private set; }
public string Path { get; }
public ILogger Logger { get; }
public IReadOnlyList<AbstractConsumerSettings> Settings { get; }
protected CancellationToken CancellationToken => _cancellationTokenSource.Token;

Expand Down Expand Up @@ -39,7 +39,7 @@ private async Task<bool> CallInterceptor(Func<IAbstractConsumerInterceptor, Task
}
catch (Exception e)
{
Logger.LogError(e, "Interceptor {Interceptor} failed with error: {Error}", interceptor.GetType().Name, e.Message);
LogInterceptorFailed(interceptor.GetType(), e.Message, e);
}
}
return true;
Expand Down Expand Up @@ -178,4 +178,25 @@ protected async virtual ValueTask DisposeAsyncCore()
}

#endregion

#region Logging

[LoggerMessage(
EventId = 0,
Level = LogLevel.Error,
Message = "Interceptor {InterceptorType} failed with error: {Error}")]
private partial void LogInterceptorFailed(Type interceptorType, string error, Exception ex);

#endregion
}


#if NETSTANDARD2_0

public partial class AbstractConsumer
{
private partial void LogInterceptorFailed(Type interceptorType, string error, Exception ex)
=> Logger.LogError(ex, "Interceptor {InterceptorType} failed with error: {Error}", interceptorType, error);
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

using System.Diagnostics;

public class CheckpointTrigger : ICheckpointTrigger
public partial class CheckpointTrigger : ICheckpointTrigger
{
private readonly ILogger<CheckpointTrigger> _logger;

Expand Down Expand Up @@ -35,7 +35,6 @@ public static CheckpointValue GetCheckpointValue(HasProviderExtensions settings)
=> new(settings.GetOrDefault(CheckpointSettings.CheckpointCount, CheckpointSettings.CheckpointCountDefault),
settings.GetOrDefault(CheckpointSettings.CheckpointDuration, CheckpointSettings.CheckpointDurationDefault));


#region Implementation of ICheckpointTrigger

public bool IsEnabled
Expand All @@ -53,17 +52,37 @@ public bool Increment()
var enabled = IsEnabled;
if (enabled && _logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Checkpoint triggered after Count: {CheckpointCount}, Duration: {CheckpointDuration} (s)", _lastCheckpointCount, _lastCheckpointDuration.Elapsed.Seconds);
LogCheckpointTriggered(_lastCheckpointCount, _lastCheckpointDuration.Elapsed.Seconds);
}

return enabled;
}

}

public void Reset()
{
_lastCheckpointCount = 0;
_lastCheckpointDuration.Restart();
}

}

#endregion

#region Logging

[LoggerMessage(
EventId = 0,
Level = LogLevel.Debug,
Message = "Checkpoint triggered after Count: {CheckpointCount}, Duration: {CheckpointDuration} (s)")]
private partial void LogCheckpointTriggered(int checkpointCount, int checkpointDuration);

#endregion
}

#if NETSTANDARD2_0

public partial class CheckpointTrigger
{
private partial void LogCheckpointTriggered(int checkpointCount, int checkpointDuration)
=> _logger.LogDebug("Checkpoint triggered after Count: {CheckpointCount}, Duration: {CheckpointDuration} (s)", checkpointCount, checkpointDuration);
}

#endif
Loading

0 comments on commit 704caa9

Please sign in to comment.