Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for Kafka Tests following the Proactor/Reactor split #3447

Draft
wants to merge 20 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a7381af
fix: broken Kafka test
iancooper Dec 30, 2024
e168be4
fix: simplify the management of kafka tests with directories
iancooper Dec 31, 2024
88c18b9
fix: make time base test more reliable
iancooper Dec 31, 2024
e66900b
chore: note issue with test and schema registry
iancooper Dec 31, 2024
5bae366
fix: issues with posting a message with delay
iancooper Dec 31, 2024
265f33d
fix: fix issues with timing
iancooper Dec 31, 2024
7b2d509
chore: merge branch 'master' into kafka_fixes
iancooper Dec 31, 2024
9a6bd5e
chore: merge branch 'master' into kafka_fixes
iancooper Dec 31, 2024
b12dfd9
chore merge branch ':master' into kafka_fixes
iancooper Dec 31, 2024
8daba2b
chore: merge branch 'master' into kafka_fixes
iancooper Dec 31, 2024
fb5dd01
chrore: merge branch 'master' into kafka_fixes
iancooper Jan 2, 2025
6955810
fix: time and date issues caused by inconsistency between invariant a…
iancooper Jan 6, 2025
b6655bf
fix: note around skip tests that breaks if Docker container does not …
iancooper Jan 6, 2025
c9e8f37
chore: merge branch 'master' into kafka_fixes
iancooper Jan 6, 2025
864af0e
fix: kafka producer should flush on close; test uses assume not create
iancooper Jan 6, 2025
90f33ed
fix: make the kafka tests more robust
iancooper Jan 8, 2025
d3dee33
chore: merge branch 'master' into kafka_fixes
iancooper Jan 8, 2025
5132b01
fix: when we don't publish on async, must manually raise deliveryreport
iancooper Jan 8, 2025
3031f44
fix: close of consumer now more reliable in flushing offsets
iancooper Jan 8, 2025
ce7f550
chore: Merge branch 'master' into kafka_fixes
iancooper Jan 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ public Headers Build(Message message)
new Header(HeaderNames.MESSAGE_ID, message.Header.MessageId.ToByteArray()),
};

if (message.Header.TimeStamp != default)
headers.Add(HeaderNames.TIMESTAMP, message.Header.TimeStamp.ToString().ToByteArray());
else
headers.Add(HeaderNames.TIMESTAMP, DateTimeOffset.UtcNow.ToString().ToByteArray());
string timeStampAsString = DateTimeOffset.UtcNow.DateTime.ToString(CultureInfo.InvariantCulture);
if (message.Header.TimeStamp.DateTime != default)
{
timeStampAsString = message.Header.TimeStamp.DateTime.ToString(CultureInfo.InvariantCulture);
}

headers.Add(HeaderNames.TIMESTAMP, timeStampAsString.ToByteArray());

if (message.Header.CorrelationId != string.Empty)
headers.Add(HeaderNames.CORRELATION_ID, message.Header.CorrelationId.ToByteArray());
Expand All @@ -64,7 +67,7 @@ public Headers Build(Message message)
if (!string.IsNullOrEmpty(message.Header.ReplyTo))
headers.Add(HeaderNames.REPLY_TO, message.Header.ReplyTo.ToByteArray());

headers.Add(HeaderNames.DELAYED_MILLISECONDS, message.Header.Delayed.ToString().ToByteArray());
headers.Add(HeaderNames.DELAYED_MILLISECONDS, message.Header.Delayed.TotalMilliseconds.ToString(CultureInfo.InvariantCulture).ToByteArray());

