Skip to content

Commit

Permalink
High performance logging
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Jan 11, 2025
1 parent 8a5fff7 commit 57a5b93
Show file tree
Hide file tree
Showing 19 changed files with 593 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/// <summary>
/// Circuit breaker to toggle consumer status on an external events.
/// </summary>
internal sealed class CircuitBreakerAbstractConsumerInterceptor : IAbstractConsumerInterceptor
internal sealed class CircuitBreakerAbstractConsumerInterceptor(ILogger logger) : IAbstractConsumerInterceptor
{
public int Order => 100;

Expand Down Expand Up @@ -33,12 +33,12 @@ async Task BreakerChanged(Circuit state)
var bus = consumer.Settings[0].MessageBusSettings.Name ?? "default";
if (shouldPause)
{
consumer.Logger.LogWarning("Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.", path, bus);
logger.LogWarning("Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.", path, bus);
await consumer.DoStop().ConfigureAwait(false);
}
else
{
consumer.Logger.LogInformation("Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.", path, bus);
logger.LogInformation("Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.", path, bus);
await consumer.DoStart().ConfigureAwait(false);
}
consumer.SetIsPaused(shouldPause);
Expand Down
37 changes: 33 additions & 4 deletions src/SlimMessageBus.Host/Collections/ProducerByMessageTypeCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/// The message type hierarchy is discovered at runtime and cached for faster access.
/// </summary>
/// <typeparam name="TProducer">The producer type</typeparam>
public class ProducerByMessageTypeCache<TProducer> : IReadOnlyCache<Type, TProducer>
public partial class ProducerByMessageTypeCache<TProducer> : IReadOnlyCache<Type, TProducer>
where TProducer : class
{
private readonly ILogger _logger;
Expand Down Expand Up @@ -37,7 +37,7 @@ private TProducer CalculateProducer(Type messageType)
var assignableProducer = assignableProducers.FirstOrDefault();
if (assignableProducer.Key != null)
{
_logger.LogDebug("Matched producer for message type {ProducerMessageType} for dispatched message type {MessageType}", assignableProducer.Key, messageType);
LogMatchedProducerForMessageType(messageType, assignableProducer.Key);
return assignableProducer.Value;
}

Expand All @@ -52,7 +52,7 @@ private TProducer CalculateProducer(Type messageType)
}
}

_logger.LogDebug("Unable to match any declared producer for dispatched message type {MessageType}", messageType);
LogUnmatchedProducerForMessageType(messageType);

// Note: Nulls are also added to dictionary, so that we don't look them up using reflection next time (cached).
return null;
Expand All @@ -74,4 +74,33 @@ private static int CalculateBaseClassDistance(Type type, Type baseType)

return distance;
}
}

#region Logging

[LoggerMessage(
EventId = 0,
Level = LogLevel.Debug,
Message = "Matched producer for message type {ProducerMessageType} for dispatched message type {MessageType}")]
partial void LogMatchedProducerForMessageType(Type messageType, Type producerMessageType);

[LoggerMessage(
EventId = 1,
Level = LogLevel.Debug,
Message = "Unable to match any declared producer for dispatched message type {MessageType}")]
partial void LogUnmatchedProducerForMessageType(Type messageType);

#endregion
}

#if NETSTANDARD2_0

public partial class ProducerByMessageTypeCache<TProducer>
{
partial void LogMatchedProducerForMessageType(Type messageType, Type producerMessageType)
=> _logger.LogDebug("Matched producer for message type {ProducerMessageType} for dispatched message type {MessageType}", producerMessageType, messageType);

partial void LogUnmatchedProducerForMessageType(Type messageType)
=> _logger.LogDebug("Unable to match any declared producer for dispatched message type {MessageType}", messageType);
}

#endif
27 changes: 24 additions & 3 deletions src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
namespace SlimMessageBus.Host;

