diff --git a/README.md b/README.md index a5d018f5..fcbc9a23 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,7 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i | `.Host.Outbox.Sql` | Transactional Outbox using MSSQL | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Outbox.Sql.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql) | | `.Host.Outbox.Sql.DbContext` | Transactional Outbox using MSSQL with EF DataContext integration | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Outbox.Sql.DbContext.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql.DbContext) | | `.Host.AsyncApi` | [AsyncAPI](https://www.asyncapi.com/) specification generation via [Saunter](https://github.com/tehmantra/saunter) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AsyncApi.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AsyncApi) | +| `.Host.CircuitBreaker.HealthCheck` | Consumer circuit breaker based on [health checks](docs/intro.md#health-check-circuit-breaker) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.CircuitBreaker.HealthCheck.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.CircuitBreaker.HealthCheck) | Typically the application layers (domain model, business logic) only need to depend on `SlimMessageBus` which is the facade, and ultimately the application hosting layer (ASP.NET, Console App, Windows Service) will reference and configure the other packages (`SlimMessageBus.Host.*`) which are the messaging transport providers and additional plugins. diff --git a/build/tasks.ps1 b/build/tasks.ps1 index 649a062d..1af71ba1 100644 --- a/build/tasks.ps1 +++ b/build/tasks.ps1 @@ -43,6 +43,7 @@ $projects = @( "SlimMessageBus.Host.Outbox.Sql", "SlimMessageBus.Host.Outbox.Sql.DbContext", + "SlimMessageBus.Host.CircuitBreaker", "SlimMessageBus.Host.CircuitBreaker.HealthCheck", "SlimMessageBus.Host.AsyncApi" diff --git a/docs/NuGet.md b/docs/NuGet.md index 23b474ef..ce59ffb3 100644 --- a/docs/NuGet.md +++ b/docs/NuGet.md @@ -24,5 +24,6 @@ Plugins: - Transactional Outbox pattern (SQL, DbContext) - Serialization using JSON, Avro, ProtoBuf - AsyncAPI specification generation +- Consumer Circuit Breaker based on Health Checks Find out more [https://github.com/zarusz/SlimMessageBus](https://github.com/zarusz/SlimMessageBus). diff --git a/src/Host.Plugin.Properties.xml b/src/Host.Plugin.Properties.xml index ea2f607f..a1eda2f8 100644 --- a/src/Host.Plugin.Properties.xml +++ b/src/Host.Plugin.Properties.xml @@ -4,7 +4,7 @@ <Import Project="Common.NuGet.Properties.xml" /> <PropertyGroup> - <Version>3.0.0-rc901</Version> + <Version>3.0.0-rc902</Version> </PropertyGroup> </Project> \ No newline at end of file diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/AddConsumer.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/AddConsumer.cs index 1da885ad..fdfca42f 100644 --- a/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/AddConsumer.cs +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/AddConsumer.cs @@ -1,17 +1,10 @@ namespace Sample.CircuitBreaker.HealthCheck.Consumers; -public class AddConsumer : IConsumer<Add> -{ - private readonly ILogger<AddConsumer> _logger; - - public AddConsumer(ILogger<AddConsumer> logger) - { - _logger = logger; - } - +public class AddConsumer(ILogger<AddConsumer> logger) : IConsumer<Add> +{ public Task OnHandle(Add message, CancellationToken cancellationToken) { - _logger.LogInformation("{A} + {B} = {C}", message.a, message.b, message.a + message.b); + logger.LogInformation("{A} + {B} = {C}", message.A, message.B, message.A + message.B); return Task.CompletedTask; } } diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/SubtractConsumer.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/SubtractConsumer.cs index 467a30d5..0d296333 100644 --- a/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/SubtractConsumer.cs +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/SubtractConsumer.cs @@ -1,17 +1,10 @@ namespace Sample.CircuitBreaker.HealthCheck.Consumers; -public class SubtractConsumer : IConsumer<Subtract> -{ - private readonly ILogger<SubtractConsumer> _logger; - - public SubtractConsumer(ILogger<SubtractConsumer> logger) - { - _logger = logger; - } - +public class SubtractConsumer(ILogger<SubtractConsumer> logger) : IConsumer<Subtract> +{ public Task OnHandle(Subtract message, CancellationToken cancellationToken) { - _logger.LogInformation("{A} - {B} = {C}", message.a, message.b, message.a - message.b); + logger.LogInformation("{A} - {B} = {C}", message.A, message.B, message.A - message.B); return Task.CompletedTask; } } diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/GlobalUsings.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/GlobalUsings.cs index 61fd0fa5..ac87444b 100644 --- a/src/Samples/Sample.CircuitBreaker.HealthCheck/GlobalUsings.cs +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/GlobalUsings.cs @@ -14,4 +14,4 @@ global using SlimMessageBus; global using SlimMessageBus.Host; global using SlimMessageBus.Host.RabbitMQ; -global using SlimMessageBus.Host.Serialization.SystemTextJson; \ No newline at end of file +global using SlimMessageBus.Host.Serialization.SystemTextJson; diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/AddRandomHealthCheck.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/AddRandomHealthCheck.cs index b74784dd..93bc7607 100644 --- a/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/AddRandomHealthCheck.cs +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/AddRandomHealthCheck.cs @@ -1,11 +1,5 @@ namespace Sample.CircuitBreaker.HealthCheck.HealthChecks; -using Microsoft.Extensions.Logging; - -public class AddRandomHealthCheck : RandomHealthCheck +public class AddRandomHealthCheck(ILogger<AddRandomHealthCheck> logger) : RandomHealthCheck(logger) { - public AddRandomHealthCheck(ILogger<AddRandomHealthCheck> logger) - : base(logger) - { - } } diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/RandomHealthCheck.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/RandomHealthCheck.cs index cf9ccf88..a448c3ec 100644 --- a/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/RandomHealthCheck.cs +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/RandomHealthCheck.cs @@ -2,19 +2,12 @@ using Microsoft.Extensions.Diagnostics.HealthChecks; -public abstract class RandomHealthCheck : IHealthCheck -{ - private readonly ILogger _logger; - - protected RandomHealthCheck(ILogger logger) - { - _logger = logger; - } - +public abstract class RandomHealthCheck(ILogger logger) : IHealthCheck +{ public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default) { var value = (HealthStatus)Random.Shared.Next(3); - _logger.LogInformation("{HealthCheck} evaluated as {HealthStatus}", this.GetType(), value); + logger.LogInformation("{HealthCheck} evaluated as {HealthStatus}", GetType(), value); return Task.FromResult(new HealthCheckResult(value, value.ToString())); } } diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/SubtractRandomHealthCheck.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/SubtractRandomHealthCheck.cs index 8a68b0b1..27ce53e5 100644 --- a/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/SubtractRandomHealthCheck.cs +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/SubtractRandomHealthCheck.cs @@ -1,11 +1,5 @@ namespace Sample.CircuitBreaker.HealthCheck.HealthChecks; -using Microsoft.Extensions.Logging; - -public class SubtractRandomHealthCheck : RandomHealthCheck +public class SubtractRandomHealthCheck(ILogger<SubtractRandomHealthCheck> logger) : RandomHealthCheck(logger) { - public SubtractRandomHealthCheck(ILogger<SubtractRandomHealthCheck> logger) - : base(logger) - { - } } diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/IntermittentMessagePublisher.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/IntermittentMessagePublisher.cs index 73110c15..7d887959 100644 --- a/src/Samples/Sample.CircuitBreaker.HealthCheck/IntermittentMessagePublisher.cs +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/IntermittentMessagePublisher.cs @@ -1,15 +1,8 @@ namespace Sample.CircuitBreaker.HealthCheck; -public class IntermittentMessagePublisher : BackgroundService + +public class IntermittentMessagePublisher(ILogger<IntermittentMessagePublisher> logger, IMessageBus messageBus) + : BackgroundService { - private readonly ILogger _logger; - private readonly IMessageBus _messageBus; - - public IntermittentMessagePublisher(ILogger<IntermittentMessagePublisher> logger, IMessageBus messageBus) - { - _logger = logger; - _messageBus = messageBus; - } - protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) @@ -17,11 +10,11 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) var a = Random.Shared.Next(10); var b = Random.Shared.Next(10); - //_logger.LogInformation("Emitting {A} +- {B} = ?", a, b); + logger.LogInformation("Emitting {A} +- {B} = ?", a, b); await Task.WhenAll( - _messageBus.Publish(new Add(a, b)), - _messageBus.Publish(new Subtract(a, b)), + messageBus.Publish(new Add(a, b), cancellationToken: stoppingToken), + messageBus.Publish(new Subtract(a, b), cancellationToken: stoppingToken), Task.Delay(1000, stoppingToken)); } } diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Add.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Add.cs index 97c5e418..2208622f 100644 --- a/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Add.cs +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Add.cs @@ -1,3 +1,3 @@ namespace Sample.CircuitBreaker.HealthCheck.Models; -public record Add(int a, int b); \ No newline at end of file +public record Add(int A, int B); \ No newline at end of file diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Subtract.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Subtract.cs index 51d2efc4..d491f605 100644 --- a/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Subtract.cs +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Subtract.cs @@ -1,3 +1,3 @@ namespace Sample.CircuitBreaker.HealthCheck.Models; -public record Subtract(int a, int b); \ No newline at end of file +public record Subtract(int A, int B); \ No newline at end of file diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/Program.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/Program.cs index 716253f5..718af413 100644 --- a/src/Samples/Sample.CircuitBreaker.HealthCheck/Program.cs +++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/Program.cs @@ -1,9 +1,10 @@ namespace Sample.CircuitBreaker.HealthCheck; + using Microsoft.Extensions.Diagnostics.HealthChecks; using Sample.CircuitBreaker.HealthCheck.HealthChecks; -using SlimMessageBus.Host.CircuitBreaker.HealthCheck.Config; +using SlimMessageBus.Host.CircuitBreaker.HealthCheck; public static class Program { diff --git a/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs b/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs index c784f1b9..1b978461 100644 --- a/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs +++ b/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs @@ -13,7 +13,6 @@ public abstract class SqsBaseConsumer : AbstractConsumer public SqsMessageBus MessageBus { get; } protected IMessageProcessor<Message> MessageProcessor { get; } - protected string Path { get; } protected ISqsHeaderSerializer HeaderSerializer { get; } protected SqsBaseConsumer( @@ -23,11 +22,13 @@ protected SqsBaseConsumer( IMessageProcessor<Message> messageProcessor, IEnumerable<AbstractConsumerSettings> consumerSettings, ILogger logger) - : base(logger, consumerSettings) + : base(logger, + consumerSettings, + path, + messageBus.Settings.ServiceProvider.GetServices<IAbstractConsumerInterceptor>()) { - MessageBus = messageBus ?? throw new ArgumentNullException(nameof(messageBus)); _clientProvider = clientProvider ?? throw new ArgumentNullException(nameof(clientProvider)); - Path = path ?? throw new ArgumentNullException(nameof(path)); + MessageBus = messageBus; MessageProcessor = messageProcessor ?? throw new ArgumentNullException(nameof(messageProcessor)); HeaderSerializer = messageBus.HeaderSerializer; T GetSingleValue<T>(Func<AbstractConsumerSettings, T> selector, string settingName, T defaultValue = default) diff --git a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs index a1bd113c..762ba2fd 100644 --- a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs +++ b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs @@ -3,6 +3,8 @@ namespace SlimMessageBus.Host.AzureEventHub; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Processor; +using Microsoft.Extensions.DependencyInjection; + public class EhGroupConsumer : AbstractConsumer { private readonly EventProcessorClient _processorClient; @@ -12,7 +14,10 @@ public class EhGroupConsumer : AbstractConsumer public EventHubMessageBus MessageBus { get; } public EhGroupConsumer(IEnumerable<AbstractConsumerSettings> consumerSettings, EventHubMessageBus messageBus, GroupPath groupPath, Func<GroupPathPartitionId, EhPartitionConsumer> partitionConsumerFactory) - : base(messageBus.LoggerFactory.CreateLogger<EhGroupConsumer>(), consumerSettings) + : base(messageBus.LoggerFactory.CreateLogger<EhGroupConsumer>(), + consumerSettings, + groupPath.Path, + messageBus.Settings.ServiceProvider.GetServices<IAbstractConsumerInterceptor>()) { _groupPath = groupPath ?? throw new ArgumentNullException(nameof(groupPath)); if (partitionConsumerFactory == null) throw new ArgumentNullException(nameof(partitionConsumerFactory)); diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs index a601e50c..09a5db8f 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs @@ -1,5 +1,7 @@ namespace SlimMessageBus.Host.AzureServiceBus.Consumer; +using Microsoft.Extensions.DependencyInjection; + public abstract class AsbBaseConsumer : AbstractConsumer { private ServiceBusProcessor _serviceBusProcessor; @@ -9,11 +11,19 @@ public abstract class AsbBaseConsumer : AbstractConsumer protected IMessageProcessor<ServiceBusReceivedMessage> MessageProcessor { get; } protected TopicSubscriptionParams TopicSubscription { get; } - protected AsbBaseConsumer(ServiceBusMessageBus messageBus, ServiceBusClient serviceBusClient, TopicSubscriptionParams subscriptionFactoryParams, IMessageProcessor<ServiceBusReceivedMessage> messageProcessor, IEnumerable<AbstractConsumerSettings> consumerSettings, ILogger logger) - : base(logger ?? throw new ArgumentNullException(nameof(logger)), consumerSettings) + protected AsbBaseConsumer(ServiceBusMessageBus messageBus, + ServiceBusClient serviceBusClient, + TopicSubscriptionParams subscriptionFactoryParams, + IMessageProcessor<ServiceBusReceivedMessage> messageProcessor, + IEnumerable<AbstractConsumerSettings> consumerSettings, + ILogger logger) + : base(logger ?? throw new ArgumentNullException(nameof(logger)), + consumerSettings, + subscriptionFactoryParams.ToString(), + messageBus.Settings.ServiceProvider.GetServices<IAbstractConsumerInterceptor>()) { - MessageBus = messageBus ?? throw new ArgumentNullException(nameof(messageBus)); - TopicSubscription = subscriptionFactoryParams ?? throw new ArgumentNullException(nameof(subscriptionFactoryParams)); + MessageBus = messageBus; + TopicSubscription = subscriptionFactoryParams; MessageProcessor = messageProcessor ?? throw new ArgumentNullException(nameof(messageProcessor)); T GetSingleValue<T>(Func<AbstractConsumerSettings, T> selector, string settingName) diff --git a/src/SlimMessageBus.Host.AzureServiceBus/TopicSubscriptionParams.cs b/src/SlimMessageBus.Host.AzureServiceBus/TopicSubscriptionParams.cs index cf19244c..4cd056a4 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/TopicSubscriptionParams.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/TopicSubscriptionParams.cs @@ -1,15 +1,9 @@ namespace SlimMessageBus.Host.AzureServiceBus; -public class TopicSubscriptionParams +public class TopicSubscriptionParams(string path, string subscriptionName) { - public string Path { get; set; } - public string SubscriptionName { get; set; } - - public TopicSubscriptionParams(string path, string subscriptionName) - { - Path = path; - SubscriptionName = subscriptionName; - } + public string Path { get; set; } = path; + public string SubscriptionName { get; set; } = subscriptionName; public override string ToString() => SubscriptionName == null ? Path : $"{Path}/{SubscriptionName}"; diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerBuilderExtensions.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerBuilderExtensions.cs index c721f5dc..cdb92ff0 100644 --- a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerBuilderExtensions.cs +++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerBuilderExtensions.cs @@ -1,6 +1,4 @@ -namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck.Config; - -using Microsoft.Extensions.DependencyInjection.Extensions; +namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck; public static class ConsumerBuilderExtensions { @@ -32,16 +30,15 @@ public static T PauseOnDegradedHealthCheck<T>(this T builder, params string[] ta private static void RegisterHealthServices(AbstractConsumerBuilder builder) { - builder.ConsumerSettings.CircuitBreakers.TryAdd<HealthCheckCircuitBreaker>(); - builder.PostConfigurationActions.Add( - services => - { - services.TryAddSingleton<HealthCheckBackgroundService>(); - services.TryAddEnumerable(ServiceDescriptor.Singleton<IHealthCheckPublisher, HealthCheckBackgroundService>(sp => sp.GetRequiredService<HealthCheckBackgroundService>())); - services.TryAdd(ServiceDescriptor.Singleton<IHealthCheckHostBreaker, HealthCheckBackgroundService>(sp => sp.GetRequiredService<HealthCheckBackgroundService>())); - services.AddHostedService(sp => sp.GetRequiredService<HealthCheckBackgroundService>()); + builder.AddConsumerCircuitBreakerType<AbstractConsumerBuilder, HealthCheckCircuitBreaker>(); + builder.PostConfigurationActions.Add(services => + { + services.TryAddSingleton<HealthCheckBackgroundService>(); + services.TryAddEnumerable(ServiceDescriptor.Singleton<IHealthCheckPublisher, HealthCheckBackgroundService>(sp => sp.GetRequiredService<HealthCheckBackgroundService>())); + services.TryAdd(ServiceDescriptor.Singleton<IHealthCheckHostBreaker, HealthCheckBackgroundService>(sp => sp.GetRequiredService<HealthCheckBackgroundService>())); + services.AddHostedService(sp => sp.GetRequiredService<HealthCheckBackgroundService>()); - services.TryAddSingleton<HealthCheckCircuitBreaker>(); - }); + services.TryAddTransient<HealthCheckCircuitBreaker>(); + }); } } diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/SettingsExtensions.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerSettingsExtensions.cs similarity index 70% rename from src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/SettingsExtensions.cs rename to src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerSettingsExtensions.cs index a2775f10..18bebac4 100644 --- a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/SettingsExtensions.cs +++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerSettingsExtensions.cs @@ -1,9 +1,7 @@ namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck; -static internal class SettingsExtensions +static internal class ConsumerSettingsExtensions { - private const string _key = nameof(HealthCheckCircuitBreaker); - public static T PauseOnDegraded<T>(this T consumerSettings, params string[] tags) where T : AbstractConsumerSettings { @@ -15,7 +13,6 @@ public static T PauseOnDegraded<T>(this T consumerSettings, params string[] tags dict[tag] = HealthStatus.Degraded; } } - return consumerSettings; } @@ -30,18 +27,9 @@ public static T PauseOnUnhealthy<T>(this T consumerSettings, params string[] tag dict[tag] = HealthStatus.Unhealthy; } } - return consumerSettings; } - static internal IDictionary<string, HealthStatus> HealthBreakerTags(this AbstractConsumerSettings consumerSettings) - { - if (!consumerSettings.Properties.TryGetValue(_key, out var rawValue) || rawValue is not IDictionary<string, HealthStatus> value) - { - value = new Dictionary<string, HealthStatus>(); - consumerSettings.Properties[_key] = value; - } - - return value; - } + static internal IDictionary<string, HealthStatus> HealthBreakerTags(this AbstractConsumerSettings consumerSettings) + => consumerSettings.GetOrCreate(ConsumerSettingsProperties.HealthStatusTags, () => []); } diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerSettingsProperties.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerSettingsProperties.cs new file mode 100644 index 00000000..a9fa2e91 --- /dev/null +++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerSettingsProperties.cs @@ -0,0 +1,6 @@ +namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck; + +static internal class ConsumerSettingsProperties +{ + static readonly internal ProviderExtensionProperty<Dictionary<string, HealthStatus>> HealthStatusTags = new("CircuitBreaker_HealthStatusTags"); +} diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/GlobalUsings.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/GlobalUsings.cs index 6d9be2c3..af29883e 100644 --- a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/GlobalUsings.cs +++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/GlobalUsings.cs @@ -1,7 +1,7 @@ global using System; global using System.Diagnostics; - + global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.DependencyInjection.Extensions; global using Microsoft.Extensions.Diagnostics.HealthChecks; global using Microsoft.Extensions.Hosting; -global using Microsoft.Extensions.Logging; diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckBackgroundService.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckBackgroundService.cs index 96bd55a0..a2cd303d 100644 --- a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckBackgroundService.cs +++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckBackgroundService.cs @@ -38,10 +38,8 @@ public async Task PublishAsync(HealthReport report, CancellationToken cancellati } } - public Task StartAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } + public Task StartAsync(CancellationToken cancellationToken) + => Task.CompletedTask; public Task StopAsync(CancellationToken cancellationToken) { diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckCircuitBreaker.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckCircuitBreaker.cs index 8797fc9e..52a5f33e 100644 --- a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckCircuitBreaker.cs +++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckCircuitBreaker.cs @@ -1,5 +1,7 @@ -namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck; - +namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck; + +using SlimMessageBus.Host.CircuitBreaker; + internal sealed class HealthCheckCircuitBreaker : IConsumerCircuitBreaker { private readonly IEnumerable<AbstractConsumerSettings> _settings; diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/SlimMessageBus.Host.CircuitBreaker.HealthCheck.csproj b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/SlimMessageBus.Host.CircuitBreaker.HealthCheck.csproj index 611abab7..7f63ccfb 100644 --- a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/SlimMessageBus.Host.CircuitBreaker.HealthCheck.csproj +++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/SlimMessageBus.Host.CircuitBreaker.HealthCheck.csproj @@ -15,16 +15,13 @@ </ItemGroup> <ItemGroup> + <ProjectReference Include="..\SlimMessageBus.Host.CircuitBreaker\SlimMessageBus.Host.CircuitBreaker.csproj" /> <ProjectReference Include="..\SlimMessageBus.Host\SlimMessageBus.Host.csproj" /> </ItemGroup> <ItemGroup> - <AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute"> - <_Parameter1>SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test</_Parameter1> - </AssemblyAttribute> - <AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute"> - <_Parameter1>DynamicProxyGenAssembly2</_Parameter1> - </AssemblyAttribute> + <InternalsVisibleTo Include="SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test" /> + <InternalsVisibleTo Include="DynamicProxyGenAssembly2" /> </ItemGroup> </Project> diff --git a/src/SlimMessageBus.Host.CircuitBreaker/Circuit.cs b/src/SlimMessageBus.Host.CircuitBreaker/Circuit.cs new file mode 100644 index 00000000..349530a4 --- /dev/null +++ b/src/SlimMessageBus.Host.CircuitBreaker/Circuit.cs @@ -0,0 +1,7 @@ +namespace SlimMessageBus.Host.CircuitBreaker; + +public enum Circuit +{ + Open, + Closed +} diff --git a/src/SlimMessageBus.Host.CircuitBreaker/Config/ConsumerBuilderExtensions.cs b/src/SlimMessageBus.Host.CircuitBreaker/Config/ConsumerBuilderExtensions.cs new file mode 100644 index 00000000..e1258a05 --- /dev/null +++ b/src/SlimMessageBus.Host.CircuitBreaker/Config/ConsumerBuilderExtensions.cs @@ -0,0 +1,26 @@ +namespace SlimMessageBus.Host.CircuitBreaker; + +using Microsoft.Extensions.DependencyInjection.Extensions; + +public static class ConsumerBuilderExtensions +{ + public static T AddConsumerCircuitBreakerType<T, TConsumerCircuitBreaker>(this T builder) + where T : AbstractConsumerBuilder + where TConsumerCircuitBreaker : IConsumerCircuitBreaker + { + if (builder is null) + { + throw new ArgumentNullException(nameof(builder)); + } + + var breakersTypes = builder.ConsumerSettings.GetOrCreate(ConsumerSettingsProperties.CircuitBreakerTypes, () => []); + breakersTypes.TryAdd<TConsumerCircuitBreaker>(); + + builder.PostConfigurationActions.Add(services => + { + services.TryAddEnumerable(ServiceDescriptor.Singleton<IAbstractConsumerInterceptor, CircuitBreakerAbstractConsumerInterceptor>()); + }); + + return builder; + } +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.CircuitBreaker/Config/ConsumerSettingsProperties.cs b/src/SlimMessageBus.Host.CircuitBreaker/Config/ConsumerSettingsProperties.cs new file mode 100644 index 00000000..5e441547 --- /dev/null +++ b/src/SlimMessageBus.Host.CircuitBreaker/Config/ConsumerSettingsProperties.cs @@ -0,0 +1,9 @@ +namespace SlimMessageBus.Host.CircuitBreaker; + +static internal class ConsumerSettingsProperties +{ + /// <summary> + /// <see cref="IConsumerCircuitBreaker"/> to be used with the consumer. + /// </summary> + static readonly internal ProviderExtensionProperty<TypeCollection<IConsumerCircuitBreaker>> CircuitBreakerTypes = new("CircuitBreaker_Types"); +} diff --git a/src/SlimMessageBus.Host.CircuitBreaker/GlobalUsings.cs b/src/SlimMessageBus.Host.CircuitBreaker/GlobalUsings.cs new file mode 100644 index 00000000..a4d02342 --- /dev/null +++ b/src/SlimMessageBus.Host.CircuitBreaker/GlobalUsings.cs @@ -0,0 +1,2 @@ +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.Logging; diff --git a/src/SlimMessageBus/IConsumerCircuitBreaker.cs b/src/SlimMessageBus.Host.CircuitBreaker/IConsumerCircuitBreaker.cs similarity index 76% rename from src/SlimMessageBus/IConsumerCircuitBreaker.cs rename to src/SlimMessageBus.Host.CircuitBreaker/IConsumerCircuitBreaker.cs index 27154566..453d149b 100644 --- a/src/SlimMessageBus/IConsumerCircuitBreaker.cs +++ b/src/SlimMessageBus.Host.CircuitBreaker/IConsumerCircuitBreaker.cs @@ -1,5 +1,5 @@ -namespace SlimMessageBus; - +namespace SlimMessageBus.Host.CircuitBreaker; + /// <summary> /// Circuit breaker to toggle consumer status on an external event. /// </summary> @@ -9,9 +9,3 @@ public interface IConsumerCircuitBreaker Task Subscribe(Func<Circuit, Task> onChange); void Unsubscribe(); } - -public enum Circuit -{ - Open, - Closed -} diff --git a/src/SlimMessageBus.Host.CircuitBreaker/Implementation/AbstractConsumerExtensions.cs b/src/SlimMessageBus.Host.CircuitBreaker/Implementation/AbstractConsumerExtensions.cs new file mode 100644 index 00000000..ebbc1b2f --- /dev/null +++ b/src/SlimMessageBus.Host.CircuitBreaker/Implementation/AbstractConsumerExtensions.cs @@ -0,0 +1,7 @@ +namespace SlimMessageBus.Host.CircuitBreaker; + +internal static class AbstractConsumerExtensions +{ + public static bool IsPaused(this AbstractConsumer consumer) => consumer.GetOrDefault(AbstractConsumerProperties.IsPaused, false); + public static void SetIsPaused(this AbstractConsumer consumer, bool isPaused) => AbstractConsumerProperties.IsPaused.Set(consumer, isPaused); +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.CircuitBreaker/Implementation/AbstractConsumerProperties.cs b/src/SlimMessageBus.Host.CircuitBreaker/Implementation/AbstractConsumerProperties.cs new file mode 100644 index 00000000..5daffa07 --- /dev/null +++ b/src/SlimMessageBus.Host.CircuitBreaker/Implementation/AbstractConsumerProperties.cs @@ -0,0 +1,7 @@ +namespace SlimMessageBus.Host.CircuitBreaker; + +static internal class AbstractConsumerProperties +{ + static readonly internal ProviderExtensionProperty<bool> IsPaused = new("CircuitBreaker_IsPaused"); + static readonly internal ProviderExtensionProperty<List<IConsumerCircuitBreaker>?> Breakers = new("CircuitBreaker_Breakers"); +} diff --git a/src/SlimMessageBus.Host.CircuitBreaker/Implementation/CircuitBreakerAbstractConsumerInterceptor.cs b/src/SlimMessageBus.Host.CircuitBreaker/Implementation/CircuitBreakerAbstractConsumerInterceptor.cs new file mode 100644 index 00000000..30658f06 --- /dev/null +++ b/src/SlimMessageBus.Host.CircuitBreaker/Implementation/CircuitBreakerAbstractConsumerInterceptor.cs @@ -0,0 +1,92 @@ +namespace SlimMessageBus.Host.CircuitBreaker; + +/// <summary> +/// Circuit breaker to toggle consumer status on an external events. +/// </summary> +internal sealed class CircuitBreakerAbstractConsumerInterceptor : IAbstractConsumerInterceptor +{ + public int Order => 100; + + public async Task<bool> CanStart(AbstractConsumer consumer) + { + var breakerTypes = consumer.Settings.SelectMany(x => x.GetOrDefault(ConsumerSettingsProperties.CircuitBreakerTypes, [])).ToHashSet(); + if (breakerTypes.Count == 0) + { + // no breakers, allow to pass + return true; + } + + var breakers = consumer.GetOrCreate(AbstractConsumerProperties.Breakers, () => [])!; + + async Task BreakerChanged(Circuit state) + { + if (!consumer.IsStarted) + { + return; + } + + var isPaused = consumer.IsPaused(); + var shouldPause = state == Circuit.Closed || breakers.Exists(x => x.State == Circuit.Closed); + if (shouldPause != isPaused) + { + var path = consumer.Path; + var bus = consumer.Settings[0].MessageBusSettings.Name ?? "default"; + if (shouldPause) + { + consumer.Logger.LogWarning("Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.", path, bus); + await consumer.DoStop().ConfigureAwait(false); + } + else + { + consumer.Logger.LogInformation("Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.", path, bus); + await consumer.DoStart().ConfigureAwait(false); + } + consumer.SetIsPaused(shouldPause); + } + } + + var sp = consumer.Settings.Select(x => x.MessageBusSettings.ServiceProvider).First(x => x != null); + foreach (var breakerType in breakerTypes) + { + var breaker = (IConsumerCircuitBreaker)ActivatorUtilities.CreateInstance(sp, breakerType, consumer.Settings); + breakers.Add(breaker); + + await breaker.Subscribe(BreakerChanged); + } + + var isPaused = breakers.Exists(x => x.State == Circuit.Closed); + consumer.SetIsPaused(isPaused); + return !isPaused; + } + + public async Task<bool> CanStop(AbstractConsumer consumer) + { + var breakers = consumer.GetOrDefault(AbstractConsumerProperties.Breakers, null); + if (breakers == null || breakers.Count == 0) + { + // no breakers, allow to pass + return true; + } + + foreach (var breaker in breakers) + { + breaker.Unsubscribe(); + + if (breaker is IAsyncDisposable asyncDisposable) + { + await asyncDisposable.DisposeAsync(); + } + else if (breaker is IDisposable disposable) + { + disposable.Dispose(); + } + } + breakers.Clear(); + + return !consumer.IsPaused(); + } + + public Task Started(AbstractConsumer consumer) => Task.CompletedTask; + + public Task Stopped(AbstractConsumer consumer) => Task.CompletedTask; +} diff --git a/src/SlimMessageBus.Host.CircuitBreaker/SlimMessageBus.Host.CircuitBreaker.csproj b/src/SlimMessageBus.Host.CircuitBreaker/SlimMessageBus.Host.CircuitBreaker.csproj new file mode 100644 index 00000000..5b7fe10c --- /dev/null +++ b/src/SlimMessageBus.Host.CircuitBreaker/SlimMessageBus.Host.CircuitBreaker.csproj @@ -0,0 +1,22 @@ +<Project Sdk="Microsoft.NET.Sdk"> + + <Import Project="../Host.Plugin.Properties.xml" /> + + <PropertyGroup> + <Description>Circuit breaker abstractions for SlimMessageBus</Description> + <PackageTags>Toggle consumer on or off</PackageTags> + <PackageIcon>icon.png</PackageIcon> + <PackageReleaseNotes /> + <Nullable>enable</Nullable> + </PropertyGroup> + + <ItemGroup> + <ProjectReference Include="..\SlimMessageBus.Host\SlimMessageBus.Host.csproj" /> + </ItemGroup> + + <ItemGroup> + <InternalsVisibleTo Include="SlimMessageBus.Host.CircuitBreaker.Test" /> + <InternalsVisibleTo Include="DynamicProxyGenAssembly2" /> + </ItemGroup> + +</Project> diff --git a/src/SlimMessageBus.Host.Configuration/Settings/AbstractConsumerSettings.cs b/src/SlimMessageBus.Host.Configuration/Settings/AbstractConsumerSettings.cs index a04fd491..71260121 100644 --- a/src/SlimMessageBus.Host.Configuration/Settings/AbstractConsumerSettings.cs +++ b/src/SlimMessageBus.Host.Configuration/Settings/AbstractConsumerSettings.cs @@ -23,11 +23,6 @@ public abstract class AbstractConsumerSettings : HasProviderExtensions /// </summary> public int Instances { get; set; } - /// <summary> - /// <see cref="IConsumerCircuitBreaker"/> to be used with the consumer. - /// </summary> - public TypeCollection<IConsumerCircuitBreaker> CircuitBreakers { get; } = []; - protected AbstractConsumerSettings() { Instances = 1; diff --git a/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj b/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj index 36463945..b01a3b49 100644 --- a/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj +++ b/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj @@ -6,7 +6,7 @@ <Description>Core configuration interfaces of SlimMessageBus</Description> <PackageTags>SlimMessageBus</PackageTags> <RootNamespace>SlimMessageBus.Host</RootNamespace> - <Version>3.0.0-rc900</Version> + <Version>3.0.0-rc901</Version> </PropertyGroup> <ItemGroup> diff --git a/src/SlimMessageBus.Host.Configuration/TypeCollection.cs b/src/SlimMessageBus.Host.Configuration/TypeCollection.cs index 04246947..f936bcfb 100644 --- a/src/SlimMessageBus.Host.Configuration/TypeCollection.cs +++ b/src/SlimMessageBus.Host.Configuration/TypeCollection.cs @@ -45,34 +45,25 @@ public bool TryAdd<T>() where T : TInterface public void Clear() => _innerList.Clear(); - public bool Contains<T>() where T : TInterface - { - return _innerList.Contains(typeof(T)); - } + public bool Contains<T>() where T : TInterface + => _innerList.Contains(typeof(T)); - public void CopyTo(Type[] array, int arrayIndex) => _innerList.CopyTo(array, arrayIndex); + public void CopyTo(Type[] array, int arrayIndex) + => _innerList.CopyTo(array, arrayIndex); - public bool Remove<T>() where T : TInterface - { - return _innerList.Remove(typeof(T)); - } + public bool Remove<T>() where T : TInterface + => _innerList.Remove(typeof(T)); - public bool Remove(Type type) - { - return _innerList.Remove(type); - } + public bool Remove(Type type) + => _innerList.Remove(type); public int Count => _innerList.Count; public bool IsReadOnly => false; - public IEnumerator<Type> GetEnumerator() - { - return _innerList.GetEnumerator(); - } + public IEnumerator<Type> GetEnumerator() + => _innerList.GetEnumerator(); - IEnumerator IEnumerable.GetEnumerator() - { - return _innerList.GetEnumerator(); - } + IEnumerator IEnumerable.GetEnumerator() + => _innerList.GetEnumerator(); } \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Interceptor/SlimMessageBus.Host.Interceptor.csproj b/src/SlimMessageBus.Host.Interceptor/SlimMessageBus.Host.Interceptor.csproj index 97abd9cc..b98b06a5 100644 --- a/src/SlimMessageBus.Host.Interceptor/SlimMessageBus.Host.Interceptor.csproj +++ b/src/SlimMessageBus.Host.Interceptor/SlimMessageBus.Host.Interceptor.csproj @@ -3,7 +3,7 @@ <Import Project="../Common.NuGet.Properties.xml" /> <PropertyGroup> - <Version>3.0.0-rc900</Version> + <Version>3.0.0-rc901</Version> <Description>Core interceptor interfaces of SlimMessageBus</Description> <PackageTags>SlimMessageBus</PackageTags> </PropertyGroup> diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs index cfb70f0f..13278bf0 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs @@ -15,8 +15,17 @@ public class KafkaGroupConsumer : AbstractConsumer, IKafkaCommitController public string Group { get; } public IReadOnlyCollection<string> Topics { get; } - public KafkaGroupConsumer(ILoggerFactory loggerFactory, KafkaMessageBusSettings providerSettings, IEnumerable<AbstractConsumerSettings> consumerSettings, string group, IReadOnlyCollection<string> topics, Func<TopicPartition, IKafkaCommitController, IKafkaPartitionConsumer> processorFactory) - : base(loggerFactory.CreateLogger<KafkaGroupConsumer>(), consumerSettings) + public KafkaGroupConsumer(ILoggerFactory loggerFactory, + KafkaMessageBusSettings providerSettings, + IEnumerable<AbstractConsumerSettings> consumerSettings, + IEnumerable<IAbstractConsumerInterceptor> interceptors, + string group, + IReadOnlyCollection<string> topics, + Func<TopicPartition, IKafkaCommitController, IKafkaPartitionConsumer> processorFactory) + : base(loggerFactory.CreateLogger<KafkaGroupConsumer>(), + consumerSettings, + group, + interceptors) { ProviderSettings = providerSettings ?? throw new ArgumentNullException(nameof(providerSettings)); Group = group ?? throw new ArgumentNullException(nameof(group)); diff --git a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs index 1c1f68ba..5bc1d1aa 100644 --- a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs +++ b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs @@ -1,5 +1,7 @@ namespace SlimMessageBus.Host.Kafka; +using Microsoft.Extensions.DependencyInjection; + using IProducer = Confluent.Kafka.IProducer<byte[], byte[]>; using Message = Confluent.Kafka.Message<byte[], byte[]>; @@ -64,7 +66,7 @@ protected override async Task CreateConsumers() void AddGroupConsumer(IEnumerable<AbstractConsumerSettings> consumerSettings, string group, IReadOnlyCollection<string> topics, Func<TopicPartition, IKafkaCommitController, IKafkaPartitionConsumer> processorFactory) { _logger.LogInformation("Creating consumer group {ConsumerGroup}", group); - AddConsumer(new KafkaGroupConsumer(LoggerFactory, ProviderSettings, consumerSettings, group, topics, processorFactory)); + AddConsumer(new KafkaGroupConsumer(LoggerFactory, ProviderSettings, consumerSettings, interceptors: Settings.ServiceProvider.GetServices<IAbstractConsumerInterceptor>(), group, topics, processorFactory)); } object MessageProvider(Type messageType, ConsumeResult<Ignore, byte[]> transportMessage) diff --git a/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs b/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs index 102ee6d5..2002f298 100644 --- a/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs +++ b/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs @@ -3,6 +3,8 @@ using System.Collections.Generic; using System.Threading; +using Microsoft.Extensions.DependencyInjection; + using MQTTnet.Extensions.ManagedClient; public class MqttMessageBus : MessageBusBase<MqttMessageBusSettings> @@ -55,7 +57,7 @@ protected override async Task CreateConsumers() void AddTopicConsumer(IEnumerable<AbstractConsumerSettings> consumerSettings, string topic, IMessageProcessor<MqttApplicationMessage> messageProcessor) { _logger.LogInformation("Creating consumer for {Path}", topic); - var consumer = new MqttTopicConsumer(LoggerFactory.CreateLogger<MqttTopicConsumer>(), consumerSettings, topic, messageProcessor); + var consumer = new MqttTopicConsumer(LoggerFactory.CreateLogger<MqttTopicConsumer>(), consumerSettings, interceptors: Settings.ServiceProvider.GetServices<IAbstractConsumerInterceptor>(), topic, messageProcessor); AddConsumer(consumer); } @@ -84,7 +86,7 @@ void AddTopicConsumer(IEnumerable<AbstractConsumerSettings> consumerSettings, st AddTopicConsumer([Settings.RequestResponse], Settings.RequestResponse.Path, processor); } - var topics = Consumers.Cast<MqttTopicConsumer>().Select(x => new MqttTopicFilterBuilder().WithTopic(x.Topic).Build()).ToList(); + var topics = Consumers.Cast<MqttTopicConsumer>().Select(x => new MqttTopicFilterBuilder().WithTopic(x.Path).Build()).ToList(); await _mqttClient.SubscribeAsync(topics).ConfigureAwait(false); } @@ -101,7 +103,7 @@ protected override async Task DestroyConsumers() private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { - var consumer = Consumers.Cast<MqttTopicConsumer>().FirstOrDefault(x => x.Topic == arg.ApplicationMessage.Topic); + var consumer = Consumers.Cast<MqttTopicConsumer>().FirstOrDefault(x => x.Path == arg.ApplicationMessage.Topic); if (consumer != null) { var headers = new Dictionary<string, object>(); diff --git a/src/SlimMessageBus.Host.Mqtt/MqttTopicConsumer.cs b/src/SlimMessageBus.Host.Mqtt/MqttTopicConsumer.cs index 7720782c..738abb8b 100644 --- a/src/SlimMessageBus.Host.Mqtt/MqttTopicConsumer.cs +++ b/src/SlimMessageBus.Host.Mqtt/MqttTopicConsumer.cs @@ -3,12 +3,17 @@ public class MqttTopicConsumer : AbstractConsumer { public IMessageProcessor<MqttApplicationMessage> MessageProcessor { get; } - public string Topic { get; } - public MqttTopicConsumer(ILogger logger, IEnumerable<AbstractConsumerSettings> consumerSettings, string topic, IMessageProcessor<MqttApplicationMessage> messageProcessor) - : base(logger, consumerSettings) + public MqttTopicConsumer(ILogger logger, + IEnumerable<AbstractConsumerSettings> consumerSettings, + IEnumerable<IAbstractConsumerInterceptor> interceptors, + string topic, + IMessageProcessor<MqttApplicationMessage> messageProcessor) + : base(logger, + consumerSettings, + topic, + interceptors) { - Topic = topic; MessageProcessor = messageProcessor; } diff --git a/src/SlimMessageBus.Host.Nats/NatsMessageBus.cs b/src/SlimMessageBus.Host.Nats/NatsMessageBus.cs index ec81318e..4a9681a6 100644 --- a/src/SlimMessageBus.Host.Nats/NatsMessageBus.cs +++ b/src/SlimMessageBus.Host.Nats/NatsMessageBus.cs @@ -1,5 +1,6 @@ namespace SlimMessageBus.Host.Nats; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Primitives; public class NatsMessageBus : MessageBusBase<NatsMessageBusSettings> @@ -78,7 +79,7 @@ protected override async Task CreateConsumers() private void AddSubjectConsumer(IEnumerable<AbstractConsumerSettings> consumerSettings, string subject, IMessageProcessor<NatsMsg<byte[]>> processor) { _logger.LogInformation("Creating consumer for {Subject}", subject); - var consumer = new NatsSubjectConsumer<byte[]>(LoggerFactory.CreateLogger<NatsSubjectConsumer<byte[]>>(), consumerSettings, subject, _connection, processor); + var consumer = new NatsSubjectConsumer<byte[]>(LoggerFactory.CreateLogger<NatsSubjectConsumer<byte[]>>(), consumerSettings, interceptors: Settings.ServiceProvider.GetServices<IAbstractConsumerInterceptor>(), subject, _connection, processor); AddConsumer(consumer); } diff --git a/src/SlimMessageBus.Host.Nats/NatsSubjectConsumer.cs b/src/SlimMessageBus.Host.Nats/NatsSubjectConsumer.cs index 43a9f2cf..625dec57 100644 --- a/src/SlimMessageBus.Host.Nats/NatsSubjectConsumer.cs +++ b/src/SlimMessageBus.Host.Nats/NatsSubjectConsumer.cs @@ -1,27 +1,31 @@ #nullable enable namespace SlimMessageBus.Host.Nats; -using System.Collections.Generic; - public class NatsSubjectConsumer<TType> : AbstractConsumer { - private readonly string _subject; private readonly INatsConnection _connection; private readonly IMessageProcessor<NatsMsg<TType>> _messageProcessor; private INatsSub<TType>? _subscription; private Task? _messageConsumerTask; - public NatsSubjectConsumer(ILogger logger, IEnumerable<AbstractConsumerSettings> consumerSettings, string subject, INatsConnection connection, IMessageProcessor<NatsMsg<TType>> messageProcessor) - : base(logger, consumerSettings) + public NatsSubjectConsumer(ILogger logger, + IEnumerable<AbstractConsumerSettings> consumerSettings, + IEnumerable<IAbstractConsumerInterceptor> interceptors, + string subject, + INatsConnection connection, + IMessageProcessor<NatsMsg<TType>> messageProcessor) + : base(logger, + consumerSettings, + path: subject, + interceptors) { - _subject = subject; _connection = connection; _messageProcessor = messageProcessor; } protected override async Task OnStart() { - _subscription ??= await _connection.SubscribeCoreAsync<TType>(_subject, cancellationToken: CancellationToken); + _subscription ??= await _connection.SubscribeCoreAsync<TType>(Path, cancellationToken: CancellationToken); _messageConsumerTask = Task.Factory.StartNew(OnLoop, CancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap(); } diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/AbstractRabbitMqConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/AbstractRabbitMqConsumer.cs index 47e4e630..05c40cc5 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/AbstractRabbitMqConsumer.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/AbstractRabbitMqConsumer.cs @@ -10,15 +10,21 @@ public abstract class AbstractRabbitMqConsumer : AbstractConsumer private AsyncEventingBasicConsumer _consumer; private string _consumerTag; - public string QueueName { get; } protected abstract RabbitMqMessageAcknowledgementMode AcknowledgementMode { get; } - protected AbstractRabbitMqConsumer(ILogger logger, IEnumerable<AbstractConsumerSettings> consumerSettings, IRabbitMqChannel channel, string queueName, IHeaderValueConverter headerValueConverter) - : base(logger, consumerSettings) + protected AbstractRabbitMqConsumer(ILogger logger, + IEnumerable<AbstractConsumerSettings> consumerSettings, + IEnumerable<IAbstractConsumerInterceptor> interceptors, + IRabbitMqChannel channel, + string queueName, + IHeaderValueConverter headerValueConverter) + : base(logger, + consumerSettings, + path: queueName, + interceptors) { _channel = channel; _headerValueConverter = headerValueConverter; - QueueName = queueName; } protected override Task OnStart() @@ -28,7 +34,7 @@ protected override Task OnStart() lock (_channel.ChannelLock) { - _consumerTag = _channel.Channel.BasicConsume(QueueName, autoAck: AcknowledgementMode == RabbitMqMessageAcknowledgementMode.AckAutomaticByRabbit, _consumer); + _consumerTag = _channel.Channel.BasicConsume(Path, autoAck: AcknowledgementMode == RabbitMqMessageAcknowledgementMode.AckAutomaticByRabbit, _consumer); } return Task.CompletedTask; @@ -54,7 +60,7 @@ protected async Task OnMessageReceived(object sender, BasicDeliverEventArgs @eve return; } - Logger.LogDebug("Message arrived on queue {QueueName} from exchange {ExchangeName} with delivery tag {DeliveryTag}", QueueName, @event.Exchange, @event.DeliveryTag); + Logger.LogDebug("Message arrived on queue {QueueName} from exchange {ExchangeName} with delivery tag {DeliveryTag}", Path, @event.Exchange, @event.DeliveryTag); Exception exception; try { @@ -76,7 +82,7 @@ protected async Task OnMessageReceived(object sender, BasicDeliverEventArgs @eve } if (exception != null) { - Logger.LogError(exception, "Error while processing message on queue {QueueName} from exchange {ExchangeName}: {ErrorMessage}", QueueName, @event.Exchange, exception.Message); + Logger.LogError(exception, "Error while processing message on queue {QueueName} from exchange {ExchangeName}: {ErrorMessage}", Path, @event.Exchange, exception.Message); } } diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs index f2f18de2..f12ad27a 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs @@ -56,7 +56,7 @@ public async Task<ProcessMessageResult> ProcessMessage(BasicDeliverEventArgs tra if (r.Exception != null) { // We rely on the IMessageProcessor to execute the ConsumerErrorHandler<T>, but if it's not registered in the DI, it fails, or there is another fatal error then the message will be lost. - _logger.LogError(r.Exception, "Exchange {Exchange} - Queue {Queue}: Error processing message {Message}, delivery tag {DeliveryTag}", transportMessage.Exchange, _consumer.QueueName, transportMessage, transportMessage.DeliveryTag); + _logger.LogError(r.Exception, "Exchange {Exchange} - Queue {Queue}: Error processing message {Message}, delivery tag {DeliveryTag}", transportMessage.Exchange, _consumer.Path, transportMessage, transportMessage.DeliveryTag); } return r; } diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs index e9018b58..f6ed3ec5 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs @@ -1,8 +1,10 @@ namespace SlimMessageBus.Host.RabbitMQ; +using Microsoft.Extensions.DependencyInjection; + public interface IRabbitMqConsumer { - string QueueName { get; } + string Path { get; } void ConfirmMessage(BasicDeliverEventArgs transportMessage, RabbitMqMessageConfirmOptions option, IDictionary<string, object> properties, bool warnIfAlreadyConfirmed = false); } @@ -24,7 +26,12 @@ public RabbitMqConsumer( MessageBusBase messageBus, MessageProvider<BasicDeliverEventArgs> messageProvider, IHeaderValueConverter headerValueConverter) - : base(loggerFactory.CreateLogger<RabbitMqConsumer>(), consumers, channel, queueName, headerValueConverter) + : base(loggerFactory.CreateLogger<RabbitMqConsumer>(), + consumers, + messageBus.Settings.ServiceProvider.GetServices<IAbstractConsumerInterceptor>(), + channel, + queueName, + headerValueConverter) { _acknowledgementMode = consumers.Select(x => x.GetOrDefault<RabbitMqMessageAcknowledgementMode?>(RabbitMqProperties.MessageAcknowledgementMode, messageBus.Settings)).FirstOrDefault(x => x != null) ?? RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade; // be default choose the safer acknowledgement mode @@ -99,7 +106,7 @@ public void ConfirmMessage(BasicDeliverEventArgs transportMessage, RabbitMqMessa // Note: We want to makes sure the 1st message confirmation is handled if (warnIfAlreadyConfirmed) { - Logger.LogWarning("Exchange {Exchange} - Queue {Queue}: The message (delivery tag {MessageDeliveryTag}) was already confirmed, subsequent message confirmation will have no effect", transportMessage.Exchange, QueueName, transportMessage.DeliveryTag); + Logger.LogWarning("Exchange {Exchange} - Queue {Queue}: The message (delivery tag {MessageDeliveryTag}) was already confirmed, subsequent message confirmation will have no effect", transportMessage.Exchange, Path, transportMessage.DeliveryTag); } return; } @@ -139,7 +146,7 @@ protected override async Task<Exception> OnMessageReceived(Dictionary<string, ob } else { - Logger.LogDebug("Exchange {Exchange} - Queue {Queue}: No message processor found for routing key {RoutingKey}", transportMessage.Exchange, QueueName, transportMessage.RoutingKey); + Logger.LogDebug("Exchange {Exchange} - Queue {Queue}: No message processor found for routing key {RoutingKey}", transportMessage.Exchange, Path, transportMessage.RoutingKey); } // error handling happens in the message processor diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs index ca63dc73..f07ae3cf 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs @@ -1,5 +1,7 @@ namespace SlimMessageBus.Host.RabbitMQ; +using System.Collections.Generic; + public class RabbitMqResponseConsumer : AbstractRabbitMqConsumer { private readonly IMessageProcessor<BasicDeliverEventArgs> _messageProcessor; @@ -8,6 +10,7 @@ public class RabbitMqResponseConsumer : AbstractRabbitMqConsumer public RabbitMqResponseConsumer( ILoggerFactory loggerFactory, + IEnumerable<IAbstractConsumerInterceptor> interceptors, IRabbitMqChannel channel, string queueName, RequestResponseSettings requestResponseSettings, @@ -15,7 +18,13 @@ public RabbitMqResponseConsumer( IPendingRequestStore pendingRequestStore, ICurrentTimeProvider currentTimeProvider, IHeaderValueConverter headerValueConverter) - : base(loggerFactory.CreateLogger<RabbitMqResponseConsumer>(), [requestResponseSettings], channel, queueName, headerValueConverter) + + : base(loggerFactory.CreateLogger<RabbitMqResponseConsumer>(), + [requestResponseSettings], + interceptors, + channel, + queueName, + headerValueConverter) { _messageProcessor = new ResponseMessageProcessor<BasicDeliverEventArgs>(loggerFactory, requestResponseSettings, messageProvider, pendingRequestStore, currentTimeProvider); } diff --git a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs index 5a354453..2e5d034c 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs @@ -1,4 +1,7 @@ namespace SlimMessageBus.Host.RabbitMQ; + +using Microsoft.Extensions.DependencyInjection; + public class RabbitMqMessageBus : MessageBusBase<RabbitMqMessageBusSettings>, IRabbitMqChannel { private readonly ILogger _logger; @@ -51,6 +54,7 @@ protected override async Task CreateConsumers() if (Settings.RequestResponse != null) { AddConsumer(new RabbitMqResponseConsumer(LoggerFactory, + interceptors: Settings.ServiceProvider.GetServices<IAbstractConsumerInterceptor>(), channel: this, queueName: Settings.RequestResponse.GetQueueName(), Settings.RequestResponse, diff --git a/src/SlimMessageBus.Host.Redis/Consumers/RedisListCheckerConsumer.cs b/src/SlimMessageBus.Host.Redis/Consumers/RedisListCheckerConsumer.cs index 8105bb1b..466b772a 100644 --- a/src/SlimMessageBus.Host.Redis/Consumers/RedisListCheckerConsumer.cs +++ b/src/SlimMessageBus.Host.Redis/Consumers/RedisListCheckerConsumer.cs @@ -23,8 +23,17 @@ public QueueProcessors(string name, List<IMessageProcessor<MessageWithHeaders>> } } - public RedisListCheckerConsumer(ILogger<RedisListCheckerConsumer> logger, IDatabase database, TimeSpan? pollDelay, TimeSpan maxIdle, IEnumerable<(string QueueName, IMessageProcessor<MessageWithHeaders> Processor)> queues, IMessageSerializer envelopeSerializer) - : base(logger, []) + public RedisListCheckerConsumer(ILogger<RedisListCheckerConsumer> logger, + IEnumerable<IAbstractConsumerInterceptor> interceptors, + IDatabase database, + TimeSpan? pollDelay, + TimeSpan maxIdle, + IEnumerable<(string QueueName, IMessageProcessor<MessageWithHeaders> Processor)> queues, + IMessageSerializer envelopeSerializer) + : base(logger, + [], + path: string.Join("|", queues.Select(x => x.QueueName)), + interceptors) { _database = database; _pollDelay = pollDelay; diff --git a/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs b/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs index ad79f383..a336aee4 100644 --- a/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs +++ b/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs @@ -7,12 +7,18 @@ public class RedisTopicConsumer : AbstractConsumer, IRedisConsumer private ChannelMessageQueue _channelMessageQueue; private readonly IMessageProcessor<MessageWithHeaders> _messageProcessor; - public string Path { get; } - - public RedisTopicConsumer(ILogger<RedisTopicConsumer> logger, IEnumerable<AbstractConsumerSettings> consumerSettings, string topic, ISubscriber subscriber, IMessageProcessor<MessageWithHeaders> messageProcessor, IMessageSerializer envelopeSerializer) - : base(logger, consumerSettings) + public RedisTopicConsumer(ILogger<RedisTopicConsumer> logger, + IEnumerable<AbstractConsumerSettings> consumerSettings, + IEnumerable<IAbstractConsumerInterceptor> interceptors, + string topic, + ISubscriber subscriber, + IMessageProcessor<MessageWithHeaders> messageProcessor, + IMessageSerializer envelopeSerializer) + : base(logger, + consumerSettings, + path: topic, + interceptors) { - Path = topic; _messageProcessor = messageProcessor; _envelopeSerializer = envelopeSerializer; _subscriber = subscriber ?? throw new ArgumentNullException(nameof(subscriber)); diff --git a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs index 1802eb3a..d7016aec 100644 --- a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs +++ b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs @@ -1,5 +1,7 @@ namespace SlimMessageBus.Host.Redis; +using Microsoft.Extensions.DependencyInjection; + public class RedisMessageBus : MessageBusBase<RedisMessageBusSettings> { private readonly ILogger _logger; @@ -95,6 +97,7 @@ void AddTopicConsumer(IEnumerable<AbstractConsumerSettings> consumerSettings, st var consumer = new RedisTopicConsumer( LoggerFactory.CreateLogger<RedisTopicConsumer>(), consumerSettings, + interceptors: Settings.ServiceProvider.GetServices<IAbstractConsumerInterceptor>(), topic, subscriber, messageProcessor, @@ -152,7 +155,7 @@ void AddTopicConsumer(IEnumerable<AbstractConsumerSettings> consumerSettings, st if (queues.Count > 0) { - AddConsumer(new RedisListCheckerConsumer(LoggerFactory.CreateLogger<RedisListCheckerConsumer>(), Database, ProviderSettings.QueuePollDelay, ProviderSettings.QueuePollMaxIdle, queues, ProviderSettings.EnvelopeSerializer)); + AddConsumer(new RedisListCheckerConsumer(LoggerFactory.CreateLogger<RedisListCheckerConsumer>(), Settings.ServiceProvider.GetServices<IAbstractConsumerInterceptor>(), Database, ProviderSettings.QueuePollDelay, ProviderSettings.QueuePollMaxIdle, queues, ProviderSettings.EnvelopeSerializer)); } } diff --git a/src/SlimMessageBus.Host.Serialization/SlimMessageBus.Host.Serialization.csproj b/src/SlimMessageBus.Host.Serialization/SlimMessageBus.Host.Serialization.csproj index 16df001a..4a23fc6e 100644 --- a/src/SlimMessageBus.Host.Serialization/SlimMessageBus.Host.Serialization.csproj +++ b/src/SlimMessageBus.Host.Serialization/SlimMessageBus.Host.Serialization.csproj @@ -3,7 +3,7 @@ <Import Project="../Common.NuGet.Properties.xml" /> <PropertyGroup> - <Version>3.0.0-rc900</Version> + <Version>3.0.0-rc901</Version> <Description>Core serialization interfaces of SlimMessageBus</Description> <PackageTags>SlimMessageBus</PackageTags> </PropertyGroup> diff --git a/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs b/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs index 28745d69..0106d1e1 100644 --- a/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs +++ b/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs @@ -1,48 +1,99 @@ namespace SlimMessageBus.Host; -public abstract class AbstractConsumer : IAsyncDisposable, IConsumerControl +public abstract class AbstractConsumer : HasProviderExtensions, IAsyncDisposable, IConsumerControl { private readonly SemaphoreSlim _semaphore; - private readonly List<IConsumerCircuitBreaker> _circuitBreakers; - + private readonly IReadOnlyList<IAbstractConsumerInterceptor> _interceptors; private CancellationTokenSource _cancellationTokenSource; private bool _starting; private bool _stopping; - public bool IsPaused { get; private set; } public bool IsStarted { get; private set; } - protected ILogger Logger { get; } - protected IReadOnlyList<AbstractConsumerSettings> Settings { get; } + public string Path { get; } + public ILogger Logger { get; } + public IReadOnlyList<AbstractConsumerSettings> Settings { get; } protected CancellationToken CancellationToken => _cancellationTokenSource.Token; - protected AbstractConsumer(ILogger logger, IEnumerable<AbstractConsumerSettings> consumerSettings) + protected AbstractConsumer(ILogger logger, + IEnumerable<AbstractConsumerSettings> consumerSettings, + string path, + IEnumerable<IAbstractConsumerInterceptor> interceptors) { _semaphore = new(1, 1); - _circuitBreakers = []; - + _interceptors = [.. interceptors.OrderBy(x => x.Order)]; Logger = logger; - Settings = consumerSettings.ToList(); + Settings = [.. consumerSettings]; + Path = path; } - public async Task Start() + private async Task<bool> CallInterceptor(Func<IAbstractConsumerInterceptor, Task<bool>> func) { - async Task StartCircuitBreakers() + foreach (var interceptor in _interceptors) { - var types = Settings.SelectMany(x => x.CircuitBreakers).Distinct(); - if (!types.Any()) + try { - return; + if (!await func(interceptor)) + { + return false; + } } - - var sp = Settings.Select(x => x.MessageBusSettings.ServiceProvider).FirstOrDefault(x => x != null); - foreach (var type in types.Distinct()) + catch (Exception e) { - var breaker = (IConsumerCircuitBreaker)ActivatorUtilities.CreateInstance(sp, type, Settings); - _circuitBreakers.Add(breaker); - await breaker.Subscribe(BreakerChanged); + Logger.LogError(e, "Interceptor {Interceptor} failed with error: {Error}", interceptor.GetType().Name, e.Message); } } + return true; + } + + /// <summary> + /// Starts the underyling transport consumer (synchronized). + /// </summary> + /// <returns></returns> + public async Task DoStart() + { + await _semaphore.WaitAsync(); + try + { + await InternalOnStart(); + } + finally + { + _semaphore.Release(); + } + } + private async Task InternalOnStart() + { + await OnStart(); + await CallInterceptor(async x => { await x.Started(this); return true; }); + } + + private async Task InternalOnStop() + { + await OnStop().ConfigureAwait(false); + await CallInterceptor(async x => { await x.Stopped(this); return true; }); + } + + /// <summary> + /// Stops the underyling transport consumer (synchronized). + /// </summary> + /// <returns></returns> + public async Task DoStop() + { + await _semaphore.WaitAsync().ConfigureAwait(false); + try + { + await OnStop().ConfigureAwait(false); + await CallInterceptor(async x => { await x.Stopped(this); return true; }); + } + finally + { + _semaphore.Release(); + } + } + + public async Task Start() + { if (IsStarted || _starting) { return; @@ -58,11 +109,9 @@ async Task StartCircuitBreakers() _cancellationTokenSource = new CancellationTokenSource(); } - await StartCircuitBreakers(); - IsPaused = _circuitBreakers.Exists(x => x.State == Circuit.Closed); - if (!IsPaused) + if (await CallInterceptor(x => x.CanStart(this))) { - await OnStart().ConfigureAwait(false); + await InternalOnStart(); } IsStarted = true; @@ -76,25 +125,6 @@ async Task StartCircuitBreakers() public async Task Stop() { - async Task StopCircuitBreakers() - { - foreach (var breaker in _circuitBreakers) - { - breaker.Unsubscribe(); - - if (breaker is IAsyncDisposable asyncDisposable) - { - await asyncDisposable.DisposeAsync(); - } - else if (breaker is IDisposable disposable) - { - disposable.Dispose(); - } - } - - _circuitBreakers.Clear(); - } - if (!IsStarted || _stopping) { return; @@ -106,10 +136,9 @@ async Task StopCircuitBreakers() { await _cancellationTokenSource.CancelAsync(); - await StopCircuitBreakers(); - if (!IsPaused) + if (await CallInterceptor(x => x.CanStop(this))) { - await OnStop().ConfigureAwait(false); + await InternalOnStop(); } IsStarted = false; @@ -121,8 +150,17 @@ async Task StopCircuitBreakers() } } - protected abstract Task OnStart(); - protected abstract Task OnStop(); + /// <summary> + /// Initializes the transport specific consumer loop after the consumer has been started. + /// </summary> + /// <returns></returns> + internal protected abstract Task OnStart(); + + /// <summary> + /// Destroys the transport specific consumer loop before the consumer is stopped. + /// </summary> + /// <returns></returns> + internal protected abstract Task OnStop(); #region IAsyncDisposable @@ -141,40 +179,4 @@ protected async virtual ValueTask DisposeAsyncCore() } #endregion - - async internal Task BreakerChanged(Circuit state) - { - await _semaphore.WaitAsync(); - try - { - if (!IsStarted) - { - return; - } - - var shouldPause = state == Circuit.Closed || _circuitBreakers.Exists(x => x.State == Circuit.Closed); - if (shouldPause != IsPaused) - { - var settings = Settings.Count > 0 ? Settings[0] : null; - var path = settings?.Path ?? "[unknown path]"; - var bus = settings?.MessageBusSettings?.Name ?? "default"; - if (shouldPause) - { - Logger.LogWarning("Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.", path, bus); - await OnStop().ConfigureAwait(false); - } - else - { - Logger.LogInformation("Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.", path, bus); - await OnStart().ConfigureAwait(false); - } - - IsPaused = shouldPause; - } - } - finally - { - _semaphore.Release(); - } - } } diff --git a/src/SlimMessageBus.Host/Consumer/IAbstractConsumerInterceptor.cs b/src/SlimMessageBus.Host/Consumer/IAbstractConsumerInterceptor.cs new file mode 100644 index 00000000..953be4d9 --- /dev/null +++ b/src/SlimMessageBus.Host/Consumer/IAbstractConsumerInterceptor.cs @@ -0,0 +1,33 @@ +namespace SlimMessageBus.Host; + +/// <summary> +/// Interceptor for consumers that are of type <see cref="AbstractConsumer"/>. +/// </summary> +public interface IAbstractConsumerInterceptor : IInterceptorWithOrder +{ + /// <summary> + /// Called to check if the consumer can be started. + /// </summary> + /// <param name="consumer"></param> + /// <returns>True if the start is allowed</returns> + Task<bool> CanStart(AbstractConsumer consumer); + + /// <summary> + /// Called to check if the consumer can be stopped. + /// </summary> + /// <param name="consumer"></param> + /// <returns>True if the stop is allowed</returns> + Task<bool> CanStop(AbstractConsumer consumer); + + /// <summary> + /// Called when the consumer is started. + /// </summary> + /// <returns></returns> + Task Started(AbstractConsumer consumer); + + /// <summary> + /// Called when the consumer is stopped. + /// </summary> + /// <returns></returns> + Task Stopped(AbstractConsumer consumer); +} diff --git a/src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs b/src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs index 7378e100..8e3a4ae8 100644 --- a/src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs +++ b/src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs @@ -26,7 +26,7 @@ public static IServiceCollection AddSlimMessageBus(this IServiceCollection servi configure(mbb); // Execute post config actions for the master bus and its children - foreach (var postConfigure in mbb.PostConfigurationActions.Concat(mbb.Children.Values.SelectMany(x => x.PostConfigurationActions))) + foreach (var postConfigure in mbb.GetPostConfigurationActions()) { postConfigure(services); } diff --git a/src/SlimMessageBus.Host/SlimMessageBus.Host.csproj b/src/SlimMessageBus.Host/SlimMessageBus.Host.csproj index 1bbb7a7a..b07b9317 100644 --- a/src/SlimMessageBus.Host/SlimMessageBus.Host.csproj +++ b/src/SlimMessageBus.Host/SlimMessageBus.Host.csproj @@ -28,7 +28,7 @@ </ItemGroup> <ItemGroup> - <InternalsVisibleTo Include="SlimMessageBus.Host.Test" /> + <InternalsVisibleTo Include="SlimMessageBus.Host.Test" /> </ItemGroup> </Project> diff --git a/src/SlimMessageBus.sln b/src/SlimMessageBus.sln index 7c230bbb..ae2b40a4 100644 --- a/src/SlimMessageBus.sln +++ b/src/SlimMessageBus.sln @@ -259,7 +259,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.Circuit EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test", "Tests\SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test\SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test.csproj", "{CA02D82E-DACC-4AB5-BD6B-071341E9E664}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.CircuitBreaker.HealthCheck", "Samples\Sample.CircuitBreaker.HealthCheck\Sample.CircuitBreaker.HealthCheck.csproj", "{226FC4F3-01EF-4214-9566-942CA0FB71B0}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.CircuitBreaker.HealthCheck", "Samples\Sample.CircuitBreaker.HealthCheck\Sample.CircuitBreaker.HealthCheck.csproj", "{226FC4F3-01EF-4214-9566-942CA0FB71B0}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.Outbox.Sql.Test", "Tests\SlimMessageBus.Host.Outbox.Sql.Test\SlimMessageBus.Host.Outbox.Sql.Test.csproj", "{CDF578D6-FE85-4A44-A99A-32490F047FDA}" EndProject @@ -282,6 +282,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.AmazonS EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.AmazonSQS.Test", "Tests\SlimMessageBus.Host.AmazonSQS.Test\SlimMessageBus.Host.AmazonSQS.Test.csproj", "{9255A33D-9697-4E69-9418-AD31656FF8AC}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.CircuitBreaker", "SlimMessageBus.Host.CircuitBreaker\SlimMessageBus.Host.CircuitBreaker.csproj", "{2FC8813B-D882-4B08-A886-5C6C6F8CB334}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.CircuitBreaker.Test", "Tests\SlimMessageBus.Host.CircuitBreaker.Test\SlimMessageBus.Host.CircuitBreaker.Test.csproj", "{B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -890,6 +894,22 @@ Global {9255A33D-9697-4E69-9418-AD31656FF8AC}.Release|Any CPU.Build.0 = Release|Any CPU {9255A33D-9697-4E69-9418-AD31656FF8AC}.Release|x86.ActiveCfg = Release|Any CPU {9255A33D-9697-4E69-9418-AD31656FF8AC}.Release|x86.Build.0 = Release|Any CPU + {2FC8813B-D882-4B08-A886-5C6C6F8CB334}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2FC8813B-D882-4B08-A886-5C6C6F8CB334}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2FC8813B-D882-4B08-A886-5C6C6F8CB334}.Debug|x86.ActiveCfg = Debug|Any CPU + {2FC8813B-D882-4B08-A886-5C6C6F8CB334}.Debug|x86.Build.0 = Debug|Any CPU + {2FC8813B-D882-4B08-A886-5C6C6F8CB334}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2FC8813B-D882-4B08-A886-5C6C6F8CB334}.Release|Any CPU.Build.0 = Release|Any CPU + {2FC8813B-D882-4B08-A886-5C6C6F8CB334}.Release|x86.ActiveCfg = Release|Any CPU + {2FC8813B-D882-4B08-A886-5C6C6F8CB334}.Release|x86.Build.0 = Release|Any CPU + {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}.Debug|x86.ActiveCfg = Debug|Any CPU + {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}.Debug|x86.Build.0 = Debug|Any CPU + {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}.Release|Any CPU.Build.0 = Release|Any CPU + {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}.Release|x86.ActiveCfg = Release|Any CPU + {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -982,6 +1002,8 @@ Global {9FCBF788-1F0C-43E2-909D-1F96B2685F38} = {9F005B5C-A856-4351-8C0C-47A8B785C637} {4DF4BC7C-5EE3-4310-BC40-054C1494444E} = {9291D340-B4FA-44A3-8060-C14743FB1712} {9255A33D-9697-4E69-9418-AD31656FF8AC} = {9F005B5C-A856-4351-8C0C-47A8B785C637} + {2FC8813B-D882-4B08-A886-5C6C6F8CB334} = {FE36338C-0DA2-499E-92CA-F9D5CE594B9F} + {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA} = {9F005B5C-A856-4351-8C0C-47A8B785C637} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {435A0D65-610C-4B84-B1AA-2C7FBE72DB80} diff --git a/src/SlimMessageBus/SlimMessageBus.csproj b/src/SlimMessageBus/SlimMessageBus.csproj index c8a0c17f..569adb31 100644 --- a/src/SlimMessageBus/SlimMessageBus.csproj +++ b/src/SlimMessageBus/SlimMessageBus.csproj @@ -3,7 +3,7 @@ <Import Project="../Common.NuGet.Properties.xml" /> <PropertyGroup> - <Version>3.0.0-rc900</Version> + <Version>3.0.0-rc902</Version> <Description> This library provides a lightweight, easy-to-use message bus interface for .NET, offering a simplified facade for working with messaging brokers. It supports multiple transport providers for popular messaging systems, as well as in-memory (in-process) messaging for efficient local communication. diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/GlobalUsings.cs b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/GlobalUsings.cs index 9434d91d..6bd34e2a 100644 --- a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/GlobalUsings.cs +++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/GlobalUsings.cs @@ -1,12 +1,7 @@ -global using System; -global using System.Collections.Generic; -global using System.Threading.Tasks; - -global using FluentAssertions; - -global using Microsoft.Extensions.Diagnostics.HealthChecks; -global using Microsoft.Extensions.Logging.Abstractions; - -global using Moq; - +global using FluentAssertions; + +global using Microsoft.Extensions.Diagnostics.HealthChecks; + +global using Moq; + global using Xunit; diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckBackgroundServiceTests.cs b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckBackgroundServiceTests.cs index 35986e86..b83e0ab3 100644 --- a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckBackgroundServiceTests.cs +++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckBackgroundServiceTests.cs @@ -1,5 +1,5 @@ -namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test; - +namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test; + public static class HealthCheckBackgroundServiceTests { public class AreEqualTests diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckCircuitBreakerTests.cs b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckCircuitBreakerTests.cs index 48499879..d7ce443a 100644 --- a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckCircuitBreakerTests.cs +++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckCircuitBreakerTests.cs @@ -1,4 +1,7 @@ -namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test; +namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test; + +using Microsoft.Extensions.Diagnostics.HealthChecks; + public class HealthCheckCircuitBreakerTests { private readonly Mock<IHealthCheckHostBreaker> _hostMock; diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test.csproj b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test.csproj index 909dcfbe..74f0b935 100644 --- a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test.csproj +++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test.csproj @@ -12,6 +12,7 @@ <ItemGroup> <ProjectReference Include="..\..\SlimMessageBus.Host.CircuitBreaker.HealthCheck\SlimMessageBus.Host.CircuitBreaker.HealthCheck.csproj" /> + <ProjectReference Include="..\SlimMessageBus.Host.Test.Common\SlimMessageBus.Host.Test.Common.csproj" /> </ItemGroup> <ItemGroup> diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/GlobalUsings.cs b/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/GlobalUsings.cs new file mode 100644 index 00000000..820b49b3 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/GlobalUsings.cs @@ -0,0 +1,8 @@ +global using FluentAssertions; + +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.DependencyInjection.Extensions; +global using Microsoft.Extensions.Logging; +global using Microsoft.Extensions.Logging.Abstractions; + +global using Xunit; diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/HealthCheckCircuitBreakerAbstractConsumerInterceptorTests.cs b/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/HealthCheckCircuitBreakerAbstractConsumerInterceptorTests.cs new file mode 100644 index 00000000..8003c373 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/HealthCheckCircuitBreakerAbstractConsumerInterceptorTests.cs @@ -0,0 +1,140 @@ +namespace SlimMessageBus.Host.CircuitBreaker.Test; + +public class CircuitBreakerAbstractConsumerInterceptorTests +{ + private class TestConsumer(ILogger logger, IEnumerable<AbstractConsumerSettings> settings, IEnumerable<IAbstractConsumerInterceptor> interceptors) + : AbstractConsumer(logger, settings, "path", interceptors) + { + protected override Task OnStart() => Task.CompletedTask; + protected override Task OnStop() => Task.CompletedTask; + } + + private class TestConsumerSettings : AbstractConsumerSettings; + + public class CircuitBreakerAccessor + { + public Circuit State { get; set; } + public int SubscribeCallCount { get; set; } = 0; + public int UnsubscribeCallCount { get; set; } = 0; + public Func<Circuit, Task>? OnChange { get; set; } + } + + private class TestCircuitBreaker : IConsumerCircuitBreaker + { + private readonly CircuitBreakerAccessor _accessor; + + public TestCircuitBreaker(CircuitBreakerAccessor accessor, IEnumerable<AbstractConsumerSettings> settings) + { + _accessor = accessor; + Settings = settings; + State = Circuit.Open; + } + + public Circuit State + { + get => _accessor.State; + set => _accessor.State = value; + } + public IEnumerable<AbstractConsumerSettings> Settings { get; } + + public Task Subscribe(Func<Circuit, Task> onChange) + { + _accessor.SubscribeCallCount++; + _accessor.OnChange = onChange; + + return Task.CompletedTask; + } + + public void Unsubscribe() + { + _accessor.UnsubscribeCallCount++; + } + } + + private readonly List<AbstractConsumerSettings> _settings; + private readonly TestConsumer _target; + private readonly CircuitBreakerAccessor accessor; + + public CircuitBreakerAbstractConsumerInterceptorTests() + { + accessor = new CircuitBreakerAccessor(); + + var h = new CircuitBreakerAbstractConsumerInterceptor(); + + var serviceCollection = new ServiceCollection(); + serviceCollection.TryAddSingleton(accessor); + serviceCollection.TryAddTransient<TestCircuitBreaker>(); + serviceCollection.TryAddEnumerable(ServiceDescriptor.Singleton<IAbstractConsumerInterceptor>(h)); + + var testSettings = new TestConsumerSettings + { + MessageBusSettings = new MessageBusSettings { ServiceProvider = serviceCollection.BuildServiceProvider() } + }; + + var breakers = testSettings.GetOrCreate(ConsumerSettingsProperties.CircuitBreakerTypes, () => []); + breakers.Add<TestCircuitBreaker>(); + + _settings = [testSettings]; + + _target = new TestConsumer(NullLogger.Instance, _settings, [h]); + } + + [Fact] + public async Task When_Start_ShouldStartCircuitBreakers_WhenNotStarted() + { + // Arrange + + // Act + await _target.Start(); + + // Assert + _target.IsStarted.Should().BeTrue(); + accessor.SubscribeCallCount.Should().Be(1); + } + + [Fact] + public async Task When_Stop_ShouldStopCircuitBreakers_WhenStarted() + { + // Arrange + await _target.Start(); + + // Act + await _target.Stop(); + + // Assert + _target.IsStarted.Should().BeFalse(); + accessor.UnsubscribeCallCount.Should().Be(1); + } + + [Fact] + public async Task When_BreakerChanged_Should_PauseConsumer_Given_BreakerClosed() + { + // Arrange + await _target.Start(); + + // Act + _target.GetOrDefault(AbstractConsumerProperties.IsPaused, false).Should().BeFalse(); + accessor.State = Circuit.Closed; + await accessor.OnChange!(Circuit.Closed); + + // Assert + _target.GetOrDefault(AbstractConsumerProperties.IsPaused, false).Should().BeTrue(); + } + + [Fact] + public async Task When_BreakerChanged_Should_ResumeConsumer_Given_BreakerOpen() + { + // Arrange + await _target.Start(); + accessor.State = Circuit.Closed; + await accessor.OnChange!(Circuit.Open); + + // Act + _target.GetOrDefault(AbstractConsumerProperties.IsPaused, false).Should().BeTrue(); + accessor.State = Circuit.Open; + await accessor.OnChange(Circuit.Open); + + // Assert + _target.GetOrDefault(AbstractConsumerProperties.IsPaused, false).Should().BeFalse(); + } +} diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/SlimMessageBus.Host.CircuitBreaker.Test.csproj b/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/SlimMessageBus.Host.CircuitBreaker.Test.csproj new file mode 100644 index 00000000..0d44b542 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/SlimMessageBus.Host.CircuitBreaker.Test.csproj @@ -0,0 +1,20 @@ +<Project Sdk="Microsoft.NET.Sdk"> + + <Import Project="../Host.Test.Properties.xml" /> + + <PropertyGroup> + <Nullable>enable</Nullable> + </PropertyGroup> + + <ItemGroup> + <ProjectReference Include="..\..\SlimMessageBus.Host.CircuitBreaker\SlimMessageBus.Host.CircuitBreaker.csproj" /> + <ProjectReference Include="..\SlimMessageBus.Host.Test.Common\SlimMessageBus.Host.Test.Common.csproj" /> + </ItemGroup> + + <ItemGroup> + <None Update="xunit.runner.json"> + <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> + </None> + </ItemGroup> + +</Project> diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaGroupConsumerTests.cs b/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaGroupConsumerTests.cs index 258fd280..32e70db3 100644 --- a/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaGroupConsumerTests.cs +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaGroupConsumerTests.cs @@ -15,7 +15,7 @@ public KafkaGroupConsumerTests() var providerSettings = new KafkaMessageBusSettings("host"); var consumerSettings = Array.Empty<AbstractConsumerSettings>(); - var subjectMock = new Mock<KafkaGroupConsumer>(loggerFactoryMock.Object, providerSettings, consumerSettings, "group", new List<string> { "topic" }, processorFactoryMock.Object) { CallBase = true }; + var subjectMock = new Mock<KafkaGroupConsumer>(loggerFactoryMock.Object, providerSettings, consumerSettings, Array.Empty<IAbstractConsumerInterceptor>(), "group", new List<string> { "topic" }, processorFactoryMock.Object) { CallBase = true }; _subject = subjectMock.Object; } diff --git a/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs b/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs index 5d49be44..c4a08511 100644 --- a/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs +++ b/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs @@ -2,138 +2,84 @@ public class AbstractConsumerTests { - private class TestConsumer : AbstractConsumer - { - public TestConsumer(ILogger logger, IEnumerable<AbstractConsumerSettings> settings) - : base(logger, settings) { } - - protected override Task OnStart() => Task.CompletedTask; - protected override Task OnStop() => Task.CompletedTask; + private class TestConsumer(ILogger logger, IEnumerable<AbstractConsumerSettings> settings, IEnumerable<IAbstractConsumerInterceptor> interceptors) + : AbstractConsumer(logger, settings, path: "path", interceptors) + { + internal protected override Task OnStart() => Task.CompletedTask; + internal protected override Task OnStop() => Task.CompletedTask; } private class TestConsumerSettings : AbstractConsumerSettings; - public class CircuitBreakerAccessor - { - public Circuit State { get; set; } - public int SubscribeCallCount { get; set; } = 0; - public int UnsubscribeCallCount { get; set; } = 0; - public IEnumerable<AbstractConsumerSettings> Settings { get; set; } - public Func<Circuit, Task> OnChange { get; set; } - } - - private class TestCircuitBreaker : IConsumerCircuitBreaker - { - private readonly CircuitBreakerAccessor _accessor; - - public TestCircuitBreaker(CircuitBreakerAccessor accessor, IEnumerable<AbstractConsumerSettings> settings) - { - _accessor = accessor; - Settings = settings; - State = Circuit.Open; - } - - public Circuit State - { - get => _accessor.State; - set => _accessor.State = value; - } - public IEnumerable<AbstractConsumerSettings> Settings { get; } - - public Task Subscribe(Func<Circuit, Task> onChange) - { - _accessor.SubscribeCallCount++; - _accessor.OnChange = onChange; - - return Task.CompletedTask; - } - - public void Unsubscribe() - { - _accessor.UnsubscribeCallCount++; - } - } - - private readonly List<AbstractConsumerSettings> _settings; - private readonly TestConsumer _target; - private readonly CircuitBreakerAccessor accessor; + private readonly List<AbstractConsumerSettings> _settings; + private readonly Mock<AbstractConsumer> _targetMock; + private readonly AbstractConsumer _target; + private readonly Mock<IAbstractConsumerInterceptor> _interceptor; public AbstractConsumerTests() { - accessor = new CircuitBreakerAccessor(); + _interceptor = new Mock<IAbstractConsumerInterceptor>(); var serviceCollection = new ServiceCollection(); - serviceCollection.TryAddSingleton(accessor); - serviceCollection.TryAddTransient<TestCircuitBreaker>(); + serviceCollection.TryAddEnumerable(ServiceDescriptor.Singleton(_interceptor.Object)); var testSettings = new TestConsumerSettings { MessageBusSettings = new MessageBusSettings { ServiceProvider = serviceCollection.BuildServiceProvider() } }; - testSettings.CircuitBreakers.Add<TestCircuitBreaker>(); - _settings = [testSettings]; - _target = new TestConsumer(NullLogger.Instance, _settings); + _targetMock = new Mock<AbstractConsumer>(NullLogger.Instance, _settings, "path", new IAbstractConsumerInterceptor[] { _interceptor.Object }) { CallBase = true }; + _target = _targetMock.Object; } - [Fact] - public async Task Start_ShouldStartCircuitBreakers_WhenNotStarted() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task When_Start_Then_Interceptor_CanStartIsCalled(bool canStart) { // Arrange + _interceptor.Setup(x => x.CanStart(_target)).ReturnsAsync(canStart); // Act await _target.Start(); // Assert - _target.IsStarted.Should().BeTrue(); - accessor.SubscribeCallCount.Should().Be(1); - } - - [Fact] - public async Task Stop_ShouldStopCircuitBreakers_WhenStarted() + _target.IsStarted.Should().BeTrue(); + + _interceptor.Verify(x => x.CanStart(_target), Times.Once); + _interceptor.VerifyGet(x => x.Order, Times.Once); + _interceptor.VerifyNoOtherCalls(); + + _targetMock.Verify(x => x.OnStart(), canStart ? Times.Once : Times.Never); + _targetMock.VerifyNoOtherCalls(); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task When_Stop_Then_Interceptor_CanStopIsCalled(bool canStop) { // Arrange - await _target.Start(); + _interceptor.Setup(x => x.CanStart(_target)).ReturnsAsync(true); + _interceptor.Setup(x => x.CanStop(_target)).ReturnsAsync(canStop); + + await _target.Start(); // Act await _target.Stop(); // Assert - _target.IsStarted.Should().BeFalse(); - accessor.UnsubscribeCallCount.Should().Be(1); - } - - [Fact] - public async Task BreakerChanged_ShouldPauseConsumer_WhenBreakerClosed() - { - // Arrange - await _target.Start(); - - // Act - _target.IsPaused.Should().BeFalse(); - accessor.State = Circuit.Closed; - await _target.BreakerChanged(Circuit.Closed); - - // Assert - _target.IsPaused.Should().BeTrue(); - } - - [Fact] - public async Task BreakerChanged_ShouldResumeConsumer_WhenBreakerOpen() - { - // Arrange - await _target.Start(); - accessor.State = Circuit.Closed; - await _target.BreakerChanged(Circuit.Open); - - // Act - _target.IsPaused.Should().BeTrue(); - accessor.State = Circuit.Open; - await _target.BreakerChanged(Circuit.Open); - - // Assert - _target.IsPaused.Should().BeFalse(); + _target.IsStarted.Should().BeFalse(); + + _interceptor.Verify(x => x.CanStart(_target), Times.Once); + _interceptor.Verify(x => x.CanStop(_target), Times.Once); + _interceptor.VerifyGet(x => x.Order, Times.Once); + _interceptor.VerifyNoOtherCalls(); + + _targetMock.Verify(x => x.OnStart(), Times.Once); + _targetMock.Verify(x => x.OnStop(), canStop ? Times.Once : Times.Never); + _targetMock.VerifyNoOtherCalls(); } } diff --git a/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs b/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs index a1aeba2e..4217e9a7 100644 --- a/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs +++ b/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs @@ -81,10 +81,10 @@ public void TriggerPendingRequestCleanup() PendingRequestManager.CleanPendingRequests(); } - public class MessageBusTestedConsumer(ILogger logger) : AbstractConsumer(logger, []) + public class MessageBusTestedConsumer(ILogger logger) : AbstractConsumer(logger, [], "path", []) { - protected override Task OnStart() => Task.CompletedTask; + internal protected override Task OnStart() => Task.CompletedTask; - protected override Task OnStop() => Task.CompletedTask; + internal protected override Task OnStop() => Task.CompletedTask; } }