Skip to content

Commit

Permalink
fix: kafka producer should flush on close; test uses assume not create
Browse files Browse the repository at this point in the history
  • Loading branch information
iancooper committed Jan 6, 2025
1 parent c9e8f37 commit 864af0e
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ THE SOFTWARE. */

namespace Paramore.Brighter.MessagingGateway.Kafka
{
internal class KafkaMessageProducer : KafkaMessagingGateway, IAmAMessageProducerSync, IAmAMessageProducerAsync, ISupportPublishConfirmation
public class KafkaMessageProducer : KafkaMessagingGateway, IAmAMessageProducerSync, IAmAMessageProducerAsync, ISupportPublishConfirmation
{
/// <summary>
/// Action taken when a message is published, following receipt of a confirmation from the broker
Expand Down Expand Up @@ -152,6 +152,15 @@ public void ConfigHook(Action<ProducerConfig> configHook)
{
configHook(_producerConfig);
}

/// <summary>
/// Flushes the producer to ensure all messages in the internal buffer have been sent
/// </summary>
/// <param name="cancellationToken">Used to timeout the flush operation</param>
public void Flush(CancellationToken cancellationToken = default)
{
_producer?.Flush(cancellationToken);
}

/// <summary>
/// Initialize the producer => two stage construction to allow for a hook if needed
Expand Down Expand Up @@ -240,8 +249,6 @@ public void Send(Message message)
throw new ChannelFailureException("Error connecting to Kafka, see inner exception for details", kafkaException);
}
}



/// <summary>
/// Sends the specified message.
Expand Down Expand Up @@ -336,6 +343,7 @@ private void Dispose(bool disposing)
{
if (disposing)
{
Flush();
_producer?.Dispose();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class KafkaMessageConsumerPreservesOrder : IDisposable
{
private readonly ITestOutputHelper _output;
private readonly string _queueName = Guid.NewGuid().ToString();
private readonly string _topic = Guid.NewGuid().ToString();
private readonly RoutingKey _topic = new(Guid.NewGuid().ToString());
private readonly IAmAProducerRegistry _producerRegistry;
private readonly string _partitionKey = Guid.NewGuid().ToString();
private readonly string _kafkaGroupId = Guid.NewGuid().ToString();
Expand Down Expand Up @@ -94,11 +94,9 @@ private string SendMessage()
{
var messageId = Guid.NewGuid().ToString();

var routingKey = new RoutingKey(_topic);

((IAmAMessageProducerSync)_producerRegistry.LookupBy(routingKey)).Send(
((IAmAMessageProducerSync)_producerRegistry.LookupBy(_topic)).Send(
new Message(
new MessageHeader(messageId, routingKey, MessageType.MT_COMMAND)
new MessageHeader(messageId, _topic, MessageType.MT_COMMAND)
{
PartitionKey = _partitionKey
},
Expand Down Expand Up @@ -139,7 +137,7 @@ private IAmAMessageConsumerSync CreateConsumer()
new KafkaMessagingGatewayConfiguration
{
Name = "Kafka Consumer Test",
BootStrapServers = new[] { "localhost:9092" }
BootStrapServers = ["localhost:9092"]
})
.Create(new KafkaSubscription<MyCommand>(
name: new SubscriptionName("Paramore.Brighter.Tests"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public KafkaProducerAssumeTests(ITestOutputHelper output)
Name = "Kafka Producer Send Test",
BootStrapServers = new[] {"localhost:9092"}
},
new KafkaPublication[] {new KafkaPublication
[
new KafkaPublication
{
Topic = new RoutingKey(_topic),
NumPartitions = 1,
Expand All @@ -35,8 +36,9 @@ public KafkaProducerAssumeTests(ITestOutputHelper output)
//your production values ought to be lower
MessageTimeoutMs = 2000,
RequestTimeoutMs = 2000,
MakeChannels = OnMissingChannel.Create
}}).Create();
MakeChannels = OnMissingChannel.Assume
}
]).Create();

}

Expand Down Expand Up @@ -69,8 +71,7 @@ public async Task When_a_consumer_declares_topics()

((IAmAMessageProducerSync)producer).Send(message);

//Give this a chance to succeed - will fail
await Task.Delay(5000);
((KafkaMessageProducer)producer).Flush();

messagePublished.Should().BeFalse();
}
Expand Down

0 comments on commit 864af0e

Please sign in to comment.