Skip to content

Commit

Permalink
Improve partition support when producing/consuming message (#42)
Browse files Browse the repository at this point in the history
- Introduce Partitioner for Raw and non-raw topic producer
- Replace int partitionId with struct Partition
- Add Multiple partition support to raw and non-raw topic consumers
- Improve on consumer connection detection to avoid unnecessary wait times
  • Loading branch information
peter-quix authored Apr 8, 2024
1 parent a0e5cae commit 43c4dbe
Show file tree
Hide file tree
Showing 17 changed files with 842 additions and 257 deletions.
4 changes: 2 additions & 2 deletions builds/csharp/nuget/build_nugets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from typing import List

version = "0.6.4.0"
informal_version = "0.6.4.0-dev5"
nuget_version = "0.6.4.0-dev5"
informal_version = "0.6.4.0-dev8"
nuget_version = "0.6.4.0-dev8"


def updatecsproj(projfilepath):
Expand Down
8 changes: 5 additions & 3 deletions src/QuixStreams.Kafka/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ void NullLoggerForAdminLogs(IAdminClient adminClient, LogMessage logMessage)
}
}

this.consumer = consumerBuilder.Build();
this.consumer = consumerBuilder.Build();

if (partitions != null)
{
Expand Down Expand Up @@ -356,7 +356,7 @@ private void AutomaticOffsetsCommittedHandler(IConsumer<byte[], byte[]> consumer

private void ConsumerLogHandler(IConsumer<byte[], byte[]> consumer, LogMessage msg)
{
if (!this.connectionEstablishedEvent.IsSet && KafkaHelper.TryParseWakeup(msg, out var ready) && ready)
if (this.VerifyBrokerConnection && !this.connectionEstablishedEvent.IsSet && KafkaHelper.TryParseWakeup(msg, out var ready) && ready)
{
this.connectionEstablishedEvent.Set();
}
Expand Down Expand Up @@ -478,7 +478,7 @@ private void PartitionsAssignedHandler(IConsumer<byte[], byte[]> consumer, List<
this.lastRevokeCancelAction?.Invoke();

var assignedPartitions = topicPartitions.ToList(); // Just in case source doesn't like us modifying this list
if (!this.connectionEstablishedEvent.IsSet && assignedPartitions.Count > 0)
if (this.VerifyBrokerConnection && !this.connectionEstablishedEvent.IsSet && assignedPartitions.Count > 0)
{
this.connectionEstablishedEvent.Set();
}
Expand Down Expand Up @@ -733,6 +733,8 @@ private async Task PollingWork(CancellationToken workerCt)

logger.LogTrace("[{0}] Polled for msg", this.configId);
if (cr == null) continue;

if (this.VerifyBrokerConnection && !this.connectionEstablishedEvent.IsSet) connectionEstablishedEvent.Set();
currentPackageProcessTime.Restart();
await this.AddMessage(cr);
currentPackageProcessTime.Stop();
Expand Down
3 changes: 2 additions & 1 deletion src/QuixStreams.Kafka/KafkaHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public static bool TryParseWakeup(LogMessage logMessage, out bool readyToFetch)
// Example: [WAKEUP] [thrd:main]: 127.0.0.1:53631/0: Wake-up: ready to fetch
if (logMessage.Level != SyslogLevel.Debug) return false;
if (!string.Equals("WAKEUP", logMessage.Facility, StringComparison.InvariantCultureIgnoreCase)) return false;
if (!logMessage.Message.Contains("Wake-up: ready to fetch")) return false;
if (!logMessage.Message.Contains("Wake-up: ready to fetch")
&& !logMessage.Message.Contains("Wake-up: fetch start")) return false;
readyToFetch = true;
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/QuixStreams.Kafka/KafkaMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class KafkaMessage
public int HeaderSize { get; protected set; }

/// <summary>
/// The topic partition offset associated with the message. Can be null.
/// The topic partition offset associated with the message. Null when the message is not read from broker.
/// </summary>
public TopicPartitionOffset TopicPartitionOffset { get; protected set; }

Expand Down
131 changes: 99 additions & 32 deletions src/QuixStreams.Kafka/KafkaProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public class KafkaProducer : IKafkaProducer
/// </summary>
private const int DefaultMaxMessageSize = 1000000;

private readonly ProduceDelegate produce;
private ProducerDelegate internalProducer;
private Dictionary<string, int> topicPartitionCount = new Dictionary<string, int>();

private long lastFlush = -1;
private IProducer<byte[], byte[]> producer;
Expand All @@ -62,36 +63,61 @@ public KafkaProducer(ProducerConfiguration producerConfiguration, ProducerTopicC
this.config = this.GetKafkaProducerConfig(producerConfiguration);
this.producerConfiguration = producerConfiguration;
SetConfigId(topicConfiguration);
if (topicConfiguration.Partition == Partition.Any)
ConfigureProduceDelegate();

Open();
}

private void ConfigureProduceDelegate()
{
// When we have a delegate to decide the partition
if (topicConfiguration.Partitioner != null)
{
this.produce = (msg, handler, _) =>
this.internalProducer = (msg, handler, _) =>
{
this.producer.Produce(topicConfiguration.Topic,
new Message<byte[], byte[]>
{
Key = msg.Key,
Value = msg.Value,
Headers = msg.ConfluentHeaders,
Timestamp = (Timestamp)msg.Timestamp
}, handler);
var maxPartition = GetPartitionCount(topicConfiguration.Topic);
var partition = topicConfiguration.Partitioner(topicConfiguration.Topic, maxPartition, msg);
this.producer.Produce(new TopicPartition(topicConfiguration.Topic, partition),
new Message<byte[], byte[]>
{
Key = msg.Key,
Value = msg.Value,
Headers = msg.ConfluentHeaders,
Timestamp = (Timestamp)msg.Timestamp
}, handler);
};
return;
}
else

// When Any partition is good
if (topicConfiguration.Partition == Partition.Any)
{
var topicPartition = new TopicPartition(topicConfiguration.Topic, topicConfiguration.Partition);
this.produce = (msg, handler, _) => this.producer.Produce(topicPartition,
new Message<byte[], byte[]>
{
Key = msg.Key,
Value = msg.Value,
Headers = msg.ConfluentHeaders,
Timestamp = (Timestamp)msg.Timestamp
}, handler);
this.internalProducer = (msg, handler, _) =>
{
this.producer.Produce(topicConfiguration.Topic,
new Message<byte[], byte[]>
{
Key = msg.Key,
Value = msg.Value,
Headers = msg.ConfluentHeaders,
Timestamp = (Timestamp)msg.Timestamp
}, handler);
};
return;
}

Open();

// When must be a specific partition
var topicPartition = new TopicPartition(topicConfiguration.Topic, topicConfiguration.Partition);
this.internalProducer = (msg, handler, _) => this.producer.Produce(topicPartition,
new Message<byte[], byte[]>
{
Key = msg.Key,
Value = msg.Value,
Headers = msg.ConfluentHeaders,
Timestamp = (Timestamp)msg.Timestamp
}, handler);
}

private ProducerConfig GetKafkaProducerConfig(ProducerConfiguration producerConfiguration)
{
var config = producerConfiguration.ToProducerConfig();
Expand All @@ -112,6 +138,45 @@ private ProducerConfig GetKafkaProducerConfig(ProducerConfiguration producerConf
return config;
}

private int GetPartitionCount(string topic)
{
if (this.topicPartitionCount.TryGetValue(topic, out var partitionCount)) return partitionCount;
lock (this.topicPartitionCount)
{
if (this.topicPartitionCount.TryGetValue(topic, out partitionCount)) return partitionCount;

this.logger.LogTrace("[{0}] Creating admin client to retrieve partition count for topic {1}", this.configId, topic);

void NullLoggerForAdminLogs(IAdminClient adminClient, LogMessage logMessage)
{
// Log nothing
}

using var adminClient = new AdminClientBuilder(this.config).SetLogHandler(NullLoggerForAdminLogs).Build();

var metadata = adminClient.GetMetadata(topic, TimeSpan.FromSeconds(10));
if (metadata == null)
{
throw new OperationCanceledException($"[{this.configId}] Topic '{topic}' metadata timed out while retrieving maximum partition count"); // Maybe a more specific exception ?
}

this.logger.LogTrace("[{0}] Retrieved metadata for topic {1}", this.configId, topic);
var topicMetaData = metadata.Topics.FirstOrDefault(x => x.Topic == topic);
if (topicMetaData == null)
{
throw new Exception($"[{this.configId}] Failed to retrieve metadata for topic '{topic}' to determine maximum partition count."); // Maybe a more specific exception ?
}

if (topicMetaData.Partitions.Count == 0)
{
throw new OperationCanceledException($"[{this.configId}] Found no partition information for topic '{topic}'. The topic may not exist or lacking permission to use it"); // Maybe a more specific exception ?
}

this.topicPartitionCount[topic] = topicMetaData.Partitions.Count;
return topicMetaData.Partitions.Count;
}
}

private async Task UpdateMaxMessageSize(TimeSpan maxWait)
{
var max = DateTime.UtcNow.Add(maxWait);
Expand Down Expand Up @@ -214,7 +279,9 @@ private void SetConfigId(ProducerTopicConfiguration topicConfiguration)
logBuilder.AppendLine();
logBuilder.AppendLine("=================== Kafka Producer Configuration =====================");
logBuilder.AppendLine("= Configuration Id: " + this.configId);
logBuilder.AppendLine($"= Topic: {topicConfiguration.Topic}{topicConfiguration.Partition}");
logBuilder.AppendLine(topicConfiguration.Partitioner != null
? $"= Topic: {topicConfiguration.Topic} with partitioner"
: $"= Topic: {topicConfiguration.Topic}{topicConfiguration.Partition}");
foreach (var keyValuePair in this.config)
{
if (keyValuePair.Key?.IndexOf("password", StringComparison.InvariantCultureIgnoreCase) > -1 ||
Expand Down Expand Up @@ -257,10 +324,10 @@ private void Open()
lastReportedBrokerDownMessage = string.Empty;
checkBrokerStateBeforeSend = false;

this.producer = new ProducerBuilder<byte[], byte[]>(this.config)
var builder = new ProducerBuilder<byte[], byte[]>(this.config)
.SetErrorHandler(this.ErrorHandler)
.SetLogHandler(this.ProducerLogHandler)
.Build();
.SetLogHandler(this.ProducerLogHandler);
this.producer = builder.Build();
}
}

Expand Down Expand Up @@ -386,7 +453,7 @@ private void Close()
/// <inheritdoc/>
public Task Publish(KafkaMessage message, CancellationToken cancellationToken = default)
{
return this.SendInternal(message, this.produce, cancellationToken);
return this.SendInternal(message, this.internalProducer, cancellationToken);
}

/// <inheritdoc/>
Expand All @@ -398,15 +465,15 @@ public Task Publish(IEnumerable<KafkaMessage> messages, CancellationToken cancel
foreach (var kafkaMessage in messages)
{
if (cancellationToken.IsCancellationRequested) return Task.FromCanceled(cancellationToken);
lastTask = this.SendInternal(kafkaMessage, this.produce, cancellationToken);
lastTask = this.SendInternal(kafkaMessage, this.internalProducer, cancellationToken);
}
}

return lastTask;
}


private Task SendInternal(KafkaMessage message, ProduceDelegate handler, CancellationToken cancellationToken = default, object state = null)
private Task SendInternal(KafkaMessage message, ProducerDelegate handler, CancellationToken cancellationToken = default, object state = null)
{
if (cancellationToken.IsCancellationRequested) return Task.FromCanceled(cancellationToken);
if (this.producer == null)
Expand Down Expand Up @@ -527,6 +594,6 @@ public void Dispose()
this.Close();
}

private delegate void ProduceDelegate(KafkaMessage message, Action<DeliveryReport<byte[], byte[]>> deliveryHandler, object state);
private delegate void ProducerDelegate(KafkaMessage message, Action<DeliveryReport<byte[], byte[]>> deliveryHandler, object state);
}
}
46 changes: 44 additions & 2 deletions src/QuixStreams.Kafka/TopicConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public ProducerTopicConfiguration(string topic, Partition partition)
{
if (string.IsNullOrWhiteSpace(topic))
{
throw new ArgumentOutOfRangeException(nameof(topic), "Cannot be null or empty");
throw new ArgumentOutOfRangeException(nameof(topic), "Topic cannot be null or empty");
}

this.Topic = topic;
Expand All @@ -35,6 +35,16 @@ public ProducerTopicConfiguration(string topic) : this(topic, Partition.Any)
{
}

/// <summary>
/// Initializes a new instance of <see cref="ProducerTopicConfiguration"/>
/// </summary>
/// <param name="topic">The topic to write to</param>
/// <param name="partitioner">The partitioner to use when sending a message</param>
public ProducerTopicConfiguration(string topic, QuixPartitionerDelegate partitioner) : this(topic, Partition.Any)
{
Partitioner = partitioner ?? throw new ArgumentOutOfRangeException(nameof(partitioner), "Partitioner cannot be null or empty");
}

/// <summary>
/// The topic to write to
/// </summary>
Expand All @@ -44,6 +54,11 @@ public ProducerTopicConfiguration(string topic) : this(topic, Partition.Any)
/// The partition to write to
/// </summary>
public Partition Partition { get; }

/// <summary>
/// The partition to select the partition for the message
/// </summary>
public QuixPartitionerDelegate Partitioner { get; }
}

public sealed class ConsumerTopicConfiguration
Expand Down Expand Up @@ -175,7 +190,7 @@ public ConsumerTopicConfiguration(ICollection<TopicPartitionOffset> topicPartiti
/// <param name="topic">The topic to set the partitions for</param>
/// <param name="partitions">The partitions to set the offset for</param>
/// <param name="offset">The offset</param>
public ConsumerTopicConfiguration(string topic, ICollection<Partition> partitions, Offset offset) : this(topic, partitions.Select(p => new PartitionOffset(p.Value, offset)).ToList())
public ConsumerTopicConfiguration(string topic, ICollection<Partition> partitions, Offset offset) : this(topic, partitions?.Select(p => new PartitionOffset(p.Value, offset)).ToList() ?? new List<PartitionOffset> { new PartitionOffset(Partition.Any, Offset.Unset)})
{
}

Expand Down Expand Up @@ -225,4 +240,31 @@ public PartitionOffset(Partition partition, Offset offset)
/// </summary>
public Offset Offset { get; }
}



/// <summary>
/// Calculate a partition number given a <paramref name="partitionCount" />
/// and <paramref name="message" />. The <paramref name="topic" />
/// is also provided, but is typically not used.
/// </summary>
/// <remarks>
/// A partitioner instance may be called in any thread at any time and
/// may be called multiple times for the same message/key.
///
/// A partitioner:
/// - MUST NOT block or execute for prolonged periods of time.
/// - MUST return a value between 0 and partitionCount-1.
/// - MUST NOT throw any exception.
/// </remarks>
/// <param name="topic">The topic.</param>
/// <param name="partitionCount">
/// The number of partitions in <paramref name="topic" />.
/// </param>
/// <param name="message">The message to select partition for.</param>
/// <returns>
/// The calculated <seealso cref="T:Confluent.Kafka.Partition" />, possibly
/// <seealso cref="F:Confluent.Kafka.Partition.Any" />.
/// </returns>
public delegate Partition QuixPartitionerDelegate(string topic, int partitionCount, KafkaMessage message);
}
Loading

0 comments on commit 43c4dbe

Please sign in to comment.