Skip to content

Commit

Permalink
fix: make the kafka tests more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
iancooper committed Jan 8, 2025
1 parent 864af0e commit 90f33ed
Show file tree
Hide file tree
Showing 16 changed files with 165 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,7 @@ public IAmAMessageConsumerSync Create(Subscription subscription)
);
}

public IAmAMessageConsumerAsync CreateAsync(Subscription subscription)
{
KafkaSubscription kafkaSubscription = subscription as KafkaSubscription;
if (kafkaSubscription == null)
throw new ConfigurationException("We expect an SQSConnection or SQSConnection<T> as a parameter");

return new KafkaMessageConsumer(
configuration: _configuration,
routingKey:kafkaSubscription.RoutingKey, //topic
groupId: kafkaSubscription.GroupId,
offsetDefault: kafkaSubscription.OffsetDefault,
sessionTimeout: kafkaSubscription.SessionTimeout,
maxPollInterval: kafkaSubscription.MaxPollInterval,
isolationLevel: kafkaSubscription.IsolationLevel,
commitBatchSize: kafkaSubscription.CommitBatchSize,
sweepUncommittedOffsetsInterval: kafkaSubscription.SweepUncommittedOffsetsInterval,
readCommittedOffsetsTimeout: kafkaSubscription.ReadCommittedOffsetsTimeOut,
numPartitions: kafkaSubscription.NumPartitions,
partitionAssignmentStrategy: kafkaSubscription.PartitionAssignmentStrategy,
replicationFactor: kafkaSubscription.ReplicationFactor,
topicFindTimeout: kafkaSubscription.TopicFindTimeout,
makeChannels: kafkaSubscription.MakeChannels
);
}
public IAmAMessageConsumerAsync CreateAsync(Subscription subscription) => (IAmAMessageConsumerAsync)Create(subscription);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,31 @@ public KafkaMessageConsumerUpdateOffsetAsync(ITestOutputHelper output)
[Fact]
public async Task When_a_message_is_acknowldgede_update_offset()
{
//Let topic propagate in the broker
await Task.Delay(500);

var groupId = Guid.NewGuid().ToString();

//send x messages to Kafka
var routingKey = new RoutingKey(_topic);
var producerAsync = ((IAmAMessageProducerAsync)_producerRegistry.LookupBy(routingKey));

//send x messages to Kafka
var sentMessages = new string[10];
for (int i = 0; i < 10; i++)
{
var msgId = Guid.NewGuid().ToString();
await SendMessageAsync(msgId);

await producerAsync.SendAsync(
new Message(
new MessageHeader(msgId, routingKey, MessageType.MT_COMMAND) {PartitionKey = _partitionKey},
new MessageBody($"test content [{_queueName}]")
)
);
sentMessages[i] = msgId;
}

//We should not need to flush, as the async does not queue work - but in case this changes
((KafkaMessageProducer)producerAsync).Flush();

//This will create, then delete the consumer
Message[] messages = await ConsumeMessagesAsync(groupId: groupId, batchLimit: 5);
Expand All @@ -80,18 +95,6 @@ public async Task When_a_message_is_acknowldgede_update_offset()
}
}

private async Task SendMessageAsync(string messageId)
{
var routingKey = new RoutingKey(_topic);

await ((IAmAMessageProducerAsync)_producerRegistry.LookupBy(routingKey)).SendAsync(
new Message(
new MessageHeader(messageId, routingKey, MessageType.MT_COMMAND) {PartitionKey = _partitionKey},
new MessageBody($"test content [{_queueName}]")
)
);
}

