From 90f33edf7e0a123bcee5c5b23de9667c8c1c671b Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Wed, 8 Jan 2025 09:45:41 +0000 Subject: [PATCH] fix: make the kafka tests more robust --- .../KafkaMessageConsumerFactory.cs | 26 ++------------ ...age_is_acknowledged_update_offset_async.cs | 35 ++++++++++--------- ...f_messages_is_sent_preserve_order_async.cs | 26 ++++++++------ ...onsumer_assumes_topic_but_missing_async.cs | 9 +++-- .../When_consumer_declares_topic_async.cs | 20 +++++++---- ...iting_next_acknowledge_sweep_them_async.cs | 22 ++++++------ .../Proactor/When_posting_a_message_async.cs | 12 +++++-- ...sting_a_message_with_header_bytes_async.cs | 12 +++++-- ...sage_without_partition_key_header_async.cs | 8 ++++- ...a_message_is_acknowledged_update_offset.cs | 31 ++++++++-------- ..._set_of_messages_is_sent_preserve_order.cs | 17 +++++---- .../Reactor/When_consumer_declares_topic.cs | 16 ++++++--- ...ts_awaiting_next_acknowledge_sweep_them.cs | 24 +++++++------ .../Local/Reactor/When_posting_a_message.cs | 12 +++++-- ...hen_posting_a_message_with_header_bytes.cs | 14 +++++--- ..._a_message_without_partition_key_header.cs | 8 +++-- 16 files changed, 165 insertions(+), 127 deletions(-) diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumerFactory.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumerFactory.cs index 5f79e52827..b827ffe831 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumerFactory.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumerFactory.cs @@ -73,29 +73,7 @@ public IAmAMessageConsumerSync Create(Subscription subscription) ); } - public IAmAMessageConsumerAsync CreateAsync(Subscription subscription) - { - KafkaSubscription kafkaSubscription = subscription as KafkaSubscription; - if (kafkaSubscription == null) - throw new ConfigurationException("We expect an SQSConnection or SQSConnection as a parameter"); - - return new KafkaMessageConsumer( - configuration: _configuration, - routingKey:kafkaSubscription.RoutingKey, //topic - groupId: kafkaSubscription.GroupId, - offsetDefault: kafkaSubscription.OffsetDefault, - sessionTimeout: kafkaSubscription.SessionTimeout, - maxPollInterval: kafkaSubscription.MaxPollInterval, - isolationLevel: kafkaSubscription.IsolationLevel, - commitBatchSize: kafkaSubscription.CommitBatchSize, - sweepUncommittedOffsetsInterval: kafkaSubscription.SweepUncommittedOffsetsInterval, - readCommittedOffsetsTimeout: kafkaSubscription.ReadCommittedOffsetsTimeOut, - numPartitions: kafkaSubscription.NumPartitions, - partitionAssignmentStrategy: kafkaSubscription.PartitionAssignmentStrategy, - replicationFactor: kafkaSubscription.ReplicationFactor, - topicFindTimeout: kafkaSubscription.TopicFindTimeout, - makeChannels: kafkaSubscription.MakeChannels - ); - } + public IAmAMessageConsumerAsync CreateAsync(Subscription subscription) => (IAmAMessageConsumerAsync)Create(subscription); + } } diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_a_message_is_acknowledged_update_offset_async.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_a_message_is_acknowledged_update_offset_async.cs index 054e73352a..53189b097f 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_a_message_is_acknowledged_update_offset_async.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_a_message_is_acknowledged_update_offset_async.cs @@ -46,16 +46,31 @@ public KafkaMessageConsumerUpdateOffsetAsync(ITestOutputHelper output) [Fact] public async Task When_a_message_is_acknowldgede_update_offset() { + //Let topic propagate in the broker + await Task.Delay(500); + var groupId = Guid.NewGuid().ToString(); - //send x messages to Kafka + var routingKey = new RoutingKey(_topic); + var producerAsync = ((IAmAMessageProducerAsync)_producerRegistry.LookupBy(routingKey)); + + //send x messages to Kafka var sentMessages = new string[10]; for (int i = 0; i < 10; i++) { var msgId = Guid.NewGuid().ToString(); - await SendMessageAsync(msgId); + + await producerAsync.SendAsync( + new Message( + new MessageHeader(msgId, routingKey, MessageType.MT_COMMAND) {PartitionKey = _partitionKey}, + new MessageBody($"test content [{_queueName}]") + ) + ); sentMessages[i] = msgId; } + + //We should not need to flush, as the async does not queue work - but in case this changes + ((KafkaMessageProducer)producerAsync).Flush(); //This will create, then delete the consumer Message[] messages = await ConsumeMessagesAsync(groupId: groupId, batchLimit: 5); @@ -80,18 +95,6 @@ public async Task When_a_message_is_acknowldgede_update_offset() } } - private async Task SendMessageAsync(string messageId) - { - var routingKey = new RoutingKey(_topic); - - await ((IAmAMessageProducerAsync)_producerRegistry.LookupBy(routingKey)).SendAsync( - new Message( - new MessageHeader(messageId, routingKey, MessageType.MT_COMMAND) {PartitionKey = _partitionKey}, - new MessageBody($"test content [{_queueName}]") - ) - ); - } - private async Task ConsumeMessagesAsync(string groupId, int batchLimit) { var consumedMessages = new List(); @@ -114,9 +117,7 @@ async Task ConsumeMessageAsync(IAmAMessageConsumerAsync consumer) try { maxTries++; - //Let topic propagate in the broker - await Task.Delay(500); - //use TimeSpan.Zero to avoid blocking + //use TimeSpan.Zero to avoid blocking messages = await consumer.ReceiveAsync(TimeSpan.Zero); if (messages[0].Header.MessageType != MessageType.MT_NONE) diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_a_set_of_messages_is_sent_preserve_order_async.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_a_set_of_messages_is_sent_preserve_order_async.cs index 703731ce2d..a8fc31c157 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_a_set_of_messages_is_sent_preserve_order_async.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_a_set_of_messages_is_sent_preserve_order_async.cs @@ -47,14 +47,24 @@ public KafkaMessageConsumerPreservesOrderAsync(ITestOutputHelper output) [Fact] public async Task When_a_message_is_sent_keep_order() { + //Let topic propagate in the broker + await Task.Delay(500); + IAmAMessageConsumerAsync consumer = null; + + var routingKey = new RoutingKey(_topic); + + var producerAsync = ((IAmAMessageProducerAsync)_producerRegistry.LookupBy(routingKey)); try { //Send a sequence of messages to Kafka - var msgId = await SendMessageAsync(); - var msgId2 = await SendMessageAsync(); - var msgId3 = await SendMessageAsync(); - var msgId4 = await SendMessageAsync(); + var msgId = await SendMessageAsync(producerAsync, routingKey); + var msgId2 = await SendMessageAsync(producerAsync, routingKey); + var msgId3 = await SendMessageAsync(producerAsync, routingKey); + var msgId4 = await SendMessageAsync(producerAsync, routingKey); + + //We should not need to flush, as the async does not queue work - but in case this changes + ((KafkaMessageProducer)producerAsync).Flush(); consumer = CreateConsumer(); @@ -90,13 +100,11 @@ public async Task When_a_message_is_sent_keep_order() } } - private async Task SendMessageAsync() + private async Task SendMessageAsync(IAmAMessageProducerAsync producerAsync, RoutingKey routingKey) { var messageId = Guid.NewGuid().ToString(); - var routingKey = new RoutingKey(_topic); - - await ((IAmAMessageProducerAsync)_producerRegistry.LookupBy(routingKey)).SendAsync( + await producerAsync.SendAsync( new Message( new MessageHeader(messageId, routingKey, MessageType.MT_COMMAND) { @@ -118,8 +126,6 @@ private async Task> ConsumeMessagesAsync(IAmAMessageConsume try { maxTries++; - //Let topic propagate in the broker - await Task.Delay(500); //use TimeSpan.Zero to avoid blocking messages = await consumer.ReceiveAsync(TimeSpan.Zero); diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_consumer_assumes_topic_but_missing_async.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_consumer_assumes_topic_but_missing_async.cs index e4fef9b912..353c6577d7 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_consumer_assumes_topic_but_missing_async.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_consumer_assumes_topic_but_missing_async.cs @@ -45,6 +45,9 @@ public KafkaProducerAssumeTestsAsync(ITestOutputHelper output) [Fact] public async Task When_a_consumer_declares_topics() { + //Let topic propagate in the broker + await Task.Delay(500); + var routingKey = new RoutingKey(_topic); var message = new Message( @@ -64,9 +67,9 @@ public async Task When_a_consumer_declares_topics() }; await ((IAmAMessageProducerAsync)producer).SendAsync(message); - - //Give this a chance to succeed - will fail - await Task.Delay(5000); + + //We should not need to flush, as the async does not queue work - but in case this changes + ((KafkaMessageProducer)producer).Flush(); messagePublished.Should().BeFalse(); } diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_consumer_declares_topic_async.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_consumer_declares_topic_async.cs index ca63c8e790..a4039d02cf 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_consumer_declares_topic_async.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_consumer_declares_topic_async.cs @@ -29,7 +29,8 @@ public KafkaConsumerDeclareTestsAsync(ITestOutputHelper output) Name = "Kafka Producer Send Test", BootStrapServers = new[] {"localhost:9092"} }, - new[] {new KafkaPublication + [ + new KafkaPublication { Topic = new RoutingKey(_topic), NumPartitions = 1, @@ -38,8 +39,9 @@ public KafkaConsumerDeclareTestsAsync(ITestOutputHelper output) //your production values ought to be lower MessageTimeoutMs = 2000, RequestTimeoutMs = 2000, - MakeChannels = OnMissingChannel.Create - }}).CreateAsync().Result; + MakeChannels = OnMissingChannel.Assume + } + ]).Create(); _consumer = new KafkaMessageConsumerFactory( new KafkaMessagingGatewayConfiguration @@ -62,6 +64,9 @@ public KafkaConsumerDeclareTestsAsync(ITestOutputHelper output) [Fact] public async Task When_a_consumer_declares_topics() { + //Let topic propagate in the broker + await Task.Delay(1000); + var routingKey = new RoutingKey(_topic); var message = new Message( @@ -72,8 +77,11 @@ public async Task When_a_consumer_declares_topics() new MessageBody($"test content [{_queueName}]") ); - //This should fail, if consumer can't create the topic as set to Assume - await ((IAmAMessageProducerAsync)_producerRegistry.LookupBy(routingKey)).SendAsync(message); + var producerAsync = ((IAmAMessageProducerAsync)_producerRegistry.LookupBy(routingKey)); + await producerAsync.SendAsync(message); + + //We should not need to flush, as the async does not queue work - but in case this changes + ((KafkaMessageProducer)producerAsync).Flush(); Message[] messages = []; int maxTries = 0; @@ -82,8 +90,6 @@ public async Task When_a_consumer_declares_topics() try { maxTries++; - //Let topic propagate in the broker - await Task.Delay(500); //use TimeSpan.Zero to avoid blocking messages = await _consumer.ReceiveAsync(TimeSpan.Zero); await _consumer.AcknowledgeAsync(messages[0]); diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_offsets_awaiting_next_acknowledge_sweep_them_async.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_offsets_awaiting_next_acknowledge_sweep_them_async.cs index 5af58c9533..4bcc9077c9 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_offsets_awaiting_next_acknowledge_sweep_them_async.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_offsets_awaiting_next_acknowledge_sweep_them_async.cs @@ -64,14 +64,23 @@ public KafkaMessageConsumerSweepOffsetsAsync(ITestOutputHelper output) [Fact] public async Task When_a_message_is_acknowledged_but_no_batch_sent_sweep_offsets() { + var routingKey = new RoutingKey(_topic); + var producerAsync = ((IAmAMessageProducerAsync)_producerRegistry.LookupAsyncBy(routingKey)); + //send x messages to Kafka var sentMessages = new string[10]; for (int i = 0; i < 10; i++) { var msgId = Guid.NewGuid().ToString(); - await SendMessageAsync(msgId); + + await producerAsync.SendAsync(new Message( + new MessageHeader(msgId, routingKey, MessageType.MT_COMMAND) {PartitionKey = _partitionKey}, + new MessageBody($"test content [{_queueName}]"))); sentMessages[i] = msgId; } + + //We should not need to flush, as the async does not queue work - but in case this changes + ((KafkaMessageProducer)producerAsync).Flush(); var consumedMessages = new List(); for (int j = 0; j < 9; j++) @@ -85,7 +94,7 @@ public async Task When_a_message_is_acknowledged_but_no_batch_sent_sweep_offsets //Let time elapse with no activity await Task.Delay(10000); - //This should trigger a sweeper run (can be fragile when non scheduled in containers etc) + //This should trigger a sweeper run (can be fragile when non-scheduled in containers etc) consumedMessages.Add(await ReadMessageAsync()); //Let the sweeper run, can be slow in CI environments to run the thread @@ -124,15 +133,6 @@ async Task ReadMessageAsync() } } - private async Task SendMessageAsync(string messageId) - { - var routingKey = new RoutingKey(_topic); - - await ((IAmAMessageProducerAsync)_producerRegistry.LookupAsyncBy(routingKey)).SendAsync(new Message( - new MessageHeader(messageId, routingKey, MessageType.MT_COMMAND) {PartitionKey = _partitionKey}, - new MessageBody($"test content [{_queueName}]"))); - } - public void Dispose() { _producerRegistry?.Dispose(); diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_posting_a_message_async.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_posting_a_message_async.cs index 4b8cd36cb2..8c077b69c7 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_posting_a_message_async.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_posting_a_message_async.cs @@ -65,6 +65,9 @@ public KafkaMessageProducerSendTestsAsync(ITestOutputHelper output) [Fact] public async Task When_posting_a_message() { + //Let topic propagate in the broker + await Task.Delay(500); + var command = new MyCommand { Value = "Test Content" }; //vanilla i.e. no Kafka specific bytes at the beginning @@ -86,7 +89,11 @@ public async Task When_posting_a_message() }, new MessageBody(body)); - await ((IAmAMessageProducerAsync)_producerRegistry.LookupAsyncBy(routingKey)).SendAsync(message); + var producerAsync = ((IAmAMessageProducerAsync)_producerRegistry.LookupAsyncBy(routingKey)); + await producerAsync.SendAsync(message); + + //We should not need to flush, as the async does not queue work - but in case this changes + ((KafkaMessageProducer)producerAsync).Flush(); var receivedMessage = await GetMessageAsync(); @@ -111,8 +118,7 @@ private async Task GetMessageAsync() try { maxTries++; - //Let topic propagate in the broker - await Task.Delay(500); + //set timespan to zero so that we will not block messages = await _consumer.ReceiveAsync(TimeSpan.Zero); diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_posting_a_message_with_header_bytes_async.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_posting_a_message_with_header_bytes_async.cs index fd3740835c..b167b686d5 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_posting_a_message_with_header_bytes_async.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_posting_a_message_with_header_bytes_async.cs @@ -61,7 +61,7 @@ public KafkaMessageProducerHeaderBytesSendTestsAsync(ITestOutputHelper output) groupId: groupId, numOfPartitions: 1, replicationFactor: 1, - messagePumpType: MessagePumpType.Reactor, + messagePumpType: MessagePumpType.Proactor, makeChannels: OnMissingChannel.Create )); @@ -77,6 +77,9 @@ public KafkaMessageProducerHeaderBytesSendTestsAsync(ITestOutputHelper output) [Fact] public async Task When_posting_a_message_via_the_messaging_gateway() { + //Let topic propagate in the broker + await Task.Delay(500); + //arrange var myCommand = new MyKafkaCommand{ Value = "Hello World"}; @@ -98,7 +101,11 @@ public async Task When_posting_a_message_via_the_messaging_gateway() //act - await ((IAmAMessageProducerAsync)_producerRegistry.LookupAsyncBy(routingKey)).SendAsync(sent); + var producerAsync = ((IAmAMessageProducerAsync)_producerRegistry.LookupAsyncBy(routingKey)); + await producerAsync.SendAsync(sent); + + //We should not need to flush, as the async does not queue work - but in case this changes + ((KafkaMessageProducer)producerAsync).Flush(); var received = await GetMessageAsync(); @@ -127,7 +134,6 @@ private async Task GetMessageAsync() try { maxTries++; - await Task.Delay(500); //Let topic propagate in the broker messages = await _consumer.ReceiveAsync(TimeSpan.FromMilliseconds(1000)); if (messages[0].Header.MessageType != MessageType.MT_NONE) diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_recieving_a_message_without_partition_key_header_async.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_recieving_a_message_without_partition_key_header_async.cs index 45301e6088..062fffff9e 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_recieving_a_message_without_partition_key_header_async.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Proactor/When_recieving_a_message_without_partition_key_header_async.cs @@ -73,6 +73,8 @@ public KafkaMessageProducerMissingHeaderTestsAsync(ITestOutputHelper output) [Fact] public async Task When_recieving_a_message_without_partition_key_header() { + await Task.Delay(500); //Let topic propagate in the broker + var command = new MyCommand { Value = "Test Content" }; //vanilla i.e. no Kafka specific bytes at the beginning @@ -85,6 +87,10 @@ public async Task When_recieving_a_message_without_partition_key_header() }; await _producer.ProduceAsync(_topic, kafkaMessage); + + //We should not need to flush, as the async does not queue work - but in case this changes + ((KafkaMessageProducer)_producer).Flush(); + var receivedMessage = await GetMessageAsync(); @@ -102,7 +108,7 @@ private async Task GetMessageAsync() try { maxTries++; - await Task.Delay(500); //Let topic propagate in the broker + messages = await _consumer.ReceiveAsync(TimeSpan.FromMilliseconds(1000)); if (messages[0].Header.MessageType != MessageType.MT_NONE) diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_a_message_is_acknowledged_update_offset.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_a_message_is_acknowledged_update_offset.cs index 2634538db5..1cd5376f5f 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_a_message_is_acknowledged_update_offset.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_a_message_is_acknowledged_update_offset.cs @@ -46,16 +46,29 @@ public KafkaMessageConsumerUpdateOffset(ITestOutputHelper output) [Fact(Skip = "Fragile as commit thread needs to be scheduled to run")] public async Task When_a_message_is_acknowldgede_update_offset() { + // let topic propogate in the broker + await Task.Delay(500); + var groupId = Guid.NewGuid().ToString(); + + var routingKey = new RoutingKey(_topic); + var producer = ((IAmAMessageProducerSync)_producerRegistry.LookupBy(routingKey)); //send x messages to Kafka var sentMessages = new string[10]; for (int i = 0; i < 10; i++) { var msgId = Guid.NewGuid().ToString(); - SendMessage(msgId); + producer.Send( + new Message( + new MessageHeader(msgId, routingKey, MessageType.MT_COMMAND) { PartitionKey = _partitionKey }, + new MessageBody($"test content [{_queueName}]") + )); sentMessages[i] = msgId; } + + //ensure the messages are actually sent + ((KafkaMessageProducer)producer).Flush(); //This will create, then delete the consumer Message[] messages = await ConsumeMessages(groupId: groupId, batchLimit: 5); @@ -78,27 +91,11 @@ public async Task When_a_message_is_acknowldgede_update_offset() } } - private void SendMessage(string messageId) - { - var routingKey = new RoutingKey(_topic); - - ((IAmAMessageProducerSync)_producerRegistry.LookupBy(routingKey)).Send( - new Message( - new MessageHeader(messageId, routingKey, MessageType.MT_COMMAND) {PartitionKey = _partitionKey}, - new MessageBody($"test content [{_queueName}]") - ) - ); - } - private async Task ConsumeMessages(string groupId, int batchLimit) { var consumedMessages = new List(); using (IAmAMessageConsumerSync consumer = CreateConsumer(groupId)) { - //Let topic propagate in the broker - await Task.Delay(1000); - - for (int i = 0; i < batchLimit; i++) { consumedMessages.Add(ConsumeMessage(consumer)); diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_a_set_of_messages_is_sent_preserve_order.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_a_set_of_messages_is_sent_preserve_order.cs index 46f35f046f..c62cd0f911 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_a_set_of_messages_is_sent_preserve_order.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_a_set_of_messages_is_sent_preserve_order.cs @@ -55,10 +55,15 @@ public async Task When_a_message_is_sent_keep_order() try { //Send a sequence of messages to Kafka - var msgId = SendMessage(); - var msgId2 = SendMessage(); - var msgId3 = SendMessage(); - var msgId4 = SendMessage(); + var routingKey = new RoutingKey(_topic); + var producer = ((IAmAMessageProducerSync)_producerRegistry.LookupBy(routingKey)); + var msgId = SendMessage(producer); + var msgId2 = SendMessage(producer); + var msgId3 = SendMessage(producer); + var msgId4 = SendMessage(producer); + + //ensure the messages are sent + ((KafkaMessageProducer)producer).Flush(); consumer = CreateConsumer(); @@ -90,11 +95,11 @@ public async Task When_a_message_is_sent_keep_order() } } - private string SendMessage() + private string SendMessage(IAmAMessageProducerSync producer) { var messageId = Guid.NewGuid().ToString(); - ((IAmAMessageProducerSync)_producerRegistry.LookupBy(_topic)).Send( + producer.Send( new Message( new MessageHeader(messageId, _topic, MessageType.MT_COMMAND) { diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_consumer_declares_topic.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_consumer_declares_topic.cs index d002082fb8..ff809f46bc 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_consumer_declares_topic.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_consumer_declares_topic.cs @@ -29,7 +29,8 @@ public KafkaConsumerDeclareTests (ITestOutputHelper output) Name = "Kafka Producer Send Test", BootStrapServers = new[] {"localhost:9092"} }, - new[] {new KafkaPublication + [ + new KafkaPublication { Topic = new RoutingKey(_topic), NumPartitions = 1, @@ -38,8 +39,9 @@ public KafkaConsumerDeclareTests (ITestOutputHelper output) //your production values ought to be lower MessageTimeoutMs = 2000, RequestTimeoutMs = 2000, - MakeChannels = OnMissingChannel.Create - }}).Create(); + MakeChannels = OnMissingChannel.Assume + } + ]).Create(); _consumer = new KafkaMessageConsumerFactory( new KafkaMessagingGatewayConfiguration @@ -75,8 +77,12 @@ public async Task When_a_consumer_declares_topics() }, new MessageBody($"test content [{_queueName}]") ); - - ((IAmAMessageProducerSync)_producerRegistry.LookupBy(routingKey)).Send(message); + + var producer = ((IAmAMessageProducerSync)_producerRegistry.LookupBy(routingKey)); + producer.Send(message); + + //ensure the messages are sent + ((KafkaMessageProducer)producer).Flush(); Message messages = ConsumeMessage(); diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_offsets_awaiting_next_acknowledge_sweep_them.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_offsets_awaiting_next_acknowledge_sweep_them.cs index 2672e9ccb5..7593533ee6 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_offsets_awaiting_next_acknowledge_sweep_them.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_offsets_awaiting_next_acknowledge_sweep_them.cs @@ -65,14 +65,26 @@ public KafkaMessageConsumerSweepOffsets(ITestOutputHelper output) [Fact] public async Task When_a_message_is_acknowldeged_but_no_batch_sent_sweep_offsets() { + //allow topic to propogate on the broker + await Task.Delay(500); + + var routingKey = new RoutingKey(_topic); + + var producer = ((IAmAMessageProducerSync)_producerRegistry.LookupBy(routingKey)); + //send x messages to Kafka var sentMessages = new string[10]; for (int i = 0; i < 10; i++) { var msgId = Guid.NewGuid().ToString(); - SendMessage(msgId); + producer.Send(new Message( + new MessageHeader(msgId, routingKey, MessageType.MT_COMMAND) {PartitionKey = _partitionKey}, + new MessageBody($"test content [{_queueName}]"))); sentMessages[i] = msgId; } + + //ensure messages are sent + ((KafkaMessageProducer)producer).Flush(); var consumedMessages = new List(); for (int j = 0; j < 9; j++) @@ -105,7 +117,6 @@ async Task ReadMessageAsync() try { maxTries++; - await Task.Delay(500); //Let topic propagate in the broker messages = _consumer.Receive(TimeSpan.FromMilliseconds(1000)); if (messages[0].Header.MessageType != MessageType.MT_NONE) @@ -126,15 +137,6 @@ async Task ReadMessageAsync() } } - private void SendMessage(string messageId) - { - var routingKey = new RoutingKey(_topic); - - ((IAmAMessageProducerSync)_producerRegistry.LookupBy(routingKey)).Send(new Message( - new MessageHeader(messageId, routingKey, MessageType.MT_COMMAND) {PartitionKey = _partitionKey}, - new MessageBody($"test content [{_queueName}]"))); - } - public void Dispose() { diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_posting_a_message.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_posting_a_message.cs index 623b8230ed..1160484bc2 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_posting_a_message.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_posting_a_message.cs @@ -63,8 +63,11 @@ public KafkaMessageProducerSendTests(ITestOutputHelper output) } [Fact] - public void When_posting_a_message() + public async Task When_posting_a_message() { + //Let topic propagate in the broker + await Task.Delay(500); + var command = new MyCommand { Value = "Test Content" }; //vanilla i.e. no Kafka specific bytes at the beginning @@ -86,7 +89,11 @@ public void When_posting_a_message() }, new MessageBody(body)); - ((IAmAMessageProducerSync)_producerRegistry.LookupBy(routingKey)).Send(message); + var producer = ((IAmAMessageProducerSync)_producerRegistry.LookupBy(routingKey)); + producer.Send(message); + + //ensure that the messages have flushed + ((KafkaMessageProducer)producer).Flush(); var receivedMessage = GetMessage(); @@ -111,7 +118,6 @@ private Message GetMessage() try { maxTries++; - Task.Delay(500).Wait(); //Let topic propagate in the broker messages = _consumer.Receive(TimeSpan.FromMilliseconds(1000)); if (messages[0].Header.MessageType != MessageType.MT_NONE) diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_posting_a_message_with_header_bytes.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_posting_a_message_with_header_bytes.cs index e98580d8f6..f283d00f00 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_posting_a_message_with_header_bytes.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_posting_a_message_with_header_bytes.cs @@ -81,8 +81,11 @@ public KafkaMessageProducerHeaderBytesSendTests (ITestOutputHelper output) /// may cause this test to fail, so check them if in doubt /// [Fact] - public void When_posting_a_message_via_the_messaging_gateway() + public async Task When_posting_a_message_via_the_messaging_gateway() { + + await Task.Delay(500); //Let topic propagate in the broker + //arrange var myCommand = new MyKafkaCommand{ Value = "Hello World"}; @@ -103,8 +106,12 @@ public void When_posting_a_message_via_the_messaging_gateway() new MessageBody(body)); //act - - ((IAmAMessageProducerSync)_producerRegistry.LookupBy(routingKey)).Send(sent); + + var producer = ((IAmAMessageProducerSync)_producerRegistry.LookupBy(routingKey)); + producer.Send(sent); + + //ensure that the messages are all sent + ((KafkaMessageProducer) producer).Flush(); var received = GetMessage(); @@ -133,7 +140,6 @@ private Message GetMessage() try { maxTries++; - Task.Delay(500).Wait(); //Let topic propagate in the broker messages = _consumer.Receive(TimeSpan.FromMilliseconds(1000)); if (messages[0].Header.MessageType != MessageType.MT_NONE) diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_recieving_a_message_without_partition_key_header.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_recieving_a_message_without_partition_key_header.cs index 9db01a508d..89195cd0a5 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_recieving_a_message_without_partition_key_header.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/Local/Reactor/When_recieving_a_message_without_partition_key_header.cs @@ -73,8 +73,10 @@ public KafkaMessageProducerMissingHeaderTests(ITestOutputHelper output) } [Fact] - public void When_recieving_a_message_without_partition_key_header() + public async Task When_recieving_a_message_without_partition_key_header() { + await Task.Delay(500); //Let topic propagate in the broker + var command = new MyCommand { Value = "Test Content" }; //vanilla i.e. no Kafka specific bytes at the beginning @@ -87,6 +89,9 @@ public void When_recieving_a_message_without_partition_key_header() }; _producer.Produce(_topic, kafkaMessage, report => _output.WriteLine(report.ToString()) ); + + //ensure any messages are flushed + _producer.Flush(); var receivedMessage = GetMessage(); @@ -104,7 +109,6 @@ private Message GetMessage() try { maxTries++; - Task.Delay(500).Wait(); //Let topic propagate in the broker messages = _consumer.Receive(TimeSpan.FromMilliseconds(1000)); if (messages[0].Header.MessageType != MessageType.MT_NONE)