Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for Kafka Tests following the Proactor/Reactor split #3447

Draft
wants to merge 20 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a7381af
fix: broken Kafka test
iancooper Dec 30, 2024
e168be4
fix: simplify the management of kafka tests with directories
iancooper Dec 31, 2024
88c18b9
fix: make time base test more reliable
iancooper Dec 31, 2024
e66900b
chore: note issue with test and schema registry
iancooper Dec 31, 2024
5bae366
fix: issues with posting a message with delay
iancooper Dec 31, 2024
265f33d
fix: fix issues with timing
iancooper Dec 31, 2024
7b2d509
chore: merge branch 'master' into kafka_fixes
iancooper Dec 31, 2024
9a6bd5e
chore: merge branch 'master' into kafka_fixes
iancooper Dec 31, 2024
b12dfd9
chore merge branch ':master' into kafka_fixes
iancooper Dec 31, 2024
8daba2b
chore: merge branch 'master' into kafka_fixes
iancooper Dec 31, 2024
fb5dd01
chrore: merge branch 'master' into kafka_fixes
iancooper Jan 2, 2025
6955810
fix: time and date issues caused by inconsistency between invariant a…
iancooper Jan 6, 2025
b6655bf
fix: note around skip tests that breaks if Docker container does not …
iancooper Jan 6, 2025
c9e8f37
chore: merge branch 'master' into kafka_fixes
iancooper Jan 6, 2025
864af0e
fix: kafka producer should flush on close; test uses assume not create
iancooper Jan 6, 2025
90f33ed
fix: make the kafka tests more robust
iancooper Jan 8, 2025
d3dee33
chore: merge branch 'master' into kafka_fixes
iancooper Jan 8, 2025
5132b01
fix: when we don't publish on async, must manually raise deliveryreport
iancooper Jan 8, 2025
3031f44
fix: close of consumer now more reliable in flushing offsets
iancooper Jan 8, 2025
ce7f550
chore: Merge branch 'master' into kafka_fixes
iancooper Jan 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ public Headers Build(Message message)
new Header(HeaderNames.MESSAGE_ID, message.Header.MessageId.ToByteArray()),
};

if (message.Header.TimeStamp != default)
headers.Add(HeaderNames.TIMESTAMP, message.Header.TimeStamp.ToString().ToByteArray());
else
headers.Add(HeaderNames.TIMESTAMP, DateTimeOffset.UtcNow.ToString().ToByteArray());
string timeStampAsString = DateTimeOffset.UtcNow.DateTime.ToString(CultureInfo.InvariantCulture);
if (message.Header.TimeStamp.DateTime != default)
{
timeStampAsString = message.Header.TimeStamp.DateTime.ToString(CultureInfo.InvariantCulture);
}

headers.Add(HeaderNames.TIMESTAMP, timeStampAsString.ToByteArray());

if (message.Header.CorrelationId != string.Empty)
headers.Add(HeaderNames.CORRELATION_ID, message.Header.CorrelationId.ToByteArray());
Expand All @@ -64,7 +67,7 @@ public Headers Build(Message message)
if (!string.IsNullOrEmpty(message.Header.ReplyTo))
headers.Add(HeaderNames.REPLY_TO, message.Header.ReplyTo.ToByteArray());

headers.Add(HeaderNames.DELAYED_MILLISECONDS, message.Header.Delayed.ToString().ToByteArray());
headers.Add(HeaderNames.DELAYED_MILLISECONDS, message.Header.Delayed.TotalMilliseconds.ToString(CultureInfo.InvariantCulture).ToByteArray());

headers.Add(HeaderNames.HANDLED_COUNT, message.Header.HandledCount.ToString().ToByteArray());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public KafkaMessageConsumer(
.Build();

s_logger.LogInformation("Kafka consumer subscribing to {Topic}", Topic);
_consumer.Subscribe(new []{ Topic.Value });
_consumer.Subscribe([Topic.Value]);

_creator = new KafkaMessageCreator();

Expand Down Expand Up @@ -540,6 +540,11 @@ private void CommitOffsets()

_consumer.Commit(listOffsets);
}
catch(Exception ex)
{
//may happen if the consumer is not valid when the thread runs
s_logger.LogWarning("KafkaMessageConsumer: Error Committing Offsets: {ErrorMessage}", ex.Message);
}
finally
{
_flushToken.Release(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,7 @@ public IAmAMessageConsumerSync Create(Subscription subscription)
);
}

