From 1b9be4190e9f1202f78724ebc44980f45c894368 Mon Sep 17 00:00:00 2001 From: Tomasz Maruszak Date: Sun, 12 Jan 2025 00:46:35 +0100 Subject: [PATCH] High performance logging Signed-off-by: Tomasz Maruszak --- ...ontextAccessorCurrentMessageBusProvider.cs | 28 ++- ...rcuitBreakerAbstractConsumerInterceptor.cs | 6 +- .../AbstractMessageProcessorQueue.cs | 26 ++- .../MemoryMessageBus.cs | 29 ++- .../Collections/KindMapping.cs | 4 +- .../Collections/ProducerByMessageTypeCache.cs | 37 +++- .../Consumer/AbstractConsumer.cs | 27 ++- .../Checkpointing/CheckpointTrigger.cs | 33 ++- .../ConcurrentMessageProcessorDecorator.cs | 38 +++- .../MessageProcessors/MessageHandler.cs | 41 +++- .../MessageProcessors/MessageProcessor.cs | 82 ++++++- .../ResponseMessageProcessor.cs | 70 +++++- .../Helpers/CompatAttributes.cs | 45 ++++ .../Helpers/CompatMethods.cs | 5 +- .../Helpers/CompatRecord.cs | 2 +- .../{ => Helpers}/Retry.cs | 9 - src/SlimMessageBus.Host/Helpers/Utils.cs | 22 +- .../Hybrid/HybridMessageBus.cs | 36 +++- src/SlimMessageBus.Host/MessageBusBase.cs | 204 +++++++++++++++--- .../RequestResponse/PendingRequestManager.cs | 27 ++- .../RequestResponse/PendingRequestState.cs | 9 +- .../Services/MessageHeaderService.cs | 25 ++- ...BreakerAbstractConsumerInterceptorTests.cs | 2 +- .../MemoryMessageBusTests.cs | 31 +++ .../Consumer/AbstractConsumerTests.cs | 53 ++++- .../Hybrid/HybridMessageBusTest.cs | 7 +- 26 files changed, 756 insertions(+), 142 deletions(-) create mode 100644 src/SlimMessageBus.Host/Helpers/CompatAttributes.cs rename src/SlimMessageBus.Host/{ => Helpers}/Retry.cs (88%) diff --git a/src/SlimMessageBus.Host.AspNetCore/HttpContextAccessorCurrentMessageBusProvider.cs b/src/SlimMessageBus.Host.AspNetCore/HttpContextAccessorCurrentMessageBusProvider.cs index ef231f89..2bd0f717 100644 --- a/src/SlimMessageBus.Host.AspNetCore/HttpContextAccessorCurrentMessageBusProvider.cs +++ b/src/SlimMessageBus.Host.AspNetCore/HttpContextAccessorCurrentMessageBusProvider.cs @@ -3,24 +3,46 @@ /// /// Resolves the from the current ASP.NET Core web request (if present, otherwise falls back to the application root container). /// -public class HttpContextAccessorCurrentMessageBusProvider( +public partial class HttpContextAccessorCurrentMessageBusProvider( ILogger logger, IHttpContextAccessor httpContextAccessor, IServiceProvider serviceProvider) : CurrentMessageBusProvider(serviceProvider) { + private readonly ILogger _logger = logger; + public override IMessageBus GetCurrent() { // When the call to resolve the given type is made within an HTTP Request, use the request scope service provider var httpContext = httpContextAccessor?.HttpContext; if (httpContext != null) { - logger.LogTrace("The type IMessageBus will be requested from the per-request scope"); + LogCurrentFrom("request"); return httpContext.RequestServices.GetService(); } // otherwise use the app wide scope provider - logger.LogTrace("The type IMessageBus will be requested from the app scope"); + LogCurrentFrom("root"); return base.GetCurrent(); } + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Trace, + Message = "The type IMessageBus will be requested from the {ScopeName} scope")] + private partial void LogCurrentFrom(string scopeName); + + #endregion } + +#if NETSTANDARD2_0 + +public partial class HttpContextAccessorCurrentMessageBusProvider +{ + private partial void LogCurrentFrom(string scopeName) + => _logger.LogTrace("The type IMessageBus will be requested from the {ScopeName} scope", scopeName); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host.CircuitBreaker/Implementation/CircuitBreakerAbstractConsumerInterceptor.cs b/src/SlimMessageBus.Host.CircuitBreaker/Implementation/CircuitBreakerAbstractConsumerInterceptor.cs index 30658f06..f46f14d4 100644 --- a/src/SlimMessageBus.Host.CircuitBreaker/Implementation/CircuitBreakerAbstractConsumerInterceptor.cs +++ b/src/SlimMessageBus.Host.CircuitBreaker/Implementation/CircuitBreakerAbstractConsumerInterceptor.cs @@ -3,7 +3,7 @@ /// /// Circuit breaker to toggle consumer status on an external events. /// -internal sealed class CircuitBreakerAbstractConsumerInterceptor : IAbstractConsumerInterceptor +internal sealed class CircuitBreakerAbstractConsumerInterceptor(ILogger logger) : IAbstractConsumerInterceptor { public int Order => 100; @@ -33,12 +33,12 @@ async Task BreakerChanged(Circuit state) var bus = consumer.Settings[0].MessageBusSettings.Name ?? "default"; if (shouldPause) { - consumer.Logger.LogWarning("Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.", path, bus); + logger.LogWarning("Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.", path, bus); await consumer.DoStop().ConfigureAwait(false); } else { - consumer.Logger.LogInformation("Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.", path, bus); + logger.LogInformation("Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.", path, bus); await consumer.DoStart().ConfigureAwait(false); } consumer.SetIsPaused(shouldPause); diff --git a/src/SlimMessageBus.Host.Memory/Consumers/AbstractMessageProcessorQueue.cs b/src/SlimMessageBus.Host.Memory/Consumers/AbstractMessageProcessorQueue.cs index 4ceb8ce0..d634c221 100644 --- a/src/SlimMessageBus.Host.Memory/Consumers/AbstractMessageProcessorQueue.cs +++ b/src/SlimMessageBus.Host.Memory/Consumers/AbstractMessageProcessorQueue.cs @@ -1,7 +1,9 @@ namespace SlimMessageBus.Host.Memory; -public abstract class AbstractMessageProcessorQueue(IMessageProcessor messageProcessor, ILogger logger) : IMessageProcessorQueue +public abstract partial class AbstractMessageProcessorQueue(IMessageProcessor messageProcessor, ILogger logger) : IMessageProcessorQueue { + private readonly ILogger _logger = logger; + public abstract void Enqueue(object transportMessage, IReadOnlyDictionary messageHeaders); protected async Task ProcessMessage(object transportMessage, IReadOnlyDictionary messageHeaders, CancellationToken cancellationToken) @@ -23,7 +25,27 @@ protected async Task ProcessMessage(object transportMessage, IReadOnlyDictionary if (r.Exception != null) { // We rely on the IMessageProcessor to execute the ConsumerErrorHandler, but if it's not registered in the DI, it fails, or there is another fatal error then the message will be lost. - logger.LogError(r.Exception, "Error processing message {Message} of type {MessageType}", transportMessage, transportMessage.GetType()); + LogMessageError(transportMessage, transportMessage.GetType(), r.Exception); } } + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Error, + Message = "Error processing message {TransportMessage} of type {TransportMessageType}")] + private partial void LogMessageError(object transportMessage, Type transportMessageType, Exception e); + + #endregion } + +#if NETSTANDARD2_0 + +public abstract partial class AbstractMessageProcessorQueue +{ + private partial void LogMessageError(object transportMessage, Type transportMessageType, Exception e) + => _logger.LogError(e, "Error processing message {TransportMessage} of type {TransportMessageType}", transportMessage, transportMessageType); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs b/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs index 20b36e1a..44d1a7af 100644 --- a/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs +++ b/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs @@ -5,7 +5,7 @@ /// /// In-memory message bus implementation to use for in process message passing. /// -public class MemoryMessageBus : MessageBusBase +public partial class MemoryMessageBus : MessageBusBase { private readonly ILogger _logger; private IDictionary> _messageProcessorByPath; @@ -63,11 +63,8 @@ public override IDictionary CreateHeaders() public override bool IsMessageScopeEnabled(ConsumerSettings consumerSettings, IDictionary consumerContextProperties) { -#if NETSTANDARD2_0 if (consumerSettings is null) throw new ArgumentNullException(nameof(consumerSettings)); -#else - ArgumentNullException.ThrowIfNull(consumerSettings); -#endif + if (consumerContextProperties != null && consumerContextProperties.ContainsKey(MemoryMessageBusProperties.CreateScope)) { return true; @@ -133,7 +130,7 @@ private async Task ProduceInternal(object me path ??= GetDefaultPath(producerSettings.MessageType, producerSettings); if (!_messageProcessorByPath.TryGetValue(path, out var messageProcessor)) { - _logger.LogDebug("No consumers interested in message type {MessageType} on path {Path}", messageType, path); + LogNoConsumerInterestedInMessageType(path, messageType); return default; } @@ -165,4 +162,24 @@ private async Task ProduceInternal(object me } return (TResponseMessage)r.Response; } + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Debug, + Message = "No consumers interested in message type {MessageType} on path {Path}")] + private partial void LogNoConsumerInterestedInMessageType(string path, Type messageType); + + #endregion } + +#if NETSTANDARD2_0 + +public partial class MemoryMessageBus +{ + private partial void LogNoConsumerInterestedInMessageType(string path, Type messageType) + => _logger.LogDebug("No consumers interested in message type {MessageType} on path {Path}", messageType, path); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Collections/KindMapping.cs b/src/SlimMessageBus.Host/Collections/KindMapping.cs index 21acf909..a14f6bb5 100644 --- a/src/SlimMessageBus.Host/Collections/KindMapping.cs +++ b/src/SlimMessageBus.Host/Collections/KindMapping.cs @@ -2,8 +2,8 @@ public class KindMapping { - private readonly Dictionary _kindByTopic = new(); - private readonly Dictionary _kindByMessageType = new(); + private readonly Dictionary _kindByTopic = []; + private readonly Dictionary _kindByMessageType = []; public void Configure(MessageBusSettings settings) { diff --git a/src/SlimMessageBus.Host/Collections/ProducerByMessageTypeCache.cs b/src/SlimMessageBus.Host/Collections/ProducerByMessageTypeCache.cs index ff6e2bcb..614416a2 100644 --- a/src/SlimMessageBus.Host/Collections/ProducerByMessageTypeCache.cs +++ b/src/SlimMessageBus.Host/Collections/ProducerByMessageTypeCache.cs @@ -5,7 +5,7 @@ /// The message type hierarchy is discovered at runtime and cached for faster access. /// /// The producer type -public class ProducerByMessageTypeCache : IReadOnlyCache +public partial class ProducerByMessageTypeCache : IReadOnlyCache where TProducer : class { private readonly ILogger _logger; @@ -37,7 +37,7 @@ private TProducer CalculateProducer(Type messageType) var assignableProducer = assignableProducers.FirstOrDefault(); if (assignableProducer.Key != null) { - _logger.LogDebug("Matched producer for message type {ProducerMessageType} for dispatched message type {MessageType}", assignableProducer.Key, messageType); + LogMatchedProducerForMessageType(messageType, assignableProducer.Key); return assignableProducer.Value; } @@ -52,7 +52,7 @@ private TProducer CalculateProducer(Type messageType) } } - _logger.LogDebug("Unable to match any declared producer for dispatched message type {MessageType}", messageType); + LogUnmatchedProducerForMessageType(messageType); // Note: Nulls are also added to dictionary, so that we don't look them up using reflection next time (cached). return null; @@ -74,4 +74,33 @@ private static int CalculateBaseClassDistance(Type type, Type baseType) return distance; } -} \ No newline at end of file + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Debug, + Message = "Matched producer for message type {ProducerMessageType} for dispatched message type {MessageType}")] + private partial void LogMatchedProducerForMessageType(Type messageType, Type producerMessageType); + + [LoggerMessage( + EventId = 1, + Level = LogLevel.Debug, + Message = "Unable to match any declared producer for dispatched message type {MessageType}")] + private partial void LogUnmatchedProducerForMessageType(Type messageType); + + #endregion +} + +#if NETSTANDARD2_0 + +public partial class ProducerByMessageTypeCache +{ + private partial void LogMatchedProducerForMessageType(Type messageType, Type producerMessageType) + => _logger.LogDebug("Matched producer for message type {ProducerMessageType} for dispatched message type {MessageType}", producerMessageType, messageType); + + private partial void LogUnmatchedProducerForMessageType(Type messageType) + => _logger.LogDebug("Unable to match any declared producer for dispatched message type {MessageType}", messageType); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs b/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs index c5c8d736..364485c0 100644 --- a/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs +++ b/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs @@ -1,7 +1,8 @@ namespace SlimMessageBus.Host; -public abstract class AbstractConsumer : HasProviderExtensions, IAsyncDisposable, IConsumerControl +public abstract partial class AbstractConsumer : HasProviderExtensions, IAsyncDisposable, IConsumerControl { + protected readonly ILogger Logger; private readonly SemaphoreSlim _semaphore; private readonly IReadOnlyList _interceptors; private CancellationTokenSource _cancellationTokenSource; @@ -10,7 +11,6 @@ public abstract class AbstractConsumer : HasProviderExtensions, IAsyncDisposable public bool IsStarted { get; private set; } public string Path { get; } - public ILogger Logger { get; } public IReadOnlyList Settings { get; } protected CancellationToken CancellationToken => _cancellationTokenSource.Token; @@ -39,7 +39,7 @@ private async Task CallInterceptor(Func Logger.LogError(ex, "Interceptor {InterceptorType} failed with error: {Error}", interceptorType, error); } + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/Checkpointing/CheckpointTrigger.cs b/src/SlimMessageBus.Host/Consumer/Checkpointing/CheckpointTrigger.cs index 49f474a9..7787b853 100644 --- a/src/SlimMessageBus.Host/Consumer/Checkpointing/CheckpointTrigger.cs +++ b/src/SlimMessageBus.Host/Consumer/Checkpointing/CheckpointTrigger.cs @@ -2,7 +2,7 @@ using System.Diagnostics; -public class CheckpointTrigger : ICheckpointTrigger +public partial class CheckpointTrigger : ICheckpointTrigger { private readonly ILogger _logger; @@ -35,7 +35,6 @@ public static CheckpointValue GetCheckpointValue(HasProviderExtensions settings) => new(settings.GetOrDefault(CheckpointSettings.CheckpointCount, CheckpointSettings.CheckpointCountDefault), settings.GetOrDefault(CheckpointSettings.CheckpointDuration, CheckpointSettings.CheckpointDurationDefault)); - #region Implementation of ICheckpointTrigger public bool IsEnabled @@ -53,17 +52,37 @@ public bool Increment() var enabled = IsEnabled; if (enabled && _logger.IsEnabled(LogLevel.Debug)) { - _logger.LogDebug("Checkpoint triggered after Count: {CheckpointCount}, Duration: {CheckpointDuration} (s)", _lastCheckpointCount, _lastCheckpointDuration.Elapsed.Seconds); + LogCheckpointTriggered(_lastCheckpointCount, _lastCheckpointDuration.Elapsed.Seconds); } return enabled; - } - + } + public void Reset() { _lastCheckpointCount = 0; _lastCheckpointDuration.Restart(); - } - + } + + #endregion + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Debug, + Message = "Checkpoint triggered after Count: {CheckpointCount}, Duration: {CheckpointDuration} (s)")] + private partial void LogCheckpointTriggered(int checkpointCount, int checkpointDuration); + #endregion } + +#if NETSTANDARD2_0 + +public partial class CheckpointTrigger +{ + private partial void LogCheckpointTriggered(int checkpointCount, int checkpointDuration) + => _logger.LogDebug("Checkpoint triggered after Count: {CheckpointCount}, Duration: {CheckpointDuration} (s)", checkpointCount, checkpointDuration); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs index 0fba6f97..5c0230ae 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs @@ -5,7 +5,7 @@ /// The expectation is that will be executed synchronously (in sequential order) by the caller on which we want to increase amount of concurrent transportMessage being processed. /// /// -public sealed class ConcurrentMessageProcessorDecorator : IMessageProcessor, IDisposable +public sealed partial class ConcurrentMessageProcessorDecorator : IMessageProcessor, IDisposable { private readonly ILogger _logger; private SemaphoreSlim _concurrentSemaphore; @@ -87,7 +87,8 @@ private async Task ProcessInBackground(TMessage transportMessage, IReadOnlyDicti { try { - _logger.LogDebug("Entering ProcessMessages for message {MessageType}", typeof(TMessage)); + LogEntering(typeof(TMessage)); + var r = await _target.ProcessMessage(transportMessage, messageHeaders, consumerContextProperties, currentServiceProvider, cancellationToken).ConfigureAwait(false); if (r.Exception != null) { @@ -105,10 +106,39 @@ private async Task ProcessInBackground(TMessage transportMessage, IReadOnlyDicti } finally { - _logger.LogDebug("Leaving ProcessMessages for message {MessageType}", typeof(TMessage)); + LogLeaving(typeof(TMessage)); _concurrentSemaphore?.Release(); Interlocked.Decrement(ref _pendingCount); } } -} \ No newline at end of file + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Debug, + Message = "Entering ProcessMessages for message {MessageType}")] + private partial void LogEntering(Type messageType); + + [LoggerMessage( + EventId = 1, + Level = LogLevel.Debug, + Message = "Leaving ProcessMessages for message {MessageType}")] + private partial void LogLeaving(Type messageType); + + #endregion +} + +#if NETSTANDARD2_0 + +public partial class ConcurrentMessageProcessorDecorator +{ + private partial void LogEntering(Type messageType) + => _logger.LogDebug("Entering ProcessMessages for message {MessageType}", messageType); + + private partial void LogLeaving(Type messageType) + => _logger.LogDebug("Leaving ProcessMessages for message {MessageType}", messageType); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs index 62d873ae..e785d199 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs @@ -2,7 +2,7 @@ using SlimMessageBus.Host.Consumer; -public class MessageHandler : IMessageHandler +public partial class MessageHandler : IMessageHandler { private readonly ILogger _logger; private readonly IMessageScopeFactory _messageScopeFactory; @@ -31,11 +31,7 @@ public MessageHandler( string path, Type consumerErrorHandlerOpenGenericType = null) { -#if NETSTANDARD2_0 if (messageBus is null) throw new ArgumentNullException(nameof(messageBus)); -#else - ArgumentNullException.ThrowIfNull(messageBus); -#endif _logger = messageBus.LoggerFactory.CreateLogger(); _messageScopeFactory = messageScopeFactory; @@ -127,7 +123,7 @@ public MessageHandler( { if (consumerInvoker.ParentSettings.IsDisposeConsumerEnabled && consumerInstance is IDisposable consumerInstanceDisposable) { - _logger.LogDebug("Disposing consumer instance {Consumer} of type {ConsumerType}", consumerInstance, consumerType); + LogDisposingConsumer(consumerType, consumerInstance); consumerInstanceDisposable.DisposeSilently("ConsumerInstance", _logger); } } @@ -162,7 +158,7 @@ private async Task DoHandleError(object message, Type messageType if (consumerErrorHandler != null) { - _logger.LogDebug(ex, "Consumer error handler of type {ConsumerErrorHandlerType} will be used to handle the exception during processing of message of type {MessageType}", consumerErrorHandler.GetType(), messageType); + LogConsumerErrorHandlerWillBeUsed(messageType, consumerErrorHandler.GetType(), ex); var consumerErrorHandlerMethod = RuntimeTypeCache.ConsumerErrorHandlerType[messageType]; errorHandlerResult = await consumerErrorHandlerMethod(consumerErrorHandler, message, consumerContext, ex, attempts).ConfigureAwait(false); @@ -211,4 +207,33 @@ public async Task ExecuteConsumer(object message, IConsumerContext consu return null; } -} \ No newline at end of file + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Debug, + Message = "Disposing consumer instance {Consumer} of type {ConsumerType}")] + private partial void LogDisposingConsumer(Type consumerType, object consumer); + + [LoggerMessage( + EventId = 1, + Level = LogLevel.Debug, + Message = "Consumer error handler of type {ConsumerErrorHandlerType} will be used to handle the exception during processing of message of type {MessageType}")] + private partial void LogConsumerErrorHandlerWillBeUsed(Type messageType, Type consumerErrorHandlerType, Exception ex); + + #endregion +} + +#if NETSTANDARD2_0 + +public partial class MessageHandler +{ + private partial void LogDisposingConsumer(Type consumerType, object consumer) + => _logger.LogDebug("Disposing consumer instance {Consumer} of type {ConsumerType}", consumer, consumerType); + + private partial void LogConsumerErrorHandlerWillBeUsed(Type messageType, Type consumerErrorHandlerType, Exception ex) + => _logger.LogDebug(ex, "Consumer error handler of type {ConsumerErrorHandlerType} will be used to handle the exception during processing of message of type {MessageType}", consumerErrorHandlerType, messageType); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs index 74f19aa2..47d4b727 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs @@ -6,7 +6,7 @@ /// Implementation of that performs orchestration around processing of a new message using an instance of the declared consumer ( or interface). /// /// -public class MessageProcessor : MessageHandler, IMessageProcessor +public partial class MessageProcessor : MessageHandler, IMessageProcessor { private readonly ILogger _logger; private readonly MessageProvider _messageProvider; @@ -131,14 +131,14 @@ public async virtual Task ProcessMessage(TTransportMessage } catch (Exception e) { - _logger.LogDebug(e, "Processing of the message {TransportMessage} of type {MessageType} failed", transportMessage, messageType); + LogProcessingMessageFailedTypeKnown(transportMessage, messageType, e); lastException ??= e; } } } catch (Exception e) { - _logger.LogDebug(e, "Processing of the message {TransportMessage} failed", transportMessage); + LogProcessingMessageFailed(transportMessage, e); lastException = e; } return new(result, lastException, lastException != null ? lastConsumerInvoker?.ParentSettings : null, lastResponse); @@ -151,7 +151,7 @@ protected Type GetMessageType(IReadOnlyDictionary headers) var messageType = MessageTypeResolver.ToType(messageTypeName); if (messageType != null) { - _logger.LogDebug("Message type {MessageType} was declared in the message header", messageType); + LogMessageTypeDeclaredInHeader(messageType); return messageType; } @@ -164,11 +164,11 @@ protected Type GetMessageType(IReadOnlyDictionary headers) if (_singleInvoker != null) { - _logger.LogDebug("No message type header was present, defaulting to the only declared message type {MessageType}", _singleInvoker.MessageType); + LogMessageTypeHeaderMissingAndDefaulting(_singleInvoker.MessageType); return _singleInvoker.MessageType; } - _logger.LogDebug("No message type header was present in the message header, multiple consumer types declared therefore cannot infer the message type"); + LogNoMessageTypeHeaderPresent(); if (_shouldFailWhenUnrecognizedMessageType) { @@ -198,7 +198,8 @@ protected IEnumerable TryMatchConsumerInvok { if (_shouldLogWhenUnrecognizedMessageType) { - _logger.LogInformation("The message on path {Path} declared {HeaderName} header of type {MessageType}, but none of the known consumer types {ConsumerTypes} was able to handle it", Path, MessageHeaders.MessageType, messageType, string.Join(",", _invokers.Select(x => x.ConsumerType.Name))); + var consumerTypes = string.Join(",", _invokers.Select(x => x.ConsumerType.Name)); + LogNoConsumerTypeMatched(messageType, Path, MessageHeaders.MessageType, consumerTypes); } if (_shouldFailWhenUnrecognizedMessageType) @@ -208,4 +209,69 @@ protected IEnumerable TryMatchConsumerInvok } } } -} \ No newline at end of file + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Debug, + Message = "Processing of the message {TransportMessage} of type {MessageType} failed")] + private partial void LogProcessingMessageFailedTypeKnown(TTransportMessage transportMessage, Type messageType, Exception e); + + [LoggerMessage( + EventId = 1, + Level = LogLevel.Debug, + Message = "Processing of the message {TransportMessage} failed")] + private partial void LogProcessingMessageFailed(TTransportMessage transportMessage, Exception e); + + [LoggerMessage( + EventId = 2, + Level = LogLevel.Debug, + Message = "Message type {MessageType} was declared in the message header")] + private partial void LogMessageTypeDeclaredInHeader(Type messageType); + + [LoggerMessage( + EventId = 3, + Level = LogLevel.Debug, + Message = "No message type header was present, defaulting to the only declared message type {MessageType}")] + private partial void LogMessageTypeHeaderMissingAndDefaulting(Type messageType); + + [LoggerMessage( + EventId = 4, + Level = LogLevel.Debug, + Message = "No message type header was present in the message header, multiple consumer types declared therefore cannot infer the message type")] + private partial void LogNoMessageTypeHeaderPresent(); + + [LoggerMessage( + EventId = 5, + Level = LogLevel.Information, + Message = "The message on path {Path} declared {HeaderName} header of type {MessageType}, but none of the known consumer types {ConsumerTypes} was able to handle it")] + private partial void LogNoConsumerTypeMatched(Type messageType, string path, string headerName, string consumerTypes); + + #endregion +} + +#if NETSTANDARD2_0 + +public partial class MessageProcessor +{ + private partial void LogProcessingMessageFailedTypeKnown(TTransportMessage transportMessage, Type messageType, Exception e) + => _logger.LogDebug(e, "Processing of the message {TransportMessage} of type {MessageType} failed", transportMessage, messageType); + + private partial void LogProcessingMessageFailed(TTransportMessage transportMessage, Exception e) + => _logger.LogDebug(e, "Processing of the message {TransportMessage} failed", transportMessage); + + private partial void LogMessageTypeDeclaredInHeader(Type messageType) + => _logger.LogDebug("Message type {MessageType} was declared in the message header", messageType); + + private partial void LogMessageTypeHeaderMissingAndDefaulting(Type messageType) + => _logger.LogDebug("No message type header was present, defaulting to the only declared message type {MessageType}", messageType); + + private partial void LogNoMessageTypeHeaderPresent() + => _logger.LogDebug("No message type header was present in the message header, multiple consumer types declared therefore cannot infer the message type"); + + private partial void LogNoConsumerTypeMatched(Type messageType, string path, string headerName, string consumerTypes) + => _logger.LogInformation("The message on path {Path} declared {HeaderName} header of type {MessageType}, but none of the known consumer types {ConsumerTypes} was able to handle it", path, headerName, messageType, consumerTypes); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs index 2290b8f8..b1f02b40 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs @@ -6,7 +6,7 @@ public abstract class ResponseMessageProcessor; /// The implementation that processes the responses arriving to the bus. /// /// -public class ResponseMessageProcessor : ResponseMessageProcessor, IMessageProcessor +public partial class ResponseMessageProcessor : ResponseMessageProcessor, IMessageProcessor { private readonly ILogger _logger; private readonly RequestResponseSettings _requestResponseSettings; @@ -42,7 +42,7 @@ public Task ProcessMessage(TTransportMessage transportMess } catch (Exception e) { - _logger.LogError(e, "Error occurred while consuming response message, {Message}", transportMessage); + LogErrorConsumingResponse(transportMessage, e); // We can only continue and process all messages in the lease ex = e; } @@ -68,7 +68,7 @@ private Exception OnResponseArrived(TTransportMessage transportMessage, string p var requestState = _pendingRequestStore.GetById(requestId); if (requestState == null) { - _logger.LogDebug("The response message for request id {RequestId} arriving on path {Path} will be disregarded. Either the request had already expired, had been cancelled or it was already handled (this response message is a duplicate).", requestId, path); + LogResponseWillBeDiscarded(path, requestId); // ToDo: add and API hook to these kind of situation return null; } @@ -77,8 +77,8 @@ private Exception OnResponseArrived(TTransportMessage transportMessage, string p { if (_logger.IsEnabled(LogLevel.Debug)) { - var tookTimespan = _currentTimeProvider.CurrentTime.Subtract(requestState.Created); - _logger.LogDebug("Response arrived for {Request} on path {Path} (time: {RequestTime} ms)", requestState, path, tookTimespan); + var requestTime = _currentTimeProvider.CurrentTime.Subtract(requestState.Created); + LogResponseArrived(path, requestState, requestTime); } if (responseHeaders.TryGetHeader(ReqRespMessageHeaders.Error, out string errorMessage)) @@ -86,7 +86,7 @@ private Exception OnResponseArrived(TTransportMessage transportMessage, string p // error response arrived var responseException = new RequestHandlerFaultedMessageBusException(errorMessage); - _logger.LogDebug(responseException, "Response arrived for {Request} on path {Path} with error: {ResponseError}", requestState, path, responseException.Message); + LogResponseArrivedWithError(path, requestState, responseException, responseException.Message); requestState.TaskCompletionSource.TrySetException(responseException); } else @@ -104,7 +104,7 @@ private Exception OnResponseArrived(TTransportMessage transportMessage, string p } catch (Exception e) { - _logger.LogDebug(e, "Could not deserialize the response message for {Request} arriving on path {Path}", requestState, path); + LogResponseCouldNotDeserialize(path, requestState, e); requestState.TaskCompletionSource.TrySetException(e); } } @@ -117,4 +117,60 @@ private Exception OnResponseArrived(TTransportMessage transportMessage, string p return null; } + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Error, + Message = "Error occurred while consuming response message, {Message}")] + private partial void LogErrorConsumingResponse(TTransportMessage message, Exception e); + + [LoggerMessage( + EventId = 1, + Level = LogLevel.Debug, + Message = "The response message for request id {RequestId} arriving on path {Path} will be disregarded. Either the request had already expired, had been cancelled or it was already handled (this response message is a duplicate).")] + private partial void LogResponseWillBeDiscarded(string path, string requestId); + + [LoggerMessage( + EventId = 2, + Level = LogLevel.Debug, + Message = "Response arrived for {RequestState} on path {Path} (time: {RequestTime} ms)")] + private partial void LogResponseArrived(string path, PendingRequestState requestState, TimeSpan requestTime); + + [LoggerMessage( + EventId = 3, + Level = LogLevel.Debug, + Message = "Response arrived for {RequestState} on path {Path} with error: {ResponseError}")] + private partial void LogResponseArrivedWithError(string path, PendingRequestState requestState, Exception e, string responseError); + + [LoggerMessage( + EventId = 4, + Level = LogLevel.Debug, + Message = "Could not deserialize the response message for {RequestState} arriving on path {Path}")] + private partial void LogResponseCouldNotDeserialize(string path, PendingRequestState requestState, Exception e); + + #endregion +} + +#if NETSTANDARD2_0 + +public partial class ResponseMessageProcessor +{ + private partial void LogErrorConsumingResponse(TTransportMessage message, Exception e) + => _logger.LogError(e, "Error occurred while consuming response message, {Message}", message); + + private partial void LogResponseWillBeDiscarded(string path, string requestId) + => _logger.LogDebug("The response message for request id {RequestId} arriving on path {Path} will be disregarded. Either the request had already expired, had been cancelled or it was already handled (this response message is a duplicate).", requestId, path); + + private partial void LogResponseArrived(string path, PendingRequestState requestState, TimeSpan requestTime) + => _logger.LogDebug("Response arrived for {RequestState} on path {Path} (time: {RequestTime} ms)", requestState, path, requestTime); + + private partial void LogResponseArrivedWithError(string path, PendingRequestState requestState, Exception e, string responseError) + => _logger.LogDebug(e, "Response arrived for {RequestState} on path {Path} with error: {ResponseError}", requestState, path, responseError); + + private partial void LogResponseCouldNotDeserialize(string path, PendingRequestState requestState, Exception e) + => _logger.LogDebug(e, "Could not deserialize the response message for {RequestState} arriving on path {Path}", requestState, path); } + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Helpers/CompatAttributes.cs b/src/SlimMessageBus.Host/Helpers/CompatAttributes.cs new file mode 100644 index 00000000..3e322b75 --- /dev/null +++ b/src/SlimMessageBus.Host/Helpers/CompatAttributes.cs @@ -0,0 +1,45 @@ +#if NETSTANDARD2_0 + +namespace SlimMessageBus.Host; + +[AttributeUsage(AttributeTargets.Method)] +public class LoggerMessageAttribute : Attribute +{ + /// + /// Gets the logging event id for the logging method. + /// + public int EventId { get; set; } = -1; + + /// + /// Gets or sets the logging event name for the logging method. + /// + /// + /// This will equal the method name if not specified. + /// + public string EventName { get; set; } + + /// + /// Gets the logging level for the logging method. + /// + public LogLevel Level { get; set; } = LogLevel.None; + + /// + /// Gets the message text for the logging method. + /// + public string Message { get; set; } = ""; + + /// + /// Gets the flag to skip IsEnabled check for the logging method. + /// + public bool SkipEnabledCheck { get; set; } + + public LoggerMessageAttribute() + { + } + + public LoggerMessageAttribute(int eventId, LogLevel level, string message) + { + } +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Helpers/CompatMethods.cs b/src/SlimMessageBus.Host/Helpers/CompatMethods.cs index 145d8865..d02ebaa3 100644 --- a/src/SlimMessageBus.Host/Helpers/CompatMethods.cs +++ b/src/SlimMessageBus.Host/Helpers/CompatMethods.cs @@ -25,8 +25,6 @@ public static bool TryAdd(this IDictionary dict, K key, V value) public static HashSet ToHashSet(this IEnumerable items) => new(items); -#if NETSTANDARD2_0 - public static IEnumerable> Chunk(this IEnumerable items, int size) { var chunk = new List(size); @@ -50,7 +48,6 @@ public static IEnumerable> Chunk(this IEnumerable i } } -#endif } public static class TimeSpanExtensions @@ -59,4 +56,4 @@ public static TimeSpan Multiply(this TimeSpan timeSpan, double factor) => TimeSpan.FromMilliseconds(timeSpan.TotalMilliseconds * factor); } -#endif +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Helpers/CompatRecord.cs b/src/SlimMessageBus.Host/Helpers/CompatRecord.cs index 56a4fc7d..118d95fe 100644 --- a/src/SlimMessageBus.Host/Helpers/CompatRecord.cs +++ b/src/SlimMessageBus.Host/Helpers/CompatRecord.cs @@ -1,4 +1,4 @@ -#if NETSTANDARD2_0 || NETSTANDARD2_1 || NETCOREAPP2_0 || NETCOREAPP2_1 || NETCOREAPP2_2 || NETCOREAPP3_0 || NETCOREAPP3_1 || NET45 || NET451 || NET452 || NET46 || NET461 || NET462 || NET47 || NET471 || NET472 || NET48 +#if NETSTANDARD2_0 // See https://github.com/dotnet/roslyn/issues/45510#issuecomment-725091019 diff --git a/src/SlimMessageBus.Host/Retry.cs b/src/SlimMessageBus.Host/Helpers/Retry.cs similarity index 88% rename from src/SlimMessageBus.Host/Retry.cs rename to src/SlimMessageBus.Host/Helpers/Retry.cs index 5c58579d..b56cbdc7 100644 --- a/src/SlimMessageBus.Host/Retry.cs +++ b/src/SlimMessageBus.Host/Helpers/Retry.cs @@ -6,17 +6,8 @@ public static class Retry public static async Task WithDelay(Func operation, Func shouldRetry, TimeSpan? delay, TimeSpan? jitter = default, CancellationToken cancellationToken = default) { -#if NETSTANDARD2_0 if (operation is null) throw new ArgumentNullException(nameof(operation)); -#else - ArgumentNullException.ThrowIfNull(operation); -#endif - -#if NETSTANDARD2_0 if (shouldRetry is null) throw new ArgumentNullException(nameof(shouldRetry)); -#else - ArgumentNullException.ThrowIfNull(shouldRetry); -#endif var attempt = 0; do diff --git a/src/SlimMessageBus.Host/Helpers/Utils.cs b/src/SlimMessageBus.Host/Helpers/Utils.cs index 8d1cd9a1..8d4c544d 100644 --- a/src/SlimMessageBus.Host/Helpers/Utils.cs +++ b/src/SlimMessageBus.Host/Helpers/Utils.cs @@ -30,22 +30,18 @@ public static async ValueTask DisposeSilently(this IAsyncDisposable disposable, } public static void DisposeSilently(this IDisposable disposable, string name, ILogger logger) - { - disposable.DisposeSilently(e => logger.LogWarning(e, "Error occurred while disposing {Name}", name)); - } - - public static void DisposeSilently(this IDisposable disposable, Func nameFunc, ILogger logger) - { - disposable.DisposeSilently(e => logger.LogWarning(e, "Error occurred while disposing {Name}", nameFunc())); - } + => disposable.DisposeSilently(e => logger.LogWarning(e, "Error occurred while disposing {Name}", name)); public static ValueTask DisposeSilently(this IAsyncDisposable disposable, Func nameFunc, ILogger logger) - { - return disposable.DisposeSilently(e => logger.LogWarning(e, "Error occurred while disposing {Name}", nameFunc())); - } + => disposable.DisposeSilently(e => logger.LogWarning(e, "Error occurred while disposing {Name}", nameFunc())); public static ValueTask DisposeSilently(this IAsyncDisposable disposable, string name, ILogger logger) + => disposable.DisposeSilently(e => logger.LogWarning(e, "Error occurred while disposing {Name}", name)); + + public static string JoinOrSingle(this T[] values, Func selector, string separator = ",") => values.Length switch { - return disposable.DisposeSilently(e => logger.LogWarning(e, "Error occurred while disposing {Name}", name)); - } + 0 => string.Empty, + 1 => selector(values[0]), + _ => string.Join(separator, values.Select(selector)) + }; } \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Hybrid/HybridMessageBus.cs b/src/SlimMessageBus.Host/Hybrid/HybridMessageBus.cs index 0cffd609..66a47ba9 100644 --- a/src/SlimMessageBus.Host/Hybrid/HybridMessageBus.cs +++ b/src/SlimMessageBus.Host/Hybrid/HybridMessageBus.cs @@ -4,7 +4,7 @@ using SlimMessageBus.Host.Serialization; -public class HybridMessageBus : IMasterMessageBus, ICompositeMessageBus, IDisposable, IAsyncDisposable +public partial class HybridMessageBus : IMasterMessageBus, ICompositeMessageBus, IDisposable, IAsyncDisposable { private readonly ILogger _logger; private readonly Dictionary _busByName; @@ -121,7 +121,8 @@ protected virtual MessageBusBase[] Route(object message, string path) { if (_logger.IsEnabled(LogLevel.Debug)) { - _logger.LogDebug("Resolved bus {BusName} for message type: {MessageType} and path {Path}", string.Join(",", buses.Select(x => x.Settings.Name)), messageType, path); + var busName = buses.JoinOrSingle(x => x.Settings.Name); + LogResolvedBus(path, messageType, busName); } return buses; } @@ -134,7 +135,7 @@ protected virtual MessageBusBase[] Route(object message, string path) // Add the message type, so that we only emit warn log once if (ProviderSettings.UndeclaredMessageTypeMode == UndeclaredMessageTypeMode.RaiseOneTimeLog && _undeclaredMessageType.TryAdd(messageType, true)) { - _logger.LogInformation("Could not find any bus that produces the message type: {MessageType}. Messages of that type will not be delivered to any child bus. Double check the message bus configuration.", messageType); + LogCouldNotFindBus(messageType); } return []; @@ -198,4 +199,33 @@ public IMasterMessageBus GetChildBus(string name) public IEnumerable GetChildBuses() => _busByName.Values; #endregion + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Debug, + Message = "Resolved bus {BusName} for message type {MessageType} and path {Path}")] + private partial void LogResolvedBus(string path, Type messageType, string busName); + + [LoggerMessage( + EventId = 1, + Level = LogLevel.Information, + Message = "Could not find any bus that produces the message type {MessageType}. Messages of that type will not be delivered to any child bus. Double check the message bus configuration.")] + private partial void LogCouldNotFindBus(Type messageType); + + #endregion } + +#if NETSTANDARD2_0 + +public partial class HybridMessageBus +{ + private partial void LogResolvedBus(string path, Type messageType, string busName) + => _logger.LogDebug("Resolved bus {BusName} for message type {MessageType} and path {Path}", busName, messageType, path); + + private partial void LogCouldNotFindBus(Type messageType) + => _logger.LogInformation("Could not find any bus that produces the message type {MessageType}. Messages of that type will not be delivered to any child bus. Double check the message bus configuration.", messageType); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/MessageBusBase.cs b/src/SlimMessageBus.Host/MessageBusBase.cs index 02384e3a..827bf499 100644 --- a/src/SlimMessageBus.Host/MessageBusBase.cs +++ b/src/SlimMessageBus.Host/MessageBusBase.cs @@ -12,7 +12,7 @@ public abstract class MessageBusBase(MessageBusSettings setti public TProviderSettings ProviderSettings { get; } = providerSettings ?? throw new ArgumentNullException(nameof(providerSettings)); } -public abstract class MessageBusBase : IDisposable, IAsyncDisposable, +public abstract partial class MessageBusBase : IDisposable, IAsyncDisposable, IMasterMessageBus, IMessageScopeFactory, IMessageHeadersFactory, @@ -111,7 +111,7 @@ protected MessageBusBase(MessageBusSettings settings) protected virtual IMessageSerializer GetSerializer() => Settings.GetSerializer(Settings.ServiceProvider); - protected virtual IMessageBusSettingsValidationService ValidationService { get => new DefaultMessageBusSettingsValidationService(Settings); } + protected virtual IMessageBusSettingsValidationService ValidationService => new DefaultMessageBusSettingsValidationService(Settings); /// /// Called by the provider to initialize the bus. @@ -137,7 +137,7 @@ protected void OnBuildProvider() } catch (Exception e) { - _logger.LogError(e, "Could not auto start consumers"); + LogCouldNotStartConsumers(e); } }); } @@ -191,17 +191,16 @@ public async Task Start() try { - await InitTaskList.EnsureAllFinished(); - - _logger.LogInformation("Starting consumers for {BusName} bus...", Name); + await InitTaskList.EnsureAllFinished().ConfigureAwait(false); + LogStartingConsumers(Name); await OnBusLifecycle(MessageBusLifecycleEventType.Starting).ConfigureAwait(false); - await CreateConsumers(); + await CreateConsumers().ConfigureAwait(false); await OnStart().ConfigureAwait(false); await Task.WhenAll(_consumers.Select(x => x.Start())).ConfigureAwait(false); await OnBusLifecycle(MessageBusLifecycleEventType.Started).ConfigureAwait(false); - _logger.LogInformation("Started consumers for {BusName} bus", Name); + LogStartedConsumers(Name); lock (_startLock) { @@ -230,9 +229,9 @@ public async Task Stop() try { - await InitTaskList.EnsureAllFinished(); + await InitTaskList.EnsureAllFinished().ConfigureAwait(false); - _logger.LogInformation("Stopping consumers for {BusName} bus...", Name); + LogStoppingConsumers(Name); await OnBusLifecycle(MessageBusLifecycleEventType.Stopping).ConfigureAwait(false); await Task.WhenAll(_consumers.Select(x => x.Stop())).ConfigureAwait(false); @@ -240,7 +239,7 @@ public async Task Stop() await DestroyConsumers().ConfigureAwait(false); await OnBusLifecycle(MessageBusLifecycleEventType.Stopped).ConfigureAwait(false); - _logger.LogInformation("Stopped consumers for {BusName} bus", Name); + LogStoppedConsumers(Name); lock (_startLock) { @@ -332,13 +331,13 @@ protected async virtual ValueTask DisposeAsyncCore() protected virtual Task CreateConsumers() { - _logger.LogInformation("Creating consumers for {BusName} bus...", Name); + LogCreatingConsumers(Name); return Task.CompletedTask; } protected async virtual Task DestroyConsumers() { - _logger.LogInformation("Destroying consumers for {BusName} bus...", Name); + LogDestroyingConsumers(Name); foreach (var consumer in _consumers) { @@ -370,7 +369,7 @@ protected virtual string GetDefaultPath(Type messageType, ProducerSettings produ var path = producerSettings.DefaultPath ?? throw new ProducerMessageBusException($"An attempt to produce message of type {messageType} without specifying path, but there was no default path configured. Double check your configuration."); - _logger.LogDebug("Applying default path {Path} for message type {MessageType}", path, messageType); + LogApplyingDefaultPath(messageType, path); return path; } @@ -383,10 +382,10 @@ public abstract Task ProduceToTransport( CancellationToken cancellationToken); protected void OnProduceToTransport(object message, - Type messageType, - string path, - IDictionary messageHeaders) - => _logger.LogDebug("Producing message {Message} of type {MessageType} to path {Path}", message, messageType, path); + Type messageType, + string path, + IDictionary messageHeaders) + => LogProducingMessageToPath(message, messageType, path); public virtual int? MaxMessagesPerTransaction => null; @@ -509,7 +508,7 @@ protected virtual TimeSpan GetDefaultRequestTimeout(Type requestType, ProducerSe if (producerSettings == null) throw new ArgumentNullException(nameof(producerSettings)); var timeout = producerSettings.Timeout ?? Settings.RequestResponse.Timeout; - _logger.LogDebug("Applying default timeout {MessageTimeout} for message type {MessageType}", timeout, requestType); + LogApplyingDefaultTimeout(requestType, timeout); return timeout; } @@ -584,12 +583,12 @@ protected async internal virtual Task SendInternal SendInternal consumerContextProperties, IServiceProvider currentServiceProvider) { var createMessageScope = IsMessageScopeEnabled(consumerSettings, consumerContextProperties); - if (createMessageScope) { - _logger.LogDebug("Creating message scope for {Message} of type {MessageType}", message, message.GetType()); + LogCreatingScope(message, message.GetType()); } return new MessageScopeWrapper(currentServiceProvider ?? Settings.ServiceProvider, createMessageScope); } public virtual Task ProvisionTopology() => Task.CompletedTask; + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Error, + Message = "Could not auto start consumers")] + private partial void LogCouldNotStartConsumers(Exception ex); + + [LoggerMessage( + EventId = 1, + Level = LogLevel.Debug, + Message = "Creating message scope for {Message} of type {MessageType}")] + private partial void LogCreatingScope(object message, Type messageType); + + [LoggerMessage( + EventId = 2, + Level = LogLevel.Debug, + Message = "Publishing of request message failed")] + private partial void LogPublishOfRequestFailed(Exception ex); + + [LoggerMessage( + EventId = 3, + Level = LogLevel.Information, + Message = "Starting consumers for {BusName} bus...")] + private partial void LogStartingConsumers(string busName); + + [LoggerMessage( + EventId = 4, + Level = LogLevel.Information, + Message = "Started consumers for {BusName} bus")] + private partial void LogStartedConsumers(string busName); + + [LoggerMessage( + EventId = 5, + Level = LogLevel.Information, + Message = "Stopping consumers for {BusName} bus...")] + private partial void LogStoppingConsumers(string busName); + + [LoggerMessage( + EventId = 6, + Level = LogLevel.Information, + Message = "Stopped consumers for {BusName} bus")] + private partial void LogStoppedConsumers(string busName); + + [LoggerMessage( + EventId = 7, + Level = LogLevel.Information, + Message = "Creating consumers for {BusName} bus...")] + private partial void LogCreatingConsumers(string busName); + + [LoggerMessage( + EventId = 8, + Level = LogLevel.Information, + Message = "Destroying consumers for {BusName} bus...")] + private partial void LogDestroyingConsumers(string busName); + + [LoggerMessage( + EventId = 9, + Level = LogLevel.Debug, + Message = "Applying default path {Path} for message type {MessageType}")] + private partial void LogApplyingDefaultPath(Type messageType, string path); + + [LoggerMessage( + EventId = 10, + Level = LogLevel.Debug, + Message = "Applying default timeout {MessageTimeout} for message type {MessageType}")] + private partial void LogApplyingDefaultTimeout(Type messageType, TimeSpan messageTimeout); + + [LoggerMessage( + EventId = 11, + Level = LogLevel.Debug, + Message = "Producing message {Message} of type {MessageType} to path {Path}")] + private partial void LogProducingMessageToPath(object message, Type messageType, string path); + + [LoggerMessage( + EventId = 12, + Level = LogLevel.Trace, + Message = "Added to PendingRequests, total is {RequestCount}")] + private partial void LogAddedToPendingRequests(int requestCount); + + [LoggerMessage( + EventId = 13, + Level = LogLevel.Debug, + Message = "Sending request message {MessageType} to path {Path} with reply to {ReplyTo}")] + private partial void LogSendingRequestMessage(string path, Type messageType, string replyTo); + + [LoggerMessage( + EventId = 14, + Level = LogLevel.Debug, + Message = "Skipping sending response {Response} of type {MessageType} as the header {HeaderName} is missing for RequestId: {RequestId}")] + private partial void LogSkippingSendingResponseMessage(string requestId, object response, Type messageType, string headerName); + + [LoggerMessage( + EventId = 15, + Level = LogLevel.Debug, + Message = "Sending the response {Response} of type {MessageType} for RequestId: {RequestId}...")] + private partial void LogSendingResponseMessage(string requestId, object response, Type messageType); + + #endregion +} + +#if NETSTANDARD2_0 +public abstract partial class MessageBusBase +{ + private partial void LogCouldNotStartConsumers(Exception ex) + => _logger.LogError(ex, "Could not auto start consumers"); + + private partial void LogCreatingScope(object message, Type messageType) + => _logger.LogDebug("Creating message scope for {Message} of type {MessageType}", message, messageType); + + private partial void LogPublishOfRequestFailed(Exception ex) + => _logger.LogDebug(ex, "Publishing of request message failed"); + + private partial void LogStartingConsumers(string busName) + => _logger.LogInformation("Starting consumers for {BusName} bus...", busName); + + private partial void LogStartedConsumers(string busName) + => _logger.LogInformation("Started consumers for {BusName} bus", busName); + + private partial void LogStoppingConsumers(string busName) + => _logger.LogInformation("Stopping consumers for {BusName} bus...", busName); + + private partial void LogStoppedConsumers(string busName) + => _logger.LogInformation("Stopped consumers for {BusName} bus", busName); + + private partial void LogCreatingConsumers(string busName) + => _logger.LogInformation("Creating consumers for {BusName} bus...", busName); + + private partial void LogDestroyingConsumers(string busName) + => _logger.LogInformation("Destroying consumers for {BusName} bus...", busName); + + private partial void LogApplyingDefaultPath(Type messageType, string path) + => _logger.LogDebug("Applying default path {Path} for message type {MessageType}", path, messageType); + + private partial void LogApplyingDefaultTimeout(Type messageType, TimeSpan messageTimeout) + => _logger.LogDebug("Applying default timeout {MessageTimeout} for message type {MessageType}", messageTimeout, messageType); + + private partial void LogProducingMessageToPath(object message, Type messageType, string path) + => _logger.LogDebug("Producing message {Message} of type {MessageType} to path {Path}", message, messageType, path); + + private partial void LogAddedToPendingRequests(int requestCount) + => _logger.LogTrace("Added to PendingRequests, total is {RequestCount}", requestCount); + + private partial void LogSendingRequestMessage(string path, Type messageType, string replyTo) + => _logger.LogDebug("Sending request message {MessageType} to path {Path} with reply to {ReplyTo}", messageType, path, replyTo); + + private partial void LogSkippingSendingResponseMessage(string requestId, object response, Type messageType, string headerName) + => _logger.LogDebug("Skipping sending response {Response} of type {MessageType} as the header {HeaderName} is missing for RequestId: {RequestId}", response, messageType, headerName, requestId); + + private partial void LogSendingResponseMessage(string requestId, object response, Type messageType) + => _logger.LogDebug("Sending the response {Response} of type {MessageType} for RequestId: {RequestId}...", response, messageType, requestId); } + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/RequestResponse/PendingRequestManager.cs b/src/SlimMessageBus.Host/RequestResponse/PendingRequestManager.cs index a4731927..f37f0dcb 100644 --- a/src/SlimMessageBus.Host/RequestResponse/PendingRequestManager.cs +++ b/src/SlimMessageBus.Host/RequestResponse/PendingRequestManager.cs @@ -3,7 +3,7 @@ /// /// Manages the pending requests - ensure requests which exceeded the allotted timeout period are removed. /// -public class PendingRequestManager : IPendingRequestManager, IDisposable +public partial class PendingRequestManager : IPendingRequestManager, IDisposable { private readonly ILogger _logger; @@ -85,10 +85,31 @@ public virtual void CleanPendingRequests() if (canceled) { - _logger.LogDebug("Pending request timed-out: {RequestState}, now: {TimeNow}", requestState, now); + LogPendingRequestTimeout(now, requestState); _onRequestTimeout?.Invoke(requestState.Request); } } Store.RemoveAll(requestsToCancel.Select(x => x.Id)); - } + } + + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Debug, + Message = "Pending request timed-out: {RequestState}, now: {TimeNow}")] + private partial void LogPendingRequestTimeout(DateTimeOffset timeNow, PendingRequestState requestState); + + #endregion } + +#if NETSTANDARD2_0 + +public partial class PendingRequestManager +{ + private partial void LogPendingRequestTimeout(DateTimeOffset timeNow, PendingRequestState requestState) + => _logger.LogDebug("Pending request timed-out: {RequestState}, now: {TimeNow}", requestState, timeNow); +} + +#endif diff --git a/src/SlimMessageBus.Host/RequestResponse/PendingRequestState.cs b/src/SlimMessageBus.Host/RequestResponse/PendingRequestState.cs index 96f92fcf..73a0ef56 100644 --- a/src/SlimMessageBus.Host/RequestResponse/PendingRequestState.cs +++ b/src/SlimMessageBus.Host/RequestResponse/PendingRequestState.cs @@ -24,12 +24,5 @@ public PendingRequestState(string id, object request, Type requestType, Type res CancellationToken = cancellationToken; } - #region Overrides of Object - - public override string ToString() - { - return $"Request(Id: {Id}, RequestType: {RequestType}, ResponseType: {ResponseType}, Created: {Created}, Expires: {Expires})"; - } - - #endregion + public override string ToString() => $"Request(Id: {Id}, RequestType: {RequestType}, ResponseType: {ResponseType}, Created: {Created}, Expires: {Expires})"; } \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Services/MessageHeaderService.cs b/src/SlimMessageBus.Host/Services/MessageHeaderService.cs index f6877245..1357c55a 100644 --- a/src/SlimMessageBus.Host/Services/MessageHeaderService.cs +++ b/src/SlimMessageBus.Host/Services/MessageHeaderService.cs @@ -6,7 +6,7 @@ internal interface IMessageHeaderService void AddMessageTypeHeader(object message, IDictionary headers); } -internal class MessageHeaderService : IMessageHeaderService +internal partial class MessageHeaderService : IMessageHeaderService { private readonly ILogger _logger; private readonly MessageBusSettings _settings; @@ -35,14 +35,14 @@ public void AddMessageHeaders(IDictionary messageHeaders, IDicti if (producerSettings.HeaderModifier != null) { // Call header hook - _logger.LogTrace($"Executing producer {nameof(ProducerSettings.HeaderModifier)}"); + LogExecutingHeaderModifier("producer"); producerSettings.HeaderModifier(messageHeaders, message); } if (_settings.HeaderModifier != null) { // Call header hook - _logger.LogTrace($"Executing bus {nameof(MessageBusSettings.HeaderModifier)}"); + LogExecutingHeaderModifier("bus"); _settings.HeaderModifier(messageHeaders, message); } } @@ -54,5 +54,24 @@ public void AddMessageTypeHeader(object message, IDictionary hea headers.SetHeader(MessageHeaders.MessageType, _messageTypeResolver.ToName(message.GetType())); } } + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Trace, + Message = $"Executing {{ConfigLevel}} {nameof(ProducerSettings.HeaderModifier)}")] + private partial void LogExecutingHeaderModifier(string configLevel); + + #endregion +} + +#if NETSTANDARD2_0 + +internal partial class MessageHeaderService +{ + private partial void LogExecutingHeaderModifier(string configLevel) + => _logger.LogTrace($"Executing {{ConfigLevel}} {nameof(ProducerSettings.HeaderModifier)}", configLevel); } +#endif \ No newline at end of file diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/HealthCheckCircuitBreakerAbstractConsumerInterceptorTests.cs b/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/HealthCheckCircuitBreakerAbstractConsumerInterceptorTests.cs index 8003c373..74aa2b9c 100644 --- a/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/HealthCheckCircuitBreakerAbstractConsumerInterceptorTests.cs +++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/HealthCheckCircuitBreakerAbstractConsumerInterceptorTests.cs @@ -59,7 +59,7 @@ public CircuitBreakerAbstractConsumerInterceptorTests() { accessor = new CircuitBreakerAccessor(); - var h = new CircuitBreakerAbstractConsumerInterceptor(); + var h = new CircuitBreakerAbstractConsumerInterceptor(NullLogger.Instance); var serviceCollection = new ServiceCollection(); serviceCollection.TryAddSingleton(accessor); diff --git a/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs b/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs index 6f0316d9..d41a5b10 100644 --- a/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs +++ b/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs @@ -507,6 +507,37 @@ public async Task When_Send_Given_AHandlerThatThrowsException_Then_ExceptionIsBu await act.Should().ThrowAsync(); } } + + [Fact] + public async Task When_Publish_Given_NoConsumerRegistered_Then_NoOp() + { + const string topic = "topic-a"; + + _builder.Produce(x => x.DefaultTopic(topic)); + + var request = new SomeRequest(Guid.NewGuid()); + + // act + await _subject.Value.ProducePublish(request); + + // assert + } + + [Fact] + public async Task When_Send_Given_NoHandlerRegistered_Then_ResponseIsNull() + { + const string topic = "topic-a"; + + _builder.Produce(x => x.DefaultTopic(topic)); + + var request = new SomeRequest(Guid.NewGuid()); + + // act + var response = await _subject.Value.ProduceSend(request); + + // assert + response.Should().BeNull(); + } } public record SomeMessageA(Guid Value); diff --git a/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs b/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs index a05b74cf..280ed2f5 100644 --- a/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs +++ b/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs @@ -1,5 +1,4 @@ -namespace SlimMessageBus.Host.Test.Consumer; - +namespace SlimMessageBus.Host.Test.Consumer; public class AbstractConsumerTests { private class TestConsumer(ILogger logger, IEnumerable settings, IEnumerable interceptors) @@ -35,12 +34,21 @@ public AbstractConsumerTests() } [Theory] - [InlineData(true)] - [InlineData(false)] - public async Task When_Start_Then_Interceptor_CanStartIsCalled(bool canStart) + [InlineData(true, false)] + [InlineData(false, false)] + [InlineData(false, true)] + [InlineData(true, true)] + public async Task When_Start_Then_Interceptor_CanStartIsCalled(bool canStart, bool interceptorThrowsException) { - // Arrange - _interceptor.Setup(x => x.CanStart(_target)).ReturnsAsync(canStart); + // Arrange + if (interceptorThrowsException) + { + _interceptor.Setup(x => x.CanStart(_target)).ThrowsAsync(new Exception()); + } + else + { + _interceptor.Setup(x => x.CanStart(_target)).ReturnsAsync(canStart); + } // Act await _target.Start(); @@ -48,15 +56,38 @@ public async Task When_Start_Then_Interceptor_CanStartIsCalled(bool canStart) // Assert _target.IsStarted.Should().BeTrue(); - _interceptor.Verify(x => x.CanStart(_target), Times.Once); - _interceptor.Verify(x => x.Started(_target), canStart ? Times.Once : Times.Never); _interceptor.VerifyGet(x => x.Order, Times.Once); + _interceptor.Verify(x => x.CanStart(_target), Times.Once); + _interceptor.Verify(x => x.Started(_target), canStart || interceptorThrowsException ? Times.Once : Times.Never); _interceptor.VerifyNoOtherCalls(); - _targetMock.Verify(x => x.OnStart(), canStart ? Times.Once : Times.Never); + _targetMock.Verify(x => x.OnStart(), canStart || interceptorThrowsException ? Times.Once : Times.Never); _targetMock.VerifyNoOtherCalls(); } + [Fact] + public async Task When_Start_Givn_CalledConcurrently_Then_ItWillStartOnce() + { + // Arrange + _interceptor.Setup(x => x.CanStart(_target)).ReturnsAsync(true); + + var startTasks = Enumerable.Range(0, 100).Select(_ => _target.Start()).ToArray(); + + // Act + await Task.WhenAll(startTasks); + + // Assert + _target.IsStarted.Should().BeTrue(); + + _interceptor.VerifyGet(x => x.Order, Times.Once); + _interceptor.Verify(x => x.CanStart(_target), Times.Once); + _interceptor.Verify(x => x.Started(_target), Times.Once); + _interceptor.VerifyNoOtherCalls(); + + _targetMock.Verify(x => x.OnStart(), Times.Once); + _targetMock.VerifyNoOtherCalls(); + } + [Theory] [InlineData(true)] [InlineData(false)] @@ -74,11 +105,11 @@ public async Task When_Stop_Then_Interceptor_CanStopIsCalled(bool canStop) // Assert _target.IsStarted.Should().BeFalse(); + _interceptor.VerifyGet(x => x.Order, Times.Once); _interceptor.Verify(x => x.CanStart(_target), Times.Once); _interceptor.Verify(x => x.CanStop(_target), Times.Once); _interceptor.Verify(x => x.Started(_target), Times.Once); _interceptor.Verify(x => x.Stopped(_target), canStop ? Times.Once : Times.Never); - _interceptor.VerifyGet(x => x.Order, Times.Once); _interceptor.VerifyNoOtherCalls(); _targetMock.Verify(x => x.OnStart(), Times.Once); diff --git a/src/Tests/SlimMessageBus.Host.Test/Hybrid/HybridMessageBusTest.cs b/src/Tests/SlimMessageBus.Host.Test/Hybrid/HybridMessageBusTest.cs index 6a3350aa..482f0243 100644 --- a/src/Tests/SlimMessageBus.Host.Test/Hybrid/HybridMessageBusTest.cs +++ b/src/Tests/SlimMessageBus.Host.Test/Hybrid/HybridMessageBusTest.cs @@ -44,6 +44,7 @@ public HybridMessageBusTest() _serviceProviderMock.Setup(x => x.GetService(typeof(RuntimeTypeCache))).Returns(new RuntimeTypeCache()); _serviceProviderMock.Setup(x => x.GetService(typeof(IPendingRequestManager))).Returns(() => new PendingRequestManager(new InMemoryPendingRequestStore(), new CurrentTimeProvider(), NullLoggerFactory.Instance)); + _loggerMock.Setup(x => x.IsEnabled(It.IsAny())).Returns(true); _loggerFactoryMock.Setup(x => x.CreateLogger(It.IsAny())).Returns(_loggerMock.Object); _messageBusBuilder.AddChildBus("bus1", (mbb) => @@ -179,7 +180,7 @@ public async Task Given_UndeclareMessageType_When_Publish_Then_FollowsSettingsMo _loggerMock.Verify(x => x.Log( LogLevel.Information, It.IsAny(), - It.Is((x, _) => MoqMatchers.LogMessageMatcher(x, m => m.StartsWith("Could not find any bus that produces the message type: "))), + It.Is((x, _) => MoqMatchers.LogMessageMatcher(x, m => m.StartsWith("Could not find any bus that produces the message type "))), It.IsAny(), It.IsAny>()), mode == UndeclaredMessageTypeMode.RaiseOneTimeLog ? Times.Once : Times.Never); } @@ -218,7 +219,7 @@ public async Task Given_UndeclaredRequestType_When_Send_Then_FollowsSettingsMode _loggerMock.Verify(x => x.Log( LogLevel.Information, It.IsAny(), - It.Is((x, _) => MoqMatchers.LogMessageMatcher(x, m => m.StartsWith("Could not find any bus that produces the message type: "))), + It.Is((x, _) => MoqMatchers.LogMessageMatcher(x, m => m.StartsWith("Could not find any bus that produces the message type "))), It.IsAny(), It.IsAny>()), mode == UndeclaredMessageTypeMode.RaiseOneTimeLog ? Times.Once : Times.Never); } @@ -256,7 +257,7 @@ public async Task Given_UndeclaredRequestTypeWithoutResponse_When_Send_Then_Foll _loggerMock.Verify(x => x.Log( LogLevel.Information, It.IsAny(), - It.Is((x, _) => MoqMatchers.LogMessageMatcher(x, m => m.StartsWith("Could not find any bus that produces the message type: "))), + It.Is((x, _) => MoqMatchers.LogMessageMatcher(x, m => m.StartsWith("Could not find any bus that produces the message type "))), It.IsAny(), It.IsAny>()), mode == UndeclaredMessageTypeMode.RaiseOneTimeLog ? Times.Once : Times.Never); }