Skip to content

Commit

Permalink
RabbitMQ pipeline improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
dluc committed Oct 6, 2024
1 parent 8f9a165 commit 6994af9
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 14 deletions.
7 changes: 6 additions & 1 deletion extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ public static async Task Main()

ListenToDeadLetterQueue(rabbitMQConfig);

await pipeline.EnqueueAsync($"test {DateTimeOffset.Now:T}");
// Change ConcurrentThreads and PrefetchCount to 1 to see
// how they affect total execution time
for (int i = 1; i <= 3; i++)
{
await pipeline.EnqueueAsync($"test #{i} {DateTimeOffset.Now:T}");
}

while (true)
{
Expand Down
6 changes: 6 additions & 0 deletions extensions/RabbitMQ/RabbitMQ.TestApplication/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@
"VirtualHost": "/",
"MessageTTLSecs": 3600,
"SslEnabled": false,
// How many messages to process asynchronously at a time, in each queue
"ConcurrentThreads": 3,
// How many messages to fetch at a time from each queue
// The value should be higher than ConcurrentThreads
"PrefetchCount": 6,
// How many times to dequeue a messages and process before moving it to a poison queue
"MaxRetriesBeforePoisonQueue": 5,
"DelayBeforeRetryingMsecs": 750,
// Suffix used for the poison queues.
"PoisonQueueSuffix": "-poison"
}
Expand Down
41 changes: 39 additions & 2 deletions extensions/RabbitMQ/RabbitMQ/RabbitMQConfig.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Text;
using Microsoft.Extensions.Logging;

#pragma warning disable IDE0130 // reduce number of "using" statements
// ReSharper disable once CheckNamespace - reduce number of "using" statements
Expand All @@ -16,7 +17,7 @@ public class RabbitMQConfig
/// <summary>
/// TCP port for the connection, e.g. 5672
/// </summary>
public int Port { get; set; } = 0;
public int Port { get; set; } = 5672;

/// <summary>
/// Authentication username
Expand Down Expand Up @@ -46,6 +47,22 @@ public class RabbitMQConfig
/// </summary>
public bool SslEnabled { get; set; } = false;

/// <summary>
/// How many messages to process asynchronously at a time, in each queue.
/// Note that this applies to each queue, and each queue is used
/// for a specific pipeline step.
/// </summary>
public ushort ConcurrentThreads { get; set; } = 2;

/// <summary>
/// How many messages to fetch at a time from each queue.
/// The value should be higher than ConcurrentThreads to make sure each
/// thread has some work to do.
/// Note that this applies to each queue, and each queue is used
/// for a specific pipeline step.
/// </summary>
public ushort PrefetchCount { get; set; } = 3;

/// <summary>
/// How many times to retry processing a message before moving it to a poison queue.
/// Example: a value of 20 means that a message will be processed up to 21 times.
Expand All @@ -54,6 +71,14 @@ public class RabbitMQConfig
/// </summary>
public int MaxRetriesBeforePoisonQueue { get; set; } = 20;

/// <summary>
/// How long to wait before putting a message back to the queue in case of failure.
/// Note: currently a basic strategy not based on RabbitMQ exchanges, potentially
/// affecting the pipeline concurrency performance: consumers hold
/// messages for N msecs, slowing down the delivery of other messages.
/// </summary>
public int DelayBeforeRetryingMsecs { get; set; } = 500;

/// <summary>
/// Suffix used for the poison queues.
/// </summary>
Expand All @@ -62,7 +87,7 @@ public class RabbitMQConfig
/// <summary>
/// Verify that the current state is valid.
/// </summary>
public void Validate()
public void Validate(ILogger? log = null)
{
const int MinTTLSecs = 5;

Expand All @@ -81,6 +106,11 @@ public void Validate()
throw new ConfigurationException($"RabbitMQ: {nameof(this.MessageTTLSecs)} value {this.MessageTTLSecs} is too low, cannot be less than {MinTTLSecs}");
}

if (this.ConcurrentThreads < 1)
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.ConcurrentThreads)} value cannot be less than 1");
}

if (string.IsNullOrWhiteSpace(this.PoisonQueueSuffix) || this.PoisonQueueSuffix != $"{this.PoisonQueueSuffix}".Trim())
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} cannot be empty or have leading or trailing spaces");
Expand All @@ -102,5 +132,12 @@ public void Validate()
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} can be up to 60 characters length");
}

#pragma warning disable CA2254
if (this.PrefetchCount < this.ConcurrentThreads)
{
log?.LogWarning($"The value of {nameof(this.PrefetchCount)} ({this.PrefetchCount}) should not be lower than the value of {nameof(this.ConcurrentThreads)} ({this.ConcurrentThreads})");
}
#pragma warning restore CA2254
}
}
43 changes: 32 additions & 11 deletions extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public sealed class RabbitMQPipeline : IQueue
private readonly AsyncEventingBasicConsumer _consumer;
private readonly RabbitMQConfig _config;
private readonly int _messageTTLMsecs;
private readonly int _delayBeforeRetryingMsecs;
private readonly int _maxAttempts;
private string _queueName = string.Empty;
private string _poisonQueueName = string.Empty;

