From 6994af96f92a204c6179ce7f576dc0cc07ae58bd Mon Sep 17 00:00:00 2001 From: Devis Lucato Date: Sat, 5 Oct 2024 21:16:14 -0700 Subject: [PATCH] RabbitMQ pipeline improvements --- .../RabbitMQ.TestApplication/Program.cs | 7 ++- .../RabbitMQ.TestApplication/appsettings.json | 6 +++ .../RabbitMQ/RabbitMQ/RabbitMQConfig.cs | 41 +++++++++++++++++- .../RabbitMQ/RabbitMQ/RabbitMQPipeline.cs | 43 ++++++++++++++----- service/Service/appsettings.json | 7 +++ 5 files changed, 90 insertions(+), 14 deletions(-) diff --git a/extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs b/extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs index 2668402fa..10ece1b13 100644 --- a/extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs +++ b/extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs @@ -45,7 +45,12 @@ public static async Task Main() ListenToDeadLetterQueue(rabbitMQConfig); - await pipeline.EnqueueAsync($"test {DateTimeOffset.Now:T}"); + // Change ConcurrentThreads and PrefetchCount to 1 to see + // how they affect total execution time + for (int i = 1; i <= 3; i++) + { + await pipeline.EnqueueAsync($"test #{i} {DateTimeOffset.Now:T}"); + } while (true) { diff --git a/extensions/RabbitMQ/RabbitMQ.TestApplication/appsettings.json b/extensions/RabbitMQ/RabbitMQ.TestApplication/appsettings.json index 55ee25231..2afea6593 100644 --- a/extensions/RabbitMQ/RabbitMQ.TestApplication/appsettings.json +++ b/extensions/RabbitMQ/RabbitMQ.TestApplication/appsettings.json @@ -14,8 +14,14 @@ "VirtualHost": "/", "MessageTTLSecs": 3600, "SslEnabled": false, + // How many messages to process asynchronously at a time, in each queue + "ConcurrentThreads": 3, + // How many messages to fetch at a time from each queue + // The value should be higher than ConcurrentThreads + "PrefetchCount": 6, // How many times to dequeue a messages and process before moving it to a poison queue "MaxRetriesBeforePoisonQueue": 5, + "DelayBeforeRetryingMsecs": 750, // Suffix used for the poison queues. "PoisonQueueSuffix": "-poison" } diff --git a/extensions/RabbitMQ/RabbitMQ/RabbitMQConfig.cs b/extensions/RabbitMQ/RabbitMQ/RabbitMQConfig.cs index 37163dc07..cffb0cd38 100644 --- a/extensions/RabbitMQ/RabbitMQ/RabbitMQConfig.cs +++ b/extensions/RabbitMQ/RabbitMQ/RabbitMQConfig.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft. All rights reserved. using System.Text; +using Microsoft.Extensions.Logging; #pragma warning disable IDE0130 // reduce number of "using" statements // ReSharper disable once CheckNamespace - reduce number of "using" statements @@ -16,7 +17,7 @@ public class RabbitMQConfig /// /// TCP port for the connection, e.g. 5672 /// - public int Port { get; set; } = 0; + public int Port { get; set; } = 5672; /// /// Authentication username @@ -46,6 +47,22 @@ public class RabbitMQConfig /// public bool SslEnabled { get; set; } = false; + /// + /// How many messages to process asynchronously at a time, in each queue. + /// Note that this applies to each queue, and each queue is used + /// for a specific pipeline step. + /// + public ushort ConcurrentThreads { get; set; } = 2; + + /// + /// How many messages to fetch at a time from each queue. + /// The value should be higher than ConcurrentThreads to make sure each + /// thread has some work to do. + /// Note that this applies to each queue, and each queue is used + /// for a specific pipeline step. + /// + public ushort PrefetchCount { get; set; } = 3; + /// /// How many times to retry processing a message before moving it to a poison queue. /// Example: a value of 20 means that a message will be processed up to 21 times. @@ -54,6 +71,14 @@ public class RabbitMQConfig /// public int MaxRetriesBeforePoisonQueue { get; set; } = 20; + /// + /// How long to wait before putting a message back to the queue in case of failure. + /// Note: currently a basic strategy not based on RabbitMQ exchanges, potentially + /// affecting the pipeline concurrency performance: consumers hold + /// messages for N msecs, slowing down the delivery of other messages. + /// + public int DelayBeforeRetryingMsecs { get; set; } = 500; + /// /// Suffix used for the poison queues. /// @@ -62,7 +87,7 @@ public class RabbitMQConfig /// /// Verify that the current state is valid. /// - public void Validate() + public void Validate(ILogger? log = null) { const int MinTTLSecs = 5; @@ -81,6 +106,11 @@ public void Validate() throw new ConfigurationException($"RabbitMQ: {nameof(this.MessageTTLSecs)} value {this.MessageTTLSecs} is too low, cannot be less than {MinTTLSecs}"); } + if (this.ConcurrentThreads < 1) + { + throw new ConfigurationException($"RabbitMQ: {nameof(this.ConcurrentThreads)} value cannot be less than 1"); + } + if (string.IsNullOrWhiteSpace(this.PoisonQueueSuffix) || this.PoisonQueueSuffix != $"{this.PoisonQueueSuffix}".Trim()) { throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} cannot be empty or have leading or trailing spaces"); @@ -102,5 +132,12 @@ public void Validate() { throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} can be up to 60 characters length"); } + +#pragma warning disable CA2254 + if (this.PrefetchCount < this.ConcurrentThreads) + { + log?.LogWarning($"The value of {nameof(this.PrefetchCount)} ({this.PrefetchCount}) should not be lower than the value of {nameof(this.ConcurrentThreads)} ({this.ConcurrentThreads})"); + } +#pragma warning restore CA2254 } } diff --git a/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs b/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs index e51aaeba6..459c143f8 100644 --- a/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs +++ b/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs @@ -24,6 +24,8 @@ public sealed class RabbitMQPipeline : IQueue private readonly AsyncEventingBasicConsumer _consumer; private readonly RabbitMQConfig _config; private readonly int _messageTTLMsecs; + private readonly int _delayBeforeRetryingMsecs; + private readonly int _maxAttempts; private string _queueName = string.Empty; private string _poisonQueueName = string.Empty; @@ -32,11 +34,11 @@ public sealed class RabbitMQPipeline : IQueue /// public RabbitMQPipeline(RabbitMQConfig config, ILoggerFactory? loggerFactory = null) { - this._config = config; - this._config.Validate(); - this._log = (loggerFactory ?? DefaultLogger.Factory).CreateLogger(); + this._config = config; + this._config.Validate(this._log); + // see https://www.rabbitmq.com/dotnet-api-guide.html#consuming-async var factory = new ConnectionFactory { @@ -46,6 +48,7 @@ public RabbitMQPipeline(RabbitMQConfig config, ILoggerFactory? loggerFactory = n Password = config.Password, VirtualHost = !string.IsNullOrWhiteSpace(config.VirtualHost) ? config.VirtualHost : "/", DispatchConsumersAsync = true, + ConsumerDispatchConcurrency = config.ConcurrentThreads, Ssl = new SslOption { Enabled = config.SslEnabled, @@ -56,8 +59,11 @@ public RabbitMQPipeline(RabbitMQConfig config, ILoggerFactory? loggerFactory = n this._messageTTLMsecs = config.MessageTTLSecs * 1000; this._connection = factory.CreateConnection(); this._channel = this._connection.CreateModel(); - this._channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); + this._channel.BasicQos(prefetchSize: 0, prefetchCount: config.PrefetchCount, global: false); this._consumer = new AsyncEventingBasicConsumer(this._channel); + + this._delayBeforeRetryingMsecs = Math.Max(0, this._config.DelayBeforeRetryingMsecs); + this._maxAttempts = Math.Max(0, this._config.MaxRetriesBeforePoisonQueue) + 1; } /// @@ -171,7 +177,6 @@ public void OnDequeue(Func> processMessageAction) this._consumer.Received += async (object sender, BasicDeliverEventArgs args) => { // Just for logging, extract the attempt number from the message headers - var maxAttempts = this._config.MaxRetriesBeforePoisonQueue + 1; var attemptNumber = 1; if (args.BasicProperties?.Headers != null && args.BasicProperties.Headers.TryGetValue("x-delivery-count", out object? value)) { @@ -181,7 +186,7 @@ public void OnDequeue(Func> processMessageAction) try { this._log.LogDebug("Message '{0}' received, expires after {1}ms, attempt {2} of {3}", - args.BasicProperties?.MessageId, args.BasicProperties?.Expiration, attemptNumber, maxAttempts); + args.BasicProperties?.MessageId, args.BasicProperties?.Expiration, attemptNumber, this._maxAttempts); byte[] body = args.Body.ToArray(); string message = Encoding.UTF8.GetString(body); @@ -194,15 +199,19 @@ public void OnDequeue(Func> processMessageAction) } else { - if (attemptNumber < maxAttempts) + if (attemptNumber < this._maxAttempts) { this._log.LogWarning("Message '{0}' failed to process (attempt {1} of {2}), putting message back in the queue", - args.BasicProperties?.MessageId, attemptNumber, maxAttempts); + args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts); + if (this._delayBeforeRetryingMsecs > 0) + { + await Task.Delay(TimeSpan.FromMilliseconds(this._delayBeforeRetryingMsecs)).ConfigureAwait(false); + } } else { this._log.LogError("Message '{0}' failed to process (attempt {1} of {2}), moving message to dead letter queue", - args.BasicProperties?.MessageId, attemptNumber, maxAttempts); + args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts); } // Note: if "requeue == false" the message would be moved to the dead letter exchange @@ -217,8 +226,20 @@ public void OnDequeue(Func> processMessageAction) // - failed to delete message from queue // - failed to unlock message in the queue - this._log.LogWarning(e, "Message '{0}' processing failed with exception (attempt {1} of {2}), putting message back in the queue", - args.BasicProperties?.MessageId, attemptNumber, maxAttempts); + if (attemptNumber < this._maxAttempts) + { + this._log.LogWarning(e, "Message '{0}' processing failed with exception (attempt {1} of {2}), putting message back in the queue", + args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts); + if (this._delayBeforeRetryingMsecs > 0) + { + await Task.Delay(TimeSpan.FromMilliseconds(this._delayBeforeRetryingMsecs)).ConfigureAwait(false); + } + } + else + { + this._log.LogError(e, "Message '{0}' processing failed with exception (attempt {1} of {2}), putting message back in the queue", + args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts); + } // TODO: verify and document what happens if this fails. RabbitMQ should automatically unlock messages. // Note: if "requeue == false" the message would be moved to the dead letter exchange diff --git a/service/Service/appsettings.json b/service/Service/appsettings.json index 100b25119..9347adfb1 100644 --- a/service/Service/appsettings.json +++ b/service/Service/appsettings.json @@ -539,10 +539,17 @@ "VirtualHost": "/", "MessageTTLSecs": 3600, "SslEnabled": false, + // How many messages to process asynchronously at a time, in each queue + "ConcurrentThreads": 4, + // How many messages to fetch at a time from each queue + // The value should be higher than ConcurrentThreads + "PrefetchCount": 8, // How many times to dequeue a messages and process before moving it to a poison queue // Note: this value cannot be changed after queues have been created. In such case // you might need to drain all queues, delete them, and restart the ingestion service(s). "MaxRetriesBeforePoisonQueue": 20, + // How long to wait before putting a message back to the queue in case of failure + "DelayBeforeRetryingMsecs": 500, // Suffix used for the poison queues. "PoisonQueueSuffix": "-poison" },