From 2e5662df3cdd068186b5503fca78fe33564e9d17 Mon Sep 17 00:00:00 2001 From: Richard Pringle Date: Sun, 29 Dec 2024 15:46:01 +0800 Subject: [PATCH] #347 Refactor IConsumerErrorHandler to return a 'retry' response instead of supplying a 'retry' delegate Signed-off-by: Richard Pringle --- docs/intro.md | 51 +- docs/intro.t.md | 35 +- .../Consumer/ISqsConsumerErrorHandler.cs | 6 +- .../GlobalUsings.cs | 2 +- .../Consumer/IEventHubConsumerErrorHandler.cs | 6 +- .../GlobalUsings.cs | 12 +- .../IServiceBusConsumerErrorHandler.cs | 6 +- .../GlobalUsings.cs | 10 +- .../Consumer/IKafkaConsumerErrorHandler.cs | 6 +- src/SlimMessageBus.Host.Kafka/GlobalUsings.cs | 4 +- .../Consumers/IMemoryConsumerErrorHandler.cs | 6 +- .../GlobalUsings.cs | 3 +- src/SlimMessageBus.Host.Mqtt/GlobalUsings.cs | 3 +- .../IMqttConsumerErrorHandler.cs | 6 +- src/SlimMessageBus.Host.Nats/GlobalUsings.cs | 5 +- .../INatsConsumerErrorHandler.cs | 6 +- .../IRabbitMqConsumerErrorHandler.cs | 6 +- .../GlobalUsings.cs | 4 +- .../Consumers/IRedisConsumerErrorHandler.cs | 6 +- src/SlimMessageBus.Host.Redis/GlobalUsings.cs | 3 +- .../Collections/RuntimeTypeCache.cs | 4 +- .../ErrorHandling/ConsumerErrorHandler.cs | 13 + .../ConsumerErrorHandlerResult.cs | 37 +- .../ErrorHandling/IConsumerErrorHandler.cs | 18 +- .../MessageProcessors/MessageHandler.cs | 63 +- src/SlimMessageBus.Host/MessageBusBase.cs | 926 +++++++++--------- src/SlimMessageBus.sln | 12 +- src/SlimMessageBus/IMessageBus.cs | 6 +- .../MemoryMessageBusTests.cs | 4 +- .../IntegrationTests/RabbitMqMessageBusIt.cs | 3 +- .../Consumer/MessageHandlerTest.cs | 72 +- 31 files changed, 754 insertions(+), 590 deletions(-) create mode 100644 src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs diff --git a/docs/intro.md b/docs/intro.md index 909c2f5a..872ce976 100644 --- a/docs/intro.md +++ b/docs/intro.md @@ -1059,28 +1059,41 @@ public class LoggingConsumerInterceptor : IConsumerInterceptor](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) enables the definition of custom error handling for specific message types: +Message processing by consumers or handlers may result in exceptions. The [IConsumerErrorHandler](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) provides a standard way to integrate custom error handling logic across different transports. ```cs public interface IConsumerErrorHandler { /// - /// Executed when the message consumer (or handler) errors out. This interface allows to intercept and handle the exception. - /// Use the consumer context to get ahold of transport specific options to proceed (acknowledge/reject message). + /// + /// Executed when the message consumer (or handler) errors out. The interface allows for interception of + /// exceptions to manipulate the processing pipeline (success/fail/retry). + /// + /// + /// The consumer context is available to apply transport specific operations (acknowledge/reject/dead letter/etc). + /// + /// + /// If message execution is to be re-attempted, any delays/jitter should be applied before the method returns. + /// /// /// The message that failed to process. - /// Performs another message processing try. The return value is relevant if the consumer was a request handler (it will be its response value). Ensure to pass the return value to the result of the error handler. /// The consumer context for the message processing pipeline. /// Exception that occurred during message processing. + /// The number of times the message has been attempted to be processed. /// The error handling result. - Task OnHandleError(T message, Func> retry, IConsumerContext consumerContext, Exception exception); + Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts); } ``` -> The `retry()` parameter allows the message processing pipeline, including consumer interceptors, to retry processing when transient errors occur and retries are desired. +The returned `ConsumerErrorHandlerResult` object is used to override the execution for the remainder of the execution pipeline. +| Result | Description | +|---------|-------------| +| Failure | The message failed to be processed and should be returned to the queue | +| Success | The pipeline must treat the message as having been processed successfully | +| SuccessWithResponse | The pipeline to treat the messagage as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) | +| Retry | Execute the pipeline again (any delay/jitter should be applied before returning from method)[^1] | + +[^1]: `Retry` will recreate the message scope on every atttempt if `PerMessageScopeEnabled` has been enabled. To enable SMB to recognize the error handler, it must be registered within the Microsoft Dependency Injection (MSDI) framework: @@ -1106,6 +1119,26 @@ Transport plugins provide specialized error handling interfaces. Examples includ This approach allows for transport-specific error handling, ensuring that specialized handlers can be prioritized. +Sample retry with exponential back-off: +```cs +public class RetryHandler : ConsumerErrorHandler +{ + private static readonly Random _random = new(); + + public override async Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts) + { + if (attempts < 3) + { + var delay = (attempts * 1000) + (_random.Next(1000) - 500); + await Task.Delay(delay, consumerContext.CancellationToken); + return Retry(); + } + + return Failure(); + } +} +``` + ## Logging SlimMessageBus uses [Microsoft.Extensions.Logging.Abstractions](https://www.nuget.org/packages/Microsoft.Extensions.Logging.Abstractions): diff --git a/docs/intro.t.md b/docs/intro.t.md index f5512cba..e998bb39 100644 --- a/docs/intro.t.md +++ b/docs/intro.t.md @@ -1059,14 +1059,19 @@ public class LoggingConsumerInterceptor : IConsumerInterceptor](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) enables the definition of custom error handling for specific message types: +Message processing by consumers or handlers may result in exceptions. The [IConsumerErrorHandler](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) provides a standard way to integrate custom error handling logic across different transports. @[:cs](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs,Interface) -> The `retry()` parameter allows the message processing pipeline, including consumer interceptors, to retry processing when transient errors occur and retries are desired. +The returned `ConsumerErrorHandlerResult` object is used to override the execution for the remainder of the execution pipeline. +| Result | Description | +|---------|-------------| +| Failure | The message failed to be processed and should be returned to the queue | +| Success | The pipeline must treat the message as having been processed successfully | +| SuccessWithResponse | The pipeline to treat the messagage as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) | +| Retry | Execute the pipeline again (any delay/jitter should be applied before returning from method)[^1] | + +[^1]: `Retry` will recreate the message scope on every atttempt if `PerMessageScopeEnabled` has been enabled. To enable SMB to recognize the error handler, it must be registered within the Microsoft Dependency Injection (MSDI) framework: @@ -1092,6 +1097,26 @@ Transport plugins provide specialized error handling interfaces. Examples includ This approach allows for transport-specific error handling, ensuring that specialized handlers can be prioritized. +Sample retry with exponential back-off: +```cs +public class RetryHandler : ConsumerErrorHandler +{ + private static readonly Random _random = new(); + + public override async Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts) + { + if (attempts < 3) + { + var delay = (attempts * 1000) + (_random.Next(1000) - 500); + await Task.Delay(delay, consumerContext.CancellationToken); + return Retry(); + } + + return Failure(); + } +} +``` + ## Logging SlimMessageBus uses [Microsoft.Extensions.Logging.Abstractions](https://www.nuget.org/packages/Microsoft.Extensions.Logging.Abstractions): diff --git a/src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs b/src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs index 6172cfb5..c4221549 100644 --- a/src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs @@ -1,5 +1,5 @@ namespace SlimMessageBus.Host.AmazonSQS; -public interface ISqsConsumerErrorHandler : IConsumerErrorHandler -{ -} +public interface ISqsConsumerErrorHandler : IConsumerErrorHandler; + +public abstract class SqsConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AmazonSQS/GlobalUsings.cs b/src/SlimMessageBus.Host.AmazonSQS/GlobalUsings.cs index ea8ffa7a..cf7639ba 100644 --- a/src/SlimMessageBus.Host.AmazonSQS/GlobalUsings.cs +++ b/src/SlimMessageBus.Host.AmazonSQS/GlobalUsings.cs @@ -7,4 +7,4 @@ global using Microsoft.Extensions.DependencyInjection.Extensions; global using Microsoft.Extensions.Logging; -global using SlimMessageBus.Host.Serialization; +global using SlimMessageBus.Host.Consumer.ErrorHandling; diff --git a/src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs b/src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs index 89c8c322..335f02bb 100644 --- a/src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs @@ -1,5 +1,5 @@ namespace SlimMessageBus.Host.AzureEventHub; -public interface IEventHubConsumerErrorHandler : IConsumerErrorHandler -{ -} \ No newline at end of file +public interface IEventHubConsumerErrorHandler : IConsumerErrorHandler; + +public abstract class EventHubConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AzureEventHub/GlobalUsings.cs b/src/SlimMessageBus.Host.AzureEventHub/GlobalUsings.cs index a6282365..b55064f5 100644 --- a/src/SlimMessageBus.Host.AzureEventHub/GlobalUsings.cs +++ b/src/SlimMessageBus.Host.AzureEventHub/GlobalUsings.cs @@ -1,9 +1,9 @@ -global using Microsoft.Extensions.Logging; +global using Azure.Messaging.EventHubs; +global using Azure.Messaging.EventHubs.Producer; +global using Azure.Storage.Blobs; + +global using Microsoft.Extensions.Logging; -global using SlimMessageBus.Host; global using SlimMessageBus.Host.Collections; +global using SlimMessageBus.Host.Consumer.ErrorHandling; global using SlimMessageBus.Host.Services; - -global using Azure.Messaging.EventHubs; -global using Azure.Messaging.EventHubs.Producer; -global using Azure.Storage.Blobs; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs index 4d1832ee..a8ad8166 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs @@ -1,5 +1,5 @@ namespace SlimMessageBus.Host.AzureServiceBus; -public interface IServiceBusConsumerErrorHandler : IConsumerErrorHandler -{ -} \ No newline at end of file +public interface IServiceBusConsumerErrorHandler : IConsumerErrorHandler; + +public abstract class ServiceBusConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AzureServiceBus/GlobalUsings.cs b/src/SlimMessageBus.Host.AzureServiceBus/GlobalUsings.cs index bbed1600..f7c6beaa 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/GlobalUsings.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/GlobalUsings.cs @@ -1,8 +1,8 @@ -global using Microsoft.Extensions.Logging; +global using Azure.Messaging.ServiceBus; +global using Azure.Messaging.ServiceBus.Administration; + +global using Microsoft.Extensions.Logging; -global using SlimMessageBus.Host; global using SlimMessageBus.Host.Collections; +global using SlimMessageBus.Host.Consumer.ErrorHandling; global using SlimMessageBus.Host.Services; - -global using Azure.Messaging.ServiceBus; -global using Azure.Messaging.ServiceBus.Administration; diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs b/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs index 7e980d1e..a7f391bf 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs @@ -1,5 +1,5 @@ namespace SlimMessageBus.Host.Kafka; -public interface IKafkaConsumerErrorHandler : IConsumerErrorHandler -{ -} \ No newline at end of file +public interface IKafkaConsumerErrorHandler : IConsumerErrorHandler; + +public abstract class KafkaConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Kafka/GlobalUsings.cs b/src/SlimMessageBus.Host.Kafka/GlobalUsings.cs index 652c8bea..b5028412 100644 --- a/src/SlimMessageBus.Host.Kafka/GlobalUsings.cs +++ b/src/SlimMessageBus.Host.Kafka/GlobalUsings.cs @@ -2,7 +2,7 @@ global using Microsoft.Extensions.Logging; -global using SlimMessageBus.Host; -global using SlimMessageBus.Host.Serialization; global using SlimMessageBus.Host.Collections; +global using SlimMessageBus.Host.Consumer.ErrorHandling; +global using SlimMessageBus.Host.Serialization; global using SlimMessageBus.Host.Services; diff --git a/src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs b/src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs index 6dd77086..f235ab2c 100644 --- a/src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs @@ -1,5 +1,5 @@ namespace SlimMessageBus.Host.Memory; -public interface IMemoryConsumerErrorHandler : IConsumerErrorHandler -{ -} \ No newline at end of file +public interface IMemoryConsumerErrorHandler : IConsumerErrorHandler; + +public abstract class MemoryConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Memory/GlobalUsings.cs b/src/SlimMessageBus.Host.Memory/GlobalUsings.cs index ff869fd3..756dc51e 100644 --- a/src/SlimMessageBus.Host.Memory/GlobalUsings.cs +++ b/src/SlimMessageBus.Host.Memory/GlobalUsings.cs @@ -1,3 +1,4 @@ global using Microsoft.Extensions.Logging; -global using SlimMessageBus.Host.Serialization; \ No newline at end of file +global using SlimMessageBus.Host.Consumer.ErrorHandling; +global using SlimMessageBus.Host.Serialization; diff --git a/src/SlimMessageBus.Host.Mqtt/GlobalUsings.cs b/src/SlimMessageBus.Host.Mqtt/GlobalUsings.cs index dac948d8..3d32ac81 100644 --- a/src/SlimMessageBus.Host.Mqtt/GlobalUsings.cs +++ b/src/SlimMessageBus.Host.Mqtt/GlobalUsings.cs @@ -4,4 +4,5 @@ global using MQTTnet.Client; global using MQTTnet.Extensions.ManagedClient; -global using SlimMessageBus.Host.Services; \ No newline at end of file +global using SlimMessageBus.Host.Consumer.ErrorHandling; +global using SlimMessageBus.Host.Services; diff --git a/src/SlimMessageBus.Host.Mqtt/IMqttConsumerErrorHandler.cs b/src/SlimMessageBus.Host.Mqtt/IMqttConsumerErrorHandler.cs index 7a47506e..9bef2034 100644 --- a/src/SlimMessageBus.Host.Mqtt/IMqttConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.Mqtt/IMqttConsumerErrorHandler.cs @@ -1,5 +1,5 @@ namespace SlimMessageBus.Host.Mqtt; -public interface IMqttConsumerErrorHandler : IConsumerErrorHandler -{ -} \ No newline at end of file +public interface IMqttConsumerErrorHandler : IConsumerErrorHandler; + +public abstract class MqttConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Nats/GlobalUsings.cs b/src/SlimMessageBus.Host.Nats/GlobalUsings.cs index b5c4039b..2c8e9b12 100644 --- a/src/SlimMessageBus.Host.Nats/GlobalUsings.cs +++ b/src/SlimMessageBus.Host.Nats/GlobalUsings.cs @@ -1,5 +1,6 @@ global using Microsoft.Extensions.Logging; -global using SlimMessageBus.Host.Services; +global using NATS.Client.Core; -global using NATS.Client.Core; \ No newline at end of file +global using SlimMessageBus.Host.Consumer.ErrorHandling; +global using SlimMessageBus.Host.Services; diff --git a/src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs b/src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs index e2725a0a..8867a961 100644 --- a/src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs @@ -1,5 +1,5 @@ namespace SlimMessageBus.Host.Nats; -public interface INatsConsumerErrorHandler : IConsumerErrorHandler -{ -} \ No newline at end of file +public interface INatsConsumerErrorHandler : IConsumerErrorHandler; + +public abstract class NatsConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs index b22819b9..2a48eab3 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs @@ -1,5 +1,5 @@ namespace SlimMessageBus.Host.RabbitMQ; -public interface IRabbitMqConsumerErrorHandler : IConsumerErrorHandler -{ -} \ No newline at end of file +public interface IRabbitMqConsumerErrorHandler : IConsumerErrorHandler; + +public abstract class RabbitMqConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.RabbitMQ/GlobalUsings.cs b/src/SlimMessageBus.Host.RabbitMQ/GlobalUsings.cs index ed6111b3..d4ef3ccb 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/GlobalUsings.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/GlobalUsings.cs @@ -3,5 +3,5 @@ global using RabbitMQ.Client; global using RabbitMQ.Client.Events; -global using SlimMessageBus.Host.Serialization; -global using SlimMessageBus.Host.Services; \ No newline at end of file +global using SlimMessageBus.Host.Consumer.ErrorHandling; +global using SlimMessageBus.Host.Services; diff --git a/src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs b/src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs index a52070b7..d041b78b 100644 --- a/src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs @@ -1,5 +1,5 @@ namespace SlimMessageBus.Host.Redis; -public interface IRedisConsumerErrorHandler : IConsumerErrorHandler -{ -} \ No newline at end of file +public interface IRedisConsumerErrorHandler : IConsumerErrorHandler; + +public abstract class RedisConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Redis/GlobalUsings.cs b/src/SlimMessageBus.Host.Redis/GlobalUsings.cs index f0ff2e4a..47b60afa 100644 --- a/src/SlimMessageBus.Host.Redis/GlobalUsings.cs +++ b/src/SlimMessageBus.Host.Redis/GlobalUsings.cs @@ -1,7 +1,8 @@ global using Microsoft.Extensions.Logging; global using SlimMessageBus.Host.Collections; +global using SlimMessageBus.Host.Consumer.ErrorHandling; global using SlimMessageBus.Host.Serialization; global using SlimMessageBus.Host.Services; -global using StackExchange.Redis; \ No newline at end of file +global using StackExchange.Redis; diff --git a/src/SlimMessageBus.Host/Collections/RuntimeTypeCache.cs b/src/SlimMessageBus.Host/Collections/RuntimeTypeCache.cs index 76c8813a..1b525653 100644 --- a/src/SlimMessageBus.Host/Collections/RuntimeTypeCache.cs +++ b/src/SlimMessageBus.Host/Collections/RuntimeTypeCache.cs @@ -16,7 +16,7 @@ public class RuntimeTypeCache : IRuntimeTypeCache public IGenericTypeCache>, IConsumerContext, Task>> ConsumerInterceptorType { get; } public IGenericTypeCache2> HandlerInterceptorType { get; } - public IGenericTypeCache>, IConsumerContext, Exception, Task>> ConsumerErrorHandlerType { get; } + public IGenericTypeCache>> ConsumerErrorHandlerType { get; } public RuntimeTypeCache() { @@ -78,7 +78,7 @@ public RuntimeTypeCache() typeof(IRequestHandlerInterceptor<,>), nameof(IRequestHandlerInterceptor.OnHandle)); - ConsumerErrorHandlerType = new GenericTypeCache>, IConsumerContext, Exception, Task>>( + ConsumerErrorHandlerType = new GenericTypeCache>>( typeof(IConsumerErrorHandler<>), nameof(IConsumerErrorHandler.OnHandleError)); } diff --git a/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs b/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs new file mode 100644 index 00000000..94d584c7 --- /dev/null +++ b/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs @@ -0,0 +1,13 @@ +namespace SlimMessageBus.Host.Consumer.ErrorHandling; + +public abstract class ConsumerErrorHandler : BaseConsumerErrorHandler, IConsumerErrorHandler +{ + public abstract Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts); +} + +public abstract class BaseConsumerErrorHandler +{ + public static ConsumerErrorHandlerResult Failure() => ConsumerErrorHandlerResult.Failure; + public static ConsumerErrorHandlerResult Retry() => ConsumerErrorHandlerResult.Retry; + public static ConsumerErrorHandlerResult Success(object response = null) => response == null ? ConsumerErrorHandlerResult.Success : ConsumerErrorHandlerResult.SuccessWithResponse(response); +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandlerResult.cs b/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandlerResult.cs index a80fed9a..7b914fee 100644 --- a/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandlerResult.cs +++ b/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandlerResult.cs @@ -2,23 +2,42 @@ public record ConsumerErrorHandlerResult { - private static readonly object NoResponse = new(); + private static readonly object _noResponse = new(); - public bool Handled { get; private set; } + private ConsumerErrorHandlerResult(ConsumerErrorHandlerResultEnum result, object response = null) + { + Result = result; + Response = response ?? _noResponse; + } + + public ConsumerErrorHandlerResultEnum Result { get; private set; } public object Response { get; private set; } - public bool HasResponse => !ReferenceEquals(Response, NoResponse); + public bool HasResponse => !ReferenceEquals(Response, _noResponse); /// - /// The error handler was not able to handle the exception. + /// The message should be placed back into the queue. /// - public static readonly ConsumerErrorHandlerResult Failure = new() { Handled = false, Response = NoResponse }; + public static readonly ConsumerErrorHandlerResult Failure = new(ConsumerErrorHandlerResultEnum.Fail); + /// - /// The error handler was able to handle the exception. + /// The message processor should evaluate the message as having been processed successfully. /// - public static readonly ConsumerErrorHandlerResult Success = new() { Handled = true, Response = NoResponse }; + public static readonly ConsumerErrorHandlerResult Success = new(ConsumerErrorHandlerResultEnum.Success); /// - /// The error handler was able to handle the exception, and has a fallback response for the or . + /// The message processor should evaluate the message as having been processed successfully and use the specified fallback response for the or . /// - public static ConsumerErrorHandlerResult SuccessWithResponse(object response) => new() { Handled = true, Response = response }; + public static ConsumerErrorHandlerResult SuccessWithResponse(object response) => new(ConsumerErrorHandlerResultEnum.Success, response); + + /// + /// Retry processing the message without placing it back in the queue. + /// + public static readonly ConsumerErrorHandlerResult Retry = new(ConsumerErrorHandlerResultEnum.Retry); +} + +public enum ConsumerErrorHandlerResultEnum +{ + Fail, + Retry, + Success } \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs b/src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs index 557ed44d..5c40030d 100644 --- a/src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs @@ -4,14 +4,22 @@ public interface IConsumerErrorHandler { /// - /// Executed when the message consumer (or handler) errors out. This interface allows to intercept and handle the exception. - /// Use the consumer context to get ahold of transport specific options to proceed (acknowledge/reject message). + /// + /// Executed when the message consumer (or handler) errors out. The interface allows for interception of + /// exceptions to manipulate the processing pipeline (success/fail/retry). + /// + /// + /// The consumer context is available to apply transport specific operations (acknowledge/reject/dead letter/etc). + /// + /// + /// If message execution is to be re-attempted, any delays/jitter should be applied before the method returns. + /// /// /// The message that failed to process. - /// Performs another message processing try. The return value is relevant if the consumer was a request handler (it will be its response value). Ensure to pass the return value to the result of the error handler. /// The consumer context for the message processing pipeline. /// Exception that occurred during message processing. + /// The number of times the message has been attempted to be processed. /// The error handling result. - Task OnHandleError(T message, Func> retry, IConsumerContext consumerContext, Exception exception); + Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts); } -// doc:fragment:Interface +// doc:fragment:Interface \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs index 40d594f0..21717605 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs @@ -65,63 +65,61 @@ public MessageHandler( var hasResponse = consumerInvoker.ParentSettings.ConsumerMode == ConsumerMode.RequestResponse; var responseType = hasResponse ? consumerInvoker.ParentSettings.ResponseType ?? typeof(Void) : null; - object response = null; - Exception responseException = null; string requestId = null; - if (hasResponse && messageHeaders != null) { messageHeaders.TryGetHeader(ReqRespMessageHeaders.RequestId, out requestId); } - await using (var messageScope = _messageScopeFactory.CreateMessageScope(consumerInvoker.ParentSettings, message, consumerContextProperties, currentServiceProvider)) + DateTimeOffset? messageExpires = null; + if (messageHeaders != null && messageHeaders.TryGetHeader(ReqRespMessageHeaders.Expires, out DateTimeOffset? expires) && expires != null) { - if (messageHeaders != null && messageHeaders.TryGetHeader(ReqRespMessageHeaders.Expires, out DateTimeOffset? expires) && expires != null) - { - // Verify if the request/message is already expired - var currentTime = _currentTimeProvider.CurrentTime; - if (currentTime > expires.Value) - { - // ToDo: Call interceptor + messageExpires = expires; + } - // Do not process the expired message - return (ResponseForExpiredRequest, null, requestId); - } + var attempts = 0; + var consumerType = consumerInvoker.ConsumerType; + + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + + await using var messageScope = _messageScopeFactory.CreateMessageScope(consumerInvoker.ParentSettings, message, consumerContextProperties, currentServiceProvider); + if (messageExpires != null && messageExpires < _currentTimeProvider.CurrentTime) + { + // ToDo: Call interceptor + // Do not process the expired message + return (ResponseForExpiredRequest, null, requestId); } var messageBusTarget = new MessageBusProxy(MessageBus, messageScope.ServiceProvider); - - Type consumerType = null; object consumerInstance = null; - try { - consumerType = consumerInvoker.ConsumerType; consumerInstance = messageScope.ServiceProvider.GetService(consumerType) ?? throw new ConfigurationMessageBusException($"Could not resolve consumer/handler type {consumerType} from the DI container. Please check that the configured type {consumerType} is registered within the DI container."); var consumerContext = CreateConsumerContext(messageHeaders, consumerInvoker, transportMessage, consumerInstance, messageBusTarget, consumerContextProperties, cancellationToken); try { - response = await DoHandleInternal(message, consumerInvoker, messageType, hasResponse, responseType, messageScope, consumerContext).ConfigureAwait(false); + var response = await DoHandleInternal(message, consumerInvoker, messageType, hasResponse, responseType, messageScope, consumerContext).ConfigureAwait(false); + return (response, null, requestId); } catch (Exception ex) { - // Give the consumer error handler a chance to take action - var handleErrorResult = await DoHandleError(message, consumerInvoker, messageType, hasResponse, responseType, messageScope, consumerContext, ex).ConfigureAwait(false); - if (!handleErrorResult.Handled) + attempts++; + var handleErrorResult = await DoHandleError(message, messageType, messageScope, consumerContext, ex, attempts, cancellationToken).ConfigureAwait(false); + if (handleErrorResult.Result != ConsumerErrorHandlerResultEnum.Retry) { - responseException = ex; - } - if (handleErrorResult.HasResponse) - { - response = handleErrorResult.Response; + var exception = handleErrorResult.Result != ConsumerErrorHandlerResultEnum.Success ? ex : null; + var response = handleErrorResult.HasResponse ? handleErrorResult.Response : null; + return (response, exception, requestId); } } } catch (Exception e) { - responseException = e; + return (null, e, requestId); } finally { @@ -132,8 +130,6 @@ public MessageHandler( } } } - - return (response, responseException, requestId); } private async Task DoHandleInternal(object message, IMessageTypeConsumerInvokerSettings consumerInvoker, Type messageType, bool hasResponse, Type responseType, IMessageScope messageScope, IConsumerContext consumerContext) @@ -150,7 +146,7 @@ private async Task DoHandleInternal(object message, IMessageTypeConsumer return await ExecuteConsumer(message, consumerContext, consumerInvoker, responseType).ConfigureAwait(false); } - private async Task DoHandleError(object message, IMessageTypeConsumerInvokerSettings consumerInvoker, Type messageType, bool hasResponse, Type responseType, IMessageScope messageScope, IConsumerContext consumerContext, Exception ex) + private async Task DoHandleError(object message, Type messageType, IMessageScope messageScope, IConsumerContext consumerContext, Exception ex, int attempts, CancellationToken cancellationToken) { var errorHandlerResult = ConsumerErrorHandlerResult.Failure; @@ -166,11 +162,8 @@ private async Task DoHandleError(object message, IMe { _logger.LogDebug(ex, "Consumer error handler of type {ConsumerErrorHandlerType} will be used to handle the exception during processing of message of type {MessageType}", consumerErrorHandler.GetType(), messageType); - // Give a chance to the consumer error handler to take action - Task retry() => DoHandleInternal(message, consumerInvoker, messageType, hasResponse, responseType, messageScope, consumerContext); - var consumerErrorHandlerMethod = RuntimeTypeCache.ConsumerErrorHandlerType[messageType]; - errorHandlerResult = await consumerErrorHandlerMethod(consumerErrorHandler, message, retry, consumerContext, ex).ConfigureAwait(false); + errorHandlerResult = await consumerErrorHandlerMethod(consumerErrorHandler, message, consumerContext, ex, attempts).ConfigureAwait(false); } return errorHandlerResult; diff --git a/src/SlimMessageBus.Host/MessageBusBase.cs b/src/SlimMessageBus.Host/MessageBusBase.cs index e5e99498..02384e3a 100644 --- a/src/SlimMessageBus.Host/MessageBusBase.cs +++ b/src/SlimMessageBus.Host/MessageBusBase.cs @@ -1,17 +1,17 @@ -namespace SlimMessageBus.Host; - -using System.Globalization; +namespace SlimMessageBus.Host; + +using System.Globalization; using System.Runtime.ExceptionServices; -using SlimMessageBus.Host.Consumer; -using SlimMessageBus.Host.Services; +using SlimMessageBus.Host.Consumer; +using SlimMessageBus.Host.Services; public abstract class MessageBusBase(MessageBusSettings settings, TProviderSettings providerSettings) : MessageBusBase(settings) - where TProviderSettings : class -{ - public TProviderSettings ProviderSettings { get; } = providerSettings ?? throw new ArgumentNullException(nameof(providerSettings)); + where TProviderSettings : class +{ + public TProviderSettings ProviderSettings { get; } = providerSettings ?? throw new ArgumentNullException(nameof(providerSettings)); } - + public abstract class MessageBusBase : IDisposable, IAsyncDisposable, IMasterMessageBus, IMessageScopeFactory, @@ -19,359 +19,359 @@ public abstract class MessageBusBase : IDisposable, IAsyncDisposable, IResponseProducer, ITransportProducer, ITransportBulkProducer -{ - private readonly ILogger _logger; - private CancellationTokenSource _cancellationTokenSource = new(); - private IMessageSerializer _serializer; - private readonly MessageHeaderService _headerService; +{ + private readonly ILogger _logger; + private CancellationTokenSource _cancellationTokenSource = new(); + private IMessageSerializer _serializer; + private readonly MessageHeaderService _headerService; private readonly List _consumers = []; public ILoggerFactory LoggerFactory { get; protected set; } - - /// - /// Special market reference that signifies a dummy producer settings for response types. - /// - protected static readonly ProducerSettings MarkerProducerSettingsForResponses = new(); - - public RuntimeTypeCache RuntimeTypeCache { get; } - - public virtual MessageBusSettings Settings { get; } - - public virtual IMessageSerializer Serializer => _serializer ??= GetSerializer(); - - public IMessageTypeResolver MessageTypeResolver { get; } - - /// - /// Default that corresponds to the root DI container, and pointing at self as the bus target. - /// - public virtual IMessageBusTarget MessageBusTarget { get; } - - protected ProducerByMessageTypeCache ProducerSettingsByMessageType { get; private set; } - - protected IPendingRequestStore PendingRequestStore { get; set; } - protected IPendingRequestManager PendingRequestManager { get; set; } - - public CancellationToken CancellationToken => _cancellationTokenSource.Token; - - #region Disposing - - protected bool IsDisposing { get; private set; } - protected bool IsDisposed { get; private set; } - - #endregion + + /// + /// Special market reference that signifies a dummy producer settings for response types. + /// + protected static readonly ProducerSettings MarkerProducerSettingsForResponses = new(); + + public RuntimeTypeCache RuntimeTypeCache { get; } + + public virtual MessageBusSettings Settings { get; } + + public virtual IMessageSerializer Serializer => _serializer ??= GetSerializer(); + + public IMessageTypeResolver MessageTypeResolver { get; } + + /// + /// Default that corresponds to the root DI container, and pointing at self as the bus target. + /// + public virtual IMessageBusTarget MessageBusTarget { get; } + + protected ProducerByMessageTypeCache ProducerSettingsByMessageType { get; private set; } + + protected IPendingRequestStore PendingRequestStore { get; set; } + protected IPendingRequestManager PendingRequestManager { get; set; } + + public CancellationToken CancellationToken => _cancellationTokenSource.Token; + + #region Disposing + + protected bool IsDisposing { get; private set; } + protected bool IsDisposed { get; private set; } + + #endregion /// /// Maintains a list of tasks that should be completed before the bus can produce the first message or start consumers. /// Add async things like /// - connection creations here to the underlying transport client /// - provision topology - /// - protected readonly AsyncTaskList InitTaskList = new(); - - #region Start & Stop - - private readonly object _startLock = new(); - - public bool IsStarted { get; private set; } - - protected bool IsStarting { get; private set; } - protected bool IsStopping { get; private set; } - - #endregion - - public virtual string Name => Settings.Name ?? "Main"; - - public IReadOnlyCollection Consumers => _consumers; - - protected MessageBusBase(MessageBusSettings settings) - { - Settings = settings ?? throw new ArgumentNullException(nameof(settings)); - - if (settings.ServiceProvider is null) throw new ConfigurationMessageBusException($"The bus {Name} has no {nameof(settings.ServiceProvider)} configured"); - - // Try to resolve from DI. If not available, suppress logging by using the NullLoggerFactory - LoggerFactory = settings.ServiceProvider.GetService() ?? NullLoggerFactory.Instance; - - _logger = LoggerFactory.CreateLogger(); - - var messageTypeResolverType = settings.MessageTypeResolverType ?? typeof(IMessageTypeResolver); - MessageTypeResolver = (IMessageTypeResolver)settings.ServiceProvider.GetService(messageTypeResolverType) - ?? throw new ConfigurationMessageBusException($"The bus {Name} could not resolve the required type {messageTypeResolverType.Name} from {nameof(Settings.ServiceProvider)}"); - - _headerService = new MessageHeaderService(LoggerFactory.CreateLogger(), Settings, MessageTypeResolver); - - RuntimeTypeCache = settings.ServiceProvider.GetRequiredService(); - + /// + protected readonly AsyncTaskList InitTaskList = new(); + + #region Start & Stop + + private readonly object _startLock = new(); + + public bool IsStarted { get; private set; } + + protected bool IsStarting { get; private set; } + protected bool IsStopping { get; private set; } + + #endregion + + public virtual string Name => Settings.Name ?? "Main"; + + public IReadOnlyCollection Consumers => _consumers; + + protected MessageBusBase(MessageBusSettings settings) + { + Settings = settings ?? throw new ArgumentNullException(nameof(settings)); + + if (settings.ServiceProvider is null) throw new ConfigurationMessageBusException($"The bus {Name} has no {nameof(settings.ServiceProvider)} configured"); + + // Try to resolve from DI. If not available, suppress logging by using the NullLoggerFactory + LoggerFactory = settings.ServiceProvider.GetService() ?? NullLoggerFactory.Instance; + + _logger = LoggerFactory.CreateLogger(); + + var messageTypeResolverType = settings.MessageTypeResolverType ?? typeof(IMessageTypeResolver); + MessageTypeResolver = (IMessageTypeResolver)settings.ServiceProvider.GetService(messageTypeResolverType) + ?? throw new ConfigurationMessageBusException($"The bus {Name} could not resolve the required type {messageTypeResolverType.Name} from {nameof(Settings.ServiceProvider)}"); + + _headerService = new MessageHeaderService(LoggerFactory.CreateLogger(), Settings, MessageTypeResolver); + + RuntimeTypeCache = settings.ServiceProvider.GetRequiredService(); + MessageBusTarget = new MessageBusProxy(this, Settings.ServiceProvider); CurrentTimeProvider = settings.ServiceProvider.GetRequiredService(); - PendingRequestManager = settings.ServiceProvider.GetRequiredService(); - PendingRequestStore = PendingRequestManager.Store; - } - - protected virtual IMessageSerializer GetSerializer() => Settings.GetSerializer(Settings.ServiceProvider); - - protected virtual IMessageBusSettingsValidationService ValidationService { get => new DefaultMessageBusSettingsValidationService(Settings); } - - /// - /// Called by the provider to initialize the bus. - /// - protected void OnBuildProvider() - { - ValidationService.AssertSettings(); - - Build(); - - // Notify the bus has been created - before any message can be produced - InitTaskList.Add(() => OnBusLifecycle(MessageBusLifecycleEventType.Created), CancellationToken); - - // Auto start consumers if enabled - if (Settings.AutoStartConsumers) - { - // Fire and forget start - _ = Task.Run(async () => - { - try - { - await Start().ConfigureAwait(false); - } - catch (Exception e) - { - _logger.LogError(e, "Could not auto start consumers"); - } - }); - } - } - - protected virtual void Build() - { - ProducerSettingsByMessageType = new ProducerByMessageTypeCache(_logger, BuildProducerByBaseMessageType(), RuntimeTypeCache); - } - - private Dictionary BuildProducerByBaseMessageType() - { - var producerByBaseMessageType = Settings.Producers.ToDictionary(producerSettings => producerSettings.MessageType); - - foreach (var consumerSettings in Settings.Consumers.Where(x => x.ResponseType != null)) - { - // A response type can be used across different requests hence TryAdd - producerByBaseMessageType.TryAdd(consumerSettings.ResponseType, MarkerProducerSettingsForResponses); - } - return producerByBaseMessageType; - } - - private IEnumerable _lifecycleInterceptors; - - private async Task OnBusLifecycle(MessageBusLifecycleEventType eventType) - { - _lifecycleInterceptors ??= Settings.ServiceProvider?.GetServices(); - if (_lifecycleInterceptors != null) - { - foreach (var i in _lifecycleInterceptors) - { - var task = i.OnBusLifecycle(eventType, MessageBusTarget); - if (task != null) - { - await task; - } - } - } - } - - public async Task Start() - { - lock (_startLock) - { - if (IsStarting || IsStarted) - { - return; - } - IsStarting = true; - } - - try - { - await InitTaskList.EnsureAllFinished(); - - _logger.LogInformation("Starting consumers for {BusName} bus...", Name); - await OnBusLifecycle(MessageBusLifecycleEventType.Starting).ConfigureAwait(false); - - await CreateConsumers(); - await OnStart().ConfigureAwait(false); - await Task.WhenAll(_consumers.Select(x => x.Start())).ConfigureAwait(false); - - await OnBusLifecycle(MessageBusLifecycleEventType.Started).ConfigureAwait(false); - _logger.LogInformation("Started consumers for {BusName} bus", Name); - - lock (_startLock) - { - IsStarted = true; - } - } - finally - { - lock (_startLock) - { - IsStarting = false; - } - } - } - - public async Task Stop() - { - lock (_startLock) - { - if (IsStopping || !IsStarted) - { - return; - } - IsStopping = true; - } - - try - { - await InitTaskList.EnsureAllFinished(); - - _logger.LogInformation("Stopping consumers for {BusName} bus...", Name); - await OnBusLifecycle(MessageBusLifecycleEventType.Stopping).ConfigureAwait(false); - - await Task.WhenAll(_consumers.Select(x => x.Stop())).ConfigureAwait(false); - await OnStop().ConfigureAwait(false); - await DestroyConsumers().ConfigureAwait(false); - - await OnBusLifecycle(MessageBusLifecycleEventType.Stopped).ConfigureAwait(false); - _logger.LogInformation("Stopped consumers for {BusName} bus", Name); - - lock (_startLock) - { - IsStarted = false; - } - } - finally - { - lock (_startLock) - { - IsStopping = false; - } - } - } - - protected internal virtual Task OnStart() => Task.CompletedTask; - protected internal virtual Task OnStop() => Task.CompletedTask; - - protected void AssertActive() - { - if (IsDisposed) - { - throw new MessageBusException("The message bus is disposed at this time"); - } - } - - protected virtual void AssertRequestResponseConfigured() - { - if (Settings.RequestResponse == null) - { - throw new SendMessageBusException("An attempt to send request when request/response communication was not configured for the message bus. Ensure you configure the bus properly before the application starts."); - } - } - - #region Implementation of IDisposable and IAsyncDisposable - - public void Dispose() - { - Dispose(disposing: true); - GC.SuppressFinalize(this); - } - - protected virtual void Dispose(bool disposing) - { - if (disposing) - { - DisposeAsyncInternal().ConfigureAwait(false).GetAwaiter().GetResult(); - } - } - - public async ValueTask DisposeAsync() - { - await DisposeAsyncInternal().ConfigureAwait(false); - GC.SuppressFinalize(this); - } - - private async ValueTask DisposeAsyncInternal() - { - if (!IsDisposed && !IsDisposing) - { - IsDisposing = true; - try - { - await DisposeAsyncCore().ConfigureAwait(false); - } - finally - { - IsDisposed = true; - IsDisposing = false; - } - } - } - - /// - /// Stops the consumers and disposes of internal bus objects. - /// - /// - protected async virtual ValueTask DisposeAsyncCore() - { - await Stop().ConfigureAwait(false); - - if (_cancellationTokenSource != null) + PendingRequestManager = settings.ServiceProvider.GetRequiredService(); + PendingRequestStore = PendingRequestManager.Store; + } + + protected virtual IMessageSerializer GetSerializer() => Settings.GetSerializer(Settings.ServiceProvider); + + protected virtual IMessageBusSettingsValidationService ValidationService { get => new DefaultMessageBusSettingsValidationService(Settings); } + + /// + /// Called by the provider to initialize the bus. + /// + protected void OnBuildProvider() + { + ValidationService.AssertSettings(); + + Build(); + + // Notify the bus has been created - before any message can be produced + InitTaskList.Add(() => OnBusLifecycle(MessageBusLifecycleEventType.Created), CancellationToken); + + // Auto start consumers if enabled + if (Settings.AutoStartConsumers) + { + // Fire and forget start + _ = Task.Run(async () => + { + try + { + await Start().ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogError(e, "Could not auto start consumers"); + } + }); + } + } + + protected virtual void Build() + { + ProducerSettingsByMessageType = new ProducerByMessageTypeCache(_logger, BuildProducerByBaseMessageType(), RuntimeTypeCache); + } + + private Dictionary BuildProducerByBaseMessageType() + { + var producerByBaseMessageType = Settings.Producers.ToDictionary(producerSettings => producerSettings.MessageType); + + foreach (var consumerSettings in Settings.Consumers.Where(x => x.ResponseType != null)) + { + // A response type can be used across different requests hence TryAdd + producerByBaseMessageType.TryAdd(consumerSettings.ResponseType, MarkerProducerSettingsForResponses); + } + return producerByBaseMessageType; + } + + private IEnumerable _lifecycleInterceptors; + + private async Task OnBusLifecycle(MessageBusLifecycleEventType eventType) + { + _lifecycleInterceptors ??= Settings.ServiceProvider?.GetServices(); + if (_lifecycleInterceptors != null) + { + foreach (var i in _lifecycleInterceptors) + { + var task = i.OnBusLifecycle(eventType, MessageBusTarget); + if (task != null) + { + await task; + } + } + } + } + + public async Task Start() + { + lock (_startLock) + { + if (IsStarting || IsStarted) + { + return; + } + IsStarting = true; + } + + try + { + await InitTaskList.EnsureAllFinished(); + + _logger.LogInformation("Starting consumers for {BusName} bus...", Name); + await OnBusLifecycle(MessageBusLifecycleEventType.Starting).ConfigureAwait(false); + + await CreateConsumers(); + await OnStart().ConfigureAwait(false); + await Task.WhenAll(_consumers.Select(x => x.Start())).ConfigureAwait(false); + + await OnBusLifecycle(MessageBusLifecycleEventType.Started).ConfigureAwait(false); + _logger.LogInformation("Started consumers for {BusName} bus", Name); + + lock (_startLock) + { + IsStarted = true; + } + } + finally + { + lock (_startLock) + { + IsStarting = false; + } + } + } + + public async Task Stop() + { + lock (_startLock) + { + if (IsStopping || !IsStarted) + { + return; + } + IsStopping = true; + } + + try + { + await InitTaskList.EnsureAllFinished(); + + _logger.LogInformation("Stopping consumers for {BusName} bus...", Name); + await OnBusLifecycle(MessageBusLifecycleEventType.Stopping).ConfigureAwait(false); + + await Task.WhenAll(_consumers.Select(x => x.Stop())).ConfigureAwait(false); + await OnStop().ConfigureAwait(false); + await DestroyConsumers().ConfigureAwait(false); + + await OnBusLifecycle(MessageBusLifecycleEventType.Stopped).ConfigureAwait(false); + _logger.LogInformation("Stopped consumers for {BusName} bus", Name); + + lock (_startLock) + { + IsStarted = false; + } + } + finally + { + lock (_startLock) + { + IsStopping = false; + } + } + } + + protected internal virtual Task OnStart() => Task.CompletedTask; + protected internal virtual Task OnStop() => Task.CompletedTask; + + protected void AssertActive() + { + if (IsDisposed) + { + throw new MessageBusException("The message bus is disposed at this time"); + } + } + + protected virtual void AssertRequestResponseConfigured() + { + if (Settings.RequestResponse == null) + { + throw new SendMessageBusException("An attempt to send request when request/response communication was not configured for the message bus. Ensure you configure the bus properly before the application starts."); + } + } + + #region Implementation of IDisposable and IAsyncDisposable + + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + DisposeAsyncInternal().ConfigureAwait(false).GetAwaiter().GetResult(); + } + } + + public async ValueTask DisposeAsync() + { + await DisposeAsyncInternal().ConfigureAwait(false); + GC.SuppressFinalize(this); + } + + private async ValueTask DisposeAsyncInternal() + { + if (!IsDisposed && !IsDisposing) + { + IsDisposing = true; + try + { + await DisposeAsyncCore().ConfigureAwait(false); + } + finally + { + IsDisposed = true; + IsDisposing = false; + } + } + } + + /// + /// Stops the consumers and disposes of internal bus objects. + /// + /// + protected async virtual ValueTask DisposeAsyncCore() + { + await Stop().ConfigureAwait(false); + + if (_cancellationTokenSource != null) { await _cancellationTokenSource.CancelAsync(); - _cancellationTokenSource.Dispose(); - _cancellationTokenSource = null; - } - } - - protected virtual Task CreateConsumers() - { - _logger.LogInformation("Creating consumers for {BusName} bus...", Name); - return Task.CompletedTask; - } - - protected async virtual Task DestroyConsumers() - { - _logger.LogInformation("Destroying consumers for {BusName} bus...", Name); - - foreach (var consumer in _consumers) - { - await consumer.DisposeSilently("Consumer", _logger).ConfigureAwait(false); - } - _consumers.Clear(); - } - - #endregion - - protected void AddConsumer(AbstractConsumer consumer) => _consumers.Add(consumer); - - public ICurrentTimeProvider CurrentTimeProvider { get; protected set; } - - protected ProducerSettings GetProducerSettings(Type messageType) - { - var producerSettings = ProducerSettingsByMessageType[messageType]; - if (producerSettings == null && !ReferenceEquals(producerSettings, MarkerProducerSettingsForResponses)) - { - throw new ProducerMessageBusException($"Message of type {messageType} was not registered as a supported produce message. Please check your MessageBus configuration and include this type or one of its base types."); - } - return producerSettings; - } - - protected virtual string GetDefaultPath(Type messageType, ProducerSettings producerSettings) - { - if (producerSettings == null) throw new ArgumentNullException(nameof(producerSettings)); - - var path = producerSettings.DefaultPath - ?? throw new ProducerMessageBusException($"An attempt to produce message of type {messageType} without specifying path, but there was no default path configured. Double check your configuration."); - - _logger.LogDebug("Applying default path {Path} for message type {MessageType}", path, messageType); - return path; + _cancellationTokenSource.Dispose(); + _cancellationTokenSource = null; + } + } + + protected virtual Task CreateConsumers() + { + _logger.LogInformation("Creating consumers for {BusName} bus...", Name); + return Task.CompletedTask; + } + + protected async virtual Task DestroyConsumers() + { + _logger.LogInformation("Destroying consumers for {BusName} bus...", Name); + + foreach (var consumer in _consumers) + { + await consumer.DisposeSilently("Consumer", _logger).ConfigureAwait(false); + } + _consumers.Clear(); + } + + #endregion + + protected void AddConsumer(AbstractConsumer consumer) => _consumers.Add(consumer); + + public ICurrentTimeProvider CurrentTimeProvider { get; protected set; } + + protected ProducerSettings GetProducerSettings(Type messageType) + { + var producerSettings = ProducerSettingsByMessageType[messageType]; + if (producerSettings == null && !ReferenceEquals(producerSettings, MarkerProducerSettingsForResponses)) + { + throw new ProducerMessageBusException($"Message of type {messageType} was not registered as a supported produce message. Please check your MessageBus configuration and include this type or one of its base types."); + } + return producerSettings; + } + + protected virtual string GetDefaultPath(Type messageType, ProducerSettings producerSettings) + { + if (producerSettings == null) throw new ArgumentNullException(nameof(producerSettings)); + + var path = producerSettings.DefaultPath + ?? throw new ProducerMessageBusException($"An attempt to produce message of type {messageType} without specifying path, but there was no default path configured. Double check your configuration."); + + _logger.LogDebug("Applying default path {Path} for message type {MessageType}", path, messageType); + return path; } public abstract Task ProduceToTransport( @@ -386,10 +386,10 @@ protected void OnProduceToTransport(object message, Type messageType, string path, IDictionary messageHeaders) - => _logger.LogDebug("Producing message {Message} of type {MessageType} to path {Path}", message, messageType, path); + => _logger.LogDebug("Producing message {Message} of type {MessageType} to path {Path}", message, messageType, path); + + public virtual int? MaxMessagesPerTransaction => null; - public virtual int? MaxMessagesPerTransaction => null; - public async virtual Task> ProduceToTransportBulk( IReadOnlyCollection envelopes, string path, @@ -407,21 +407,21 @@ await ProduceToTransport(envelope.Message, envelope.MessageType, path, envelope. dispatched.Add(envelope); } - return new(dispatched, null); + return new(dispatched, null); } catch (Exception ex) { return new(dispatched, ex); } - } - - public async virtual Task ProducePublish(object message, string path = null, IDictionary headers = null, IMessageBusTarget targetBus = null, CancellationToken cancellationToken = default) - { - if (message == null) throw new ArgumentNullException(nameof(message)); + } + + public async virtual Task ProducePublish(object message, string path = null, IDictionary headers = null, IMessageBusTarget targetBus = null, CancellationToken cancellationToken = default) + { + if (message == null) throw new ArgumentNullException(nameof(message)); AssertActive(); - await InitTaskList.EnsureAllFinished(); - - // check if the cancellation was already requested + await InitTaskList.EnsureAllFinished(); + + // check if the cancellation was already requested cancellationToken.ThrowIfCancellationRequested(); var messageType = message.GetType(); @@ -488,10 +488,10 @@ public async virtual Task ProducePublish(object message, string path = null, IDi protected static string GetProducerErrorMessage(string path, object message, Type messageType, Exception ex) => $"Producing message {message} of type {messageType?.Name} to path {path} resulted in error: {ex.Message}"; - /// - /// Create an instance of message headers. - /// - /// + /// + /// Create an instance of message headers. + /// + /// public virtual IDictionary CreateHeaders() => new Dictionary(10); private IDictionary GetMessageHeaders(object message, IDictionary headers, ProducerSettings producerSettings) @@ -503,27 +503,27 @@ private IDictionary GetMessageHeaders(object message, IDictionar } return messageHeaders; } - - protected virtual TimeSpan GetDefaultRequestTimeout(Type requestType, ProducerSettings producerSettings) - { - if (producerSettings == null) throw new ArgumentNullException(nameof(producerSettings)); - - var timeout = producerSettings.Timeout ?? Settings.RequestResponse.Timeout; - _logger.LogDebug("Applying default timeout {MessageTimeout} for message type {MessageType}", timeout, requestType); - return timeout; - } - - public virtual async Task ProduceSend(object request, string path = null, IDictionary headers = null, TimeSpan? timeout = null, IMessageBusTarget targetBus = null, CancellationToken cancellationToken = default) - { - if (request == null) throw new ArgumentNullException(nameof(request)); - AssertActive(); + + protected virtual TimeSpan GetDefaultRequestTimeout(Type requestType, ProducerSettings producerSettings) + { + if (producerSettings == null) throw new ArgumentNullException(nameof(producerSettings)); + + var timeout = producerSettings.Timeout ?? Settings.RequestResponse.Timeout; + _logger.LogDebug("Applying default timeout {MessageTimeout} for message type {MessageType}", timeout, requestType); + return timeout; + } + + public virtual async Task ProduceSend(object request, string path = null, IDictionary headers = null, TimeSpan? timeout = null, IMessageBusTarget targetBus = null, CancellationToken cancellationToken = default) + { + if (request == null) throw new ArgumentNullException(nameof(request)); + AssertActive(); AssertRequestResponseConfigured(); - await InitTaskList.EnsureAllFinished(); - - // check if the cancellation was already requested - cancellationToken.ThrowIfCancellationRequested(); - - var requestType = request.GetType(); + await InitTaskList.EnsureAllFinished(); + + // check if the cancellation was already requested + cancellationToken.ThrowIfCancellationRequested(); + + var requestType = request.GetType(); var responseType = typeof(TResponse); var producerSettings = GetProducerSettings(requestType); @@ -573,22 +573,22 @@ public virtual async Task ProduceSend(object request, stri return await SendInternal(request, path, requestType, responseType, producerSettings, created, expires, requestId, requestHeaders, targetBus, cancellationToken); } - protected async internal virtual Task SendInternal(object request, string path, Type requestType, Type responseType, ProducerSettings producerSettings, DateTimeOffset created, DateTimeOffset expires, string requestId, IDictionary requestHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken) - { - if (request == null) throw new ArgumentNullException(nameof(request)); - if (producerSettings == null) throw new ArgumentNullException(nameof(producerSettings)); - - // record the request state - var requestState = new PendingRequestState(requestId, request, requestType, responseType, created, expires, cancellationToken); - PendingRequestStore.Add(requestState); - - if (_logger.IsEnabled(LogLevel.Trace)) - { - _logger.LogTrace("Added to PendingRequests, total is {RequestCount}", PendingRequestStore.GetCount()); - } - - try - { + protected async internal virtual Task SendInternal(object request, string path, Type requestType, Type responseType, ProducerSettings producerSettings, DateTimeOffset created, DateTimeOffset expires, string requestId, IDictionary requestHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken) + { + if (request == null) throw new ArgumentNullException(nameof(request)); + if (producerSettings == null) throw new ArgumentNullException(nameof(producerSettings)); + + // record the request state + var requestState = new PendingRequestState(requestId, request, requestType, responseType, created, expires, cancellationToken); + PendingRequestStore.Add(requestState); + + if (_logger.IsEnabled(LogLevel.Trace)) + { + _logger.LogTrace("Added to PendingRequests, total is {RequestCount}", PendingRequestStore.GetCount()); + } + + try + { _logger.LogDebug("Sending request message {MessageType} to path {Path} with reply to {ReplyTo}", requestState, path, Settings.RequestResponse.Path); if (requestHeaders != null) @@ -597,66 +597,66 @@ protected async internal virtual Task SendInternal to Task - var responseUntyped = await requestState.TaskCompletionSource.Task.ConfigureAwait(false); - return (TResponseMessage)responseUntyped; - } - - public virtual Task ProduceResponse(string requestId, object request, IReadOnlyDictionary requestHeaders, object response, Exception responseException, IMessageTypeConsumerInvokerSettings consumerInvoker, CancellationToken cancellationToken) - { - if (requestHeaders == null) throw new ArgumentNullException(nameof(requestHeaders)); - if (consumerInvoker == null) throw new ArgumentNullException(nameof(consumerInvoker)); - - var responseType = consumerInvoker.ParentSettings.ResponseType; - if (!requestHeaders.TryGetHeader(ReqRespMessageHeaders.ReplyTo, out object replyTo)) - { - _logger.LogDebug($$"""Skipping sending response {Response} of type {MessageType} as the header {{ReqRespMessageHeaders.ReplyTo}} is missing for RequestId: {RequestId}""", response, responseType, requestId); - return Task.CompletedTask; - } - - _logger.LogDebug("Sending the response {Response} of type {MessageType} for RequestId: {RequestId}...", response, responseType, requestId); - - var responseHeaders = CreateHeaders(); - responseHeaders.SetHeader(ReqRespMessageHeaders.RequestId, requestId); - if (responseException != null) - { - responseHeaders.SetHeader(ReqRespMessageHeaders.Error, responseException.Message); - } - - _headerService.AddMessageTypeHeader(response, responseHeaders); - - return ProduceToTransport(response, responseType, (string)replyTo, responseHeaders, null, cancellationToken); - } - - /// - /// Generates unique request IDs - /// - /// - protected virtual string GenerateRequestId() => Guid.NewGuid().ToString("N", CultureInfo.InvariantCulture); - - public virtual bool IsMessageScopeEnabled(ConsumerSettings consumerSettings, IDictionary consumerContextProperties) - => consumerSettings.IsMessageScopeEnabled ?? Settings.IsMessageScopeEnabled ?? true; - - public virtual IMessageScope CreateMessageScope(ConsumerSettings consumerSettings, object message, IDictionary consumerContextProperties, IServiceProvider currentServiceProvider) - { - var createMessageScope = IsMessageScopeEnabled(consumerSettings, consumerContextProperties); - - if (createMessageScope) - { - _logger.LogDebug("Creating message scope for {Message} of type {MessageType}", message, message.GetType()); - } - return new MessageScopeWrapper(currentServiceProvider ?? Settings.ServiceProvider, createMessageScope); - } - - public virtual Task ProvisionTopology() => Task.CompletedTask; -} + await ProduceToTransport(request, producerSettings.MessageType, path, requestHeaders, targetBus, cancellationToken); + } + catch (Exception e) + { + _logger.LogDebug(e, "Publishing of request message failed"); + // remove from registry + PendingRequestStore.Remove(requestId); + throw; + } + + // convert Task to Task + var responseUntyped = await requestState.TaskCompletionSource.Task.ConfigureAwait(false); + return (TResponseMessage)responseUntyped; + } + + public virtual Task ProduceResponse(string requestId, object request, IReadOnlyDictionary requestHeaders, object response, Exception responseException, IMessageTypeConsumerInvokerSettings consumerInvoker, CancellationToken cancellationToken) + { + if (requestHeaders == null) throw new ArgumentNullException(nameof(requestHeaders)); + if (consumerInvoker == null) throw new ArgumentNullException(nameof(consumerInvoker)); + + var responseType = consumerInvoker.ParentSettings.ResponseType; + if (!requestHeaders.TryGetHeader(ReqRespMessageHeaders.ReplyTo, out object replyTo)) + { + _logger.LogDebug($$"""Skipping sending response {Response} of type {MessageType} as the header {{ReqRespMessageHeaders.ReplyTo}} is missing for RequestId: {RequestId}""", response, responseType, requestId); + return Task.CompletedTask; + } + + _logger.LogDebug("Sending the response {Response} of type {MessageType} for RequestId: {RequestId}...", response, responseType, requestId); + + var responseHeaders = CreateHeaders(); + responseHeaders.SetHeader(ReqRespMessageHeaders.RequestId, requestId); + if (responseException != null) + { + responseHeaders.SetHeader(ReqRespMessageHeaders.Error, responseException.Message); + } + + _headerService.AddMessageTypeHeader(response, responseHeaders); + + return ProduceToTransport(response, responseType, (string)replyTo, responseHeaders, null, cancellationToken); + } + + /// + /// Generates unique request IDs + /// + /// + protected virtual string GenerateRequestId() => Guid.NewGuid().ToString("N", CultureInfo.InvariantCulture); + + public virtual bool IsMessageScopeEnabled(ConsumerSettings consumerSettings, IDictionary consumerContextProperties) + => consumerSettings.IsMessageScopeEnabled ?? Settings.IsMessageScopeEnabled ?? true; + + public virtual IMessageScope CreateMessageScope(ConsumerSettings consumerSettings, object message, IDictionary consumerContextProperties, IServiceProvider currentServiceProvider) + { + var createMessageScope = IsMessageScopeEnabled(consumerSettings, consumerContextProperties); + + if (createMessageScope) + { + _logger.LogDebug("Creating message scope for {Message} of type {MessageType}", message, message.GetType()); + } + return new MessageScopeWrapper(currentServiceProvider ?? Settings.ServiceProvider, createMessageScope); + } + + public virtual Task ProvisionTopology() => Task.CompletedTask; +} diff --git a/src/SlimMessageBus.sln b/src/SlimMessageBus.sln index f96a2a9c..39ff2b16 100644 --- a/src/SlimMessageBus.sln +++ b/src/SlimMessageBus.sln @@ -137,16 +137,18 @@ EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Docs", "Docs", "{CBE53E71-7F48-415C-BD43-B812EC207BC6}" ProjectSection(SolutionItems) = preProject ..\CONTRIBUTING.md = ..\CONTRIBUTING.md - ..\docs\intro.md = ..\docs\intro.md + ..\docs\intro.t.md = ..\docs\intro.t.md ..\docs\NuGet.md = ..\docs\NuGet.md - ..\docs\provider_amazon_sqs.md = ..\docs\provider_amazon_sqs.md + ..\docs\plugin_asyncapi.t.md = ..\docs\plugin_asyncapi.t.md + ..\docs\provider_amazon_sqs.t.md = ..\docs\provider_amazon_sqs.t.md ..\docs\provider_azure_eventhubs.md = ..\docs\provider_azure_eventhubs.md ..\docs\provider_azure_servicebus.md = ..\docs\provider_azure_servicebus.md ..\docs\provider_hybrid.md = ..\docs\provider_hybrid.md - ..\docs\provider_kafka.md = ..\docs\provider_kafka.md - ..\docs\provider_memory.md = ..\docs\provider_memory.md + ..\docs\provider_kafka.t.md = ..\docs\provider_kafka.t.md + ..\docs\provider_memory.t.md = ..\docs\provider_memory.t.md ..\docs\provider_mqtt.md = ..\docs\provider_mqtt.md - ..\docs\provider_nats.md = ..\docs\provider_nats.md + ..\docs\provider_nats.t.md = ..\docs\provider_nats.t.md + ..\docs\plugin_outbox.t.md = ..\docs\plugin_outbox.t.md ..\docs\provider_redis.md = ..\docs\provider_redis.md ..\docs\README.md = ..\docs\README.md ..\README.md = ..\README.md diff --git a/src/SlimMessageBus/IMessageBus.cs b/src/SlimMessageBus/IMessageBus.cs index 4e0e7ae2..28ab8635 100644 --- a/src/SlimMessageBus/IMessageBus.cs +++ b/src/SlimMessageBus/IMessageBus.cs @@ -1,5 +1,5 @@ -namespace SlimMessageBus; +namespace SlimMessageBus; -public interface IMessageBus : IRequestResponseBus, IPublishBus -{ +public interface IMessageBus : IRequestResponseBus, IPublishBus +{ } \ No newline at end of file diff --git a/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs b/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs index 7a9378ae..1e934b8b 100644 --- a/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs +++ b/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs @@ -429,7 +429,7 @@ public async Task When_Publish_Given_AConsumersThatThrowsException_Then_Exceptio var consumerErrorHandlerMock = new Mock>(); consumerErrorHandlerMock - .Setup(x => x.OnHandleError(It.IsAny(), It.IsAny>>(), It.IsAny(), It.IsAny())) + .Setup(x => x.OnHandleError(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .ReturnsAsync(() => errorHandlerHandlesError ? ConsumerErrorHandlerResult.Success : ConsumerErrorHandlerResult.Failure); _serviceProviderMock.ProviderMock @@ -477,7 +477,7 @@ public async Task When_Send_Given_AHandlerThatThrowsException_Then_ExceptionIsBu var consumerErrorHandlerMock = new Mock>(); consumerErrorHandlerMock - .Setup(x => x.OnHandleError(It.IsAny(), It.IsAny>>(), It.IsAny(), It.IsAny())) + .Setup(x => x.OnHandleError(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .ReturnsAsync(() => errorHandlerHandlesError ? ConsumerErrorHandlerResult.SuccessWithResponse(null) : ConsumerErrorHandlerResult.Failure); _serviceProviderMock.ProviderMock diff --git a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs index 5a70273d..3c884a16 100644 --- a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs @@ -371,7 +371,7 @@ public static Task SimulateFakeException(int counter) /// public class CustomRabbitMqConsumerErrorHandler : IRabbitMqConsumerErrorHandler { - public Task OnHandleError(T message, Func> next, IConsumerContext consumerContext, Exception exception) + public Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts) { // Check if this is consumer context for RabbitMQ var isRabbitMqContext = consumerContext.GetTransportMessage() != null; @@ -388,6 +388,7 @@ public Task OnHandleError(T message, Func>(); consumerErrorHandlerMock - .Setup(x => x.OnHandleError(someMessage, It.IsAny>>(), It.IsAny(), someException)) + .Setup(x => x.OnHandleError(someMessage, It.IsAny(), someException, It.IsAny())) .ReturnsAsync(() => errorHandlerWasAbleToHandle ? ConsumerErrorHandlerResult.Success : ConsumerErrorHandlerResult.Failure); if (errorHandlerRegistered) @@ -134,7 +134,6 @@ public async Task When_DoHandle_Given_ConsumerThatThrowsExceptionAndErrorHandler var result = await subject.DoHandle(someMessage, messageHeaders, consumerInvoker: consumerInvokerMock.Object, currentServiceProvider: busMock.ServiceProviderMock.Object); // assert - result.RequestId.Should().BeNull(); result.Response.Should().BeNull(); @@ -156,7 +155,7 @@ public async Task When_DoHandle_Given_ConsumerThatThrowsExceptionAndErrorHandler { consumerErrorHandlerMock .Verify( - x => x.OnHandleError(someMessage, It.IsAny>>(), It.IsAny(), someException), + x => x.OnHandleError(someMessage, It.IsAny(), someException, It.IsAny()), Times.Once()); consumerErrorHandlerMock @@ -164,6 +163,73 @@ public async Task When_DoHandle_Given_ConsumerThatThrowsExceptionAndErrorHandler } } + [Fact] + public async Task When_DoHandle_Given_ConsumerThatThrowsExceptionAndErrorHandlerRegisteredAndRequestsARetry_Then_RetryInvocation() + { + // arrange + var someMessage = new SomeMessage(); + var someException = fixture.Create(); + var messageHeaders = fixture.Create>(); + + var consumerMock = new Mock>(); + + var consumerErrorHandlerMock = new Mock>(); + + consumerErrorHandlerMock + .Setup(x => x.OnHandleError(someMessage, It.IsAny(), someException, It.IsAny())) + .ReturnsAsync(() => ConsumerErrorHandlerResult.Retry); + + busMock.ServiceProviderMock + .Setup(x => x.GetService(typeof(IConsumerErrorHandler))) + .Returns(consumerErrorHandlerMock.Object); + + busMock.ServiceProviderMock + .Setup(x => x.GetService(typeof(IConsumer))) + .Returns(consumerMock.Object); + + consumerMethodMock + .SetupSequence(x => x(consumerMock.Object, someMessage, It.IsAny(), It.IsAny())) + .ThrowsAsync(someException) + .Returns(Task.CompletedTask); + + consumerInvokerMock + .SetupGet(x => x.ParentSettings) + .Returns(new ConsumerSettings()); + + consumerInvokerMock + .SetupGet(x => x.ConsumerType) + .Returns(typeof(IConsumer)); + + messageScopeMock + .SetupGet(x => x.ServiceProvider) + .Returns(busMock.ServiceProviderMock.Object); + + // act + var result = await subject.DoHandle(someMessage, messageHeaders, consumerInvoker: consumerInvokerMock.Object, currentServiceProvider: busMock.ServiceProviderMock.Object); + + // assert + result.RequestId.Should().BeNull(); + result.Response.Should().BeNull(); + result.ResponseException.Should().BeNull(); + + messageScopeFactoryMock + .Verify(x => x.CreateMessageScope(It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny()), Times.Exactly(2)); + + consumerMethodMock + .Verify(x => x(consumerMock.Object, someMessage, It.IsAny(), It.IsAny()), Times.Exactly(2)); + + consumerMethodMock + .VerifyNoOtherCalls(); + + consumerErrorHandlerMock + .Verify( + x => x.OnHandleError(someMessage, It.IsAny(), someException, It.IsAny()), + Times.Once()); + + consumerErrorHandlerMock + .VerifyNoOtherCalls(); + } + [Fact] public async Task When_ExecuteConsumer_Given_Handler_Then_ReturnsResponse() {