private async Task<Message[]> ConsumeMessagesAsync(string groupId, int batchLimit)
{
var consumedMessages = new List<Message>();
Expand All @@ -114,9 +117,7 @@ async Task<Message> ConsumeMessageAsync(IAmAMessageConsumerAsync consumer)
try
{
maxTries++;
//Let topic propagate in the broker
await Task.Delay(500);
//use TimeSpan.Zero to avoid blocking
//use TimeSpan.Zero to avoid blocking
messages = await consumer.ReceiveAsync(TimeSpan.Zero);

if (messages[0].Header.MessageType != MessageType.MT_NONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,24 @@ public KafkaMessageConsumerPreservesOrderAsync(ITestOutputHelper output)
[Fact]
public async Task When_a_message_is_sent_keep_order()
{
//Let topic propagate in the broker
await Task.Delay(500);

IAmAMessageConsumerAsync consumer = null;

var routingKey = new RoutingKey(_topic);

var producerAsync = ((IAmAMessageProducerAsync)_producerRegistry.LookupBy(routingKey));
try
{
//Send a sequence of messages to Kafka
var msgId = await SendMessageAsync();
var msgId2 = await SendMessageAsync();
var msgId3 = await SendMessageAsync();
var msgId4 = await SendMessageAsync();
var msgId = await SendMessageAsync(producerAsync, routingKey);
var msgId2 = await SendMessageAsync(producerAsync, routingKey);
var msgId3 = await SendMessageAsync(producerAsync, routingKey);
var msgId4 = await SendMessageAsync(producerAsync, routingKey);

//We should not need to flush, as the async does not queue work - but in case this changes
((KafkaMessageProducer)producerAsync).Flush();

consumer = CreateConsumer();

Expand Down Expand Up @@ -90,13 +100,11 @@ public async Task When_a_message_is_sent_keep_order()
}
}

private async Task<string> SendMessageAsync()
private async Task<string> SendMessageAsync(IAmAMessageProducerAsync producerAsync, RoutingKey routingKey)
{
var messageId = Guid.NewGuid().ToString();

var routingKey = new RoutingKey(_topic);

await ((IAmAMessageProducerAsync)_producerRegistry.LookupBy(routingKey)).SendAsync(
await producerAsync.SendAsync(
new Message(
new MessageHeader(messageId, routingKey, MessageType.MT_COMMAND)
{
Expand All @@ -118,8 +126,6 @@ private async Task<IEnumerable<Message>> ConsumeMessagesAsync(IAmAMessageConsume
try
{
maxTries++;
//Let topic propagate in the broker
await Task.Delay(500);
//use TimeSpan.Zero to avoid blocking
messages = await consumer.ReceiveAsync(TimeSpan.Zero);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public KafkaProducerAssumeTestsAsync(ITestOutputHelper output)
[Fact]
public async Task When_a_consumer_declares_topics()
{
//Let topic propagate in the broker
await Task.Delay(500);

var routingKey = new RoutingKey(_topic);

var message = new Message(
Expand All @@ -64,9 +67,9 @@ public async Task When_a_consumer_declares_topics()
};

await ((IAmAMessageProducerAsync)producer).SendAsync(message);

//Give this a chance to succeed - will fail
await Task.Delay(5000);
//We should not need to flush, as the async does not queue work - but in case this changes
((KafkaMessageProducer)producer).Flush();

messagePublished.Should().BeFalse();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public KafkaConsumerDeclareTestsAsync(ITestOutputHelper output)
Name = "Kafka Producer Send Test",
BootStrapServers = new[] {"localhost:9092"}
},
new[] {new KafkaPublication
[
new KafkaPublication
{
Topic = new RoutingKey(_topic),
NumPartitions = 1,
Expand All @@ -38,8 +39,9 @@ public KafkaConsumerDeclareTestsAsync(ITestOutputHelper output)
//your production values ought to be lower
MessageTimeoutMs = 2000,
RequestTimeoutMs = 2000,
MakeChannels = OnMissingChannel.Create
}}).CreateAsync().Result;
MakeChannels = OnMissingChannel.Assume
}
]).Create();

_consumer = new KafkaMessageConsumerFactory(
new KafkaMessagingGatewayConfiguration
Expand All @@ -62,6 +64,9 @@ public KafkaConsumerDeclareTestsAsync(ITestOutputHelper output)
[Fact]
public async Task When_a_consumer_declares_topics()
{
//Let topic propagate in the broker
await Task.Delay(1000);

var routingKey = new RoutingKey(_topic);

var message = new Message(
Expand All @@ -72,8 +77,11 @@ public async Task When_a_consumer_declares_topics()
new MessageBody($"test content [{_queueName}]")
);

//This should fail, if consumer can't create the topic as set to Assume
await ((IAmAMessageProducerAsync)_producerRegistry.LookupBy(routingKey)).SendAsync(message);
var producerAsync = ((IAmAMessageProducerAsync)_producerRegistry.LookupBy(routingKey));
await producerAsync.SendAsync(message);

//We should not need to flush, as the async does not queue work - but in case this changes
((KafkaMessageProducer)producerAsync).Flush();

Message[] messages = [];
int maxTries = 0;
Expand All @@ -82,8 +90,6 @@ public async Task When_a_consumer_declares_topics()
try
{
maxTries++;
//Let topic propagate in the broker
await Task.Delay(500);
//use TimeSpan.Zero to avoid blocking
messages = await _consumer.ReceiveAsync(TimeSpan.Zero);
await _consumer.AcknowledgeAsync(messages[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,23 @@ public KafkaMessageConsumerSweepOffsetsAsync(ITestOutputHelper output)
[Fact]
public async Task When_a_message_is_acknowledged_but_no_batch_sent_sweep_offsets()
{
var routingKey = new RoutingKey(_topic);
var producerAsync = ((IAmAMessageProducerAsync)_producerRegistry.LookupAsyncBy(routingKey));

//send x messages to Kafka
var sentMessages = new string[10];
for (int i = 0; i < 10; i++)
{
var msgId = Guid.NewGuid().ToString();
await SendMessageAsync(msgId);

await producerAsync.SendAsync(new Message(
new MessageHeader(msgId, routingKey, MessageType.MT_COMMAND) {PartitionKey = _partitionKey},
new MessageBody($"test content [{_queueName}]")));
sentMessages[i] = msgId;
}

//We should not need to flush, as the async does not queue work - but in case this changes
((KafkaMessageProducer)producerAsync).Flush();

var consumedMessages = new List<Message>();
for (int j = 0; j < 9; j++)
Expand All @@ -85,7 +94,7 @@ public async Task When_a_message_is_acknowledged_but_no_batch_sent_sweep_offsets
//Let time elapse with no activity
await Task.Delay(10000);

//This should trigger a sweeper run (can be fragile when non scheduled in containers etc)
//This should trigger a sweeper run (can be fragile when non-scheduled in containers etc)
consumedMessages.Add(await ReadMessageAsync());

//Let the sweeper run, can be slow in CI environments to run the thread
Expand Down Expand Up @@ -124,15 +133,6 @@ async Task<Message> ReadMessageAsync()
}
}

private async Task SendMessageAsync(string messageId)
{
var routingKey = new RoutingKey(_topic);

await ((IAmAMessageProducerAsync)_producerRegistry.LookupAsyncBy(routingKey)).SendAsync(new Message(
new MessageHeader(messageId, routingKey, MessageType.MT_COMMAND) {PartitionKey = _partitionKey},
new MessageBody($"test content [{_queueName}]")));
}

public void Dispose()
{
_producerRegistry?.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public KafkaMessageProducerSendTestsAsync(ITestOutputHelper output)
[Fact]
public async Task When_posting_a_message()
{
//Let topic propagate in the broker
await Task.Delay(500);

var command = new MyCommand { Value = "Test Content" };

//vanilla i.e. no Kafka specific bytes at the beginning
Expand All @@ -86,7 +89,11 @@ public async Task When_posting_a_message()
},
new MessageBody(body));

await ((IAmAMessageProducerAsync)_producerRegistry.LookupAsyncBy(routingKey)).SendAsync(message);
var producerAsync = ((IAmAMessageProducerAsync)_producerRegistry.LookupAsyncBy(routingKey));
await producerAsync.SendAsync(message);

//We should not need to flush, as the async does not queue work - but in case this changes
((KafkaMessageProducer)producerAsync).Flush();

var receivedMessage = await GetMessageAsync();

Expand All @@ -111,8 +118,7 @@ private async Task<Message> GetMessageAsync()
try
{
maxTries++;
//Let topic propagate in the broker
await Task.Delay(500);

//set timespan to zero so that we will not block
messages = await _consumer.ReceiveAsync(TimeSpan.Zero);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public KafkaMessageProducerHeaderBytesSendTestsAsync(ITestOutputHelper output)
groupId: groupId,
numOfPartitions: 1,
replicationFactor: 1,
messagePumpType: MessagePumpType.Reactor,
messagePumpType: MessagePumpType.Proactor,
makeChannels: OnMissingChannel.Create
));

Expand All @@ -77,6 +77,9 @@ public KafkaMessageProducerHeaderBytesSendTestsAsync(ITestOutputHelper output)
[Fact]
public async Task When_posting_a_message_via_the_messaging_gateway()
{
//Let topic propagate in the broker
await Task.Delay(500);

//arrange

var myCommand = new MyKafkaCommand{ Value = "Hello World"};
Expand All @@ -98,7 +101,11 @@ public async Task When_posting_a_message_via_the_messaging_gateway()

//act

await ((IAmAMessageProducerAsync)_producerRegistry.LookupAsyncBy(routingKey)).SendAsync(sent);
var producerAsync = ((IAmAMessageProducerAsync)_producerRegistry.LookupAsyncBy(routingKey));
await producerAsync.SendAsync(sent);

//We should not need to flush, as the async does not queue work - but in case this changes
((KafkaMessageProducer)producerAsync).Flush();

var received = await GetMessageAsync();

Expand Down Expand Up @@ -127,7 +134,6 @@ private async Task<Message> GetMessageAsync()
try
{
maxTries++;
await Task.Delay(500); //Let topic propagate in the broker
messages = await _consumer.ReceiveAsync(TimeSpan.FromMilliseconds(1000));

if (messages[0].Header.MessageType != MessageType.MT_NONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public KafkaMessageProducerMissingHeaderTestsAsync(ITestOutputHelper output)
[Fact]
public async Task When_recieving_a_message_without_partition_key_header()
{
await Task.Delay(500); //Let topic propagate in the broker

var command = new MyCommand { Value = "Test Content" };

//vanilla i.e. no Kafka specific bytes at the beginning
Expand All @@ -85,6 +87,10 @@ public async Task When_recieving_a_message_without_partition_key_header()
};

await _producer.ProduceAsync(_topic, kafkaMessage);

//We should not need to flush, as the async does not queue work - but in case this changes
((KafkaMessageProducer)_producer).Flush();


var receivedMessage = await GetMessageAsync();

Expand All @@ -102,7 +108,7 @@ private async Task<Message> GetMessageAsync()
try
{
maxTries++;
await Task.Delay(500); //Let topic propagate in the broker

messages = await _consumer.ReceiveAsync(TimeSpan.FromMilliseconds(1000));

if (messages[0].Header.MessageType != MessageType.MT_NONE)
Expand Down
Loading

0 comments on commit 90f33ed

Please sign in to comment.