Skip to content

Commit

Permalink
Fixes for Kafka Tests following the Proactor/Reactor split (#3447)
Browse files Browse the repository at this point in the history
* fix: broken Kafka test

* fix: simplify the management of kafka tests with directories

* fix: make time base test more reliable

* chore: note issue with test and schema registry

* fix: issues with posting a message with delay

* fix: fix issues with timing

* fix: time and date issues caused by inconsistency between invariant and local culture decisions. Use invariant

* fix: note around skip tests that breaks if Docker container does not set auto-create for Kafka topics to false.

* fix: kafka producer should flush on close; test uses assume not create

* fix: make the kafka tests more robust

* fix: when we don't publish on async, must manually raise deliveryreport

* fix: close of consumer now more reliable in flushing offsets

* fix: try to make more reliable, changes to receive. many proactor tests can only run locally, due to timing issues

* chore: skip another test impacted by timing in CI

* chore: move tests to fragile that don't execute well in CI - many won't run locally if not in debug, but better to note these over never running

* chore: ensure fragile tests marked as such for CI

* chore: kill Confluent tests, they are not being run ever

* chore: another CI fragile test
  • Loading branch information
iancooper authored Jan 15, 2025
1 parent de05cfe commit d1484fe
Show file tree
Hide file tree
Showing 33 changed files with 650 additions and 1,607 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ public Headers Build(Message message)
new Header(HeaderNames.MESSAGE_ID, message.Header.MessageId.ToByteArray()),
};

if (message.Header.TimeStamp != default)
headers.Add(HeaderNames.TIMESTAMP, message.Header.TimeStamp.ToString().ToByteArray());
else
headers.Add(HeaderNames.TIMESTAMP, DateTimeOffset.UtcNow.ToString().ToByteArray());
string timeStampAsString = DateTimeOffset.UtcNow.DateTime.ToString(CultureInfo.InvariantCulture);
if (message.Header.TimeStamp.DateTime != default)
{
timeStampAsString = message.Header.TimeStamp.DateTime.ToString(CultureInfo.InvariantCulture);
}

headers.Add(HeaderNames.TIMESTAMP, timeStampAsString.ToByteArray());

if (message.Header.CorrelationId != string.Empty)
headers.Add(HeaderNames.CORRELATION_ID, message.Header.CorrelationId.ToByteArray());
Expand All @@ -64,7 +67,7 @@ public Headers Build(Message message)
if (!string.IsNullOrEmpty(message.Header.ReplyTo))
headers.Add(HeaderNames.REPLY_TO, message.Header.ReplyTo.ToByteArray());

headers.Add(HeaderNames.DELAYED_MILLISECONDS, message.Header.Delayed.ToString().ToByteArray());
headers.Add(HeaderNames.DELAYED_MILLISECONDS, message.Header.Delayed.TotalMilliseconds.ToString(CultureInfo.InvariantCulture).ToByteArray());

headers.Add(HeaderNames.HANDLED_COUNT, message.Header.HandledCount.ToString().ToByteArray());

Expand Down
121 changes: 87 additions & 34 deletions src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,18 @@ namespace Paramore.Brighter.MessagingGateway.Kafka
/// </summary>
public class KafkaMessageConsumer : KafkaMessagingGateway, IAmAMessageConsumerSync, IAmAMessageConsumerAsync
{
private IConsumer<string, byte[]> _consumer;
private readonly IConsumer<string, byte[]> _consumer;
private readonly KafkaMessageCreator _creator;
private readonly ConsumerConfig _consumerConfig;
private List<TopicPartition> _partitions = new List<TopicPartition>();
private readonly ConcurrentBag<TopicPartitionOffset> _offsetStorage = new();
private List<TopicPartition> _partitions = [];
private readonly ConcurrentBag<TopicPartitionOffset> _offsetStorage = [];
private readonly long _maxBatchSize;
private readonly TimeSpan _readCommittedOffsetsTimeout;
private DateTime _lastFlushAt = DateTime.UtcNow;
private readonly TimeSpan _sweepUncommittedInterval;
private readonly SemaphoreSlim _flushToken = new(1, 1);
private bool _hasFatalError;
private bool _isClosed;

/// <summary>
/// Constructs a KafkaMessageConsumer using Confluent's Consumer Builder. We set up callbacks to handle assigned, revoked or lost partitions as
Expand Down Expand Up @@ -206,7 +207,7 @@ public KafkaMessageConsumer(
.Build();

s_logger.LogInformation("Kafka consumer subscribing to {Topic}", Topic);
_consumer.Subscribe(new []{ Topic.Value });
_consumer.Subscribe([Topic.Value]);

_creator = new KafkaMessageCreator();

Expand All @@ -219,6 +220,14 @@ public KafkaMessageConsumer(
EnsureTopic();
}

/// <summary>
/// Destroys the consumer
/// </summary>
~KafkaMessageConsumer()
{
Dispose(false);
}

/// <summary>
/// Acknowledges the specified message.
/// </summary>
Expand Down Expand Up @@ -285,6 +294,35 @@ public void Acknowledge(Message message)
Acknowledge(message);
return Task.CompletedTask;
}

/// <summary>
/// Close the consumer
/// - Commit any outstanding offsets
/// - Surrender any assignments
/// </summary>
/// <remarks>Use this before disposing of the consumer, to ensure an orderly shutdown</remarks>
public void Close()
{
//we will be called twice if explicitly disposed as well as closed, so just skip in that case
if (_isClosed) return;

try
{
_flushToken.Wait(TimeSpan.Zero);
//this will release the semaphore
CommitAllOffsets(DateTime.UtcNow);
}
catch (Exception ex)
{
//Close anyway, we just will get replay of those messages
s_logger.LogDebug("Error committing the current offset to Kafka before closing: {ErrorMessage}", ex.Message);
}
finally
{
_consumer.Close();
_isClosed = true;
}
}

/// <summary>
/// Purges the specified queue name.
Expand Down Expand Up @@ -315,9 +353,25 @@ public void Purge()
/// in production code
/// </remarks>
/// <param name="cancellationToken"></param>
public Task PurgeAsync(CancellationToken cancellationToken = default(CancellationToken))
public async Task PurgeAsync(CancellationToken cancellationToken = default(CancellationToken))
{
return Task.Run(this.Purge, cancellationToken);
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);

var purgeTask = Task.Run(() =>
{
try
{
Purge();
tcs.SetResult(null);
}
catch (Exception e)
{
tcs.SetException(e);
}
}, cancellationToken);

await tcs.Task;
await purgeTask;
}