public abstract class AbstractConsumer : HasProviderExtensions, IAsyncDisposable, IConsumerControl
public abstract partial class AbstractConsumer : HasProviderExtensions, IAsyncDisposable, IConsumerControl
{
protected readonly ILogger Logger;
private readonly SemaphoreSlim _semaphore;
private readonly IReadOnlyList<IAbstractConsumerInterceptor> _interceptors;
private CancellationTokenSource _cancellationTokenSource;
Expand All @@ -10,7 +11,6 @@ public abstract class AbstractConsumer : HasProviderExtensions, IAsyncDisposable

public bool IsStarted { get; private set; }
public string Path { get; }
public ILogger Logger { get; }
public IReadOnlyList<AbstractConsumerSettings> Settings { get; }
protected CancellationToken CancellationToken => _cancellationTokenSource.Token;

Expand Down Expand Up @@ -39,7 +39,7 @@ private async Task<bool> CallInterceptor(Func<IAbstractConsumerInterceptor, Task
}
catch (Exception e)
{
Logger.LogError(e, "Interceptor {Interceptor} failed with error: {Error}", interceptor.GetType().Name, e.Message);
LogInterceptorFailed(interceptor.GetType(), e.Message, e);
}
}
return true;
Expand Down Expand Up @@ -178,4 +178,25 @@ protected async virtual ValueTask DisposeAsyncCore()
}

#endregion

#region Logging

[LoggerMessage(
EventId = 0,
Level = LogLevel.Error,
Message = "Interceptor {InterceptorType} failed with error: {Error}")]
partial void LogInterceptorFailed(Type interceptorType, string error, Exception ex);

#endregion
}


#if NETSTANDARD2_0

public partial class AbstractConsumer
{
partial void LogInterceptorFailed(Type interceptorType, string error, Exception ex)
=> Logger.LogError(ex, "Interceptor {InterceptorType} failed with error: {Error}", interceptorType, error);
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

using System.Diagnostics;

public class CheckpointTrigger : ICheckpointTrigger
public partial class CheckpointTrigger : ICheckpointTrigger
{
private readonly ILogger<CheckpointTrigger> _logger;

Expand Down Expand Up @@ -35,7 +35,6 @@ public static CheckpointValue GetCheckpointValue(HasProviderExtensions settings)
=> new(settings.GetOrDefault(CheckpointSettings.CheckpointCount, CheckpointSettings.CheckpointCountDefault),
settings.GetOrDefault(CheckpointSettings.CheckpointDuration, CheckpointSettings.CheckpointDurationDefault));


#region Implementation of ICheckpointTrigger

public bool IsEnabled
Expand All @@ -53,17 +52,37 @@ public bool Increment()
var enabled = IsEnabled;
if (enabled && _logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Checkpoint triggered after Count: {CheckpointCount}, Duration: {CheckpointDuration} (s)", _lastCheckpointCount, _lastCheckpointDuration.Elapsed.Seconds);
LogCheckpointTriggered(_lastCheckpointCount, _lastCheckpointDuration.Elapsed.Seconds);
}

return enabled;
}

}

public void Reset()
{
_lastCheckpointCount = 0;
_lastCheckpointDuration.Restart();
}

}

#endregion

#region Logging

[LoggerMessage(
EventId = 0,
Level = LogLevel.Debug,
Message = "Checkpoint triggered after Count: {CheckpointCount}, Duration: {CheckpointDuration} (s)")]
partial void LogCheckpointTriggered(int checkpointCount, int checkpointDuration);

#endregion
}

#if NETSTANDARD2_0

