Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for Kafka Tests following the Proactor/Reactor split #3447

Merged
merged 29 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a7381af
fix: broken Kafka test
iancooper Dec 30, 2024
e168be4
fix: simplify the management of kafka tests with directories
iancooper Dec 31, 2024
88c18b9
fix: make time base test more reliable
iancooper Dec 31, 2024
e66900b
chore: note issue with test and schema registry
iancooper Dec 31, 2024
5bae366
fix: issues with posting a message with delay
iancooper Dec 31, 2024
265f33d
fix: fix issues with timing
iancooper Dec 31, 2024
7b2d509
chore: merge branch 'master' into kafka_fixes
iancooper Dec 31, 2024
9a6bd5e
chore: merge branch 'master' into kafka_fixes
iancooper Dec 31, 2024
b12dfd9
chore merge branch ':master' into kafka_fixes
iancooper Dec 31, 2024
8daba2b
chore: merge branch 'master' into kafka_fixes
iancooper Dec 31, 2024
fb5dd01
chrore: merge branch 'master' into kafka_fixes
iancooper Jan 2, 2025
6955810
fix: time and date issues caused by inconsistency between invariant a…
iancooper Jan 6, 2025
b6655bf
fix: note around skip tests that breaks if Docker container does not …
iancooper Jan 6, 2025
c9e8f37
chore: merge branch 'master' into kafka_fixes
iancooper Jan 6, 2025
864af0e
fix: kafka producer should flush on close; test uses assume not create
iancooper Jan 6, 2025
90f33ed
fix: make the kafka tests more robust
iancooper Jan 8, 2025
d3dee33
chore: merge branch 'master' into kafka_fixes
iancooper Jan 8, 2025
5132b01
fix: when we don't publish on async, must manually raise deliveryreport
iancooper Jan 8, 2025
3031f44
fix: close of consumer now more reliable in flushing offsets
iancooper Jan 8, 2025
ce7f550
chore: Merge branch 'master' into kafka_fixes
iancooper Jan 8, 2025
47fe38a
fix: try to make more reliable, changes to receive. many proactor tes…
iancooper Jan 10, 2025
0f5dfa7
chore: skip another test impacted by timing in CI
iancooper Jan 15, 2025
779ded8
chore: move tests to fragile that don't execute well in CI - many won…
iancooper Jan 15, 2025
7da8749
chore: merge branch 'master' into kafka_fixes
iancooper Jan 15, 2025
bef2276
chore: ensure fragile tests marked as such for CI
iancooper Jan 15, 2025
22e8ffb
chore: merge branch 'master' into kafka_fixes
iancooper Jan 15, 2025
c993c0a
chore: kill Confluent tests, they are not being run ever
iancooper Jan 15, 2025
15c7837
chore: another CI fragile test
iancooper Jan 15, 2025
af5c9fb
Merge branch 'master' into kafka_fixes
iancooper Jan 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ public Headers Build(Message message)
new Header(HeaderNames.MESSAGE_ID, message.Header.MessageId.ToByteArray()),
};

if (message.Header.TimeStamp != default)
headers.Add(HeaderNames.TIMESTAMP, message.Header.TimeStamp.ToString().ToByteArray());
else
headers.Add(HeaderNames.TIMESTAMP, DateTimeOffset.UtcNow.ToString().ToByteArray());
string timeStampAsString = DateTimeOffset.UtcNow.DateTime.ToString(CultureInfo.InvariantCulture);
if (message.Header.TimeStamp.DateTime != default)
{
timeStampAsString = message.Header.TimeStamp.DateTime.ToString(CultureInfo.InvariantCulture);
}

headers.Add(HeaderNames.TIMESTAMP, timeStampAsString.ToByteArray());

if (message.Header.CorrelationId != string.Empty)
headers.Add(HeaderNames.CORRELATION_ID, message.Header.CorrelationId.ToByteArray());
Expand All @@ -64,7 +67,7 @@ public Headers Build(Message message)
if (!string.IsNullOrEmpty(message.Header.ReplyTo))
headers.Add(HeaderNames.REPLY_TO, message.Header.ReplyTo.ToByteArray());

headers.Add(HeaderNames.DELAYED_MILLISECONDS, message.Header.Delayed.ToString().ToByteArray());
headers.Add(HeaderNames.DELAYED_MILLISECONDS, message.Header.Delayed.TotalMilliseconds.ToString(CultureInfo.InvariantCulture).ToByteArray());

headers.Add(HeaderNames.HANDLED_COUNT, message.Header.HandledCount.ToString().ToByteArray());