public IAmAMessageConsumerAsync CreateAsync(Subscription subscription)
{
KafkaSubscription kafkaSubscription = subscription as KafkaSubscription;
if (kafkaSubscription == null)
throw new ConfigurationException("We expect an SQSConnection or SQSConnection<T> as a parameter");

return new KafkaMessageConsumer(
configuration: _configuration,
routingKey:kafkaSubscription.RoutingKey, //topic
groupId: kafkaSubscription.GroupId,
offsetDefault: kafkaSubscription.OffsetDefault,
sessionTimeout: kafkaSubscription.SessionTimeout,
maxPollInterval: kafkaSubscription.MaxPollInterval,
isolationLevel: kafkaSubscription.IsolationLevel,
commitBatchSize: kafkaSubscription.CommitBatchSize,
sweepUncommittedOffsetsInterval: kafkaSubscription.SweepUncommittedOffsetsInterval,
readCommittedOffsetsTimeout: kafkaSubscription.ReadCommittedOffsetsTimeOut,
numPartitions: kafkaSubscription.NumPartitions,
partitionAssignmentStrategy: kafkaSubscription.PartitionAssignmentStrategy,
replicationFactor: kafkaSubscription.ReplicationFactor,
topicFindTimeout: kafkaSubscription.TopicFindTimeout,
makeChannels: kafkaSubscription.MakeChannels
);
}
public IAmAMessageConsumerAsync CreateAsync(Subscription subscription) => (IAmAMessageConsumerAsync)Create(subscription);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,7 @@ private HeaderResult<DateTime> ReadTimeStamp(Headers headers)
if (headers.TryGetLastBytesIgnoreCase(HeaderNames.TIMESTAMP, out byte[] lastHeader))
{
//Additional testing for a non unixtimestamp string
if (DateTime.TryParse(lastHeader.FromByteArray(), DateTimeFormatInfo.CurrentInfo,
DateTimeStyles.AdjustToUniversal, out DateTime timestamp))
if (DateTime.TryParse(lastHeader.FromByteArray(), DateTimeFormatInfo.InvariantInfo, DateTimeStyles.AdjustToUniversal, out DateTime timestamp))
{
return new HeaderResult<DateTime>(timestamp, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ THE SOFTWARE. */

namespace Paramore.Brighter.MessagingGateway.Kafka
{
internal class KafkaMessageProducer : KafkaMessagingGateway, IAmAMessageProducerSync, IAmAMessageProducerAsync, ISupportPublishConfirmation
public class KafkaMessageProducer : KafkaMessagingGateway, IAmAMessageProducerSync, IAmAMessageProducerAsync, ISupportPublishConfirmation
{
/// <summary>
/// Action taken when a message is published, following receipt of a confirmation from the broker
Expand Down Expand Up @@ -152,6 +152,15 @@ public void ConfigHook(Action<ProducerConfig> configHook)
{
configHook(_producerConfig);
}

/// <summary>
/// Flushes the producer to ensure all messages in the internal buffer have been sent
/// </summary>
/// <param name="cancellationToken">Used to timeout the flush operation</param>
public void Flush(CancellationToken cancellationToken = default)
{
_producer?.Flush(cancellationToken);
}

/// <summary>
/// Initialize the producer => two stage construction to allow for a hook if needed
Expand Down Expand Up @@ -240,8 +249,6 @@ public void Send(Message message)
throw new ChannelFailureException("Error connecting to Kafka, see inner exception for details", kafkaException);
}
}



/// <summary>
/// Sends the specified message.
Expand Down Expand Up @@ -336,6 +343,7 @@ private void Dispose(bool disposing)
{
if (disposing)
{
Flush();
_producer?.Dispose();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
using Xunit;
using Xunit.Abstractions;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Proactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down Expand Up @@ -131,15 +131,17 @@ private async Task<string> SendMessageAsync()

private async Task<IEnumerable<Message>> ConsumeMessagesAsync(IAmAMessageConsumerAsync consumer)
{
var messages = new Message[0];
var messages = Array.Empty<Message>();
int maxTries = 0;
do
{
try
{
maxTries++;
await Task.Delay(500); //Let topic propagate in the broker
messages = await consumer.ReceiveAsync(TimeSpan.FromMilliseconds(1000));
//Let topic propagate in the broker
await Task.Delay(500);
//use TimeSpan.Zero to avoid blocking
messages = await consumer.ReceiveAsync(TimeSpan.Zero);

if (messages[0].Header.MessageType != MessageType.MT_NONE)
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
using System;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Confluent.Kafka;
using FluentAssertions;
using Paramore.Brighter.Kafka.Tests.TestDoubles;
using Paramore.Brighter.MessagingGateway.Kafka;
using Xunit;
using Xunit.Abstractions;
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Proactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Proactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Proactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
using Xunit;
using Xunit.Abstractions;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Reactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down Expand Up @@ -129,14 +129,15 @@ private string SendMessage()

private IEnumerable<Message> ConsumeMessages(IAmAMessageConsumerSync consumer)
{
var messages = new Message[0];
var messages = Array.Empty<Message>();
int maxTries = 0;
do
{
try
{
maxTries++;
Task.Delay(500).Wait(); //Let topic propagate in the broker
//Let topic propagate in the broker
Task.Delay(500).Wait();
messages = consumer.Receive(TimeSpan.FromMilliseconds(1000));

if (messages[0].Header.MessageType != MessageType.MT_NONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,13 @@ THE SOFTWARE. */
#endregion

using System;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Confluent.Kafka;
using FluentAssertions;
using Paramore.Brighter.Kafka.Tests.TestDoubles;
using Paramore.Brighter.MessagingGateway.Kafka;
using Xunit;
using Xunit.Abstractions;
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Reactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Reactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Reactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using Xunit;
using Xunit.Abstractions;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Local.Proactor;

[Trait("Category", "Kafka")]
[Trait("Fragile", "CI")]
Expand Down Expand Up @@ -46,16 +46,31 @@ public KafkaMessageConsumerUpdateOffsetAsync(ITestOutputHelper output)
[Fact]
public async Task When_a_message_is_acknowldgede_update_offset()
{
//Let topic propagate in the broker
await Task.Delay(500);

var groupId = Guid.NewGuid().ToString();

//send x messages to Kafka
var routingKey = new RoutingKey(_topic);
var producerAsync = ((IAmAMessageProducerAsync)_producerRegistry.LookupBy(routingKey));

//send x messages to Kafka
var sentMessages = new string[10];
for (int i = 0; i < 10; i++)
{
var msgId = Guid.NewGuid().ToString();
await SendMessageAsync(msgId);

await producerAsync.SendAsync(
new Message(
new MessageHeader(msgId, routingKey, MessageType.MT_COMMAND) {PartitionKey = _partitionKey},
new MessageBody($"test content [{_queueName}]")
)
);
sentMessages[i] = msgId;
}

//We should not need to flush, as the async does not queue work - but in case this changes
((KafkaMessageProducer)producerAsync).Flush();

//This will create, then delete the consumer
Message[] messages = await ConsumeMessagesAsync(groupId: groupId, batchLimit: 5);
Expand All @@ -68,7 +83,7 @@ public async Task When_a_message_is_acknowldgede_update_offset()
}

//yield to broker to catch up
await Task.Delay(TimeSpan.FromSeconds(5));
await Task.Delay(TimeSpan.FromMilliseconds(500));

//This will create a new consumer
Message[] newMessages = await ConsumeMessagesAsync(groupId, batchLimit: 5);
Expand All @@ -80,18 +95,6 @@ public async Task When_a_message_is_acknowldgede_update_offset()
}
}

private async Task SendMessageAsync(string messageId)
{
var routingKey = new RoutingKey(_topic);

await ((IAmAMessageProducerAsync)_producerRegistry.LookupBy(routingKey)).SendAsync(
new Message(
new MessageHeader(messageId, routingKey, MessageType.MT_COMMAND) {PartitionKey = _partitionKey},
new MessageBody($"test content [{_queueName}]")
)
);
}

private async Task<Message[]> ConsumeMessagesAsync(string groupId, int batchLimit)
{
var consumedMessages = new List<Message>();
Expand All @@ -107,15 +110,15 @@ private async Task<Message[]> ConsumeMessagesAsync(string groupId, int batchLimi

async Task<Message> ConsumeMessageAsync(IAmAMessageConsumerAsync consumer)
{
Message[] messages = new []{new Message()};
Message[] messages = [new Message()];
int maxTries = 0;
do
{
try
{
maxTries++;
await Task.Delay(500); //Let topic propagate in the broker
messages = await consumer.ReceiveAsync(TimeSpan.FromMilliseconds(1000));
//use TimeSpan.Zero to avoid blocking
messages = await consumer.ReceiveAsync(TimeSpan.Zero);

if (messages[0].Header.MessageType != MessageType.MT_NONE)
{
Expand Down
Loading
Loading