headers.Add(HeaderNames.HANDLED_COUNT, message.Header.HandledCount.ToString().ToByteArray());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,18 @@ namespace Paramore.Brighter.MessagingGateway.Kafka
/// </summary>
public class KafkaMessageConsumer : KafkaMessagingGateway, IAmAMessageConsumerSync, IAmAMessageConsumerAsync
{
private IConsumer<string, byte[]> _consumer;
private readonly IConsumer<string, byte[]> _consumer;
private readonly KafkaMessageCreator _creator;
private readonly ConsumerConfig _consumerConfig;
private List<TopicPartition> _partitions = new List<TopicPartition>();
private readonly ConcurrentBag<TopicPartitionOffset> _offsetStorage = new();
private List<TopicPartition> _partitions = [];
private readonly ConcurrentBag<TopicPartitionOffset> _offsetStorage = [];
private readonly long _maxBatchSize;
private readonly TimeSpan _readCommittedOffsetsTimeout;
private DateTime _lastFlushAt = DateTime.UtcNow;
private readonly TimeSpan _sweepUncommittedInterval;
private readonly SemaphoreSlim _flushToken = new(1, 1);
private bool _hasFatalError;
private bool _isClosed;

/// <summary>
/// Constructs a KafkaMessageConsumer using Confluent's Consumer Builder. We set up callbacks to handle assigned, revoked or lost partitions as
Expand Down Expand Up @@ -206,7 +207,7 @@ public KafkaMessageConsumer(
.Build();

s_logger.LogInformation("Kafka consumer subscribing to {Topic}", Topic);
_consumer.Subscribe(new []{ Topic.Value });
_consumer.Subscribe([Topic.Value]);

_creator = new KafkaMessageCreator();

Expand All @@ -219,6 +220,14 @@ public KafkaMessageConsumer(
EnsureTopic();
}

/// <summary>
/// Destroys the consumer
/// </summary>
~KafkaMessageConsumer()
{
Dispose(false);
}

/// <summary>
/// Acknowledges the specified message.
/// </summary>
Expand Down Expand Up @@ -285,6 +294,35 @@ public void Acknowledge(Message message)
Acknowledge(message);
return Task.CompletedTask;
}

/// <summary>
/// Close the consumer
/// - Commit any outstanding offsets
/// - Surrender any assignments
/// </summary>
/// <remarks>Use this before disposing of the consumer, to ensure an orderly shutdown</remarks>
public void Close()
{
//we will be called twice if explicitly disposed as well as closed, so just skip in that case
if (_isClosed) return;

try
{
_flushToken.Wait(TimeSpan.Zero);
//this will release the semaphore
CommitAllOffsets(DateTime.UtcNow);
}
catch (Exception ex)
{
//Close anyway, we just will get replay of those messages
s_logger.LogDebug("Error committing the current offset to Kafka before closing: {ErrorMessage}", ex.Message);
}
finally
{
_consumer.Close();
_isClosed = true;
}
}

/// <summary>
/// Purges the specified queue name.
Expand Down Expand Up @@ -540,6 +578,11 @@ private void CommitOffsets()

_consumer.Commit(listOffsets);
}
catch(Exception ex)
{
//may happen if the consumer is not valid when the thread runs
s_logger.LogWarning("KafkaMessageConsumer: Error Committing Offsets: {ErrorMessage}", ex.Message);
}
finally
{
_flushToken.Release(1);
Expand Down Expand Up @@ -678,39 +721,16 @@ private void SweepOffsets()
}
}

private void Close()
{
try
{
_consumer.Commit();

var committedOffsets = _consumer.Committed(_partitions, _readCommittedOffsetsTimeout);
foreach (var committedOffset in committedOffsets)
s_logger.LogInformation("Committed offset: {Offset} on partition: {ChannelName} for topic: {Topic}", committedOffset.Offset.Value.ToString(), committedOffset.Partition.Value.ToString(), committedOffset.Topic);

}
catch (Exception ex)
{
//this may happen if the offset is already committed
s_logger.LogDebug("Error committing the current offset to Kafka before closing: {ErrorMessage}", ex.Message);
}
}

private void Dispose(bool disposing)
{
if (disposing)
{
Close();
_consumer?.Dispose();
_flushToken?.Dispose();
}
}


~KafkaMessageConsumer()
{
Dispose(false);
}

public void Dispose()
{
Dispose(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,7 @@ public IAmAMessageConsumerSync Create(Subscription subscription)
);
}