Expand Down
121 changes: 87 additions & 34 deletions src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,18 @@ namespace Paramore.Brighter.MessagingGateway.Kafka
/// </summary>
public class KafkaMessageConsumer : KafkaMessagingGateway, IAmAMessageConsumerSync, IAmAMessageConsumerAsync
{
private IConsumer<string, byte[]> _consumer;
private readonly IConsumer<string, byte[]> _consumer;
private readonly KafkaMessageCreator _creator;
private readonly ConsumerConfig _consumerConfig;
private List<TopicPartition> _partitions = new List<TopicPartition>();
private readonly ConcurrentBag<TopicPartitionOffset> _offsetStorage = new();
private List<TopicPartition> _partitions = [];
private readonly ConcurrentBag<TopicPartitionOffset> _offsetStorage = [];
private readonly long _maxBatchSize;
private readonly TimeSpan _readCommittedOffsetsTimeout;
private DateTime _lastFlushAt = DateTime.UtcNow;
private readonly TimeSpan _sweepUncommittedInterval;
private readonly SemaphoreSlim _flushToken = new(1, 1);
private bool _hasFatalError;
private bool _isClosed;

/// <summary>
/// Constructs a KafkaMessageConsumer using Confluent's Consumer Builder. We set up callbacks to handle assigned, revoked or lost partitions as
Expand Down Expand Up @@ -206,7 +207,7 @@ public KafkaMessageConsumer(
.Build();

s_logger.LogInformation("Kafka consumer subscribing to {Topic}", Topic);
_consumer.Subscribe(new []{ Topic.Value });
_consumer.Subscribe([Topic.Value]);

_creator = new KafkaMessageCreator();

Expand All @@ -219,6 +220,14 @@ public KafkaMessageConsumer(
EnsureTopic();
}

/// <summary>
/// Destroys the consumer
/// </summary>
~KafkaMessageConsumer()
{
Dispose(false);
}

/// <summary>
/// Acknowledges the specified message.
/// </summary>
Expand Down Expand Up @@ -285,6 +294,35 @@ public void Acknowledge(Message message)
Acknowledge(message);
return Task.CompletedTask;
}

/// <summary>
/// Close the consumer
/// - Commit any outstanding offsets
/// - Surrender any assignments
/// </summary>
/// <remarks>Use this before disposing of the consumer, to ensure an orderly shutdown</remarks>
public void Close()
{
//we will be called twice if explicitly disposed as well as closed, so just skip in that case
if (_isClosed) return;

try
{
_flushToken.Wait(TimeSpan.Zero);
//this will release the semaphore
CommitAllOffsets(DateTime.UtcNow);
}
catch (Exception ex)
{
//Close anyway, we just will get replay of those messages
s_logger.LogDebug("Error committing the current offset to Kafka before closing: {ErrorMessage}", ex.Message);
}
finally
{
_consumer.Close();
_isClosed = true;
}
}

/// <summary>
/// Purges the specified queue name.
Expand Down Expand Up @@ -315,9 +353,25 @@ public void Purge()
/// in production code
/// </remarks>
/// <param name="cancellationToken"></param>
public Task PurgeAsync(CancellationToken cancellationToken = default(CancellationToken))
public async Task PurgeAsync(CancellationToken cancellationToken = default(CancellationToken))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Code Duplication
The module contains 4 functions with similar structure: CommitAllOffsets,CommitOffsets,PurgeAsync,ReceiveAsync

Suppress

{
return Task.Run(this.Purge, cancellationToken);
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);

var purgeTask = Task.Run(() =>
{
try
{
Purge();
tcs.SetResult(null);
}
catch (Exception e)
{
tcs.SetException(e);
}
}, cancellationToken);

await tcs.Task;
await purgeTask;
}

