Skip to content

Commit

Permalink
Health check circuit breaker
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Jan 9, 2025
1 parent 4f8b32b commit a1fce9d
Show file tree
Hide file tree
Showing 69 changed files with 759 additions and 406 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i
| `.Host.Outbox.Sql` | Transactional Outbox using MSSQL | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Outbox.Sql.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql) |
| `.Host.Outbox.Sql.DbContext` | Transactional Outbox using MSSQL with EF DataContext integration | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Outbox.Sql.DbContext.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql.DbContext) |
| `.Host.AsyncApi` | [AsyncAPI](https://www.asyncapi.com/) specification generation via [Saunter](https://github.com/tehmantra/saunter) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AsyncApi.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AsyncApi) |
| `.Host.CircuitBreaker.HealthCheck` | Consumer circuit breaker based on [health checks](docs/intro.md#health-check-circuit-breaker) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.CircuitBreaker.HealthCheck.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.CircuitBreaker.HealthCheck) |

Typically the application layers (domain model, business logic) only need to depend on `SlimMessageBus` which is the facade, and ultimately the application hosting layer (ASP.NET, Console App, Windows Service) will reference and configure the other packages (`SlimMessageBus.Host.*`) which are the messaging transport providers and additional plugins.

Expand Down
1 change: 1 addition & 0 deletions build/tasks.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ $projects = @(
"SlimMessageBus.Host.Outbox.Sql",
"SlimMessageBus.Host.Outbox.Sql.DbContext",

"SlimMessageBus.Host.CircuitBreaker",
"SlimMessageBus.Host.CircuitBreaker.HealthCheck",

"SlimMessageBus.Host.AsyncApi"
Expand Down
1 change: 1 addition & 0 deletions docs/NuGet.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ Plugins:
- Transactional Outbox pattern (SQL, DbContext)
- Serialization using JSON, Avro, ProtoBuf
- AsyncAPI specification generation
- Consumer Circuit Breaker based on Health Checks

Find out more [https://github.com/zarusz/SlimMessageBus](https://github.com/zarusz/SlimMessageBus).
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>3.0.0-rc901</Version>
<Version>3.0.0-rc902</Version>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
namespace Sample.CircuitBreaker.HealthCheck.Consumers;

public class AddConsumer : IConsumer<Add>
{
private readonly ILogger<AddConsumer> _logger;

public AddConsumer(ILogger<AddConsumer> logger)
{
_logger = logger;
}

public class AddConsumer(ILogger<AddConsumer> logger) : IConsumer<Add>
{
public Task OnHandle(Add message, CancellationToken cancellationToken)
{
_logger.LogInformation("{A} + {B} = {C}", message.a, message.b, message.a + message.b);
logger.LogInformation("{A} + {B} = {C}", message.A, message.B, message.A + message.B);
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
namespace Sample.CircuitBreaker.HealthCheck.Consumers;

public class SubtractConsumer : IConsumer<Subtract>
{
private readonly ILogger<SubtractConsumer> _logger;

public SubtractConsumer(ILogger<SubtractConsumer> logger)
{
_logger = logger;
}

public class SubtractConsumer(ILogger<SubtractConsumer> logger) : IConsumer<Subtract>
{
public Task OnHandle(Subtract message, CancellationToken cancellationToken)
{
_logger.LogInformation("{A} - {B} = {C}", message.a, message.b, message.a - message.b);
logger.LogInformation("{A} - {B} = {C}", message.A, message.B, message.A - message.B);
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
global using SlimMessageBus;
global using SlimMessageBus.Host;
global using SlimMessageBus.Host.RabbitMQ;
global using SlimMessageBus.Host.Serialization.SystemTextJson;
global using SlimMessageBus.Host.Serialization.SystemTextJson;
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
namespace Sample.CircuitBreaker.HealthCheck.HealthChecks;

using Microsoft.Extensions.Logging;

public class AddRandomHealthCheck : RandomHealthCheck
public class AddRandomHealthCheck(ILogger<AddRandomHealthCheck> logger) : RandomHealthCheck(logger)
{
public AddRandomHealthCheck(ILogger<AddRandomHealthCheck> logger)
: base(logger)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,12 @@

using Microsoft.Extensions.Diagnostics.HealthChecks;

public abstract class RandomHealthCheck : IHealthCheck
{
private readonly ILogger _logger;

protected RandomHealthCheck(ILogger logger)
{
_logger = logger;
}

public abstract class RandomHealthCheck(ILogger logger) : IHealthCheck
{
public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
{
var value = (HealthStatus)Random.Shared.Next(3);
_logger.LogInformation("{HealthCheck} evaluated as {HealthStatus}", this.GetType(), value);
logger.LogInformation("{HealthCheck} evaluated as {HealthStatus}", GetType(), value);
return Task.FromResult(new HealthCheckResult(value, value.ToString()));
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
namespace Sample.CircuitBreaker.HealthCheck.HealthChecks;

using Microsoft.Extensions.Logging;

public class SubtractRandomHealthCheck : RandomHealthCheck
public class SubtractRandomHealthCheck(ILogger<SubtractRandomHealthCheck> logger) : RandomHealthCheck(logger)
{
public SubtractRandomHealthCheck(ILogger<SubtractRandomHealthCheck> logger)
: base(logger)
{
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,20 @@
namespace Sample.CircuitBreaker.HealthCheck;
public class IntermittentMessagePublisher : BackgroundService

public class IntermittentMessagePublisher(ILogger<IntermittentMessagePublisher> logger, IMessageBus messageBus)
: BackgroundService
{
private readonly ILogger _logger;
private readonly IMessageBus _messageBus;

public IntermittentMessagePublisher(ILogger<IntermittentMessagePublisher> logger, IMessageBus messageBus)
{
_logger = logger;
_messageBus = messageBus;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var a = Random.Shared.Next(10);
var b = Random.Shared.Next(10);

//_logger.LogInformation("Emitting {A} +- {B} = ?", a, b);
logger.LogInformation("Emitting {A} +- {B} = ?", a, b);

await Task.WhenAll(
_messageBus.Publish(new Add(a, b)),
_messageBus.Publish(new Subtract(a, b)),
messageBus.Publish(new Add(a, b), cancellationToken: stoppingToken),
messageBus.Publish(new Subtract(a, b), cancellationToken: stoppingToken),
Task.Delay(1000, stoppingToken));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
namespace Sample.CircuitBreaker.HealthCheck.Models;

public record Add(int a, int b);
public record Add(int A, int B);
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
namespace Sample.CircuitBreaker.HealthCheck.Models;

public record Subtract(int a, int b);
public record Subtract(int A, int B);
3 changes: 2 additions & 1 deletion src/Samples/Sample.CircuitBreaker.HealthCheck/Program.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
namespace Sample.CircuitBreaker.HealthCheck;

using Microsoft.Extensions.Diagnostics.HealthChecks;

using Sample.CircuitBreaker.HealthCheck.HealthChecks;

using SlimMessageBus.Host.CircuitBreaker.HealthCheck.Config;
using SlimMessageBus.Host.CircuitBreaker.HealthCheck;

public static class Program
{
Expand Down
9 changes: 5 additions & 4 deletions src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public abstract class SqsBaseConsumer : AbstractConsumer

public SqsMessageBus MessageBus { get; }
protected IMessageProcessor<Message> MessageProcessor { get; }
protected string Path { get; }
protected ISqsHeaderSerializer HeaderSerializer { get; }

protected SqsBaseConsumer(
Expand All @@ -23,11 +22,13 @@ protected SqsBaseConsumer(
IMessageProcessor<Message> messageProcessor,
IEnumerable<AbstractConsumerSettings> consumerSettings,
ILogger logger)
: base(logger, consumerSettings)
: base(logger,
consumerSettings,
path,
messageBus.Settings.ServiceProvider.GetServices<IAbstractConsumerInterceptor>())
{
MessageBus = messageBus ?? throw new ArgumentNullException(nameof(messageBus));
_clientProvider = clientProvider ?? throw new ArgumentNullException(nameof(clientProvider));
Path = path ?? throw new ArgumentNullException(nameof(path));
MessageBus = messageBus;
MessageProcessor = messageProcessor ?? throw new ArgumentNullException(nameof(messageProcessor));
HeaderSerializer = messageBus.HeaderSerializer;
T GetSingleValue<T>(Func<AbstractConsumerSettings, T> selector, string settingName, T defaultValue = default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ namespace SlimMessageBus.Host.AzureEventHub;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;

using Microsoft.Extensions.DependencyInjection;

public class EhGroupConsumer : AbstractConsumer
{
private readonly EventProcessorClient _processorClient;
Expand All @@ -12,7 +14,10 @@ public class EhGroupConsumer : AbstractConsumer
public EventHubMessageBus MessageBus { get; }

public EhGroupConsumer(IEnumerable<AbstractConsumerSettings> consumerSettings, EventHubMessageBus messageBus, GroupPath groupPath, Func<GroupPathPartitionId, EhPartitionConsumer> partitionConsumerFactory)
: base(messageBus.LoggerFactory.CreateLogger<EhGroupConsumer>(), consumerSettings)
: base(messageBus.LoggerFactory.CreateLogger<EhGroupConsumer>(),
consumerSettings,
groupPath.Path,
messageBus.Settings.ServiceProvider.GetServices<IAbstractConsumerInterceptor>())
{
_groupPath = groupPath ?? throw new ArgumentNullException(nameof(groupPath));
if (partitionConsumerFactory == null) throw new ArgumentNullException(nameof(partitionConsumerFactory));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace SlimMessageBus.Host.AzureServiceBus.Consumer;

using Microsoft.Extensions.DependencyInjection;

public abstract class AsbBaseConsumer : AbstractConsumer
{
private ServiceBusProcessor _serviceBusProcessor;
Expand All @@ -9,11 +11,19 @@ public abstract class AsbBaseConsumer : AbstractConsumer
protected IMessageProcessor<ServiceBusReceivedMessage> MessageProcessor { get; }
protected TopicSubscriptionParams TopicSubscription { get; }

protected AsbBaseConsumer(ServiceBusMessageBus messageBus, ServiceBusClient serviceBusClient, TopicSubscriptionParams subscriptionFactoryParams, IMessageProcessor<ServiceBusReceivedMessage> messageProcessor, IEnumerable<AbstractConsumerSettings> consumerSettings, ILogger logger)
: base(logger ?? throw new ArgumentNullException(nameof(logger)), consumerSettings)
protected AsbBaseConsumer(ServiceBusMessageBus messageBus,
ServiceBusClient serviceBusClient,
TopicSubscriptionParams subscriptionFactoryParams,
IMessageProcessor<ServiceBusReceivedMessage> messageProcessor,
IEnumerable<AbstractConsumerSettings> consumerSettings,
ILogger logger)
: base(logger ?? throw new ArgumentNullException(nameof(logger)),
consumerSettings,
subscriptionFactoryParams.ToString(),
messageBus.Settings.ServiceProvider.GetServices<IAbstractConsumerInterceptor>())
{
MessageBus = messageBus ?? throw new ArgumentNullException(nameof(messageBus));
TopicSubscription = subscriptionFactoryParams ?? throw new ArgumentNullException(nameof(subscriptionFactoryParams));
MessageBus = messageBus;
TopicSubscription = subscriptionFactoryParams;
MessageProcessor = messageProcessor ?? throw new ArgumentNullException(nameof(messageProcessor));

T GetSingleValue<T>(Func<AbstractConsumerSettings, T> selector, string settingName)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
namespace SlimMessageBus.Host.AzureServiceBus;

public class TopicSubscriptionParams
public class TopicSubscriptionParams(string path, string subscriptionName)
{
public string Path { get; set; }
public string SubscriptionName { get; set; }

public TopicSubscriptionParams(string path, string subscriptionName)
{
Path = path;
SubscriptionName = subscriptionName;
}
public string Path { get; set; } = path;
public string SubscriptionName { get; set; } = subscriptionName;

public override string ToString()
=> SubscriptionName == null ? Path : $"{Path}/{SubscriptionName}";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck.Config;

using Microsoft.Extensions.DependencyInjection.Extensions;
namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck;

public static class ConsumerBuilderExtensions
{
Expand Down Expand Up @@ -32,16 +30,15 @@ public static T PauseOnDegradedHealthCheck<T>(this T builder, params string[] ta

private static void RegisterHealthServices(AbstractConsumerBuilder builder)
{
builder.ConsumerSettings.CircuitBreakers.TryAdd<HealthCheckCircuitBreaker>();
builder.PostConfigurationActions.Add(
services =>
{
services.TryAddSingleton<HealthCheckBackgroundService>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IHealthCheckPublisher, HealthCheckBackgroundService>(sp => sp.GetRequiredService<HealthCheckBackgroundService>()));
services.TryAdd(ServiceDescriptor.Singleton<IHealthCheckHostBreaker, HealthCheckBackgroundService>(sp => sp.GetRequiredService<HealthCheckBackgroundService>()));
services.AddHostedService(sp => sp.GetRequiredService<HealthCheckBackgroundService>());
builder.AddConsumerCircuitBreakerType<AbstractConsumerBuilder, HealthCheckCircuitBreaker>();
builder.PostConfigurationActions.Add(services =>
{
services.TryAddSingleton<HealthCheckBackgroundService>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IHealthCheckPublisher, HealthCheckBackgroundService>(sp => sp.GetRequiredService<HealthCheckBackgroundService>()));
services.TryAdd(ServiceDescriptor.Singleton<IHealthCheckHostBreaker, HealthCheckBackgroundService>(sp => sp.GetRequiredService<HealthCheckBackgroundService>()));
services.AddHostedService(sp => sp.GetRequiredService<HealthCheckBackgroundService>());

services.TryAddSingleton<HealthCheckCircuitBreaker>();
});
services.TryAddTransient<HealthCheckCircuitBreaker>();
});
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck;

static internal class SettingsExtensions
static internal class ConsumerSettingsExtensions
{
private const string _key = nameof(HealthCheckCircuitBreaker);

public static T PauseOnDegraded<T>(this T consumerSettings, params string[] tags)
where T : AbstractConsumerSettings
{
Expand All @@ -15,7 +13,6 @@ public static T PauseOnDegraded<T>(this T consumerSettings, params string[] tags
dict[tag] = HealthStatus.Degraded;
}
}

return consumerSettings;
}

Expand All @@ -30,18 +27,9 @@ public static T PauseOnUnhealthy<T>(this T consumerSettings, params string[] tag
dict[tag] = HealthStatus.Unhealthy;
}
}

return consumerSettings;
}

static internal IDictionary<string, HealthStatus> HealthBreakerTags(this AbstractConsumerSettings consumerSettings)
{
if (!consumerSettings.Properties.TryGetValue(_key, out var rawValue) || rawValue is not IDictionary<string, HealthStatus> value)
{
value = new Dictionary<string, HealthStatus>();
consumerSettings.Properties[_key] = value;
}

return value;
}
static internal IDictionary<string, HealthStatus> HealthBreakerTags(this AbstractConsumerSettings consumerSettings)
=> consumerSettings.GetOrCreate(ConsumerSettingsProperties.HealthStatusTags, () => []);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck;

static internal class ConsumerSettingsProperties
{
static readonly internal ProviderExtensionProperty<Dictionary<string, HealthStatus>> HealthStatusTags = new("CircuitBreaker_HealthStatusTags");
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
global using System;
global using System.Diagnostics;


global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.DependencyInjection.Extensions;
global using Microsoft.Extensions.Diagnostics.HealthChecks;
global using Microsoft.Extensions.Hosting;
global using Microsoft.Extensions.Logging;
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ public async Task PublishAsync(HealthReport report, CancellationToken cancellati
}
}

public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task StartAsync(CancellationToken cancellationToken)
=> Task.CompletedTask;

public Task StopAsync(CancellationToken cancellationToken)
{
Expand Down
Loading

0 comments on commit a1fce9d

Please sign in to comment.