From 864af0e45ec481dffef1669b611158a4c4a0dcd0 Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Mon, 6 Jan 2025 16:51:29 +0000 Subject: [PATCH] fix: kafka producer should flush on close; test uses assume not create --- .../KafkaMessageProducer.cs | 14 +++++++++++--- ...hen_a_set_of_messages_is_sent_preserve_order.cs | 10 ++++------ .../When_consumer_assumes_topic_but_missing.cs | 11 ++++++----- 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageProducer.cs index af1b8c14e4..65c8b89078 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageProducer.cs @@ -30,7 +30,7 @@ THE SOFTWARE. */ namespace Paramore.Brighter.MessagingGateway.Kafka { - internal class KafkaMessageProducer : KafkaMessagingGateway, IAmAMessageProducerSync, IAmAMessageProducerAsync, ISupportPublishConfirmation + public class KafkaMessageProducer : KafkaMessagingGateway, IAmAMessageProducerSync, IAmAMessageProducerAsync, ISupportPublishConfirmation { /// /// Action taken when a message is published, following receipt of a confirmation from the broker @@ -152,6 +152,15 @@ public void ConfigHook(Action configHook) { configHook(_producerConfig); } + + /// + /// Flushes the producer to ensure all messages in the internal buffer have been sent + /// + /// Used to timeout the flush operation + public void Flush(CancellationToken cancellationToken = default) + { + _producer?.Flush(cancellationToken); + } /// /// Initialize the producer => two stage construction to allow for a hook if needed @@ -240,8 +249,6 @@ public void Send(Message message) throw new ChannelFailureException("Error connecting to Kafka, see inner exception for details", kafkaException); } } - - /// /// Sends the specified message. @@ -336,6 +343,7 @@ private void Dispose(bool disposing) { if (disposing) { + Flush(); _producer?.Dispose(); } } diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_a_set_of_messages_is_sent_preserve_order.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_a_set_of_messages_is_sent_preserve_order.cs index 42d479f2b4..46f35f046f 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_a_set_of_messages_is_sent_preserve_order.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_a_set_of_messages_is_sent_preserve_order.cs @@ -17,7 +17,7 @@ public class KafkaMessageConsumerPreservesOrder : IDisposable { private readonly ITestOutputHelper _output; private readonly string _queueName = Guid.NewGuid().ToString(); - private readonly string _topic = Guid.NewGuid().ToString(); + private readonly RoutingKey _topic = new(Guid.NewGuid().ToString()); private readonly IAmAProducerRegistry _producerRegistry; private readonly string _partitionKey = Guid.NewGuid().ToString(); private readonly string _kafkaGroupId = Guid.NewGuid().ToString(); @@ -94,11 +94,9 @@ private string SendMessage() { var messageId = Guid.NewGuid().ToString(); - var routingKey = new RoutingKey(_topic); - - ((IAmAMessageProducerSync)_producerRegistry.LookupBy(routingKey)).Send( + ((IAmAMessageProducerSync)_producerRegistry.LookupBy(_topic)).Send( new Message( - new MessageHeader(messageId, routingKey, MessageType.MT_COMMAND) + new MessageHeader(messageId, _topic, MessageType.MT_COMMAND) { PartitionKey = _partitionKey }, @@ -139,7 +137,7 @@ private IAmAMessageConsumerSync CreateConsumer() new KafkaMessagingGatewayConfiguration { Name = "Kafka Consumer Test", - BootStrapServers = new[] { "localhost:9092" } + BootStrapServers = ["localhost:9092"] }) .Create(new KafkaSubscription( name: new SubscriptionName("Paramore.Brighter.Tests"), diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_consumer_assumes_topic_but_missing.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_consumer_assumes_topic_but_missing.cs index 1526b89069..a8cc52ff7a 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_consumer_assumes_topic_but_missing.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_consumer_assumes_topic_but_missing.cs @@ -26,7 +26,8 @@ public KafkaProducerAssumeTests(ITestOutputHelper output) Name = "Kafka Producer Send Test", BootStrapServers = new[] {"localhost:9092"} }, - new KafkaPublication[] {new KafkaPublication + [ + new KafkaPublication { Topic = new RoutingKey(_topic), NumPartitions = 1, @@ -35,8 +36,9 @@ public KafkaProducerAssumeTests(ITestOutputHelper output) //your production values ought to be lower MessageTimeoutMs = 2000, RequestTimeoutMs = 2000, - MakeChannels = OnMissingChannel.Create - }}).Create(); + MakeChannels = OnMissingChannel.Assume + } + ]).Create(); } @@ -69,8 +71,7 @@ public async Task When_a_consumer_declares_topics() ((IAmAMessageProducerSync)producer).Send(message); - //Give this a chance to succeed - will fail - await Task.Delay(5000); + ((KafkaMessageProducer)producer).Flush(); messagePublished.Should().BeFalse(); }