diff --git a/docs/_docs/0-introduction/003-releases.md b/docs/_docs/0-introduction/003-releases.md index 3d6b97815..49cc441d8 100644 --- a/docs/_docs/0-introduction/003-releases.md +++ b/docs/_docs/0-introduction/003-releases.md @@ -12,6 +12,7 @@ toc: true ### Fixes * Fixed `OutboundQueueHealthCheck` [[#43](https://github.com/BEagle1984/silverback/issues/43)] * The `KafkaProducer` is not disposed by default anymore when a `KafkaException` in thrown (creating too many instances of the producer over a short time span could lead to too many active TCP connections) +* Fixed the bug preventing a `KafkaConsumerEndpoint` pointing to multiple topics to be successfully subscribed ## [1.0.4](https://github.com/BEagle1984/silverback/releases/tag/1.0.4) diff --git a/docs/assets/images/samples.png b/docs/assets/images/samples.png index 580fb631c..9899810e7 100644 Binary files a/docs/assets/images/samples.png and b/docs/assets/images/samples.png differ diff --git a/samples/Examples-2.2/src/Silverback.Examples.Common/Silverback.Examples.Common.csproj b/samples/Examples-2.2/src/Silverback.Examples.Common/Silverback.Examples.Common.csproj index 6dc355251..91a78a97b 100644 --- a/samples/Examples-2.2/src/Silverback.Examples.Common/Silverback.Examples.Common.csproj +++ b/samples/Examples-2.2/src/Silverback.Examples.Common/Silverback.Examples.Common.csproj @@ -7,11 +7,15 @@ - + + Configuration.cs + - + + DependencyInjectionHelper.cs + @@ -20,6 +24,9 @@ + + Messages\PartitionedSimpleIntegrationEvent.cs + @@ -47,11 +54,4 @@ - - - - - - - diff --git a/samples/Examples-2.2/src/Silverback.Examples.ConsumerA/Silverback.Examples.ConsumerA.csproj b/samples/Examples-2.2/src/Silverback.Examples.ConsumerA/Silverback.Examples.ConsumerA.csproj index 1b43d3d04..fd2ea95d7 100644 --- a/samples/Examples-2.2/src/Silverback.Examples.ConsumerA/Silverback.Examples.ConsumerA.csproj +++ b/samples/Examples-2.2/src/Silverback.Examples.ConsumerA/Silverback.Examples.ConsumerA.csproj @@ -7,13 +7,6 @@ Silverback.Examples.ConsumerA - - - - - - - @@ -34,4 +27,19 @@ + + + ConsumerServiceA.cs + + + LogHeadersBehavior.cs + + + Program.cs + + + SubscriberService.cs + + + diff --git a/samples/Examples-2.2/src/Silverback.Examples.Main/Silverback.Examples.Main.csproj b/samples/Examples-2.2/src/Silverback.Examples.Main/Silverback.Examples.Main.csproj index c78f685f5..e7f78be87 100644 --- a/samples/Examples-2.2/src/Silverback.Examples.Main/Silverback.Examples.Main.csproj +++ b/samples/Examples-2.2/src/Silverback.Examples.Main/Silverback.Examples.Main.csproj @@ -7,32 +7,6 @@ Silverback.Examples.Main - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -57,18 +31,100 @@ - - - - - - - - PreserveNewest + + + Constants.cs + + + Menu\MenuHelper.cs + + + Menu\MenuItem.cs + + + Menu\MenuNavigator.cs + + + Program.cs + + + UseCases\Advanced\AdvancedCategory.cs + + + UseCases\Advanced\BatchConsumerUseCase.cs + + + UseCases\Advanced\ChunkingUseCase.cs + + + UseCases\Advanced\HeadersUseCase.cs + + + UseCases\Advanced\InteroperableMessageUseCase.cs + + + UseCases\Advanced\MultipleOutboundConnectorsUseCase.cs + + + UseCases\Advanced\PartitioningUseCase.cs + + + UseCases\Advanced\SameProcessUseCase.cs + + + UseCases\Basic\BasicCategory.cs + + + UseCases\Basic\CustomSerializerSettingsUseCase.cs + + + UseCases\Basic\ExternalConfigUseCase.cs + + + UseCases\Basic\SimplePublishUseCase.cs + + + UseCases\Basic\TranslatePublishSubscribe.cs + + + UseCases\EfCore\DeferredOutboundUseCase.cs + + + UseCases\EfCore\EfCoreCategory.cs + + + UseCases\EfCore\OutboundWorkerUseCase.cs + + + UseCases\ErrorHandling\ErrorHandlingCategory.cs + + + UseCases\ErrorHandling\RetryAndMoveErrorPolicyUseCase.cs + + + UseCases\ErrorHandling\RetryAndMoveErrorPolicyUseCase2.cs + + + UseCases\HealthCheck\HealthCheckCategory.cs + + + UseCases\HealthCheck\OutboundEndpointsHealthUseCase.cs + + + UseCases\HealthCheck\OutboundQueueHealthUseCase.cs + + + UseCases\UseCase.cs + + + UseCases\UseCaseCategory.cs + + + diff --git a/samples/Examples/src/Silverback.Examples.ConsumerA/ConsumerServiceA.cs b/samples/Examples/src/Silverback.Examples.ConsumerA/ConsumerServiceA.cs index 93c913ff3..35d54244a 100644 --- a/samples/Examples/src/Silverback.Examples.ConsumerA/ConsumerServiceA.cs +++ b/samples/Examples/src/Silverback.Examples.ConsumerA/ConsumerServiceA.cs @@ -42,7 +42,7 @@ protected override void Configure(BusConfigurator configurator, IServiceProvider var broker = configurator .Connect(endpoints => endpoints - .AddInbound(CreateConsumerEndpoint("silverback-examples-events")) + .AddInbound(CreateConsumerEndpoint("silverback-examples-events", "silverback-examples-events-chunked", "silverback-examples-events-sp")) .AddInbound(CreateConsumerEndpoint("silverback-examples-batch"), settings: new InboundConnectorSettings { @@ -108,8 +108,14 @@ protected override void Configure(BusConfigurator configurator, IServiceProvider } private static KafkaConsumerEndpoint CreateConsumerEndpoint(string name, IMessageSerializer messageSerializer = null) - { - var endpoint = new KafkaConsumerEndpoint(name) + => CreateConsumerEndpoint(new[] { name }, messageSerializer); + + private static KafkaConsumerEndpoint CreateConsumerEndpoint(params string[] names) + => CreateConsumerEndpoint(names, null); + + private static KafkaConsumerEndpoint CreateConsumerEndpoint(string[] names, IMessageSerializer messageSerializer) + { + var endpoint = new KafkaConsumerEndpoint(names) { Configuration = new KafkaConsumerConfig { diff --git a/samples/Examples/src/Silverback.Examples.Main/Menu/MenuHelper.cs b/samples/Examples/src/Silverback.Examples.Main/Menu/MenuHelper.cs index b54bafa6d..46c7b209d 100644 --- a/samples/Examples/src/Silverback.Examples.Main/Menu/MenuHelper.cs +++ b/samples/Examples/src/Silverback.Examples.Main/Menu/MenuHelper.cs @@ -7,7 +7,10 @@ namespace Silverback.Examples.Main.Menu { public class MenuHelper { - public static int Choice(string title, params string[] options) + public static int Choice(string title, params string[] options) => + Choice(() => Console.WriteLine(title + Environment.NewLine), options); + + public static int Choice(Action headerAction, params string[] options) { var selected = 0; @@ -19,9 +22,8 @@ public static int Choice(string title, params string[] options) { Console.Clear(); ShowSplash(); - - Console.WriteLine(title); - Console.WriteLine(); + + headerAction?.Invoke(); for (var i = 0; i < options.Length; i++) { @@ -56,8 +58,6 @@ public static int Choice(string title, params string[] options) selected++; break; } - case ConsoleKey.Backspace: - case ConsoleKey.LeftArrow: case ConsoleKey.Escape: { return -1; diff --git a/samples/Examples/src/Silverback.Examples.Main/Menu/MenuNavigator.cs b/samples/Examples/src/Silverback.Examples.Main/Menu/MenuNavigator.cs index 47d8332cc..41e8ad0e9 100644 --- a/samples/Examples/src/Silverback.Examples.Main/Menu/MenuNavigator.cs +++ b/samples/Examples/src/Silverback.Examples.Main/Menu/MenuNavigator.cs @@ -15,10 +15,13 @@ public void RenderCategories() { var categories = MenuItem.GetAll(); - var selected = MenuHelper.Choice("Choose a category (or press ESC to exit):", - categories.Select((category, i) => $"{i+1}. {category.Name}").ToArray()); + var selected = MenuHelper.Choice("Choose a category:", + categories + .Select((category, i) => $"{i+1}. {category.Name}") + .Append("<- Exit") + .ToArray()); - if (selected < 0) + if (selected < 0|| selected >= categories.Length) return; RenderUseCases(categories[selected]); @@ -31,10 +34,22 @@ public void RenderUseCases(UseCaseCategory category) while (true) { - var selected = MenuHelper.Choice($"Choose a use cases in category '{category.Name}' (or press ESC to return to the categories list):", - useCases.Select((useCase, i) => $"{i+1}. {useCase.Name}").ToArray()); + var selected = MenuHelper.Choice( + () => + { + Console.Write("Choose a use cases in category '"); + Console.ForegroundColor = Constants.PrimaryColor; + Console.Write(category.Name); + Console.ResetColor(); + Console.WriteLine("':"); + Console.WriteLine(); + }, + useCases + .Select((useCase, i) => $"{i + 1}. {useCase.Name}") + .Append("<- Back") + .ToArray()); - if (selected < 0) + if (selected < 0 || selected >= useCases.Length) return; Console.Clear(); diff --git a/samples/Examples/src/Silverback.Examples.Main/UseCases/Advanced/ChunkingUseCase.cs b/samples/Examples/src/Silverback.Examples.Main/UseCases/Advanced/ChunkingUseCase.cs index abdf75d37..2d9b236f1 100644 --- a/samples/Examples/src/Silverback.Examples.Main/UseCases/Advanced/ChunkingUseCase.cs +++ b/samples/Examples/src/Silverback.Examples.Main/UseCases/Advanced/ChunkingUseCase.cs @@ -30,7 +30,7 @@ protected override void Configure(BusConfigurator configurator, IServiceProvider .AddOutbound(CreateEndpoint())); private KafkaEndpoint CreateEndpoint() => - new KafkaProducerEndpoint("silverback-examples-events") + new KafkaProducerEndpoint("silverback-examples-events-chunked") { Configuration = new KafkaProducerConfig { diff --git a/samples/Examples/src/Silverback.Examples.Main/UseCases/Advanced/SameProcessUseCase.cs b/samples/Examples/src/Silverback.Examples.Main/UseCases/Advanced/SameProcessUseCase.cs index 7c66cbd8a..71ce573df 100644 --- a/samples/Examples/src/Silverback.Examples.Main/UseCases/Advanced/SameProcessUseCase.cs +++ b/samples/Examples/src/Silverback.Examples.Main/UseCases/Advanced/SameProcessUseCase.cs @@ -32,19 +32,19 @@ protected override void Configure(BusConfigurator configurator, IServiceProvider configurator .Subscribe((SimpleIntegrationEvent message) => logger.LogInformation($"Received SimpleIntegrationEvent '{message.Content}")) .Connect(endpoints => endpoints - .AddOutbound(new KafkaProducerEndpoint("silverback-examples-events") + .AddOutbound(new KafkaProducerEndpoint("silverback-examples-events-sp") { Configuration = new KafkaProducerConfig { BootstrapServers = "PLAINTEXT://localhost:9092" } }) - .AddInbound(new KafkaConsumerEndpoint("silverback-examples-events") + .AddInbound(new KafkaConsumerEndpoint("silverback-examples-events-sp") { Configuration = new KafkaConsumerConfig { BootstrapServers = "PLAINTEXT://localhost:9092", - GroupId = "SameProcessUseCase", + GroupId = "same-process-uc", AutoOffsetReset = AutoOffsetReset.Earliest } })); diff --git a/src/Silverback.Integration.Kafka/Messaging/Broker/InnerConsumerWrapper.cs b/src/Silverback.Integration.Kafka/Messaging/Broker/InnerConsumerWrapper.cs index 64c47eb27..13a3804a8 100644 --- a/src/Silverback.Integration.Kafka/Messaging/Broker/InnerConsumerWrapper.cs +++ b/src/Silverback.Integration.Kafka/Messaging/Broker/InnerConsumerWrapper.cs @@ -87,7 +87,7 @@ private void InitInnerConsumer() Subscribe(); } - private void Subscribe() => _innerConsumer.Subscribe(_endpoints.Select(e => e.Name)); + private void Subscribe() => _innerConsumer.Subscribe(_endpoints.SelectMany(e => e.Names)); private Confluent.Kafka.IConsumer BuildConfluentConsumer() { diff --git a/src/Silverback.Integration.Kafka/Messaging/Broker/KafkaConsumer.cs b/src/Silverback.Integration.Kafka/Messaging/Broker/KafkaConsumer.cs index 25e3ac93a..52c17b7ee 100644 --- a/src/Silverback.Integration.Kafka/Messaging/Broker/KafkaConsumer.cs +++ b/src/Silverback.Integration.Kafka/Messaging/Broker/KafkaConsumer.cs @@ -67,7 +67,7 @@ private async Task OnMessageReceived(Confluent.Kafka.Message mes { // Checking if the message was sent to the subscribed topic is necessary // when reusing the same consumer for multiple topics. - if (!tpo.Topic.Equals(Endpoint.Name, StringComparison.InvariantCultureIgnoreCase)) + if (!Endpoint.Names.Any(endpointName => tpo.Topic.Equals(endpointName, StringComparison.InvariantCultureIgnoreCase))) return; await TryHandleMessage(message, tpo);