From 18b6ab91743985ae652d5fdf49034321c15d033b Mon Sep 17 00:00:00 2001 From: golanbz Date: Mon, 2 Sep 2024 14:47:32 +0300 Subject: [PATCH 1/7] * Added support for consumer strategies that do not act as "stop the world" scenarios (e.g., cooperative sticky). * Enabled automatic committing with `confluent auto commit: true` instead of relying solely on manual commits, but only when the consumer strategy is cooperative sticky. (Refer to the open librdkafka issue at https://github.com/confluentinc/librdkafka/issues/4059). --- .../ConsumerConfigurationBuilder.cs | 4 +- src/KafkaFlow/Consumers/Consumer.cs | 70 +++++++++++++++---- src/KafkaFlow/Consumers/ConsumerManager.cs | 41 +++++++++-- .../Extensions/StrategyExtensions.cs | 17 +++++ 4 files changed, 112 insertions(+), 20 deletions(-) create mode 100644 src/KafkaFlow/Extensions/StrategyExtensions.cs diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index 304fe2d34..a87f58be8 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading.Tasks; using KafkaFlow.Consumers.DistributionStrategies; +using KafkaFlow.Extensions; namespace KafkaFlow.Configuration; @@ -251,7 +252,8 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration) consumerConfigCopy.StatisticsIntervalMs = _consumerConfig.StatisticsIntervalMs ?? _statisticsInterval; consumerConfigCopy.EnableAutoOffsetStore = false; - consumerConfigCopy.EnableAutoCommit = false; + consumerConfigCopy.EnableAutoCommit = _consumerConfig.PartitionAssignmentStrategy.IsStopTheWorldStrategy() is false; + consumerConfigCopy.AutoCommitIntervalMs = _consumerConfig.AutoCommitIntervalMs ?? 5000; consumerConfigCopy.ReadSecurityInformationFrom(clusterConfiguration); diff --git a/src/KafkaFlow/Consumers/Consumer.cs b/src/KafkaFlow/Consumers/Consumer.cs index 7f478f6e5..77f74e07b 100644 --- a/src/KafkaFlow/Consumers/Consumer.cs +++ b/src/KafkaFlow/Consumers/Consumer.cs @@ -7,6 +7,7 @@ using Confluent.Kafka; using KafkaFlow.Authentication; using KafkaFlow.Configuration; +using KafkaFlow.Extensions; namespace KafkaFlow.Consumers; @@ -14,6 +15,7 @@ internal class Consumer : IConsumer { private readonly IDependencyResolver _dependencyResolver; private readonly ILogHandler _logHandler; + private readonly bool _stopTheWorldStrategy; private readonly List, List>> _partitionsAssignedHandlers = new(); @@ -40,6 +42,7 @@ public Consumer( this.Configuration = configuration; _flowManager = new ConsumerFlowManager(this, _logHandler); _maxPollIntervalExceeded = new(_logHandler); + _stopTheWorldStrategy = Configuration.GetKafkaConfig().PartitionAssignmentStrategy.IsStopTheWorldStrategy(); foreach (var handler in this.Configuration.StatisticsHandlers) { @@ -148,7 +151,17 @@ public void Commit(IReadOnlyCollection off return; } - _consumer.Commit(validOffsets); + if (_stopTheWorldStrategy) + { + _consumer.Commit(validOffsets); + } + else + { + foreach (var topicPartitionOffset in validOffsets) + { + _consumer.StoreOffset(topicPartitionOffset); + } + } foreach (var offset in validOffsets) { @@ -237,17 +250,8 @@ private void EnsureConsumer() var kafkaConfig = this.Configuration.GetKafkaConfig(); var consumerBuilder = new ConsumerBuilder(kafkaConfig) - .SetPartitionsAssignedHandler( - (consumer, partitions) => this.FirePartitionsAssignedHandlers(consumer, partitions)) - .SetPartitionsRevokedHandler( - (consumer, partitions) => - { - _partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); - this.Assignment = new List(); - this.Subscription = new List(); - _currentPartitionsOffsets.Clear(); - _flowManager.Stop(); - }) + .SetPartitionsAssignedHandler(FirePartitionsAssignedHandlers) + .SetPartitionsRevokedHandler(FirePartitionRevokedHandlers) .SetErrorHandler((consumer, error) => _errorsHandlers.ForEach(x => x(consumer, error))) .SetStatisticsHandler((consumer, statistics) => _statisticsHandlers.ForEach(x => x(consumer, statistics))); @@ -293,13 +297,51 @@ private void FirePartitionsAssignedHandlers( IConsumer consumer, List partitions) { - this.Assignment = partitions; + if (_stopTheWorldStrategy) + { + this.Assignment = partitions; + this.Subscription = consumer.Subscription; + _flowManager.Start(consumer); + _partitionsAssignedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); + return; + } + + if (partitions.Count == 0) + { + return; + } + + this.Assignment = this.Assignment.Union(partitions).ToArray(); this.Subscription = consumer.Subscription; + _flowManager.Stop(); _flowManager.Start(consumer); - _partitionsAssignedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); } + private void FirePartitionRevokedHandlers(IConsumer consumer, List partitions) + { + if (_stopTheWorldStrategy) + { + _partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); + this.Assignment = new List(); + this.Subscription = new List(); + _currentPartitionsOffsets.Clear(); + _flowManager.Stop(); + return; + } + + this.Assignment = this.Assignment.Except(partitions.Select(x => x.TopicPartition)).ToArray(); + this.Subscription = consumer.Subscription; + foreach (var partition in partitions) + { + _currentPartitionsOffsets.TryRemove(partition.TopicPartition, out _); + } + + _flowManager.Stop(); + _flowManager.Start(consumer); + _partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); + } + private void InvalidateConsumer() { _consumer?.Close(); diff --git a/src/KafkaFlow/Consumers/ConsumerManager.cs b/src/KafkaFlow/Consumers/ConsumerManager.cs index 9d627cbcd..0dacccbfc 100644 --- a/src/KafkaFlow/Consumers/ConsumerManager.cs +++ b/src/KafkaFlow/Consumers/ConsumerManager.cs @@ -3,7 +3,9 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using Confluent.Kafka; using KafkaFlow.Configuration; +using KafkaFlow.Extensions; namespace KafkaFlow.Consumers; @@ -11,6 +13,7 @@ internal class ConsumerManager : IConsumerManager { private readonly IDependencyResolver _dependencyResolver; private readonly ILogHandler _logHandler; + private readonly bool _stopTheWorldStrategy; private Timer _evaluateWorkersCountTimer; @@ -26,7 +29,7 @@ public ConsumerManager( this.Consumer = consumer; this.WorkerPool = consumerWorkerPool; this.Feeder = feeder; - + _stopTheWorldStrategy = consumer.Configuration.GetKafkaConfig().PartitionAssignmentStrategy.IsStopTheWorldStrategy(); this.Consumer.OnPartitionsAssigned((_, _, partitions) => this.OnPartitionAssigned(partitions)); this.Consumer.OnPartitionsRevoked((_, _, partitions) => this.OnPartitionRevoked(partitions)); } @@ -104,9 +107,23 @@ private void OnPartitionRevoked(IEnumerable x.TopicPartition))); + this.GetConsumerLogInfo(topicPartitions?.Select(x => x.TopicPartition).ToArray() + ?? Array.Empty())); this.WorkerPool.StopAsync().GetAwaiter().GetResult(); + + if (_stopTheWorldStrategy) + { + return; + } + + var assignedPartitions = Consumer.Assignment; + var workersCount = this.CalculateWorkersCount(assignedPartitions).GetAwaiter().GetResult(); + + this.WorkerPool + .StartAsync(assignedPartitions, workersCount) + .GetAwaiter() + .GetResult(); } private void OnPartitionAssigned(IReadOnlyCollection partitions) @@ -115,10 +132,16 @@ private void OnPartitionAssigned(IReadOnlyCollection x.Topic) .Select( x => new @@ -136,6 +159,14 @@ private void OnPartitionAssigned(IReadOnlyCollection y.Partition.Value), }), + CurrentTopics = Consumer.Assignment.GroupBy(x => x.Topic) + .Select( + x => new + { + x.First().Topic, + PartitionsCount = x.Count(), + Partitions = x.Select(y => y.Partition.Value), + }), }; private async Task CalculateWorkersCount(IEnumerable partitions) diff --git a/src/KafkaFlow/Extensions/StrategyExtensions.cs b/src/KafkaFlow/Extensions/StrategyExtensions.cs new file mode 100644 index 000000000..82e5057e3 --- /dev/null +++ b/src/KafkaFlow/Extensions/StrategyExtensions.cs @@ -0,0 +1,17 @@ +using Confluent.Kafka; + +namespace KafkaFlow.Extensions; + +/// +/// Strategy extension methods. +/// +public static class StrategyExtensions +{ + /// + /// Determine if the strategy is a "stop the world" behavior. + /// + /// Strategy + /// + public static bool IsStopTheWorldStrategy(this PartitionAssignmentStrategy? strategy) => + strategy is null or PartitionAssignmentStrategy.Range or PartitionAssignmentStrategy.RoundRobin; +} From 08366615152f2bb091b9e67115abb61450046c84 Mon Sep 17 00:00:00 2001 From: golanbz Date: Mon, 2 Sep 2024 14:50:12 +0300 Subject: [PATCH 2/7] Introduced a new sample project designed specifically to evaluate the support for the cooperative sticky strategy, allowing for more thorough testing and analysis. --- KafkaFlow.sln | 7 +++ .../HostedService.cs | 44 +++++++++++++++ .../KafkaFlow.Sample.CooperativeSticky.csproj | 33 ++++++++++++ .../PrintConsoleHandler.cs | 18 +++++++ .../Program.cs | 53 +++++++++++++++++++ .../README.md | 28 ++++++++++ .../TestMessage.cs | 10 ++++ 7 files changed, 193 insertions(+) create mode 100644 samples/KafkaFlow.Sample.CooperativeSticky/HostedService.cs create mode 100644 samples/KafkaFlow.Sample.CooperativeSticky/KafkaFlow.Sample.CooperativeSticky.csproj create mode 100644 samples/KafkaFlow.Sample.CooperativeSticky/PrintConsoleHandler.cs create mode 100644 samples/KafkaFlow.Sample.CooperativeSticky/Program.cs create mode 100644 samples/KafkaFlow.Sample.CooperativeSticky/README.md create mode 100644 samples/KafkaFlow.Sample.CooperativeSticky/TestMessage.cs diff --git a/KafkaFlow.sln b/KafkaFlow.sln index a591122dc..9c5d79494 100644 --- a/KafkaFlow.sln +++ b/KafkaFlow.sln @@ -96,6 +96,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.OpenTelemetry", "samples\KafkaFlow.Sample.OpenTelemetry\KafkaFlow.Sample.OpenTelemetry.csproj", "{E9E8B374-4165-45F2-8DF5-F141E141AC1D}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.CooperativeSticky", "samples\KafkaFlow.Sample.CooperativeSticky\KafkaFlow.Sample.CooperativeSticky.csproj", "{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -222,6 +224,10 @@ Global {E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Debug|Any CPU.Build.0 = Debug|Any CPU {E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Release|Any CPU.ActiveCfg = Release|Any CPU {E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Release|Any CPU.Build.0 = Release|Any CPU + {DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -265,6 +271,7 @@ Global {1755E8DB-970C-4A24-8B7C-A2BEC1410BEE} = {7A9B997B-DAAC-4004-94F3-32F6B88E0068} {80080C1D-579E-4AB2-935D-5CFFC51843D8} = {7A9B997B-DAAC-4004-94F3-32F6B88E0068} {E9E8B374-4165-45F2-8DF5-F141E141AC1D} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} + {DBF7B091-11AE-402F-9F36-7E7EB3901B0B} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {6AE955B5-16B0-41CF-9F12-66D15B3DD1AB} diff --git a/samples/KafkaFlow.Sample.CooperativeSticky/HostedService.cs b/samples/KafkaFlow.Sample.CooperativeSticky/HostedService.cs new file mode 100644 index 000000000..a4bdaafa6 --- /dev/null +++ b/samples/KafkaFlow.Sample.CooperativeSticky/HostedService.cs @@ -0,0 +1,44 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using KafkaFlow.Producers; +using Microsoft.Extensions.Hosting; + +namespace KafkaFlow.Sample.CooperativeSticky; + +public class HostedService : IHostedService +{ + private IMessageProducer _producer; + const string producerName = "PrintConsole"; + const string topicName = "sample-topic"; + + + public HostedService(IProducerAccessor producerAccessor) + { + _producer = producerAccessor.GetProducer(producerName); + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + try + { + while (true) + { + await _producer.ProduceAsync( + topicName, + Guid.NewGuid().ToString(), + new TestMessage { Text = $"Message: {Guid.NewGuid()}" }); + await Task.Delay(500, cancellationToken); + } + } + catch (Exception e) + { + Console.WriteLine(e); + } + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/samples/KafkaFlow.Sample.CooperativeSticky/KafkaFlow.Sample.CooperativeSticky.csproj b/samples/KafkaFlow.Sample.CooperativeSticky/KafkaFlow.Sample.CooperativeSticky.csproj new file mode 100644 index 000000000..44c01e12a --- /dev/null +++ b/samples/KafkaFlow.Sample.CooperativeSticky/KafkaFlow.Sample.CooperativeSticky.csproj @@ -0,0 +1,33 @@ + + + + Exe + net6.0 + false + false + true + + + + 1701;1702;CS1591;SA1600 + + + + 1701;1702;CS1591;SA1600 + + + + + + + + + + + + + + + + + diff --git a/samples/KafkaFlow.Sample.CooperativeSticky/PrintConsoleHandler.cs b/samples/KafkaFlow.Sample.CooperativeSticky/PrintConsoleHandler.cs new file mode 100644 index 000000000..eb745195d --- /dev/null +++ b/samples/KafkaFlow.Sample.CooperativeSticky/PrintConsoleHandler.cs @@ -0,0 +1,18 @@ +using System; +using System.Threading.Tasks; + +namespace KafkaFlow.Sample.CooperativeSticky; + +public class PrintConsoleHandler : IMessageHandler +{ + public Task Handle(IMessageContext context, TestMessage message) + { + Console.WriteLine( + "Partition: {0} | Offset: {1} | Message: {2}", + context.ConsumerContext.Partition, + context.ConsumerContext.Offset, + message.Text); + + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/samples/KafkaFlow.Sample.CooperativeSticky/Program.cs b/samples/KafkaFlow.Sample.CooperativeSticky/Program.cs new file mode 100644 index 000000000..27bd15eed --- /dev/null +++ b/samples/KafkaFlow.Sample.CooperativeSticky/Program.cs @@ -0,0 +1,53 @@ +using Confluent.Kafka; +using KafkaFlow; +using KafkaFlow.Sample.CooperativeSticky; +using KafkaFlow.Serializer; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using AutoOffsetReset = KafkaFlow.AutoOffsetReset; + +const string producerName = "PrintConsole"; +const string topicName = "sample-topic"; +var hostBuilder = new HostBuilder(); +hostBuilder.ConfigureServices(services => + services.AddHostedService().AddKafka( + kafka => kafka + .UseConsoleLog() + .AddCluster( + cluster => cluster + .WithBrokers(new[] { "localhost:9092" }) + .CreateTopicIfNotExists(topicName, 6, 1) + .AddProducer( + producerName, + producer => producer + .DefaultTopic(topicName) + .AddMiddlewares(m => m.AddSerializer()) + ) + .AddConsumer( + consumer => consumer + .WithConsumerConfig(new ConsumerConfig + { + PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky, + AutoCommitIntervalMs = 100 + }) + .Topic(topicName) + .WithGroupId("print-console-handler") + .WithBufferSize(100) + .WithWorkersCount(3) + .WithAutoCommitIntervalMs(10) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddDeserializer() + .AddTypedHandlers(h => h.AddHandler()) + ) + ) + ) + )); + +var build = hostBuilder.Build(); +var kafkaBus = build.Services.CreateKafkaBus(); +await kafkaBus.StartAsync(); + +await build.RunAsync(); +await kafkaBus.StopAsync(); \ No newline at end of file diff --git a/samples/KafkaFlow.Sample.CooperativeSticky/README.md b/samples/KafkaFlow.Sample.CooperativeSticky/README.md new file mode 100644 index 000000000..1439c5195 --- /dev/null +++ b/samples/KafkaFlow.Sample.CooperativeSticky/README.md @@ -0,0 +1,28 @@ +# KafkaFlow.Sample + +This is a simple sample that shows how to produce and consume messages. + +## How to run + +### Requirements + +- [.NET 6.0 SDK](https://dotnet.microsoft.com/en-us/download/dotnet/6.0) +- [Docker Desktop](https://www.docker.com/products/docker-desktop/) + +### Start the cluster + +Using your terminal of choice, start the cluster. +You can find a docker-compose file at the root of this repository. +Position the terminal in that folder and run the following command. + +```bash +docker-compose up -d +``` + +### Run the Sample + +Using your terminal of choice, start the sample for the sample folder. + +```bash +dotnet run +``` diff --git a/samples/KafkaFlow.Sample.CooperativeSticky/TestMessage.cs b/samples/KafkaFlow.Sample.CooperativeSticky/TestMessage.cs new file mode 100644 index 000000000..775eebe49 --- /dev/null +++ b/samples/KafkaFlow.Sample.CooperativeSticky/TestMessage.cs @@ -0,0 +1,10 @@ +using System.Runtime.Serialization; + +namespace KafkaFlow.Sample.CooperativeSticky; + +[DataContract] +public class TestMessage +{ + [DataMember(Order = 1)] + public string Text { get; set; } +} \ No newline at end of file From 7b6f0ee75ef10f05c2bb635cb96c418b84285500 Mon Sep 17 00:00:00 2001 From: golanbz Date: Mon, 2 Sep 2024 14:47:32 +0300 Subject: [PATCH 3/7] * Added support for consumer strategies that do not act as "stop the world" scenarios (e.g., cooperative sticky). Fixes issue #557 and Fixes issue #456 * Enabled automatic committing with `confluent auto commit: true` instead of relying solely on manual commits, but only when the consumer strategy is cooperative sticky. (Refer to the open librdkafka issue at https://github.com/confluentinc/librdkafka/issues/4059). --- .../ConsumerConfigurationBuilder.cs | 4 +- src/KafkaFlow/Consumers/Consumer.cs | 70 +++++++++++++++---- src/KafkaFlow/Consumers/ConsumerManager.cs | 41 +++++++++-- .../Extensions/StrategyExtensions.cs | 17 +++++ 4 files changed, 112 insertions(+), 20 deletions(-) create mode 100644 src/KafkaFlow/Extensions/StrategyExtensions.cs diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index 304fe2d34..a87f58be8 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading.Tasks; using KafkaFlow.Consumers.DistributionStrategies; +using KafkaFlow.Extensions; namespace KafkaFlow.Configuration; @@ -251,7 +252,8 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration) consumerConfigCopy.StatisticsIntervalMs = _consumerConfig.StatisticsIntervalMs ?? _statisticsInterval; consumerConfigCopy.EnableAutoOffsetStore = false; - consumerConfigCopy.EnableAutoCommit = false; + consumerConfigCopy.EnableAutoCommit = _consumerConfig.PartitionAssignmentStrategy.IsStopTheWorldStrategy() is false; + consumerConfigCopy.AutoCommitIntervalMs = _consumerConfig.AutoCommitIntervalMs ?? 5000; consumerConfigCopy.ReadSecurityInformationFrom(clusterConfiguration); diff --git a/src/KafkaFlow/Consumers/Consumer.cs b/src/KafkaFlow/Consumers/Consumer.cs index 7f478f6e5..77f74e07b 100644 --- a/src/KafkaFlow/Consumers/Consumer.cs +++ b/src/KafkaFlow/Consumers/Consumer.cs @@ -7,6 +7,7 @@ using Confluent.Kafka; using KafkaFlow.Authentication; using KafkaFlow.Configuration; +using KafkaFlow.Extensions; namespace KafkaFlow.Consumers; @@ -14,6 +15,7 @@ internal class Consumer : IConsumer { private readonly IDependencyResolver _dependencyResolver; private readonly ILogHandler _logHandler; + private readonly bool _stopTheWorldStrategy; private readonly List, List>> _partitionsAssignedHandlers = new(); @@ -40,6 +42,7 @@ public Consumer( this.Configuration = configuration; _flowManager = new ConsumerFlowManager(this, _logHandler); _maxPollIntervalExceeded = new(_logHandler); + _stopTheWorldStrategy = Configuration.GetKafkaConfig().PartitionAssignmentStrategy.IsStopTheWorldStrategy(); foreach (var handler in this.Configuration.StatisticsHandlers) { @@ -148,7 +151,17 @@ public void Commit(IReadOnlyCollection off return; } - _consumer.Commit(validOffsets); + if (_stopTheWorldStrategy) + { + _consumer.Commit(validOffsets); + } + else + { + foreach (var topicPartitionOffset in validOffsets) + { + _consumer.StoreOffset(topicPartitionOffset); + } + } foreach (var offset in validOffsets) { @@ -237,17 +250,8 @@ private void EnsureConsumer() var kafkaConfig = this.Configuration.GetKafkaConfig(); var consumerBuilder = new ConsumerBuilder(kafkaConfig) - .SetPartitionsAssignedHandler( - (consumer, partitions) => this.FirePartitionsAssignedHandlers(consumer, partitions)) - .SetPartitionsRevokedHandler( - (consumer, partitions) => - { - _partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); - this.Assignment = new List(); - this.Subscription = new List(); - _currentPartitionsOffsets.Clear(); - _flowManager.Stop(); - }) + .SetPartitionsAssignedHandler(FirePartitionsAssignedHandlers) + .SetPartitionsRevokedHandler(FirePartitionRevokedHandlers) .SetErrorHandler((consumer, error) => _errorsHandlers.ForEach(x => x(consumer, error))) .SetStatisticsHandler((consumer, statistics) => _statisticsHandlers.ForEach(x => x(consumer, statistics))); @@ -293,13 +297,51 @@ private void FirePartitionsAssignedHandlers( IConsumer consumer, List partitions) { - this.Assignment = partitions; + if (_stopTheWorldStrategy) + { + this.Assignment = partitions; + this.Subscription = consumer.Subscription; + _flowManager.Start(consumer); + _partitionsAssignedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); + return; + } + + if (partitions.Count == 0) + { + return; + } + + this.Assignment = this.Assignment.Union(partitions).ToArray(); this.Subscription = consumer.Subscription; + _flowManager.Stop(); _flowManager.Start(consumer); - _partitionsAssignedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); } + private void FirePartitionRevokedHandlers(IConsumer consumer, List partitions) + { + if (_stopTheWorldStrategy) + { + _partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); + this.Assignment = new List(); + this.Subscription = new List(); + _currentPartitionsOffsets.Clear(); + _flowManager.Stop(); + return; + } + + this.Assignment = this.Assignment.Except(partitions.Select(x => x.TopicPartition)).ToArray(); + this.Subscription = consumer.Subscription; + foreach (var partition in partitions) + { + _currentPartitionsOffsets.TryRemove(partition.TopicPartition, out _); + } + + _flowManager.Stop(); + _flowManager.Start(consumer); + _partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); + } + private void InvalidateConsumer() { _consumer?.Close(); diff --git a/src/KafkaFlow/Consumers/ConsumerManager.cs b/src/KafkaFlow/Consumers/ConsumerManager.cs index 9d627cbcd..0dacccbfc 100644 --- a/src/KafkaFlow/Consumers/ConsumerManager.cs +++ b/src/KafkaFlow/Consumers/ConsumerManager.cs @@ -3,7 +3,9 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using Confluent.Kafka; using KafkaFlow.Configuration; +using KafkaFlow.Extensions; namespace KafkaFlow.Consumers; @@ -11,6 +13,7 @@ internal class ConsumerManager : IConsumerManager { private readonly IDependencyResolver _dependencyResolver; private readonly ILogHandler _logHandler; + private readonly bool _stopTheWorldStrategy; private Timer _evaluateWorkersCountTimer; @@ -26,7 +29,7 @@ public ConsumerManager( this.Consumer = consumer; this.WorkerPool = consumerWorkerPool; this.Feeder = feeder; - + _stopTheWorldStrategy = consumer.Configuration.GetKafkaConfig().PartitionAssignmentStrategy.IsStopTheWorldStrategy(); this.Consumer.OnPartitionsAssigned((_, _, partitions) => this.OnPartitionAssigned(partitions)); this.Consumer.OnPartitionsRevoked((_, _, partitions) => this.OnPartitionRevoked(partitions)); } @@ -104,9 +107,23 @@ private void OnPartitionRevoked(IEnumerable x.TopicPartition))); + this.GetConsumerLogInfo(topicPartitions?.Select(x => x.TopicPartition).ToArray() + ?? Array.Empty())); this.WorkerPool.StopAsync().GetAwaiter().GetResult(); + + if (_stopTheWorldStrategy) + { + return; + } + + var assignedPartitions = Consumer.Assignment; + var workersCount = this.CalculateWorkersCount(assignedPartitions).GetAwaiter().GetResult(); + + this.WorkerPool + .StartAsync(assignedPartitions, workersCount) + .GetAwaiter() + .GetResult(); } private void OnPartitionAssigned(IReadOnlyCollection partitions) @@ -115,10 +132,16 @@ private void OnPartitionAssigned(IReadOnlyCollection x.Topic) .Select( x => new @@ -136,6 +159,14 @@ private void OnPartitionAssigned(IReadOnlyCollection y.Partition.Value), }), + CurrentTopics = Consumer.Assignment.GroupBy(x => x.Topic) + .Select( + x => new + { + x.First().Topic, + PartitionsCount = x.Count(), + Partitions = x.Select(y => y.Partition.Value), + }), }; private async Task CalculateWorkersCount(IEnumerable partitions) diff --git a/src/KafkaFlow/Extensions/StrategyExtensions.cs b/src/KafkaFlow/Extensions/StrategyExtensions.cs new file mode 100644 index 000000000..82e5057e3 --- /dev/null +++ b/src/KafkaFlow/Extensions/StrategyExtensions.cs @@ -0,0 +1,17 @@ +using Confluent.Kafka; + +namespace KafkaFlow.Extensions; + +/// +/// Strategy extension methods. +/// +public static class StrategyExtensions +{ + /// + /// Determine if the strategy is a "stop the world" behavior. + /// + /// Strategy + /// + public static bool IsStopTheWorldStrategy(this PartitionAssignmentStrategy? strategy) => + strategy is null or PartitionAssignmentStrategy.Range or PartitionAssignmentStrategy.RoundRobin; +} From 0fb5d4de40318927e80102fcaefa116ff1a6caff Mon Sep 17 00:00:00 2001 From: golanbz Date: Mon, 2 Sep 2024 14:50:12 +0300 Subject: [PATCH 4/7] Introduced a new sample project designed specifically to evaluate the support for the cooperative sticky strategy, allowing for more thorough testing and analysis. --- KafkaFlow.sln | 7 +++ .../HostedService.cs | 44 +++++++++++++++ .../KafkaFlow.Sample.CooperativeSticky.csproj | 33 ++++++++++++ .../PrintConsoleHandler.cs | 18 +++++++ .../Program.cs | 53 +++++++++++++++++++ .../README.md | 28 ++++++++++ .../TestMessage.cs | 10 ++++ 7 files changed, 193 insertions(+) create mode 100644 samples/KafkaFlow.Sample.CooperativeSticky/HostedService.cs create mode 100644 samples/KafkaFlow.Sample.CooperativeSticky/KafkaFlow.Sample.CooperativeSticky.csproj create mode 100644 samples/KafkaFlow.Sample.CooperativeSticky/PrintConsoleHandler.cs create mode 100644 samples/KafkaFlow.Sample.CooperativeSticky/Program.cs create mode 100644 samples/KafkaFlow.Sample.CooperativeSticky/README.md create mode 100644 samples/KafkaFlow.Sample.CooperativeSticky/TestMessage.cs diff --git a/KafkaFlow.sln b/KafkaFlow.sln index a591122dc..9c5d79494 100644 --- a/KafkaFlow.sln +++ b/KafkaFlow.sln @@ -96,6 +96,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.OpenTelemetry", "samples\KafkaFlow.Sample.OpenTelemetry\KafkaFlow.Sample.OpenTelemetry.csproj", "{E9E8B374-4165-45F2-8DF5-F141E141AC1D}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.CooperativeSticky", "samples\KafkaFlow.Sample.CooperativeSticky\KafkaFlow.Sample.CooperativeSticky.csproj", "{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -222,6 +224,10 @@ Global {E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Debug|Any CPU.Build.0 = Debug|Any CPU {E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Release|Any CPU.ActiveCfg = Release|Any CPU {E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Release|Any CPU.Build.0 = Release|Any CPU + {DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -265,6 +271,7 @@ Global {1755E8DB-970C-4A24-8B7C-A2BEC1410BEE} = {7A9B997B-DAAC-4004-94F3-32F6B88E0068} {80080C1D-579E-4AB2-935D-5CFFC51843D8} = {7A9B997B-DAAC-4004-94F3-32F6B88E0068} {E9E8B374-4165-45F2-8DF5-F141E141AC1D} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} + {DBF7B091-11AE-402F-9F36-7E7EB3901B0B} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {6AE955B5-16B0-41CF-9F12-66D15B3DD1AB} diff --git a/samples/KafkaFlow.Sample.CooperativeSticky/HostedService.cs b/samples/KafkaFlow.Sample.CooperativeSticky/HostedService.cs new file mode 100644 index 000000000..a4bdaafa6 --- /dev/null +++ b/samples/KafkaFlow.Sample.CooperativeSticky/HostedService.cs @@ -0,0 +1,44 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using KafkaFlow.Producers; +using Microsoft.Extensions.Hosting; + +namespace KafkaFlow.Sample.CooperativeSticky; + +public class HostedService : IHostedService +{ + private IMessageProducer _producer; + const string producerName = "PrintConsole"; + const string topicName = "sample-topic"; + + + public HostedService(IProducerAccessor producerAccessor) + { + _producer = producerAccessor.GetProducer(producerName); + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + try + { + while (true) + { + await _producer.ProduceAsync( + topicName, + Guid.NewGuid().ToString(), + new TestMessage { Text = $"Message: {Guid.NewGuid()}" }); + await Task.Delay(500, cancellationToken); + } + } + catch (Exception e) + { + Console.WriteLine(e); + } + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/samples/KafkaFlow.Sample.CooperativeSticky/KafkaFlow.Sample.CooperativeSticky.csproj b/samples/KafkaFlow.Sample.CooperativeSticky/KafkaFlow.Sample.CooperativeSticky.csproj new file mode 100644 index 000000000..44c01e12a --- /dev/null +++ b/samples/KafkaFlow.Sample.CooperativeSticky/KafkaFlow.Sample.CooperativeSticky.csproj @@ -0,0 +1,33 @@ + + + + Exe + net6.0 + false + false + true + + + + 1701;1702;CS1591;SA1600 + + + + 1701;1702;CS1591;SA1600 + + + + + + + + + + + + + + + + + diff --git a/samples/KafkaFlow.Sample.CooperativeSticky/PrintConsoleHandler.cs b/samples/KafkaFlow.Sample.CooperativeSticky/PrintConsoleHandler.cs new file mode 100644 index 000000000..eb745195d --- /dev/null +++ b/samples/KafkaFlow.Sample.CooperativeSticky/PrintConsoleHandler.cs @@ -0,0 +1,18 @@ +using System; +using System.Threading.Tasks; + +namespace KafkaFlow.Sample.CooperativeSticky; + +public class PrintConsoleHandler : IMessageHandler +{ + public Task Handle(IMessageContext context, TestMessage message) + { + Console.WriteLine( + "Partition: {0} | Offset: {1} | Message: {2}", + context.ConsumerContext.Partition, + context.ConsumerContext.Offset, + message.Text); + + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/samples/KafkaFlow.Sample.CooperativeSticky/Program.cs b/samples/KafkaFlow.Sample.CooperativeSticky/Program.cs new file mode 100644 index 000000000..27bd15eed --- /dev/null +++ b/samples/KafkaFlow.Sample.CooperativeSticky/Program.cs @@ -0,0 +1,53 @@ +using Confluent.Kafka; +using KafkaFlow; +using KafkaFlow.Sample.CooperativeSticky; +using KafkaFlow.Serializer; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using AutoOffsetReset = KafkaFlow.AutoOffsetReset; + +const string producerName = "PrintConsole"; +const string topicName = "sample-topic"; +var hostBuilder = new HostBuilder(); +hostBuilder.ConfigureServices(services => + services.AddHostedService().AddKafka( + kafka => kafka + .UseConsoleLog() + .AddCluster( + cluster => cluster + .WithBrokers(new[] { "localhost:9092" }) + .CreateTopicIfNotExists(topicName, 6, 1) + .AddProducer( + producerName, + producer => producer + .DefaultTopic(topicName) + .AddMiddlewares(m => m.AddSerializer()) + ) + .AddConsumer( + consumer => consumer + .WithConsumerConfig(new ConsumerConfig + { + PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky, + AutoCommitIntervalMs = 100 + }) + .Topic(topicName) + .WithGroupId("print-console-handler") + .WithBufferSize(100) + .WithWorkersCount(3) + .WithAutoCommitIntervalMs(10) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddDeserializer() + .AddTypedHandlers(h => h.AddHandler()) + ) + ) + ) + )); + +var build = hostBuilder.Build(); +var kafkaBus = build.Services.CreateKafkaBus(); +await kafkaBus.StartAsync(); + +await build.RunAsync(); +await kafkaBus.StopAsync(); \ No newline at end of file diff --git a/samples/KafkaFlow.Sample.CooperativeSticky/README.md b/samples/KafkaFlow.Sample.CooperativeSticky/README.md new file mode 100644 index 000000000..1439c5195 --- /dev/null +++ b/samples/KafkaFlow.Sample.CooperativeSticky/README.md @@ -0,0 +1,28 @@ +# KafkaFlow.Sample + +This is a simple sample that shows how to produce and consume messages. + +## How to run + +### Requirements + +- [.NET 6.0 SDK](https://dotnet.microsoft.com/en-us/download/dotnet/6.0) +- [Docker Desktop](https://www.docker.com/products/docker-desktop/) + +### Start the cluster + +Using your terminal of choice, start the cluster. +You can find a docker-compose file at the root of this repository. +Position the terminal in that folder and run the following command. + +```bash +docker-compose up -d +``` + +### Run the Sample + +Using your terminal of choice, start the sample for the sample folder. + +```bash +dotnet run +``` diff --git a/samples/KafkaFlow.Sample.CooperativeSticky/TestMessage.cs b/samples/KafkaFlow.Sample.CooperativeSticky/TestMessage.cs new file mode 100644 index 000000000..775eebe49 --- /dev/null +++ b/samples/KafkaFlow.Sample.CooperativeSticky/TestMessage.cs @@ -0,0 +1,10 @@ +using System.Runtime.Serialization; + +namespace KafkaFlow.Sample.CooperativeSticky; + +[DataContract] +public class TestMessage +{ + [DataMember(Order = 1)] + public string Text { get; set; } +} \ No newline at end of file From f4997c2fe1e6ed622e4ea861a8bba1fa7e60d7f9 Mon Sep 17 00:00:00 2001 From: golanbz Date: Tue, 17 Sep 2024 20:38:07 +0300 Subject: [PATCH 5/7] Added unit tests and fixed a bug in ClusterConfiguration - Added 3 new test files to improve coverage: - `ConsumerConfigurationBuilderTests.cs` - `KafkaConfigTests.cs` - `PartitionAssignmentStrategyTests.cs` - Fixed a bug in `ClusterConfiguration` related to the `AutoCommitInterval` initialization. --- .../ConsumerConfigurationBuilder.cs | 2 +- .../ConsumerConfigurationBuilderTests.cs | 62 ++++++ .../ConsumerManagerCooperativeStickyTests.cs | 195 ++++++++++++++++++ .../Consumer/ConsumerManagerTests.cs | 17 +- 4 files changed, 273 insertions(+), 3 deletions(-) create mode 100644 tests/KafkaFlow.UnitTests/Consumer/ConsumerConfigurationBuilderTests.cs create mode 100644 tests/KafkaFlow.UnitTests/Consumer/ConsumerManagerCooperativeStickyTests.cs diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index a87f58be8..00de17e6d 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -253,7 +253,7 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration) consumerConfigCopy.EnableAutoOffsetStore = false; consumerConfigCopy.EnableAutoCommit = _consumerConfig.PartitionAssignmentStrategy.IsStopTheWorldStrategy() is false; - consumerConfigCopy.AutoCommitIntervalMs = _consumerConfig.AutoCommitIntervalMs ?? 5000; + consumerConfigCopy.AutoCommitIntervalMs = (int?)_autoCommitInterval.TotalMilliseconds; consumerConfigCopy.ReadSecurityInformationFrom(clusterConfiguration); diff --git a/tests/KafkaFlow.UnitTests/Consumer/ConsumerConfigurationBuilderTests.cs b/tests/KafkaFlow.UnitTests/Consumer/ConsumerConfigurationBuilderTests.cs new file mode 100644 index 000000000..3a20c0b02 --- /dev/null +++ b/tests/KafkaFlow.UnitTests/Consumer/ConsumerConfigurationBuilderTests.cs @@ -0,0 +1,62 @@ +using System; +using AutoFixture; +using AutoFixture.AutoMoq; +using Confluent.Kafka; +using FluentAssertions; +using KafkaFlow.Configuration; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Moq; + +namespace KafkaFlow.UnitTests.Consumer; + +[TestClass] +public class ConsumerConfigurationBuilderTests +{ + private readonly Fixture _fixture = new(); + + [TestInitialize] + public void Setup() + { + _fixture.Customize(new AutoMoqCustomization()); + } + + [TestMethod] + public void ConfigurationBuild_CallBuild_WithSticky_EnableAutoCommit_True() + { + // Arrange + var consumerConfigurationBuilder = _fixture.Create(); + consumerConfigurationBuilder.WithConsumerConfig(new ConsumerConfig() + { + PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky, + GroupId = "Test", + }).WithAutoCommitIntervalMs(500) + .WithBufferSize(3); + + // Act + var consumerConfiguration = consumerConfigurationBuilder.Build(_fixture.Create()); + + // Assert + var consumerConfig = consumerConfiguration.GetKafkaConfig(); + consumerConfig.EnableAutoCommit.Should().BeTrue(); + consumerConfig.AutoCommitIntervalMs.Should().Be(500); + consumerConfiguration.AutoCommitInterval.Should().Be(TimeSpan.FromMilliseconds(500)); + } + + [TestMethod] + public void ConfigurationBuild_CallBuild_WithSRoundRobin_EnableAutoCommit_False() + { + // Arrange + var consumerConfigurationBuilder = new ConsumerConfigurationBuilder(Mock.Of()); + consumerConfigurationBuilder.WithConsumerConfig(new ConsumerConfig() + { + PartitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin, + GroupId = "Test" + }).WithAutoCommitIntervalMs(500).WithBufferSize(3); + // Act + var consumerConfiguration = consumerConfigurationBuilder.Build(_fixture.Create()); + + // Assert + consumerConfiguration.GetKafkaConfig().EnableAutoCommit.Should().BeFalse(); + consumerConfiguration.AutoCommitInterval.Should().Be(TimeSpan.FromMilliseconds(500)); + } +} \ No newline at end of file diff --git a/tests/KafkaFlow.UnitTests/Consumer/ConsumerManagerCooperativeStickyTests.cs b/tests/KafkaFlow.UnitTests/Consumer/ConsumerManagerCooperativeStickyTests.cs new file mode 100644 index 000000000..508e4dad7 --- /dev/null +++ b/tests/KafkaFlow.UnitTests/Consumer/ConsumerManagerCooperativeStickyTests.cs @@ -0,0 +1,195 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using AutoFixture; +using Confluent.Kafka; +using FluentAssertions; +using KafkaFlow.Configuration; +using KafkaFlow.Consumers; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Moq; + +namespace KafkaFlow.UnitTests.Consumer; + +[TestClass] +public class ConsumerManagerCooperativeStickyTests +{ + private readonly Fixture _fixture = new(); + + private ConsumerManager _target; + + private Mock _consumerMock; + private Mock _workerPoolMock; + private Mock _feederMock; + private Mock _logHandlerMock; + private Mock _dependencyResolver; + + private Action, List> _onPartitionAssignedHandler; + private Action, List> _onPartitionRevokedHandler; + + [TestInitialize] + public void Setup() + { + _consumerMock = new Mock(); + _workerPoolMock = new Mock(); + _feederMock = new Mock(); + _logHandlerMock = new Mock(); + _dependencyResolver = new Mock(); + + _consumerMock + .Setup( + x => x.OnPartitionsAssigned(It.IsAny, List>>())) + .Callback( + (Action, List> value) => + _onPartitionAssignedHandler = value); + + _consumerMock + .Setup( + x => x.OnPartitionsRevoked( + It.IsAny, List>>())) + .Callback( + (Action, List> value) => + _onPartitionRevokedHandler = value); + + var configurationMock= new Mock(); + + configurationMock + .Setup(x => x.GetKafkaConfig()) + .Returns(() => new ConsumerConfig { PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky }); + + configurationMock + .SetupGet(x => x.WorkersCountCalculator) + .Returns((_, _) => Task.FromResult(10)); + + configurationMock + .SetupGet(x => x.WorkersCountEvaluationInterval) + .Returns(TimeSpan.FromMinutes(5)); + + _consumerMock + .SetupGet(x => x.Configuration) + .Returns(configurationMock.Object); + + _consumerMock + .SetupGet(x => x.Assignment) + .Returns(Array.Empty()); + + _target = new ConsumerManager( + _consumerMock.Object, + _workerPoolMock.Object, + _feederMock.Object, + _dependencyResolver.Object, + _logHandlerMock.Object); + } + + [TestMethod] + public void ConstructorCalled_InitializeProperties() + { + // Assert + _target.Consumer.Should().Be(_consumerMock.Object); + _target.WorkerPool.Should().Be(_workerPoolMock.Object); + _target.Feeder.Should().Be(_feederMock.Object); + } + + [TestMethod] + public async Task StartAsync_StartDependencies() + { + // Arrange + _feederMock + .Setup(x => x.Start()); + + // Act + await _target.StartAsync(); + + // Assert + _feederMock.VerifyAll(); + } + + [TestMethod] + public async Task StopAsync_StopDependencies() + { + // Arrange + _feederMock + .Setup(x => x.StopAsync()) + .Returns(Task.CompletedTask); + + _workerPoolMock + .Setup(x => x.StopAsync()) + .Returns(Task.CompletedTask); + + // Act + await _target.StopAsync(); + + // Assert + _feederMock.VerifyAll(); + _workerPoolMock.VerifyAll(); + _workerPoolMock.VerifyNoOtherCalls(); + _consumerMock.Verify(x => x.Dispose(), Times.Once()); + } + + [TestMethod] + public void OnPartitionsAssigned_StartWorkerPool() + { + // Arrange + var currentPartitions = _fixture.Create>(); + var newAssignedPartitions = _fixture.Create>(); + var allPartitions = currentPartitions.Concat(newAssignedPartitions).ToArray(); + + _workerPoolMock + .Setup(x => x.StopAsync()) + .Returns(Task.CompletedTask); + + _workerPoolMock + .Setup(x => x.StartAsync(allPartitions, It.IsAny())) + .Returns(Task.CompletedTask); + + _logHandlerMock + .Setup(x => x.Info(It.IsAny(), It.IsAny())); + + _consumerMock.SetupGet(x=>x.Assignment).Returns(allPartitions.ToArray()); + + // Act + _onPartitionAssignedHandler(_dependencyResolver.Object, Mock.Of>(), newAssignedPartitions); + + // Assert + _workerPoolMock.VerifyAll(); + _workerPoolMock.VerifyNoOtherCalls(); + _logHandlerMock.VerifyAll(); + } + + [TestMethod] + public void OnPartitionsRevoked_StopWorkerPool() + { + // Arrange + var currentPartitions = _fixture.CreateMany(6).ToList(); + var revokedPartitions = currentPartitions.Take(3).ToArray(); + var leftPartitions = currentPartitions.Except(revokedPartitions).ToArray(); + + var partitions = _fixture.Create>(); + + _workerPoolMock + .Setup(x => x.StopAsync()) + .Returns(Task.CompletedTask); + + _workerPoolMock + .Setup(x => x.StartAsync(leftPartitions, It.IsAny())) + .Returns(Task.CompletedTask); + + _consumerMock + .SetupGet(x => x.Configuration) + .Returns(new Mock().Object); + + _logHandlerMock + .Setup(x => x.Warning(It.IsAny(), It.IsAny())); + + _consumerMock.SetupGet(x=>x.Assignment).Returns(leftPartitions); + + // Act + _onPartitionRevokedHandler(_dependencyResolver.Object, Mock.Of>(), revokedPartitions.Select(x=>new Confluent.Kafka.TopicPartitionOffset(x,123)).ToList()); + + // Assert + _workerPoolMock.VerifyAll(); + _workerPoolMock.VerifyNoOtherCalls(); + _logHandlerMock.VerifyAll(); + } +} \ No newline at end of file diff --git a/tests/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs b/tests/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs index dd92901f9..178eff9c0 100644 --- a/tests/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs +++ b/tests/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Threading.Tasks; using AutoFixture; +using Confluent.Kafka; using FluentAssertions; using KafkaFlow.Configuration; using KafkaFlow.Consumers; @@ -50,7 +51,11 @@ public void Setup() (Action, List> value) => _onPartitionRevokedHandler = value); - var configurationMock = new Mock(); + var configurationMock= new Mock(); + + configurationMock + .Setup(x => x.GetKafkaConfig()) + .Returns(() => new ConsumerConfig { PartitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin }); configurationMock .SetupGet(x => x.WorkersCountCalculator) @@ -63,7 +68,11 @@ public void Setup() _consumerMock .SetupGet(x => x.Configuration) .Returns(configurationMock.Object); - + + _consumerMock + .SetupGet(x => x.Assignment) + .Returns(Array.Empty()); + _target = new ConsumerManager( _consumerMock.Object, _workerPoolMock.Object, @@ -113,6 +122,7 @@ public async Task StopAsync_StopDependencies() // Assert _feederMock.VerifyAll(); _workerPoolMock.VerifyAll(); + _workerPoolMock.VerifyNoOtherCalls(); _consumerMock.Verify(x => x.Dispose(), Times.Once()); } @@ -128,6 +138,8 @@ public void OnPartitionsAssigned_StartWorkerPool() _logHandlerMock .Setup(x => x.Info(It.IsAny(), It.IsAny())); + + _consumerMock.SetupGet(x=>x.Assignment).Returns(partitions.ToArray()); // Act _onPartitionAssignedHandler(_dependencyResolver.Object, Mock.Of>(), partitions); @@ -160,6 +172,7 @@ public void OnPartitionsRevoked_StopWorkerPool() // Assert _workerPoolMock.VerifyAll(); + _workerPoolMock.VerifyNoOtherCalls(); _logHandlerMock.VerifyAll(); } } From 2db29f9641ccb86d89eca045014225e126e9a00c Mon Sep 17 00:00:00 2001 From: golanbz Date: Tue, 17 Sep 2024 20:46:46 +0300 Subject: [PATCH 6/7] Updated `AutoCommitInterval` in Cooperative-sticky sample `Program.cs` to 100ms. --- samples/KafkaFlow.Sample.CooperativeSticky/Program.cs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/samples/KafkaFlow.Sample.CooperativeSticky/Program.cs b/samples/KafkaFlow.Sample.CooperativeSticky/Program.cs index 27bd15eed..0754e6388 100644 --- a/samples/KafkaFlow.Sample.CooperativeSticky/Program.cs +++ b/samples/KafkaFlow.Sample.CooperativeSticky/Program.cs @@ -27,14 +27,12 @@ consumer => consumer .WithConsumerConfig(new ConsumerConfig { - PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky, - AutoCommitIntervalMs = 100 - }) + PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky}) .Topic(topicName) .WithGroupId("print-console-handler") .WithBufferSize(100) .WithWorkersCount(3) - .WithAutoCommitIntervalMs(10) + .WithAutoCommitIntervalMs(100) .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares From 08161134d98b6ab401ef625e14aa52711fc4fc06 Mon Sep 17 00:00:00 2001 From: golanbz Date: Wed, 18 Sep 2024 14:17:34 +0300 Subject: [PATCH 7/7] Fix Codacy issues --- .../ConsumerConfigurationBuilderTests.cs | 6 ++--- .../ConsumerManagerCooperativeStickyTests.cs | 24 +++++++++---------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/tests/KafkaFlow.UnitTests/Consumer/ConsumerConfigurationBuilderTests.cs b/tests/KafkaFlow.UnitTests/Consumer/ConsumerConfigurationBuilderTests.cs index 3a20c0b02..1877c137b 100644 --- a/tests/KafkaFlow.UnitTests/Consumer/ConsumerConfigurationBuilderTests.cs +++ b/tests/KafkaFlow.UnitTests/Consumer/ConsumerConfigurationBuilderTests.cs @@ -25,8 +25,8 @@ public void ConfigurationBuild_CallBuild_WithSticky_EnableAutoCommit_True() { // Arrange var consumerConfigurationBuilder = _fixture.Create(); - consumerConfigurationBuilder.WithConsumerConfig(new ConsumerConfig() - { + consumerConfigurationBuilder.WithConsumerConfig(new ConsumerConfig + { PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky, GroupId = "Test", }).WithAutoCommitIntervalMs(500) @@ -47,7 +47,7 @@ public void ConfigurationBuild_CallBuild_WithSRoundRobin_EnableAutoCommit_False( { // Arrange var consumerConfigurationBuilder = new ConsumerConfigurationBuilder(Mock.Of()); - consumerConfigurationBuilder.WithConsumerConfig(new ConsumerConfig() + consumerConfigurationBuilder.WithConsumerConfig(new ConsumerConfig { PartitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin, GroupId = "Test" diff --git a/tests/KafkaFlow.UnitTests/Consumer/ConsumerManagerCooperativeStickyTests.cs b/tests/KafkaFlow.UnitTests/Consumer/ConsumerManagerCooperativeStickyTests.cs index 508e4dad7..6ee03acc9 100644 --- a/tests/KafkaFlow.UnitTests/Consumer/ConsumerManagerCooperativeStickyTests.cs +++ b/tests/KafkaFlow.UnitTests/Consumer/ConsumerManagerCooperativeStickyTests.cs @@ -52,7 +52,7 @@ public void Setup() (Action, List> value) => _onPartitionRevokedHandler = value); - var configurationMock= new Mock(); + var configurationMock = new Mock(); configurationMock .Setup(x => x.GetKafkaConfig()) @@ -69,11 +69,11 @@ public void Setup() _consumerMock .SetupGet(x => x.Configuration) .Returns(configurationMock.Object); - + _consumerMock .SetupGet(x => x.Assignment) .Returns(Array.Empty()); - + _target = new ConsumerManager( _consumerMock.Object, _workerPoolMock.Object, @@ -134,19 +134,19 @@ public void OnPartitionsAssigned_StartWorkerPool() var currentPartitions = _fixture.Create>(); var newAssignedPartitions = _fixture.Create>(); var allPartitions = currentPartitions.Concat(newAssignedPartitions).ToArray(); - + _workerPoolMock .Setup(x => x.StopAsync()) .Returns(Task.CompletedTask); - + _workerPoolMock .Setup(x => x.StartAsync(allPartitions, It.IsAny())) .Returns(Task.CompletedTask); _logHandlerMock .Setup(x => x.Info(It.IsAny(), It.IsAny())); - - _consumerMock.SetupGet(x=>x.Assignment).Returns(allPartitions.ToArray()); + + _consumerMock.SetupGet(x => x.Assignment).Returns(allPartitions.ToArray()); // Act _onPartitionAssignedHandler(_dependencyResolver.Object, Mock.Of>(), newAssignedPartitions); @@ -164,13 +164,11 @@ public void OnPartitionsRevoked_StopWorkerPool() var currentPartitions = _fixture.CreateMany(6).ToList(); var revokedPartitions = currentPartitions.Take(3).ToArray(); var leftPartitions = currentPartitions.Except(revokedPartitions).ToArray(); - - var partitions = _fixture.Create>(); _workerPoolMock .Setup(x => x.StopAsync()) .Returns(Task.CompletedTask); - + _workerPoolMock .Setup(x => x.StartAsync(leftPartitions, It.IsAny())) .Returns(Task.CompletedTask); @@ -181,11 +179,11 @@ public void OnPartitionsRevoked_StopWorkerPool() _logHandlerMock .Setup(x => x.Warning(It.IsAny(), It.IsAny())); - - _consumerMock.SetupGet(x=>x.Assignment).Returns(leftPartitions); + + _consumerMock.SetupGet(x => x.Assignment).Returns(leftPartitions); // Act - _onPartitionRevokedHandler(_dependencyResolver.Object, Mock.Of>(), revokedPartitions.Select(x=>new Confluent.Kafka.TopicPartitionOffset(x,123)).ToList()); + _onPartitionRevokedHandler(_dependencyResolver.Object, Mock.Of>(), revokedPartitions.Select(x => new Confluent.Kafka.TopicPartitionOffset(x, 123)).ToList()); // Assert _workerPoolMock.VerifyAll();