Skip to content

Commit

Permalink
#347 Refactor IConsumerErrorHandler to return a 'retry' response inst…
Browse files Browse the repository at this point in the history
…ead of supplying a 'retry' delegate

Signed-off-by: Richard Pringle <[email protected]>
  • Loading branch information
EtherZa authored and zarusz committed Dec 29, 2024
1 parent 78afac0 commit 2e5662d
Show file tree
Hide file tree
Showing 31 changed files with 754 additions and 590 deletions.
51 changes: 42 additions & 9 deletions docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -1059,28 +1059,41 @@ public class LoggingConsumerInterceptor<TMessage> : IConsumerInterceptor<TMessag

## Error Handling

Message processing by consumers or handlers may result in exceptions.
Starting with version 2.3.0, SMB introduces a standard way to integrate custom error handling logic across different transports.

The interface [IConsumerErrorHandler<T>](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) enables the definition of custom error handling for specific message types:
Message processing by consumers or handlers may result in exceptions. The [IConsumerErrorHandler<T>](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) provides a standard way to integrate custom error handling logic across different transports.

```cs
public interface IConsumerErrorHandler<in T>
{
/// <summary>
/// Executed when the message consumer (or handler) errors out. This interface allows to intercept and handle the exception.
/// Use the consumer context to get ahold of transport specific options to proceed (acknowledge/reject message).
/// <para>
/// Executed when the message consumer (or handler) errors out. The interface allows for interception of
/// exceptions to manipulate the processing pipeline (success/fail/retry).
/// </para>
/// <para>
/// The consumer context is available to apply transport specific operations (acknowledge/reject/dead letter/etc).
/// </para>
/// <para>
/// If message execution is to be re-attempted, any delays/jitter should be applied before the method returns.
/// </para>
/// </summary>
/// <param name="message">The message that failed to process.</param>
/// <param name="retry">Performs another message processing try. The return value is relevant if the consumer was a request handler (it will be its response value). Ensure to pass the return value to the result of the error handler.</param>
/// <param name="consumerContext">The consumer context for the message processing pipeline.</param>
/// <param name="exception">Exception that occurred during message processing.</param>
/// <param name="attempts">The number of times the message has been attempted to be processed.</param>
/// <returns>The error handling result.</returns>
Task<ConsumerErrorHandlerResult> OnHandleError(T message, Func<Task<object>> retry, IConsumerContext consumerContext, Exception exception);
Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts);
}
```

> The `retry()` parameter allows the message processing pipeline, including consumer interceptors, to retry processing when transient errors occur and retries are desired.
The returned `ConsumerErrorHandlerResult` object is used to override the execution for the remainder of the execution pipeline.
| Result | Description |
|---------|-------------|
| Failure | The message failed to be processed and should be returned to the queue |
| Success | The pipeline must treat the message as having been processed successfully |
| SuccessWithResponse | The pipeline to treat the messagage as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus<T>](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) |
| Retry | Execute the pipeline again (any delay/jitter should be applied before returning from method)[^1] |

[^1]: `Retry` will recreate the message scope on every atttempt if `PerMessageScopeEnabled` has been enabled.

To enable SMB to recognize the error handler, it must be registered within the Microsoft Dependency Injection (MSDI) framework:

Expand All @@ -1106,6 +1119,26 @@ Transport plugins provide specialized error handling interfaces. Examples includ

This approach allows for transport-specific error handling, ensuring that specialized handlers can be prioritized.

Sample retry with exponential back-off:
```cs
public class RetryHandler<T> : ConsumerErrorHandler<T>
{
private static readonly Random _random = new();

public override async Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts)
{
if (attempts < 3)
{
var delay = (attempts * 1000) + (_random.Next(1000) - 500);
await Task.Delay(delay, consumerContext.CancellationToken);
return Retry();
}

return Failure();
}
}
```

## Logging

SlimMessageBus uses [Microsoft.Extensions.Logging.Abstractions](https://www.nuget.org/packages/Microsoft.Extensions.Logging.Abstractions):
Expand Down
35 changes: 30 additions & 5 deletions docs/intro.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -1059,14 +1059,19 @@ public class LoggingConsumerInterceptor<TMessage> : IConsumerInterceptor<TMessag

## Error Handling

Message processing by consumers or handlers may result in exceptions.
Starting with version 2.3.0, SMB introduces a standard way to integrate custom error handling logic across different transports.

The interface [IConsumerErrorHandler<T>](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) enables the definition of custom error handling for specific message types:
Message processing by consumers or handlers may result in exceptions. The [IConsumerErrorHandler<T>](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) provides a standard way to integrate custom error handling logic across different transports.

@[:cs](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs,Interface)

> The `retry()` parameter allows the message processing pipeline, including consumer interceptors, to retry processing when transient errors occur and retries are desired.
The returned `ConsumerErrorHandlerResult` object is used to override the execution for the remainder of the execution pipeline.
| Result | Description |
|---------|-------------|
| Failure | The message failed to be processed and should be returned to the queue |
| Success | The pipeline must treat the message as having been processed successfully |
| SuccessWithResponse | The pipeline to treat the messagage as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus<T>](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) |
| Retry | Execute the pipeline again (any delay/jitter should be applied before returning from method)[^1] |

[^1]: `Retry` will recreate the message scope on every atttempt if `PerMessageScopeEnabled` has been enabled.

To enable SMB to recognize the error handler, it must be registered within the Microsoft Dependency Injection (MSDI) framework:

Expand All @@ -1092,6 +1097,26 @@ Transport plugins provide specialized error handling interfaces. Examples includ

This approach allows for transport-specific error handling, ensuring that specialized handlers can be prioritized.

