Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for Kafka Tests following the Proactor/Reactor split #3447

Draft
wants to merge 20 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a7381af
fix: broken Kafka test
iancooper Dec 30, 2024
e168be4
fix: simplify the management of kafka tests with directories
iancooper Dec 31, 2024
88c18b9
fix: make time base test more reliable
iancooper Dec 31, 2024
e66900b
chore: note issue with test and schema registry
iancooper Dec 31, 2024
5bae366
fix: issues with posting a message with delay
iancooper Dec 31, 2024
265f33d
fix: fix issues with timing
iancooper Dec 31, 2024
7b2d509
chore: merge branch 'master' into kafka_fixes
iancooper Dec 31, 2024
9a6bd5e
chore: merge branch 'master' into kafka_fixes
iancooper Dec 31, 2024
b12dfd9
chore merge branch ':master' into kafka_fixes
iancooper Dec 31, 2024
8daba2b
chore: merge branch 'master' into kafka_fixes
iancooper Dec 31, 2024
fb5dd01
chrore: merge branch 'master' into kafka_fixes
iancooper Jan 2, 2025
6955810
fix: time and date issues caused by inconsistency between invariant a…
iancooper Jan 6, 2025
b6655bf
fix: note around skip tests that breaks if Docker container does not …
iancooper Jan 6, 2025
c9e8f37
chore: merge branch 'master' into kafka_fixes
iancooper Jan 6, 2025
864af0e
fix: kafka producer should flush on close; test uses assume not create
iancooper Jan 6, 2025
90f33ed
fix: make the kafka tests more robust
iancooper Jan 8, 2025
d3dee33
chore: merge branch 'master' into kafka_fixes
iancooper Jan 8, 2025
5132b01
fix: when we don't publish on async, must manually raise deliveryreport
iancooper Jan 8, 2025
3031f44
fix: close of consumer now more reliable in flushing offsets
iancooper Jan 8, 2025
ce7f550
chore: Merge branch 'master' into kafka_fixes
iancooper Jan 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ public Headers Build(Message message)
new Header(HeaderNames.MESSAGE_ID, message.Header.MessageId.ToByteArray()),
};

if (message.Header.TimeStamp != default)
headers.Add(HeaderNames.TIMESTAMP, message.Header.TimeStamp.ToString().ToByteArray());
else
headers.Add(HeaderNames.TIMESTAMP, DateTimeOffset.UtcNow.ToString().ToByteArray());
string timeStampAsString = DateTimeOffset.UtcNow.DateTime.ToString(CultureInfo.InvariantCulture);
if (message.Header.TimeStamp.DateTime != default)
{
timeStampAsString = message.Header.TimeStamp.DateTime.ToString(CultureInfo.InvariantCulture);
}

headers.Add(HeaderNames.TIMESTAMP, timeStampAsString.ToByteArray());

if (message.Header.CorrelationId != string.Empty)
headers.Add(HeaderNames.CORRELATION_ID, message.Header.CorrelationId.ToByteArray());
Expand All @@ -64,7 +67,7 @@ public Headers Build(Message message)
if (!string.IsNullOrEmpty(message.Header.ReplyTo))
headers.Add(HeaderNames.REPLY_TO, message.Header.ReplyTo.ToByteArray());

headers.Add(HeaderNames.DELAYED_MILLISECONDS, message.Header.Delayed.ToString().ToByteArray());
headers.Add(HeaderNames.DELAYED_MILLISECONDS, message.Header.Delayed.TotalMilliseconds.ToString(CultureInfo.InvariantCulture).ToByteArray());