public IAmAMessageConsumerAsync CreateAsync(Subscription subscription)
{
KafkaSubscription kafkaSubscription = subscription as KafkaSubscription;
if (kafkaSubscription == null)
throw new ConfigurationException("We expect an SQSConnection or SQSConnection<T> as a parameter");

return new KafkaMessageConsumer(
configuration: _configuration,
routingKey:kafkaSubscription.RoutingKey, //topic
groupId: kafkaSubscription.GroupId,
offsetDefault: kafkaSubscription.OffsetDefault,
sessionTimeout: kafkaSubscription.SessionTimeout,
maxPollInterval: kafkaSubscription.MaxPollInterval,
isolationLevel: kafkaSubscription.IsolationLevel,
commitBatchSize: kafkaSubscription.CommitBatchSize,
sweepUncommittedOffsetsInterval: kafkaSubscription.SweepUncommittedOffsetsInterval,
readCommittedOffsetsTimeout: kafkaSubscription.ReadCommittedOffsetsTimeOut,
numPartitions: kafkaSubscription.NumPartitions,
partitionAssignmentStrategy: kafkaSubscription.PartitionAssignmentStrategy,
replicationFactor: kafkaSubscription.ReplicationFactor,
topicFindTimeout: kafkaSubscription.TopicFindTimeout,
makeChannels: kafkaSubscription.MakeChannels
);
}
public IAmAMessageConsumerAsync CreateAsync(Subscription subscription) => (IAmAMessageConsumerAsync)Create(subscription);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,7 @@ private HeaderResult<DateTime> ReadTimeStamp(Headers headers)
if (headers.TryGetLastBytesIgnoreCase(HeaderNames.TIMESTAMP, out byte[] lastHeader))
{
//Additional testing for a non unixtimestamp string
if (DateTime.TryParse(lastHeader.FromByteArray(), DateTimeFormatInfo.CurrentInfo,
DateTimeStyles.AdjustToUniversal, out DateTime timestamp))
if (DateTime.TryParse(lastHeader.FromByteArray(), DateTimeFormatInfo.InvariantInfo, DateTimeStyles.AdjustToUniversal, out DateTime timestamp))
{
return new HeaderResult<DateTime>(timestamp, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ THE SOFTWARE. */

namespace Paramore.Brighter.MessagingGateway.Kafka
{
internal class KafkaMessageProducer : KafkaMessagingGateway, IAmAMessageProducerSync, IAmAMessageProducerAsync, ISupportPublishConfirmation
public class KafkaMessageProducer : KafkaMessagingGateway, IAmAMessageProducerSync, IAmAMessageProducerAsync, ISupportPublishConfirmation
{
/// <summary>
/// Action taken when a message is published, following receipt of a confirmation from the broker
Expand Down Expand Up @@ -152,6 +152,15 @@ public void ConfigHook(Action<ProducerConfig> configHook)
{
configHook(_producerConfig);
}

/// <summary>
/// Flushes the producer to ensure all messages in the internal buffer have been sent
/// </summary>
/// <param name="cancellationToken">Used to timeout the flush operation</param>
public void Flush(CancellationToken cancellationToken = default)
{
_producer?.Flush(cancellationToken);
}

/// <summary>
/// Initialize the producer => two stage construction to allow for a hook if needed
Expand Down Expand Up @@ -240,8 +249,6 @@ public void Send(Message message)
throw new ChannelFailureException("Error connecting to Kafka, see inner exception for details", kafkaException);
}
}



/// <summary>
/// Sends the specified message.
Expand Down Expand Up @@ -336,6 +343,7 @@ private void Dispose(bool disposing)
{
if (disposing)
{
Flush();
_producer?.Dispose();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,24 @@ public void PublishMessage(Message message, Action<DeliveryReport<string, byte[]
public async Task PublishMessageAsync(Message message, Action<DeliveryResult<string, byte[]>> deliveryReport, CancellationToken cancellationToken = default)
{
var kafkaMessage = BuildMessage(message);

var deliveryResult = await _producer.ProduceAsync(message.Header.Topic, kafkaMessage, cancellationToken);
deliveryReport(deliveryResult);

try
{
var deliveryResult = await _producer.ProduceAsync(message.Header.Topic, kafkaMessage, cancellationToken);
deliveryReport(deliveryResult);
}
catch (ProduceException<string, byte[]>)
{
//unlike the sync path, async will throw if it can't write. We want to capture that exception and raise the event
//so that our flows match
DeliveryResult<string, byte[]> deliveryResult = new();
deliveryResult.Status = PersistenceStatus.NotPersisted;
deliveryResult.Message = new Message<string, byte[]>();
deliveryResult.Message.Headers = [];
deliveryResult.Headers.Add(HeaderNames.MESSAGE_ID, message.Id.ToByteArray());
deliveryReport(deliveryResult);
}

}

private Message<string, byte[]> BuildMessage(Message message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
using Xunit;
using Xunit.Abstractions;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Proactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down Expand Up @@ -131,15 +131,17 @@ private async Task<string> SendMessageAsync()

private async Task<IEnumerable<Message>> ConsumeMessagesAsync(IAmAMessageConsumerAsync consumer)
{
var messages = new Message[0];
var messages = Array.Empty<Message>();
int maxTries = 0;
do
{
try
{
maxTries++;
await Task.Delay(500); //Let topic propagate in the broker
messages = await consumer.ReceiveAsync(TimeSpan.FromMilliseconds(1000));
//Let topic propagate in the broker
await Task.Delay(500);
//use TimeSpan.Zero to avoid blocking
messages = await consumer.ReceiveAsync(TimeSpan.Zero);

if (messages[0].Header.MessageType != MessageType.MT_NONE)
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
using System;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Confluent.Kafka;
using FluentAssertions;
using Paramore.Brighter.Kafka.Tests.TestDoubles;
using Paramore.Brighter.MessagingGateway.Kafka;
using Xunit;
using Xunit.Abstractions;
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Proactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down Expand Up @@ -63,7 +57,7 @@ string SupplyCertificateLocation()
//your production values ought to be lower
MessageTimeoutMs = 10000,
RequestTimeoutMs = 10000,
MakeChannels = OnMissingChannel.Create //This will not make the topic
MakeChannels = OnMissingChannel.Assume //This will not make the topic
}
]).Create();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Proactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Proactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
using Xunit;
using Xunit.Abstractions;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Reactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down Expand Up @@ -129,14 +129,15 @@ private string SendMessage()

private IEnumerable<Message> ConsumeMessages(IAmAMessageConsumerSync consumer)
{
var messages = new Message[0];
var messages = Array.Empty<Message>();
int maxTries = 0;
do
{
try
{
maxTries++;
Task.Delay(500).Wait(); //Let topic propagate in the broker
//Let topic propagate in the broker
Task.Delay(500).Wait();
messages = consumer.Receive(TimeSpan.FromMilliseconds(1000));

if (messages[0].Header.MessageType != MessageType.MT_NONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,13 @@ THE SOFTWARE. */
#endregion

using System;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Confluent.Kafka;
using FluentAssertions;
using Paramore.Brighter.Kafka.Tests.TestDoubles;
using Paramore.Brighter.MessagingGateway.Kafka;
using Xunit;
using Xunit.Abstractions;
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Reactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down
Loading
Loading