Sample retry with exponential back-off:
```cs
public class RetryHandler<T> : ConsumerErrorHandler<T>
{
private static readonly Random _random = new();

public override async Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts)
{
if (attempts < 3)
{
var delay = (attempts * 1000) + (_random.Next(1000) - 500);
await Task.Delay(delay, consumerContext.CancellationToken);
return Retry();
}

return Failure();
}
}
```

## Logging

SlimMessageBus uses [Microsoft.Extensions.Logging.Abstractions](https://www.nuget.org/packages/Microsoft.Extensions.Logging.Abstractions):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.AmazonSQS;

public interface ISqsConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface ISqsConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class SqsConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.AmazonSQS/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
global using Microsoft.Extensions.DependencyInjection.Extensions;
global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.AzureEventHub;

public interface IEventHubConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IEventHubConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class EventHubConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
12 changes: 6 additions & 6 deletions src/SlimMessageBus.Host.AzureEventHub/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
global using Microsoft.Extensions.Logging;
global using Azure.Messaging.EventHubs;
global using Azure.Messaging.EventHubs.Producer;
global using Azure.Storage.Blobs;

global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host;
global using SlimMessageBus.Host.Collections;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Services;

global using Azure.Messaging.EventHubs;
global using Azure.Messaging.EventHubs.Producer;
global using Azure.Storage.Blobs;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.AzureServiceBus;

public interface IServiceBusConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IServiceBusConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class ServiceBusConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
10 changes: 5 additions & 5 deletions src/SlimMessageBus.Host.AzureServiceBus/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
global using Microsoft.Extensions.Logging;
global using Azure.Messaging.ServiceBus;
global using Azure.Messaging.ServiceBus.Administration;

global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host;
global using SlimMessageBus.Host.Collections;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Services;

global using Azure.Messaging.ServiceBus;
global using Azure.Messaging.ServiceBus.Administration;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Kafka;

public interface IKafkaConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IKafkaConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class KafkaConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host.Kafka/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host;
global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Collections;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Services;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Memory;

public interface IMemoryConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IMemoryConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class MemoryConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
3 changes: 2 additions & 1 deletion src/SlimMessageBus.Host.Memory/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Serialization;
3 changes: 2 additions & 1 deletion src/SlimMessageBus.Host.Mqtt/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
global using MQTTnet.Client;
global using MQTTnet.Extensions.ManagedClient;

global using SlimMessageBus.Host.Services;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Services;
6 changes: 3 additions & 3 deletions src/SlimMessageBus.Host.Mqtt/IMqttConsumerErrorHandler.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Mqtt;

public interface IMqttConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IMqttConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class MqttConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
5 changes: 3 additions & 2 deletions src/SlimMessageBus.Host.Nats/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host.Services;
global using NATS.Client.Core;

global using NATS.Client.Core;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Services;
6 changes: 3 additions & 3 deletions src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Nats;

public interface INatsConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface INatsConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class NatsConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.RabbitMQ;

public interface IRabbitMqConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IRabbitMqConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class RabbitMqConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host.RabbitMQ/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
global using RabbitMQ.Client;
global using RabbitMQ.Client.Events;

global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Services;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Services;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Redis;

public interface IRedisConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IRedisConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class RedisConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
3 changes: 2 additions & 1 deletion src/SlimMessageBus.Host.Redis/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host.Collections;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Services;

global using StackExchange.Redis;
global using StackExchange.Redis;
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host/Collections/RuntimeTypeCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class RuntimeTypeCache : IRuntimeTypeCache
public IGenericTypeCache<Func<object, object, Func<Task<object>>, IConsumerContext, Task<object>>> ConsumerInterceptorType { get; }
public IGenericTypeCache2<Func<object, object, object, IConsumerContext, Task>> HandlerInterceptorType { get; }

public IGenericTypeCache<Func<object, object, Func<Task<object>>, IConsumerContext, Exception, Task<ConsumerErrorHandlerResult>>> ConsumerErrorHandlerType { get; }
public IGenericTypeCache<Func<object, object, IConsumerContext, Exception, int, Task<ConsumerErrorHandlerResult>>> ConsumerErrorHandlerType { get; }

public RuntimeTypeCache()
{
Expand Down Expand Up @@ -78,7 +78,7 @@ public RuntimeTypeCache()
typeof(IRequestHandlerInterceptor<,>),
nameof(IRequestHandlerInterceptor<object, object>.OnHandle));

ConsumerErrorHandlerType = new GenericTypeCache<Func<object, object, Func<Task<object>>, IConsumerContext, Exception, Task<ConsumerErrorHandlerResult>>>(
ConsumerErrorHandlerType = new GenericTypeCache<Func<object, object, IConsumerContext, Exception, int, Task<ConsumerErrorHandlerResult>>>(
typeof(IConsumerErrorHandler<>),
nameof(IConsumerErrorHandler<object>.OnHandleError));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace SlimMessageBus.Host.Consumer.ErrorHandling;

public abstract class ConsumerErrorHandler<T> : BaseConsumerErrorHandler, IConsumerErrorHandler<T>
{
public abstract Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts);
}

public abstract class BaseConsumerErrorHandler
{
public static ConsumerErrorHandlerResult Failure() => ConsumerErrorHandlerResult.Failure;
public static ConsumerErrorHandlerResult Retry() => ConsumerErrorHandlerResult.Retry;
public static ConsumerErrorHandlerResult Success(object response = null) => response == null ? ConsumerErrorHandlerResult.Success : ConsumerErrorHandlerResult.SuccessWithResponse(response);
}
Loading

0 comments on commit 2e5662d

Please sign in to comment.