/// <summary>
Expand Down Expand Up @@ -399,16 +453,33 @@ public Message[] Receive(TimeSpan? timeOut = null)
/// We consume the next offset from the stream, and turn it into a Brighter message; we store the offset in the partition into the Brighter message
/// headers for use in storing and committing offsets. If the stream is EOF or we are not allocated partitions, returns an empty message.
/// Kafka does not support an async consumer, and probably never will. See <a href="https://github.com/confluentinc/confluent-kafka-dotnet/issues/487">Confluent Kafka</a>
/// As a result we use Task.Run to encapsulate the call. This will cost a thread, and be slower than the sync version. However, given our pump characeristics this would not result
/// in thread pool exhaustion.
/// As a result we use TimeSpan.Zero to run the recieve loop, which will stop it blocking
/// </remarks>
/// <param name="timeOut">The timeout for receiving a message. Defaults to 300ms</param>
/// <param name="timeOut">The timeout for receiving a message. For async always treated as zero</param>
/// <param name="cancellationToken">The cancellation token - not used as this is async over sync</param>
/// <returns>A Brighter message wrapping the payload from the Kafka stream</returns>
/// <exception cref="ChannelFailureException">We catch Kafka consumer errors and rethrow as a ChannelFailureException </exception>
public async Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default(CancellationToken))
{
return await Task.Run(() => Receive(timeOut), cancellationToken);
var tcs = new TaskCompletionSource<Message[]>(TaskCreationOptions.RunContinuationsAsynchronously);

var recieveTask = Task.Run(() =>
{
try
{
var messages = Receive(TimeSpan.Zero);
tcs.SetResult(messages);
}
catch (Exception e)
{
tcs.SetException(e);
}
}, cancellationToken);

var messages = await tcs.Task;
await recieveTask;

return messages;
}

/// <summary>
Expand Down Expand Up @@ -540,6 +611,11 @@ private void CommitOffsets()

_consumer.Commit(listOffsets);
}
catch(Exception ex)
{
//may happen if the consumer is not valid when the thread runs
s_logger.LogWarning("KafkaMessageConsumer: Error Committing Offsets: {ErrorMessage}", ex.Message);
}
finally
{
_flushToken.Release(1);
Expand Down Expand Up @@ -678,39 +754,16 @@ private void SweepOffsets()
}
}

private void Close()
{
try
{
_consumer.Commit();

var committedOffsets = _consumer.Committed(_partitions, _readCommittedOffsetsTimeout);
foreach (var committedOffset in committedOffsets)
s_logger.LogInformation("Committed offset: {Offset} on partition: {ChannelName} for topic: {Topic}", committedOffset.Offset.Value.ToString(), committedOffset.Partition.Value.ToString(), committedOffset.Topic);

}
catch (Exception ex)
{
//this may happen if the offset is already committed
s_logger.LogDebug("Error committing the current offset to Kafka before closing: {ErrorMessage}", ex.Message);
}
}

private void Dispose(bool disposing)
{
if (disposing)
{
Close();
_consumer?.Dispose();
_flushToken?.Dispose();
}
}


~KafkaMessageConsumer()
{
Dispose(false);
}

public void Dispose()
{
Dispose(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,7 @@ public IAmAMessageConsumerSync Create(Subscription subscription)
);
}