public partial class CheckpointTrigger
{
partial void LogCheckpointTriggered(int checkpointCount, int checkpointDuration)
=> _logger.LogDebug("Checkpoint triggered after Count: {CheckpointCount}, Duration: {CheckpointDuration} (s)", checkpointCount, checkpointDuration);
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/// The expectation is that <see cref="IMessageProcessor{TMessage}.ProcessMessage(TMessage)"/> will be executed synchronously (in sequential order) by the caller on which we want to increase amount of concurrent transportMessage being processed.
/// </summary>
/// <typeparam name="TMessage"></typeparam>
public sealed class ConcurrentMessageProcessorDecorator<TMessage> : IMessageProcessor<TMessage>, IDisposable
public sealed partial class ConcurrentMessageProcessorDecorator<TMessage> : IMessageProcessor<TMessage>, IDisposable
{
private readonly ILogger _logger;
private SemaphoreSlim _concurrentSemaphore;
Expand Down Expand Up @@ -87,7 +87,8 @@ private async Task ProcessInBackground(TMessage transportMessage, IReadOnlyDicti
{
try
{
_logger.LogDebug("Entering ProcessMessages for message {MessageType}", typeof(TMessage));
LogEntering(typeof(TMessage));

var r = await _target.ProcessMessage(transportMessage, messageHeaders, consumerContextProperties, currentServiceProvider, cancellationToken).ConfigureAwait(false);
if (r.Exception != null)
{
Expand All @@ -105,10 +106,39 @@ private async Task ProcessInBackground(TMessage transportMessage, IReadOnlyDicti
}
finally
{
_logger.LogDebug("Leaving ProcessMessages for message {MessageType}", typeof(TMessage));
LogLeaving(typeof(TMessage));
_concurrentSemaphore?.Release();

Interlocked.Decrement(ref _pendingCount);
}
}
}

#region Logging

[LoggerMessage(
EventId = 0,
Level = LogLevel.Debug,
Message = "Entering ProcessMessages for message {MessageType}")]
partial void LogEntering(Type messageType);

[LoggerMessage(
EventId = 1,
Level = LogLevel.Debug,
Message = "Leaving ProcessMessages for message {MessageType}")]
partial void LogLeaving(Type messageType);

#endregion
}

#if NETSTANDARD2_0

public partial class ConcurrentMessageProcessorDecorator<TMessage>
{
partial void LogEntering(Type messageType)
=> _logger.LogDebug("Entering ProcessMessages for message {MessageType}", messageType);

partial void LogLeaving(Type messageType)
=> _logger.LogDebug("Leaving ProcessMessages for message {MessageType}", messageType);
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

using SlimMessageBus.Host.Consumer;

public class MessageHandler : IMessageHandler
public partial class MessageHandler : IMessageHandler
{
private readonly ILogger _logger;
private readonly IMessageScopeFactory _messageScopeFactory;
Expand Down Expand Up @@ -31,11 +31,7 @@ public MessageHandler(
string path,
Type consumerErrorHandlerOpenGenericType = null)
{
#if NETSTANDARD2_0
if (messageBus is null) throw new ArgumentNullException(nameof(messageBus));
#else
ArgumentNullException.ThrowIfNull(messageBus);
#endif

_logger = messageBus.LoggerFactory.CreateLogger<MessageHandler>();
_messageScopeFactory = messageScopeFactory;
Expand Down Expand Up @@ -127,7 +123,7 @@ public MessageHandler(
{
if (consumerInvoker.ParentSettings.IsDisposeConsumerEnabled && consumerInstance is IDisposable consumerInstanceDisposable)
{
_logger.LogDebug("Disposing consumer instance {Consumer} of type {ConsumerType}", consumerInstance, consumerType);
LogDisposingConsumer(consumerType, consumerInstance);
consumerInstanceDisposable.DisposeSilently("ConsumerInstance", _logger);
}
}
Expand Down Expand Up @@ -162,7 +158,7 @@ private async Task<ProcessResult> DoHandleError(object message, Type messageType

if (consumerErrorHandler != null)
{
_logger.LogDebug(ex, "Consumer error handler of type {ConsumerErrorHandlerType} will be used to handle the exception during processing of message of type {MessageType}", consumerErrorHandler.GetType(), messageType);
LogConsumerErrorHandlerWillBeUsed(messageType, consumerErrorHandler.GetType(), ex);

var consumerErrorHandlerMethod = RuntimeTypeCache.ConsumerErrorHandlerType[messageType];
errorHandlerResult = await consumerErrorHandlerMethod(consumerErrorHandler, message, consumerContext, ex, attempts).ConfigureAwait(false);
Expand Down Expand Up @@ -211,4 +207,33 @@ public async Task<object> ExecuteConsumer(object message, IConsumerContext consu

return null;
}
}

#region Logging

[LoggerMessage(
EventId = 0,
Level = LogLevel.Debug,
Message = "Disposing consumer instance {Consumer} of type {ConsumerType}")]
partial void LogDisposingConsumer(Type consumerType, object consumer);

[LoggerMessage(
EventId = 1,
Level = LogLevel.Debug,
Message = "Consumer error handler of type {ConsumerErrorHandlerType} will be used to handle the exception during processing of message of type {MessageType}")]
partial void LogConsumerErrorHandlerWillBeUsed(Type messageType, Type consumerErrorHandlerType, Exception ex);

#endregion
}

#if NETSTANDARD2_0

public partial class MessageHandler
{
partial void LogDisposingConsumer(Type consumerType, object consumer)
=> _logger.LogDebug("Disposing consumer instance {Consumer} of type {ConsumerType}", consumer, consumerType);

partial void LogConsumerErrorHandlerWillBeUsed(Type messageType, Type consumerErrorHandlerType, Exception ex)
=> _logger.LogDebug(ex, "Consumer error handler of type {ConsumerErrorHandlerType} will be used to handle the exception during processing of message of type {MessageType}", consumerErrorHandlerType, messageType);
}

#endif
Loading

0 comments on commit 57a5b93

Please sign in to comment.