/// <summary>
Expand Down Expand Up @@ -399,16 +453,33 @@ public Message[] Receive(TimeSpan? timeOut = null)
/// We consume the next offset from the stream, and turn it into a Brighter message; we store the offset in the partition into the Brighter message
/// headers for use in storing and committing offsets. If the stream is EOF or we are not allocated partitions, returns an empty message.
/// Kafka does not support an async consumer, and probably never will. See <a href="https://github.com/confluentinc/confluent-kafka-dotnet/issues/487">Confluent Kafka</a>
/// As a result we use Task.Run to encapsulate the call. This will cost a thread, and be slower than the sync version. However, given our pump characeristics this would not result
/// in thread pool exhaustion.
/// As a result we use TimeSpan.Zero to run the recieve loop, which will stop it blocking
/// </remarks>
/// <param name="timeOut">The timeout for receiving a message. Defaults to 300ms</param>
/// <param name="timeOut">The timeout for receiving a message. For async always treated as zero</param>
/// <param name="cancellationToken">The cancellation token - not used as this is async over sync</param>
/// <returns>A Brighter message wrapping the payload from the Kafka stream</returns>
/// <exception cref="ChannelFailureException">We catch Kafka consumer errors and rethrow as a ChannelFailureException </exception>
public async Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default(CancellationToken))
{
return await Task.Run(() => Receive(timeOut), cancellationToken);
var tcs = new TaskCompletionSource<Message[]>(TaskCreationOptions.RunContinuationsAsynchronously);

var recieveTask = Task.Run(() =>
{
try
{
var messages = Receive(TimeSpan.Zero);
tcs.SetResult(messages);
}
catch (Exception e)
{
tcs.SetException(e);
}
}, cancellationToken);

var messages = await tcs.Task;
await recieveTask;

return messages;
}

/// <summary>
Expand Down Expand Up @@ -540,6 +611,11 @@ private void CommitOffsets()

_consumer.Commit(listOffsets);
}
catch(Exception ex)
{
//may happen if the consumer is not valid when the thread runs
s_logger.LogWarning("KafkaMessageConsumer: Error Committing Offsets: {ErrorMessage}", ex.Message);
}
finally
{
_flushToken.Release(1);
Expand Down Expand Up @@ -678,39 +754,16 @@ private void SweepOffsets()
}
}

private void Close()
{
try
{
_consumer.Commit();

var committedOffsets = _consumer.Committed(_partitions, _readCommittedOffsetsTimeout);
foreach (var committedOffset in committedOffsets)
s_logger.LogInformation("Committed offset: {Offset} on partition: {ChannelName} for topic: {Topic}", committedOffset.Offset.Value.ToString(), committedOffset.Partition.Value.ToString(), committedOffset.Topic);

}
catch (Exception ex)
{
//this may happen if the offset is already committed
s_logger.LogDebug("Error committing the current offset to Kafka before closing: {ErrorMessage}", ex.Message);
}
}

private void Dispose(bool disposing)
{
if (disposing)
{
Close();
_consumer?.Dispose();
_flushToken?.Dispose();
}
}


~KafkaMessageConsumer()
{
Dispose(false);
}

public void Dispose()
{
Dispose(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,7 @@ public IAmAMessageConsumerSync Create(Subscription subscription)
);
}