Expand All @@ -32,11 +34,11 @@ public sealed class RabbitMQPipeline : IQueue
/// </summary>
public RabbitMQPipeline(RabbitMQConfig config, ILoggerFactory? loggerFactory = null)
{
this._config = config;
this._config.Validate();

this._log = (loggerFactory ?? DefaultLogger.Factory).CreateLogger<RabbitMQPipeline>();

this._config = config;
this._config.Validate(this._log);

// see https://www.rabbitmq.com/dotnet-api-guide.html#consuming-async
var factory = new ConnectionFactory
{
Expand All @@ -46,6 +48,7 @@ public RabbitMQPipeline(RabbitMQConfig config, ILoggerFactory? loggerFactory = n
Password = config.Password,
VirtualHost = !string.IsNullOrWhiteSpace(config.VirtualHost) ? config.VirtualHost : "/",
DispatchConsumersAsync = true,
ConsumerDispatchConcurrency = config.ConcurrentThreads,
Ssl = new SslOption
{
Enabled = config.SslEnabled,
Expand All @@ -56,8 +59,11 @@ public RabbitMQPipeline(RabbitMQConfig config, ILoggerFactory? loggerFactory = n
this._messageTTLMsecs = config.MessageTTLSecs * 1000;
this._connection = factory.CreateConnection();
this._channel = this._connection.CreateModel();
this._channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
this._channel.BasicQos(prefetchSize: 0, prefetchCount: config.PrefetchCount, global: false);
this._consumer = new AsyncEventingBasicConsumer(this._channel);

this._delayBeforeRetryingMsecs = Math.Max(0, this._config.DelayBeforeRetryingMsecs);
this._maxAttempts = Math.Max(0, this._config.MaxRetriesBeforePoisonQueue) + 1;
}

/// <inheritdoc />
Expand Down Expand Up @@ -171,7 +177,6 @@ public void OnDequeue(Func<string, Task<bool>> processMessageAction)
this._consumer.Received += async (object sender, BasicDeliverEventArgs args) =>
{
// Just for logging, extract the attempt number from the message headers
var maxAttempts = this._config.MaxRetriesBeforePoisonQueue + 1;
var attemptNumber = 1;
if (args.BasicProperties?.Headers != null && args.BasicProperties.Headers.TryGetValue("x-delivery-count", out object? value))
{
Expand All @@ -181,7 +186,7 @@ public void OnDequeue(Func<string, Task<bool>> processMessageAction)
try
{
this._log.LogDebug("Message '{0}' received, expires after {1}ms, attempt {2} of {3}",
args.BasicProperties?.MessageId, args.BasicProperties?.Expiration, attemptNumber, maxAttempts);
args.BasicProperties?.MessageId, args.BasicProperties?.Expiration, attemptNumber, this._maxAttempts);
byte[] body = args.Body.ToArray();
string message = Encoding.UTF8.GetString(body);
Expand All @@ -194,15 +199,19 @@ public void OnDequeue(Func<string, Task<bool>> processMessageAction)
}
else
{
if (attemptNumber < maxAttempts)
if (attemptNumber < this._maxAttempts)
{
this._log.LogWarning("Message '{0}' failed to process (attempt {1} of {2}), putting message back in the queue",
args.BasicProperties?.MessageId, attemptNumber, maxAttempts);
args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts);
if (this._delayBeforeRetryingMsecs > 0)
{
await Task.Delay(TimeSpan.FromMilliseconds(this._delayBeforeRetryingMsecs)).ConfigureAwait(false);
}
}
else
{
this._log.LogError("Message '{0}' failed to process (attempt {1} of {2}), moving message to dead letter queue",
args.BasicProperties?.MessageId, attemptNumber, maxAttempts);
args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts);
}
// Note: if "requeue == false" the message would be moved to the dead letter exchange
Expand All @@ -217,8 +226,20 @@ public void OnDequeue(Func<string, Task<bool>> processMessageAction)
// - failed to delete message from queue
// - failed to unlock message in the queue
this._log.LogWarning(e, "Message '{0}' processing failed with exception (attempt {1} of {2}), putting message back in the queue",
args.BasicProperties?.MessageId, attemptNumber, maxAttempts);
if (attemptNumber < this._maxAttempts)
{
this._log.LogWarning(e, "Message '{0}' processing failed with exception (attempt {1} of {2}), putting message back in the queue",
args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts);
if (this._delayBeforeRetryingMsecs > 0)
{
await Task.Delay(TimeSpan.FromMilliseconds(this._delayBeforeRetryingMsecs)).ConfigureAwait(false);
}
}
else
{
this._log.LogError(e, "Message '{0}' processing failed with exception (attempt {1} of {2}), putting message back in the queue",
args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts);
}
// TODO: verify and document what happens if this fails. RabbitMQ should automatically unlock messages.
// Note: if "requeue == false" the message would be moved to the dead letter exchange
Expand Down
7 changes: 7 additions & 0 deletions service/Service/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -539,10 +539,17 @@
"VirtualHost": "/",
"MessageTTLSecs": 3600,
"SslEnabled": false,
// How many messages to process asynchronously at a time, in each queue
"ConcurrentThreads": 4,
// How many messages to fetch at a time from each queue
// The value should be higher than ConcurrentThreads
"PrefetchCount": 8,
// How many times to dequeue a messages and process before moving it to a poison queue
// Note: this value cannot be changed after queues have been created. In such case
// you might need to drain all queues, delete them, and restart the ingestion service(s).
"MaxRetriesBeforePoisonQueue": 20,
// How long to wait before putting a message back to the queue in case of failure
"DelayBeforeRetryingMsecs": 500,
// Suffix used for the poison queues.
"PoisonQueueSuffix": "-poison"
},
Expand Down

0 comments on commit 6994af9

Please sign in to comment.