headers.Add(HeaderNames.HANDLED_COUNT, message.Header.HandledCount.ToString().ToByteArray());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public KafkaMessageConsumer(
.Build();

s_logger.LogInformation("Kafka consumer subscribing to {Topic}", Topic);
_consumer.Subscribe(new []{ Topic.Value });
_consumer.Subscribe([Topic.Value]);

_creator = new KafkaMessageCreator();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
using Xunit;
using Xunit.Abstractions;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Proactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down Expand Up @@ -131,15 +131,17 @@ private async Task<string> SendMessageAsync()

private async Task<IEnumerable<Message>> ConsumeMessagesAsync(IAmAMessageConsumerAsync consumer)
{
var messages = new Message[0];
var messages = Array.Empty<Message>();
int maxTries = 0;
do
{
try
{
maxTries++;
await Task.Delay(500); //Let topic propagate in the broker
messages = await consumer.ReceiveAsync(TimeSpan.FromMilliseconds(1000));
//Let topic propagate in the broker
await Task.Delay(500);
//use TimeSpan.Zero to avoid blocking
messages = await consumer.ReceiveAsync(TimeSpan.Zero);

if (messages[0].Header.MessageType != MessageType.MT_NONE)
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
using System;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Confluent.Kafka;
using FluentAssertions;
using Paramore.Brighter.Kafka.Tests.TestDoubles;
using Paramore.Brighter.MessagingGateway.Kafka;
using Xunit;
using Xunit.Abstractions;
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Proactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Proactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Proactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
using Xunit;
using Xunit.Abstractions;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Reactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down Expand Up @@ -129,14 +129,15 @@ private string SendMessage()

private IEnumerable<Message> ConsumeMessages(IAmAMessageConsumerSync consumer)
{
var messages = new Message[0];
var messages = Array.Empty<Message>();
int maxTries = 0;
do
{
try
{
maxTries++;
Task.Delay(500).Wait(); //Let topic propagate in the broker
//Let topic propagate in the broker
Task.Delay(500).Wait();
messages = consumer.Receive(TimeSpan.FromMilliseconds(1000));

if (messages[0].Header.MessageType != MessageType.MT_NONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,13 @@ THE SOFTWARE. */
#endregion

using System;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Confluent.Kafka;
using FluentAssertions;
using Paramore.Brighter.Kafka.Tests.TestDoubles;
using Paramore.Brighter.MessagingGateway.Kafka;
using Xunit;
using Xunit.Abstractions;
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Reactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Reactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Reactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using Xunit;
using Xunit.Abstractions;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Local.Proactor;

[Trait("Category", "Kafka")]
[Trait("Fragile", "CI")]
Expand Down Expand Up @@ -68,7 +68,7 @@ public async Task When_a_message_is_acknowldgede_update_offset()
}

//yield to broker to catch up
await Task.Delay(TimeSpan.FromSeconds(5));
await Task.Delay(TimeSpan.FromMilliseconds(500));

//This will create a new consumer
Message[] newMessages = await ConsumeMessagesAsync(groupId, batchLimit: 5);
Expand Down Expand Up @@ -107,15 +107,17 @@ private async Task<Message[]> ConsumeMessagesAsync(string groupId, int batchLimi

async Task<Message> ConsumeMessageAsync(IAmAMessageConsumerAsync consumer)
{
Message[] messages = new []{new Message()};
Message[] messages = [new Message()];
int maxTries = 0;
do
{
try
{
maxTries++;
await Task.Delay(500); //Let topic propagate in the broker
messages = await consumer.ReceiveAsync(TimeSpan.FromMilliseconds(1000));
//Let topic propagate in the broker
await Task.Delay(500);
//use TimeSpan.Zero to avoid blocking
messages = await consumer.ReceiveAsync(TimeSpan.Zero);

if (messages[0].Header.MessageType != MessageType.MT_NONE)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using Xunit;
using Xunit.Abstractions;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Local.Proactor;

[Trait("Category", "Kafka")]
[Collection("Kafka")] //Kafka doesn't like multiple consumers of a partition
Expand Down Expand Up @@ -111,15 +111,17 @@ private async Task<string> SendMessageAsync()

private async Task<IEnumerable<Message>> ConsumeMessagesAsync(IAmAMessageConsumerAsync consumer)
{
var messages = new Message[0];
var messages = Array.Empty<Message>();
int maxTries = 0;
do
{
try
{
maxTries++;
await Task.Delay(500); //Let topic propagate in the broker
messages = await consumer.ReceiveAsync(TimeSpan.FromMilliseconds(1000));
//Let topic propagate in the broker
await Task.Delay(500);
//use TimeSpan.Zero to avoid blocking
messages = await consumer.ReceiveAsync(TimeSpan.Zero);

if (messages[0].Header.MessageType != MessageType.MT_NONE)
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using Xunit;
using Xunit.Abstractions;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Local.Proactor;

[Trait("Category", "Kafka")]
[Collection("Kafka")] //Kafka doesn't like multiple consumers of a partition
Expand All @@ -26,7 +26,8 @@ public KafkaProducerAssumeTestsAsync(ITestOutputHelper output)
Name = "Kafka Producer Send Test",
BootStrapServers = new[] {"localhost:9092"}
},
new KafkaPublication[] {new KafkaPublication
[
new KafkaPublication
{
Topic = new RoutingKey(_topic),
NumPartitions = 1,
Expand All @@ -36,7 +37,8 @@ public KafkaProducerAssumeTestsAsync(ITestOutputHelper output)
MessageTimeoutMs = 2000,
RequestTimeoutMs = 2000,
MakeChannels = OnMissingChannel.Create
}}).Create();
}
]).Create();
}

//[Fact(Skip = "Does not fail on docker container as has topic creation set to true")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using Xunit;
using Xunit.Abstractions;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Local.Proactor;

[Trait("Category", "Kafka")]
[Collection("Kafka")] //Kafka doesn't like multiple consumers of a partition
Expand Down Expand Up @@ -82,8 +82,10 @@ public async Task When_a_consumer_declares_topics()
try
{
maxTries++;
await Task.Delay(500); //Let topic propagate in the broker
messages = await _consumer.ReceiveAsync(TimeSpan.FromMilliseconds(10000));
//Let topic propagate in the broker
await Task.Delay(500);
//use TimeSpan.Zero to avoid blocking
messages = await _consumer.ReceiveAsync(TimeSpan.Zero);
await _consumer.AcknowledgeAsync(messages[0]);

if (messages[0].Header.MessageType != MessageType.MT_NONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
using Xunit;
using Xunit.Abstractions;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Local.Proactor;

[Trait("Category", "Kafka")]
[Collection("Kafka")] //Kafka doesn't like multiple consumers of a partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using Xunit;
using Xunit.Abstractions;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Local.Proactor;

[Trait("Category", "Kafka")]
[Collection("Kafka")] //Kafka doesn't like multiple consumers of a partition
Expand Down Expand Up @@ -96,8 +96,8 @@ public async Task When_posting_a_message()
receivedMessage.Header.PartitionKey.Should().Be(_partitionKey);
receivedMessage.Body.Bytes.Should().Equal(message.Body.Bytes);
receivedMessage.Body.Value.Should().Be(message.Body.Value);
receivedMessage.Header.TimeStamp.ToString("yyyy-MM-ddTHH:mm:ssZ")
.Should().Be(message.Header.TimeStamp.ToString("yyyy-MM-ddTHH:mm:ssZ"));
receivedMessage.Header.TimeStamp.ToString("yyyy-MM-ddTHH:mm:Z")
.Should().Be(message.Header.TimeStamp.ToString("yyyy-MM-ddTHH:mm:Z"));
receivedCommand.Id.Should().Be(command.Id);
receivedCommand.Value.Should().Be(command.Value);
}
Expand All @@ -111,8 +111,10 @@ private async Task<Message> GetMessageAsync()
try
{
maxTries++;
await Task.Delay(500); //Let topic propagate in the broker
messages = await _consumer.ReceiveAsync(TimeSpan.FromMilliseconds(1000));
//Let topic propagate in the broker
await Task.Delay(500);
//set timespan to zero so that we will not block
messages = await _consumer.ReceiveAsync(TimeSpan.Zero);

if (messages[0].Header.MessageType != MessageType.MT_NONE)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
using Xunit;
using Xunit.Abstractions;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Local.Proactor;

[Trait("Category", "Kafka")]
[Collection("Kafka")] //Kafka doesn't like multiple consumers of a partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
using Xunit.Abstractions;
using Acks = Confluent.Kafka.Acks;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Local.Proactor;

public class KafkaMessageProducerMissingHeaderTestsAsync : IAsyncDisposable
{
Expand Down Expand Up @@ -40,7 +40,7 @@ public KafkaMessageProducerMissingHeaderTestsAsync(ITestOutputHelper output)
LingerMs = 5,
MessageTimeoutMs = 5000,
MessageSendMaxRetries = 3,
Partitioner = Confluent.Kafka.Partitioner.ConsistentRandom,
Partitioner = global::Confluent.Kafka.Partitioner.ConsistentRandom,
QueueBufferingMaxMessages = 10,
QueueBufferingMaxKbytes = 1048576,
RequestTimeoutMs = 500,
Expand Down
Loading
Loading