public IAmAMessageConsumerAsync CreateAsync(Subscription subscription)
{
KafkaSubscription kafkaSubscription = subscription as KafkaSubscription;
if (kafkaSubscription == null)
throw new ConfigurationException("We expect an SQSConnection or SQSConnection<T> as a parameter");

return new KafkaMessageConsumer(
configuration: _configuration,
routingKey:kafkaSubscription.RoutingKey, //topic
groupId: kafkaSubscription.GroupId,
offsetDefault: kafkaSubscription.OffsetDefault,
sessionTimeout: kafkaSubscription.SessionTimeout,
maxPollInterval: kafkaSubscription.MaxPollInterval,
isolationLevel: kafkaSubscription.IsolationLevel,
commitBatchSize: kafkaSubscription.CommitBatchSize,
sweepUncommittedOffsetsInterval: kafkaSubscription.SweepUncommittedOffsetsInterval,
readCommittedOffsetsTimeout: kafkaSubscription.ReadCommittedOffsetsTimeOut,
numPartitions: kafkaSubscription.NumPartitions,
partitionAssignmentStrategy: kafkaSubscription.PartitionAssignmentStrategy,
replicationFactor: kafkaSubscription.ReplicationFactor,
topicFindTimeout: kafkaSubscription.TopicFindTimeout,
makeChannels: kafkaSubscription.MakeChannels
);
}
public IAmAMessageConsumerAsync CreateAsync(Subscription subscription) => (IAmAMessageConsumerAsync)Create(subscription);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,7 @@ private HeaderResult<DateTime> ReadTimeStamp(Headers headers)
if (headers.TryGetLastBytesIgnoreCase(HeaderNames.TIMESTAMP, out byte[] lastHeader))
{
//Additional testing for a non unixtimestamp string
if (DateTime.TryParse(lastHeader.FromByteArray(), DateTimeFormatInfo.CurrentInfo,
DateTimeStyles.AdjustToUniversal, out DateTime timestamp))
if (DateTime.TryParse(lastHeader.FromByteArray(), DateTimeFormatInfo.InvariantInfo, DateTimeStyles.AdjustToUniversal, out DateTime timestamp))
{
return new HeaderResult<DateTime>(timestamp, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ THE SOFTWARE. */

namespace Paramore.Brighter.MessagingGateway.Kafka
{
internal class KafkaMessageProducer : KafkaMessagingGateway, IAmAMessageProducerSync, IAmAMessageProducerAsync, ISupportPublishConfirmation
public class KafkaMessageProducer : KafkaMessagingGateway, IAmAMessageProducerSync, IAmAMessageProducerAsync, ISupportPublishConfirmation
{
/// <summary>
/// Action taken when a message is published, following receipt of a confirmation from the broker
Expand Down Expand Up @@ -152,6 +152,15 @@ public void ConfigHook(Action<ProducerConfig> configHook)
{
configHook(_producerConfig);
}

/// <summary>
/// Flushes the producer to ensure all messages in the internal buffer have been sent
/// </summary>
/// <param name="cancellationToken">Used to timeout the flush operation</param>
public void Flush(CancellationToken cancellationToken = default)
{
_producer?.Flush(cancellationToken);
}

/// <summary>
/// Initialize the producer => two stage construction to allow for a hook if needed
Expand Down Expand Up @@ -240,8 +249,6 @@ public void Send(Message message)
throw new ChannelFailureException("Error connecting to Kafka, see inner exception for details", kafkaException);
}
}



/// <summary>
/// Sends the specified message.
Expand Down Expand Up @@ -336,6 +343,7 @@ private void Dispose(bool disposing)
{
if (disposing)
{
Flush();
_producer?.Dispose();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,24 @@ public void PublishMessage(Message message, Action<DeliveryReport<string, byte[]
public async Task PublishMessageAsync(Message message, Action<DeliveryResult<string, byte[]>> deliveryReport, CancellationToken cancellationToken = default)
{
var kafkaMessage = BuildMessage(message);

var deliveryResult = await _producer.ProduceAsync(message.Header.Topic, kafkaMessage, cancellationToken);
deliveryReport(deliveryResult);

try
{
var deliveryResult = await _producer.ProduceAsync(message.Header.Topic, kafkaMessage, cancellationToken);
deliveryReport(deliveryResult);
}
catch (ProduceException<string, byte[]>)
{
//unlike the sync path, async will throw if it can't write. We want to capture that exception and raise the event
//so that our flows match
DeliveryResult<string, byte[]> deliveryResult = new();
deliveryResult.Status = PersistenceStatus.NotPersisted;
deliveryResult.Message = new Message<string, byte[]>();
deliveryResult.Message.Headers = [];
deliveryResult.Headers.Add(HeaderNames.MESSAGE_ID, message.Id.ToByteArray());
deliveryReport(deliveryResult);
}

}

private Message<string, byte[]> BuildMessage(Message message)
Expand Down
Loading

0 comments on commit d1484fe

Please sign in to comment.