public IAmAMessageConsumerAsync CreateAsync(Subscription subscription)
{
KafkaSubscription kafkaSubscription = subscription as KafkaSubscription;
if (kafkaSubscription == null)
throw new ConfigurationException("We expect an SQSConnection or SQSConnection<T> as a parameter");

return new KafkaMessageConsumer(
configuration: _configuration,
routingKey:kafkaSubscription.RoutingKey, //topic
groupId: kafkaSubscription.GroupId,
offsetDefault: kafkaSubscription.OffsetDefault,
sessionTimeout: kafkaSubscription.SessionTimeout,
maxPollInterval: kafkaSubscription.MaxPollInterval,
isolationLevel: kafkaSubscription.IsolationLevel,
commitBatchSize: kafkaSubscription.CommitBatchSize,
sweepUncommittedOffsetsInterval: kafkaSubscription.SweepUncommittedOffsetsInterval,
readCommittedOffsetsTimeout: kafkaSubscription.ReadCommittedOffsetsTimeOut,
numPartitions: kafkaSubscription.NumPartitions,
partitionAssignmentStrategy: kafkaSubscription.PartitionAssignmentStrategy,
replicationFactor: kafkaSubscription.ReplicationFactor,
topicFindTimeout: kafkaSubscription.TopicFindTimeout,
makeChannels: kafkaSubscription.MakeChannels
);
}
public IAmAMessageConsumerAsync CreateAsync(Subscription subscription) => (IAmAMessageConsumerAsync)Create(subscription);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,7 @@ private HeaderResult<DateTime> ReadTimeStamp(Headers headers)
if (headers.TryGetLastBytesIgnoreCase(HeaderNames.TIMESTAMP, out byte[] lastHeader))
{
//Additional testing for a non unixtimestamp string
if (DateTime.TryParse(lastHeader.FromByteArray(), DateTimeFormatInfo.CurrentInfo,
DateTimeStyles.AdjustToUniversal, out DateTime timestamp))
if (DateTime.TryParse(lastHeader.FromByteArray(), DateTimeFormatInfo.InvariantInfo, DateTimeStyles.AdjustToUniversal, out DateTime timestamp))
{
return new HeaderResult<DateTime>(timestamp, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ THE SOFTWARE. */

namespace Paramore.Brighter.MessagingGateway.Kafka
{
internal class KafkaMessageProducer : KafkaMessagingGateway, IAmAMessageProducerSync, IAmAMessageProducerAsync, ISupportPublishConfirmation
public class KafkaMessageProducer : KafkaMessagingGateway, IAmAMessageProducerSync, IAmAMessageProducerAsync, ISupportPublishConfirmation
{
/// <summary>
/// Action taken when a message is published, following receipt of a confirmation from the broker
Expand Down Expand Up @@ -152,6 +152,15 @@ public void ConfigHook(Action<ProducerConfig> configHook)
{
configHook(_producerConfig);
}

/// <summary>
/// Flushes the producer to ensure all messages in the internal buffer have been sent
/// </summary>
/// <param name="cancellationToken">Used to timeout the flush operation</param>
public void Flush(CancellationToken cancellationToken = default)
{
_producer?.Flush(cancellationToken);
}

/// <summary>
/// Initialize the producer => two stage construction to allow for a hook if needed
Expand Down Expand Up @@ -240,8 +249,6 @@ public void Send(Message message)
throw new ChannelFailureException("Error connecting to Kafka, see inner exception for details", kafkaException);
}
}



/// <summary>
/// Sends the specified message.
Expand Down Expand Up @@ -336,6 +343,7 @@ private void Dispose(bool disposing)
{
if (disposing)
{
Flush();
_producer?.Dispose();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,24 @@ public void PublishMessage(Message message, Action<DeliveryReport<string, byte[]
public async Task PublishMessageAsync(Message message, Action<DeliveryResult<string, byte[]>> deliveryReport, CancellationToken cancellationToken = default)
{
var kafkaMessage = BuildMessage(message);

var deliveryResult = await _producer.ProduceAsync(message.Header.Topic, kafkaMessage, cancellationToken);
deliveryReport(deliveryResult);

try
{
var deliveryResult = await _producer.ProduceAsync(message.Header.Topic, kafkaMessage, cancellationToken);
deliveryReport(deliveryResult);
}
catch (ProduceException<string, byte[]>)
{
//unlike the sync path, async will throw if it can't write. We want to capture that exception and raise the event
//so that our flows match
DeliveryResult<string, byte[]> deliveryResult = new();
deliveryResult.Status = PersistenceStatus.NotPersisted;
deliveryResult.Message = new Message<string, byte[]>();
deliveryResult.Message.Headers = [];
deliveryResult.Headers.Add(HeaderNames.MESSAGE_ID, message.Id.ToByteArray());
deliveryReport(deliveryResult);
}

}

private Message<string, byte[]> BuildMessage(Message message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
using Xunit;
using Xunit.Abstractions;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Proactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down Expand Up @@ -131,15 +131,17 @@ private async Task<string> SendMessageAsync()

private async Task<IEnumerable<Message>> ConsumeMessagesAsync(IAmAMessageConsumerAsync consumer)
{
var messages = new Message[0];
var messages = Array.Empty<Message>();
int maxTries = 0;
do
{
try
{
maxTries++;
await Task.Delay(500); //Let topic propagate in the broker
messages = await consumer.ReceiveAsync(TimeSpan.FromMilliseconds(1000));
//Let topic propagate in the broker
await Task.Delay(500);
//use TimeSpan.Zero to avoid blocking
messages = await consumer.ReceiveAsync(TimeSpan.Zero);

if (messages[0].Header.MessageType != MessageType.MT_NONE)
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
using System;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Confluent.Kafka;
using FluentAssertions;
using Paramore.Brighter.Kafka.Tests.TestDoubles;
using Paramore.Brighter.MessagingGateway.Kafka;
using Xunit;
using Xunit.Abstractions;
using SaslMechanism = Paramore.Brighter.MessagingGateway.Kafka.SaslMechanism;
using SecurityProtocol = Paramore.Brighter.MessagingGateway.Kafka.SecurityProtocol;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway.Confluent.Proactor;

[Trait("Category", "Kafka")]
[Trait("Category", "Confluent")]
Expand Down Expand Up @@ -63,7 +57,7 @@ string SupplyCertificateLocation()
//your production values ought to be lower
MessageTimeoutMs = 10000,
RequestTimeoutMs = 10000,
MakeChannels = OnMissingChannel.Create //This will not make the topic
MakeChannels = OnMissingChannel.Assume //This will not make the topic
}
]).Create();

Expand Down
Loading
Loading