Skip to content

Commit

Permalink
Fixed consuming of KafkaConsumerEndpoint with multiple topics (Names)
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed Dec 8, 2019
1 parent 2a2674e commit 919b3d1
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 71 deletions.
1 change: 1 addition & 0 deletions docs/_docs/0-introduction/003-releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ toc: true
### Fixes
* Fixed `OutboundQueueHealthCheck` [[#43](https://github.com/BEagle1984/silverback/issues/43)]
* The `KafkaProducer` is not disposed by default anymore when a `KafkaException` in thrown (creating too many instances of the producer over a short time span could lead to too many active TCP connections)
* Fixed the bug preventing a `KafkaConsumerEndpoint` pointing to multiple topics to be successfully subscribed

## [1.0.4](https://github.com/BEagle1984/silverback/releases/tag/1.0.4)

Expand Down
Binary file modified docs/assets/images/samples.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\Configuration.cs" Link="Configuration.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\Configuration.cs">
<Link>Configuration.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\Consumer\ConsumerService.cs" Link="Consumer\ConsumerService.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\Data\Customer.cs" Link="Data\Customer.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\Data\ExamplesDbContext.cs" Link="Data\ExamplesDbContext.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\DependencyInjectionHelper.cs" Link="DependencyInjectionHelper.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\DependencyInjectionHelper.cs">
<Link>DependencyInjectionHelper.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\Messages\BadIntegrationEvent.cs" Link="Messages\BadIntegrationEvent.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\Messages\BatchProcessedEvent.cs" Link="Messages\BatchProcessedEvent.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\Messages\CustomSerializedIntegrationEvent.cs" Link="Messages\CustomSerializedIntegrationEvent.cs" />
Expand All @@ -20,6 +24,9 @@
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\Messages\IntegrationEventB.cs" Link="Messages\IntegrationEventB.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\Messages\LegacyMessage.cs" Link="Messages\LegacyMessage.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\Messages\MessageMoved.cs" Link="Messages\MessageMoved.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\Messages\PartitionedSimpleIntegrationEvent.cs">
<Link>Messages\PartitionedSimpleIntegrationEvent.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\Messages\SimpleEvent.cs" Link="Messages\SimpleEvent.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\Messages\SimpleIntegrationEvent.cs" Link="Messages\SimpleIntegrationEvent.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Common\Serialization\LegacyMessageSerializer.cs" Link="Serialization\LegacyMessageSerializer.cs" />
Expand Down Expand Up @@ -47,11 +54,4 @@
<PackageReference Include="Silverback.Integration.Kafka" Version="$(BaseVersion)" />
</ItemGroup>

<ItemGroup>
<Folder Include="Consumer\" />
<Folder Include="Data\" />
<Folder Include="Messages\" />
<Folder Include="Serialization\" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,6 @@
<RootNamespace>Silverback.Examples.ConsumerA</RootNamespace>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.ConsumerA\ConsumerServiceA.cs" Link="ConsumerServiceA.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.ConsumerA\LogHeadersBehavior.cs" Link="LogHeadersBehavior.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.ConsumerA\Program.cs" Link="Program.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.ConsumerA\SubscriberService.cs" Link="SubscriberService.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.2.6" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.2.6" />
Expand All @@ -34,4 +27,19 @@
<ProjectReference Include="..\Silverback.Examples.Common\Silverback.Examples.Common.csproj" />
</ItemGroup>

<ItemGroup>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.ConsumerA\ConsumerServiceA.cs">
<Link>ConsumerServiceA.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.ConsumerA\LogHeadersBehavior.cs">
<Link>LogHeadersBehavior.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.ConsumerA\Program.cs">
<Link>Program.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.ConsumerA\SubscriberService.cs">
<Link>SubscriberService.cs</Link>
</Compile>
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,6 @@
<RootNamespace>Silverback.Examples.Main</RootNamespace>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\Menu\MenuItem.cs" Link="Menu\MenuItem.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\Menu\MenuNavigator.cs" Link="Menu\MenuNavigator.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\Program.cs" Link="Program.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Advanced\AdvancedCategory.cs" Link="UseCases\Advanced\AdvancedCategory.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Advanced\BatchConsumerUseCase.cs" Link="UseCases\Advanced\BatchConsumerUseCase.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Advanced\ChunkingUseCase.cs" Link="UseCases\Advanced\ChunkingUseCase.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Advanced\HeadersUseCase.cs" Link="UseCases\Advanced\HeadersUseCase.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Advanced\InteroperableMessageUseCase.cs" Link="UseCases\Advanced\InteroperableMessageUseCase.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Advanced\MultipleOutboundConnectorsUseCase.cs" Link="UseCases\Advanced\MultipleOutboundConnectorsUseCase.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Basic\BasicCategory.cs" Link="UseCases\Basic\BasicCategory.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Basic\CustomSerializerSettingsUseCase.cs" Link="UseCases\Basic\CustomSerializerSettingsUseCase.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Basic\DomainEventsUseCase.cs" Link="UseCases\Basic\DomainEventsUseCase.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Basic\ExternalConfigUseCase.cs" Link="UseCases\Basic\ExternalConfigUseCase.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Basic\SimplePublishUseCase.cs" Link="UseCases\Basic\SimplePublishUseCase.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Basic\TranslatePublishSubscribe.cs" Link="UseCases\Basic\TranslatePublishSubscribe.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\EfCore\DeferredOutboundUseCase.cs" Link="UseCases\EfCore\DeferredOutboundUseCase.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\EfCore\EfCoreCategory.cs" Link="UseCases\EfCore\EfCoreCategory.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\EfCore\OutboundWorkerUseCase.cs" Link="UseCases\EfCore\OutboundWorkerUseCase.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\ErrorHandling\ErrorHandlingCategory.cs" Link="UseCases\ErrorHandling\ErrorHandlingCategory.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\ErrorHandling\RetryAndMoveErrorPolicyUseCase.cs" Link="UseCases\ErrorHandling\RetryAndMoveErrorPolicyUseCase.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\ErrorHandling\RetryAndMoveErrorPolicyUseCase2.cs" Link="UseCases\ErrorHandling\RetryAndMoveErrorPolicyUseCase2.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\UseCase.cs" Link="UseCases\UseCase.cs" />
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\UseCaseCategory.cs" Link="UseCases\UseCaseCategory.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Autofac" Version="4.9.4" />
<PackageReference Include="Autofac.Extensions.DependencyInjection" Version="5.0.0" />
Expand All @@ -57,18 +31,100 @@
<ProjectReference Include="..\Silverback.Examples.Common\Silverback.Examples.Common.csproj" />
</ItemGroup>

<ItemGroup>
<Folder Include="Menu\" />
<Folder Include="UseCases\Advanced\" />
<Folder Include="UseCases\Basic\" />
<Folder Include="UseCases\EfCore\" />
<Folder Include="UseCases\ErrorHandling\" />
</ItemGroup>

<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>

<ItemGroup>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\Constants.cs">
<Link>Constants.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\Menu\MenuHelper.cs">
<Link>Menu\MenuHelper.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\Menu\MenuItem.cs">
<Link>Menu\MenuItem.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\Menu\MenuNavigator.cs">
<Link>Menu\MenuNavigator.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\Program.cs">
<Link>Program.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Advanced\AdvancedCategory.cs">
<Link>UseCases\Advanced\AdvancedCategory.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Advanced\BatchConsumerUseCase.cs">
<Link>UseCases\Advanced\BatchConsumerUseCase.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Advanced\ChunkingUseCase.cs">
<Link>UseCases\Advanced\ChunkingUseCase.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Advanced\HeadersUseCase.cs">
<Link>UseCases\Advanced\HeadersUseCase.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Advanced\InteroperableMessageUseCase.cs">
<Link>UseCases\Advanced\InteroperableMessageUseCase.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Advanced\MultipleOutboundConnectorsUseCase.cs">
<Link>UseCases\Advanced\MultipleOutboundConnectorsUseCase.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Advanced\PartitioningUseCase.cs">
<Link>UseCases\Advanced\PartitioningUseCase.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Advanced\SameProcessUseCase.cs">
<Link>UseCases\Advanced\SameProcessUseCase.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Basic\BasicCategory.cs">
<Link>UseCases\Basic\BasicCategory.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Basic\CustomSerializerSettingsUseCase.cs">
<Link>UseCases\Basic\CustomSerializerSettingsUseCase.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Basic\ExternalConfigUseCase.cs">
<Link>UseCases\Basic\ExternalConfigUseCase.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Basic\SimplePublishUseCase.cs">
<Link>UseCases\Basic\SimplePublishUseCase.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\Basic\TranslatePublishSubscribe.cs">
<Link>UseCases\Basic\TranslatePublishSubscribe.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\EfCore\DeferredOutboundUseCase.cs">
<Link>UseCases\EfCore\DeferredOutboundUseCase.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\EfCore\EfCoreCategory.cs">
<Link>UseCases\EfCore\EfCoreCategory.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\EfCore\OutboundWorkerUseCase.cs">
<Link>UseCases\EfCore\OutboundWorkerUseCase.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\ErrorHandling\ErrorHandlingCategory.cs">
<Link>UseCases\ErrorHandling\ErrorHandlingCategory.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\ErrorHandling\RetryAndMoveErrorPolicyUseCase.cs">
<Link>UseCases\ErrorHandling\RetryAndMoveErrorPolicyUseCase.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\ErrorHandling\RetryAndMoveErrorPolicyUseCase2.cs">
<Link>UseCases\ErrorHandling\RetryAndMoveErrorPolicyUseCase2.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\HealthCheck\HealthCheckCategory.cs">
<Link>UseCases\HealthCheck\HealthCheckCategory.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\HealthCheck\OutboundEndpointsHealthUseCase.cs">
<Link>UseCases\HealthCheck\OutboundEndpointsHealthUseCase.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\HealthCheck\OutboundQueueHealthUseCase.cs">
<Link>UseCases\HealthCheck\OutboundQueueHealthUseCase.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\UseCase.cs">
<Link>UseCases\UseCase.cs</Link>
</Compile>
<Compile Include="..\..\..\Examples\src\Silverback.Examples.Main\UseCases\UseCaseCategory.cs">
<Link>UseCases\UseCaseCategory.cs</Link>
</Compile>
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected override void Configure(BusConfigurator configurator, IServiceProvider

var broker = configurator
.Connect(endpoints => endpoints
.AddInbound(CreateConsumerEndpoint("silverback-examples-events"))
.AddInbound(CreateConsumerEndpoint("silverback-examples-events", "silverback-examples-events-chunked", "silverback-examples-events-sp"))
.AddInbound(CreateConsumerEndpoint("silverback-examples-batch"),
settings: new InboundConnectorSettings
{
Expand Down Expand Up @@ -108,8 +108,14 @@ protected override void Configure(BusConfigurator configurator, IServiceProvider
}

private static KafkaConsumerEndpoint CreateConsumerEndpoint(string name, IMessageSerializer messageSerializer = null)
{
var endpoint = new KafkaConsumerEndpoint(name)
=> CreateConsumerEndpoint(new[] { name }, messageSerializer);

private static KafkaConsumerEndpoint CreateConsumerEndpoint(params string[] names)
=> CreateConsumerEndpoint(names, null);

private static KafkaConsumerEndpoint CreateConsumerEndpoint(string[] names, IMessageSerializer messageSerializer)
{
var endpoint = new KafkaConsumerEndpoint(names)
{
Configuration = new KafkaConsumerConfig
{
Expand Down
12 changes: 6 additions & 6 deletions samples/Examples/src/Silverback.Examples.Main/Menu/MenuHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ namespace Silverback.Examples.Main.Menu
{
public class MenuHelper
{
public static int Choice(string title, params string[] options)
public static int Choice(string title, params string[] options) =>
Choice(() => Console.WriteLine(title + Environment.NewLine), options);

public static int Choice(Action headerAction, params string[] options)
{
var selected = 0;

Expand All @@ -19,9 +22,8 @@ public static int Choice(string title, params string[] options)
{
Console.Clear();
ShowSplash();

Console.WriteLine(title);
Console.WriteLine();

headerAction?.Invoke();

for (var i = 0; i < options.Length; i++)
{
Expand Down Expand Up @@ -56,8 +58,6 @@ public static int Choice(string title, params string[] options)
selected++;
break;
}
case ConsoleKey.Backspace:
case ConsoleKey.LeftArrow:
case ConsoleKey.Escape:
{
return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ public void RenderCategories()
{
var categories = MenuItem.GetAll<UseCaseCategory>();

var selected = MenuHelper.Choice("Choose a category (or press ESC to exit):",
categories.Select((category, i) => $"{i+1}. {category.Name}").ToArray());
var selected = MenuHelper.Choice("Choose a category:",
categories
.Select((category, i) => $"{i+1}. {category.Name}")
.Append("<- Exit")
.ToArray());

if (selected < 0)
if (selected < 0|| selected >= categories.Length)
return;

RenderUseCases(categories[selected]);
Expand All @@ -31,10 +34,22 @@ public void RenderUseCases(UseCaseCategory category)

while (true)
{
var selected = MenuHelper.Choice($"Choose a use cases in category '{category.Name}' (or press ESC to return to the categories list):",
useCases.Select((useCase, i) => $"{i+1}. {useCase.Name}").ToArray());
var selected = MenuHelper.Choice(
() =>
{
Console.Write("Choose a use cases in category '");
Console.ForegroundColor = Constants.PrimaryColor;
Console.Write(category.Name);
Console.ResetColor();
Console.WriteLine("':");
Console.WriteLine();
},
useCases
.Select((useCase, i) => $"{i + 1}. {useCase.Name}")
.Append("<- Back")
.ToArray());

if (selected < 0)
if (selected < 0 || selected >= useCases.Length)
return;

Console.Clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ protected override void Configure(BusConfigurator configurator, IServiceProvider
.AddOutbound<IIntegrationEvent>(CreateEndpoint()));

private KafkaEndpoint CreateEndpoint() =>
new KafkaProducerEndpoint("silverback-examples-events")
new KafkaProducerEndpoint("silverback-examples-events-chunked")
{
Configuration = new KafkaProducerConfig
{
Expand Down
Loading

0 comments on commit 919b3d1

Please sign in to comment.