From 759cc43cae6e8af98ba44d0453833b462aa53e80 Mon Sep 17 00:00:00 2001 From: Marco Minerva Date: Sun, 6 Oct 2024 04:48:19 +0200 Subject: [PATCH] Add poison queues support for RabbitMQ (#648) ## Motivation and Context (Why the change? What's the scenario?) Avoid infinite retries when using RabbitMQ, and move poison messages after N retries. See #408. --------- Co-authored-by: Devis Lucato --- KernelMemory.sln | 8 +- .../RabbitMQ.TestApplication/Program.cs | 90 ++++++ .../RabbitMQ.TestApplication.csproj | 19 ++ .../RabbitMQ.TestApplication/appsettings.json | 24 ++ .../{ => RabbitMQ}/DependencyInjection.cs | 8 +- .../RabbitMQ/{ => RabbitMQ}/RabbitMQ.csproj | 4 +- .../RabbitMQ/RabbitMQ/RabbitMQConfig.cs | 106 +++++++ .../RabbitMQ/RabbitMQ/RabbitMQPipeline.cs | 266 ++++++++++++++++++ extensions/RabbitMQ/RabbitMQPipeline.cs | 164 ----------- extensions/RabbitMQ/RabbitMqConfig.cs | 46 --- .../Diagnostics/ArgumentExceptionEx.cs | 24 ++ service/Core/Core.csproj | 2 +- service/Service/ServiceConfiguration.cs | 4 +- service/Service/appsettings.json | 8 +- 14 files changed, 553 insertions(+), 220 deletions(-) create mode 100644 extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs create mode 100644 extensions/RabbitMQ/RabbitMQ.TestApplication/RabbitMQ.TestApplication.csproj create mode 100644 extensions/RabbitMQ/RabbitMQ.TestApplication/appsettings.json rename extensions/RabbitMQ/{ => RabbitMQ}/DependencyInjection.cs (88%) rename extensions/RabbitMQ/{ => RabbitMQ}/RabbitMQ.csproj (86%) create mode 100644 extensions/RabbitMQ/RabbitMQ/RabbitMQConfig.cs create mode 100644 extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs delete mode 100644 extensions/RabbitMQ/RabbitMQPipeline.cs delete mode 100644 extensions/RabbitMQ/RabbitMqConfig.cs create mode 100644 service/Abstractions/Diagnostics/ArgumentExceptionEx.cs diff --git a/KernelMemory.sln b/KernelMemory.sln index 7dc6a100e..d8677eac4 100644 --- a/KernelMemory.sln +++ b/KernelMemory.sln @@ -223,7 +223,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Qdrant.TestApplication", "e EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Qdrant.FunctionalTests", "extensions\Qdrant\Qdrant.FunctionalTests\Qdrant.FunctionalTests.csproj", "{62B96766-AA6C-4CFF-A6FB-6370C89C2509}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ", "extensions\RabbitMQ\RabbitMQ.csproj", "{E3877E49-958E-4DC8-B5E8-834010F5C4B7}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ", "extensions\RabbitMQ\RabbitMQ\RabbitMQ.csproj", "{E3877E49-958E-4DC8-B5E8-834010F5C4B7}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenAI", "extensions\OpenAI\OpenAI\OpenAI.csproj", "{A6AE31A1-4F60-47B0-8534-7B083D68118C}" EndProject @@ -327,6 +327,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Onnx.FunctionalTests", "ext EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Onnx", "extensions\ONNX\Onnx\Onnx.csproj", "{345DEF9B-6EE1-49DF-B46A-25E38CE9B151}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.TestApplication", "extensions\RabbitMQ\RabbitMQ.TestApplication\RabbitMQ.TestApplication.csproj", "{82670921-FDCD-4672-84BD-4353F5AC24A0}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -602,6 +604,9 @@ Global {345DEF9B-6EE1-49DF-B46A-25E38CE9B151}.Debug|Any CPU.Build.0 = Debug|Any CPU {345DEF9B-6EE1-49DF-B46A-25E38CE9B151}.Release|Any CPU.ActiveCfg = Release|Any CPU {345DEF9B-6EE1-49DF-B46A-25E38CE9B151}.Release|Any CPU.Build.0 = Release|Any CPU + {82670921-FDCD-4672-84BD-4353F5AC24A0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {82670921-FDCD-4672-84BD-4353F5AC24A0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {82670921-FDCD-4672-84BD-4353F5AC24A0}.Release|Any CPU.ActiveCfg = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -698,6 +703,7 @@ Global {F192513B-265B-4943-A2A9-44E23B15BA18} = {155DA079-E267-49AF-973A-D1D44681970F} {7BBD348E-CDD9-4462-B8C9-47613C5EC682} = {3C17F42B-CFC8-4900-8CFB-88936311E919} {345DEF9B-6EE1-49DF-B46A-25E38CE9B151} = {155DA079-E267-49AF-973A-D1D44681970F} + {82670921-FDCD-4672-84BD-4353F5AC24A0} = {3C17F42B-CFC8-4900-8CFB-88936311E919} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {CC136C62-115C-41D1-B414-F9473EFF6EA8} diff --git a/extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs b/extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs new file mode 100644 index 000000000..2668402fa --- /dev/null +++ b/extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs @@ -0,0 +1,90 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Text; +using Microsoft.KernelMemory; +using Microsoft.KernelMemory.Diagnostics; +using Microsoft.KernelMemory.Orchestration.RabbitMQ; +using Microsoft.KernelMemory.Pipeline.Queue; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace Microsoft.RabbitMQ.TestApplication; + +internal static class Program +{ + private const string QueueName = "test queue"; + + public static async Task Main() + { + var cfg = new ConfigurationBuilder() + .AddJsonFile("appsettings.json") + .AddJsonFile("appsettings.development.json", optional: true) + .AddJsonFile("appsettings.Development.json", optional: true) + .Build(); + + var rabbitMQConfig = cfg.GetSection("KernelMemory:Services:RabbitMQ").Get(); + ArgumentNullExceptionEx.ThrowIfNull(rabbitMQConfig, nameof(rabbitMQConfig), "RabbitMQ config not found"); + + DefaultLogger.Factory = LoggerFactory.Create(builder => + { + builder.AddConsole(); + builder.SetMinimumLevel(LogLevel.Warning); + }); + + var pipeline = new RabbitMQPipeline(rabbitMQConfig, DefaultLogger.Factory); + + var counter = 0; + pipeline.OnDequeue(async msg => + { + Console.WriteLine($"{++counter} Received message: {msg}"); + await Task.Delay(0); + return false; + }); + + await pipeline.ConnectToQueueAsync(QueueName, QueueOptions.PubSub); + + ListenToDeadLetterQueue(rabbitMQConfig); + + await pipeline.EnqueueAsync($"test {DateTimeOffset.Now:T}"); + + while (true) + { + await Task.Delay(TimeSpan.FromSeconds(2)); + } + } + + private static void ListenToDeadLetterQueue(RabbitMQConfig config) + { + var factory = new ConnectionFactory + { + HostName = config.Host, + Port = config.Port, + UserName = config.Username, + Password = config.Password, + VirtualHost = !string.IsNullOrWhiteSpace(config.VirtualHost) ? config.VirtualHost : "/", + DispatchConsumersAsync = true, + Ssl = new SslOption + { + Enabled = config.SslEnabled, + ServerName = config.Host, + } + }; + + var connection = factory.CreateConnection(); + var channel = connection.CreateModel(); + var consumer = new AsyncEventingBasicConsumer(channel); + + consumer.Received += async (object sender, BasicDeliverEventArgs args) => + { + byte[] body = args.Body.ToArray(); + string message = Encoding.UTF8.GetString(body); + + Console.WriteLine($"Poison message received: {message}"); + await Task.Delay(0); + }; + + channel.BasicConsume(queue: $"{QueueName}{config.PoisonQueueSuffix}", + autoAck: true, + consumer: consumer); + } +} diff --git a/extensions/RabbitMQ/RabbitMQ.TestApplication/RabbitMQ.TestApplication.csproj b/extensions/RabbitMQ/RabbitMQ.TestApplication/RabbitMQ.TestApplication.csproj new file mode 100644 index 000000000..f6e189737 --- /dev/null +++ b/extensions/RabbitMQ/RabbitMQ.TestApplication/RabbitMQ.TestApplication.csproj @@ -0,0 +1,19 @@ + + + + Microsoft.RabbitMQ.TestApplication + Microsoft.RabbitMQ.TestApplication + Exe + net8.0 + LatestMajor + enable + enable + false + $(NoWarn);KMEXP00;KMEXP01;KMEXP02;KMEXP03;KMEXP04; + + + + + + + diff --git a/extensions/RabbitMQ/RabbitMQ.TestApplication/appsettings.json b/extensions/RabbitMQ/RabbitMQ.TestApplication/appsettings.json new file mode 100644 index 000000000..55ee25231 --- /dev/null +++ b/extensions/RabbitMQ/RabbitMQ.TestApplication/appsettings.json @@ -0,0 +1,24 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Error" + } + }, + "KernelMemory": { + "Services": { + "RabbitMQ": { + "Host": "127.0.0.1", + "Port": "5672", + "Username": "user", + "Password": "password", + "VirtualHost": "/", + "MessageTTLSecs": 3600, + "SslEnabled": false, + // How many times to dequeue a messages and process before moving it to a poison queue + "MaxRetriesBeforePoisonQueue": 5, + // Suffix used for the poison queues. + "PoisonQueueSuffix": "-poison" + } + } + } +} \ No newline at end of file diff --git a/extensions/RabbitMQ/DependencyInjection.cs b/extensions/RabbitMQ/RabbitMQ/DependencyInjection.cs similarity index 88% rename from extensions/RabbitMQ/DependencyInjection.cs rename to extensions/RabbitMQ/RabbitMQ/DependencyInjection.cs index c4e445928..2508cad14 100644 --- a/extensions/RabbitMQ/DependencyInjection.cs +++ b/extensions/RabbitMQ/RabbitMQ/DependencyInjection.cs @@ -14,7 +14,7 @@ namespace Microsoft.KernelMemory; /// public static partial class KernelMemoryBuilderExtensions { - public static IKernelMemoryBuilder WithRabbitMQOrchestration(this IKernelMemoryBuilder builder, RabbitMqConfig config) + public static IKernelMemoryBuilder WithRabbitMQOrchestration(this IKernelMemoryBuilder builder, RabbitMQConfig config) { builder.Services.AddRabbitMQOrchestration(config); return builder; @@ -26,8 +26,10 @@ public static IKernelMemoryBuilder WithRabbitMQOrchestration(this IKernelMemoryB /// public static partial class DependencyInjection { - public static IServiceCollection AddRabbitMQOrchestration(this IServiceCollection services, RabbitMqConfig config) + public static IServiceCollection AddRabbitMQOrchestration(this IServiceCollection services, RabbitMQConfig config) { + config.Validate(); + IQueue QueueFactory(IServiceProvider serviceProvider) { return serviceProvider.GetService() @@ -37,7 +39,7 @@ IQueue QueueFactory(IServiceProvider serviceProvider) // The orchestrator uses multiple queue clients, each linked to a specific queue, // so it requires a factory rather than a single queue injected to the ctor. return services - .AddSingleton(config) + .AddSingleton(config) .AddTransient() .AddSingleton(serviceProvider => new QueueClientFactory(() => QueueFactory(serviceProvider))); } diff --git a/extensions/RabbitMQ/RabbitMQ.csproj b/extensions/RabbitMQ/RabbitMQ/RabbitMQ.csproj similarity index 86% rename from extensions/RabbitMQ/RabbitMQ.csproj rename to extensions/RabbitMQ/RabbitMQ/RabbitMQ.csproj index d74cdfcb8..7df934414 100644 --- a/extensions/RabbitMQ/RabbitMQ.csproj +++ b/extensions/RabbitMQ/RabbitMQ/RabbitMQ.csproj @@ -9,7 +9,7 @@ - + @@ -26,7 +26,7 @@ - + diff --git a/extensions/RabbitMQ/RabbitMQ/RabbitMQConfig.cs b/extensions/RabbitMQ/RabbitMQ/RabbitMQConfig.cs new file mode 100644 index 000000000..37163dc07 --- /dev/null +++ b/extensions/RabbitMQ/RabbitMQ/RabbitMQConfig.cs @@ -0,0 +1,106 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Text; + +#pragma warning disable IDE0130 // reduce number of "using" statements +// ReSharper disable once CheckNamespace - reduce number of "using" statements +namespace Microsoft.KernelMemory; + +public class RabbitMQConfig +{ + /// + /// RabbitMQ hostname, e.g. "127.0.0.1" + /// + public string Host { get; set; } = ""; + + /// + /// TCP port for the connection, e.g. 5672 + /// + public int Port { get; set; } = 0; + + /// + /// Authentication username + /// + public string Username { get; set; } = ""; + + /// + /// Authentication password + /// + public string Password { get; set; } = ""; + + /// + /// RabbitMQ virtual host name, e.g. "/" + /// See https://www.rabbitmq.com/docs/vhosts + /// + public string VirtualHost { get; set; } = "/"; + + /// + /// How long to retry messages delivery, ie how long to retry, in seconds. + /// Default: 3600 second, 1 hour. + /// + public int MessageTTLSecs { get; set; } = 3600; + + /// + /// Set to true if your RabbitMQ supports SSL. + /// Default: false + /// + public bool SslEnabled { get; set; } = false; + + /// + /// How many times to retry processing a message before moving it to a poison queue. + /// Example: a value of 20 means that a message will be processed up to 21 times. + /// Note: this value cannot be changed after queues have been created. In such case + /// you might need to drain all queues, delete them, and restart the ingestion service(s). + /// + public int MaxRetriesBeforePoisonQueue { get; set; } = 20; + + /// + /// Suffix used for the poison queues. + /// + public string PoisonQueueSuffix { get; set; } = "-poison"; + + /// + /// Verify that the current state is valid. + /// + public void Validate() + { + const int MinTTLSecs = 5; + + if (string.IsNullOrWhiteSpace(this.Host) || this.Host != $"{this.Host}".Trim()) + { + throw new ConfigurationException($"RabbitMQ: {nameof(this.Host)} cannot be empty or have leading or trailing spaces"); + } + + if (this.Port < 1) + { + throw new ConfigurationException($"RabbitMQ: {nameof(this.Port)} value {this.Port} is not valid"); + } + + if (this.MessageTTLSecs < MinTTLSecs) + { + throw new ConfigurationException($"RabbitMQ: {nameof(this.MessageTTLSecs)} value {this.MessageTTLSecs} is too low, cannot be less than {MinTTLSecs}"); + } + + if (string.IsNullOrWhiteSpace(this.PoisonQueueSuffix) || this.PoisonQueueSuffix != $"{this.PoisonQueueSuffix}".Trim()) + { + throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} cannot be empty or have leading or trailing spaces"); + } + + if (this.MaxRetriesBeforePoisonQueue < 0) + { + throw new ConfigurationException($"RabbitMQ: {nameof(this.MaxRetriesBeforePoisonQueue)} cannot be a negative number"); + } + + if (string.IsNullOrWhiteSpace(this.PoisonQueueSuffix)) + { + throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} is empty"); + } + + // Queue names can be up to 255 bytes of UTF-8 characters. + // Allow a max of 60 bytes for the suffix, so there is room for the queue name. + if (Encoding.UTF8.GetByteCount(this.PoisonQueueSuffix) > 60) + { + throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} can be up to 60 characters length"); + } + } +} diff --git a/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs b/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs new file mode 100644 index 000000000..e51aaeba6 --- /dev/null +++ b/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs @@ -0,0 +1,266 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.KernelMemory.Diagnostics; +using Microsoft.KernelMemory.Pipeline.Queue; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using RabbitMQ.Client.Exceptions; + +namespace Microsoft.KernelMemory.Orchestration.RabbitMQ; + +[Experimental("KMEXP04")] +public sealed class RabbitMQPipeline : IQueue +{ + private readonly ILogger _log; + private readonly IConnection _connection; + private readonly IModel _channel; + private readonly AsyncEventingBasicConsumer _consumer; + private readonly RabbitMQConfig _config; + private readonly int _messageTTLMsecs; + private string _queueName = string.Empty; + private string _poisonQueueName = string.Empty; + + /// + /// Create a new RabbitMQ queue instance + /// + public RabbitMQPipeline(RabbitMQConfig config, ILoggerFactory? loggerFactory = null) + { + this._config = config; + this._config.Validate(); + + this._log = (loggerFactory ?? DefaultLogger.Factory).CreateLogger(); + + // see https://www.rabbitmq.com/dotnet-api-guide.html#consuming-async + var factory = new ConnectionFactory + { + HostName = config.Host, + Port = config.Port, + UserName = config.Username, + Password = config.Password, + VirtualHost = !string.IsNullOrWhiteSpace(config.VirtualHost) ? config.VirtualHost : "/", + DispatchConsumersAsync = true, + Ssl = new SslOption + { + Enabled = config.SslEnabled, + ServerName = config.Host, + } + }; + + this._messageTTLMsecs = config.MessageTTLSecs * 1000; + this._connection = factory.CreateConnection(); + this._channel = this._connection.CreateModel(); + this._channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); + this._consumer = new AsyncEventingBasicConsumer(this._channel); + } + + /// + /// About dead letters, see https://www.rabbitmq.com/docs/dlx + public Task ConnectToQueueAsync(string queueName, QueueOptions options = default, CancellationToken cancellationToken = default) + { + ArgumentNullExceptionEx.ThrowIfNullOrWhiteSpace(queueName, nameof(queueName), "The queue name is empty"); + ArgumentExceptionEx.ThrowIf(queueName.StartsWith("amq.", StringComparison.OrdinalIgnoreCase), nameof(queueName), "The queue name cannot start with 'amq.'"); + + var poisonExchangeName = $"{queueName}.dlx"; + var poisonQueueName = $"{queueName}{this._config.PoisonQueueSuffix}"; + + ArgumentExceptionEx.ThrowIf((Encoding.UTF8.GetByteCount(queueName) > 255), nameof(queueName), + $"The queue name '{queueName}' is too long, max 255 UTF8 bytes allowed"); + ArgumentExceptionEx.ThrowIf((Encoding.UTF8.GetByteCount(poisonExchangeName) > 255), nameof(poisonExchangeName), + $"The exchange name '{poisonExchangeName}' is too long, max 255 UTF8 bytes allowed, try using a shorter queue name"); + ArgumentExceptionEx.ThrowIf((Encoding.UTF8.GetByteCount(poisonQueueName) > 255), nameof(poisonQueueName), + $"The dead letter queue name '{poisonQueueName}' is too long, max 255 UTF8 bytes allowed, try using a shorter queue name"); + + if (!string.IsNullOrEmpty(this._queueName)) + { + throw new InvalidOperationException($"The client is already connected to `{this._queueName}`"); + } + + // Define queue where messages are sent by the orchestrator + this._queueName = queueName; + try + { + this._channel.QueueDeclare( + queue: this._queueName, + durable: true, + exclusive: false, + autoDelete: false, + arguments: new Dictionary + { + ["x-queue-type"] = "quorum", + ["x-delivery-limit"] = this._config.MaxRetriesBeforePoisonQueue, + ["x-dead-letter-exchange"] = poisonExchangeName + }); + this._log.LogTrace("Queue name: {0}", this._queueName); + } +#pragma warning disable CA2254 + catch (OperationInterruptedException ex) + { + var err = ex.Message; + if (ex.Message.Contains("inequivalent arg 'x-delivery-limit'", StringComparison.OrdinalIgnoreCase)) + { + err = $"The queue '{this._queueName}' is already configured with a different value for 'x-delivery-limit' " + + $"({nameof(this._config.MaxRetriesBeforePoisonQueue)}), the value cannot be changed to {this._config.MaxRetriesBeforePoisonQueue}"; + } + else if (ex.Message.Contains("inequivalent arg 'x-dead-letter-exchange'", StringComparison.OrdinalIgnoreCase)) + { + err = $"The queue '{this._queueName}' is already linked to a different dead letter exchange, " + + $"it is not possible to change the 'x-dead-letter-exchange' value to {poisonExchangeName}"; + } + + this._log.LogError(ex, err); + throw new KernelMemoryException(err, ex); + } +#pragma warning restore CA2254 + + // Define poison queue where failed messages are stored + this._poisonQueueName = poisonQueueName; + this._channel.QueueDeclare( + queue: this._poisonQueueName, + durable: true, + exclusive: false, + autoDelete: false, + arguments: null); + + // Define exchange to route failed messages to poison queue + this._channel.ExchangeDeclare(poisonExchangeName, "fanout", durable: true, autoDelete: false); + this._channel.QueueBind(this._poisonQueueName, poisonExchangeName, routingKey: string.Empty, arguments: null); + this._log.LogTrace("Poison queue name '{0}' bound to exchange '{1}' for queue '{2}'", this._poisonQueueName, poisonExchangeName, this._queueName); + + // Activate consumer + if (options.DequeueEnabled) + { + this._channel.BasicConsume(queue: this._queueName, autoAck: false, consumer: this._consumer); + this._log.LogTrace("Enabling dequeue on queue `{0}`", this._queueName); + } + + return Task.FromResult(this); + } + + /// + public Task EnqueueAsync(string message, CancellationToken cancellationToken = default) + { + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + if (string.IsNullOrEmpty(this._queueName)) + { + throw new InvalidOperationException("The client must be connected to a queue first"); + } + + this.PublishMessage( + queueName: this._queueName, + body: Encoding.UTF8.GetBytes(message), + messageId: Guid.NewGuid().ToString("N"), + expirationMsecs: this._messageTTLMsecs); + + return Task.CompletedTask; + } + + /// + public void OnDequeue(Func> processMessageAction) + { + this._consumer.Received += async (object sender, BasicDeliverEventArgs args) => + { + // Just for logging, extract the attempt number from the message headers + var maxAttempts = this._config.MaxRetriesBeforePoisonQueue + 1; + var attemptNumber = 1; + if (args.BasicProperties?.Headers != null && args.BasicProperties.Headers.TryGetValue("x-delivery-count", out object? value)) + { + attemptNumber = int.TryParse(value.ToString(), out var parsedResult) ? ++parsedResult : -1; + } + + try + { + this._log.LogDebug("Message '{0}' received, expires after {1}ms, attempt {2} of {3}", + args.BasicProperties?.MessageId, args.BasicProperties?.Expiration, attemptNumber, maxAttempts); + + byte[] body = args.Body.ToArray(); + string message = Encoding.UTF8.GetString(body); + + bool success = await processMessageAction.Invoke(message).ConfigureAwait(false); + if (success) + { + this._log.LogTrace("Message '{0}' successfully processed, deleting message", args.BasicProperties?.MessageId); + this._channel.BasicAck(args.DeliveryTag, multiple: false); + } + else + { + if (attemptNumber < maxAttempts) + { + this._log.LogWarning("Message '{0}' failed to process (attempt {1} of {2}), putting message back in the queue", + args.BasicProperties?.MessageId, attemptNumber, maxAttempts); + } + else + { + this._log.LogError("Message '{0}' failed to process (attempt {1} of {2}), moving message to dead letter queue", + args.BasicProperties?.MessageId, attemptNumber, maxAttempts); + } + + // Note: if "requeue == false" the message would be moved to the dead letter exchange + this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true); + } + } +#pragma warning disable CA1031 // Must catch all to handle queue properly + catch (Exception e) + { + // Exceptions caught by this block: + // - message processing failed with exception + // - failed to delete message from queue + // - failed to unlock message in the queue + + this._log.LogWarning(e, "Message '{0}' processing failed with exception (attempt {1} of {2}), putting message back in the queue", + args.BasicProperties?.MessageId, attemptNumber, maxAttempts); + + // TODO: verify and document what happens if this fails. RabbitMQ should automatically unlock messages. + // Note: if "requeue == false" the message would be moved to the dead letter exchange + this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true); + } +#pragma warning restore CA1031 + }; + } + + public void Dispose() + { + this._channel.Close(); + this._connection.Close(); + + this._channel.Dispose(); + this._connection.Dispose(); + } + + private void PublishMessage( + string queueName, + ReadOnlyMemory body, + string messageId, + int? expirationMsecs) + { + var properties = this._channel.CreateBasicProperties(); + properties.Persistent = true; + properties.MessageId = messageId; + + if (expirationMsecs.HasValue) + { + properties.Expiration = $"{expirationMsecs}"; + } + + this._log.LogDebug("Sending message to {0}: {1} (TTL: {2} secs)...", + queueName, properties.MessageId, expirationMsecs.HasValue ? expirationMsecs / 1000 : "infinite"); + + this._channel.BasicPublish( + routingKey: queueName, + body: body, + exchange: string.Empty, + basicProperties: properties); + + this._log.LogDebug("Message sent: {0} (TTL: {1} secs)", properties.MessageId, expirationMsecs.HasValue ? expirationMsecs / 1000 : "infinite"); + } +} diff --git a/extensions/RabbitMQ/RabbitMQPipeline.cs b/extensions/RabbitMQ/RabbitMQPipeline.cs deleted file mode 100644 index b3feda513..000000000 --- a/extensions/RabbitMQ/RabbitMQPipeline.cs +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. - -using System; -using System.Diagnostics.CodeAnalysis; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging; -using Microsoft.KernelMemory.Diagnostics; -using Microsoft.KernelMemory.Pipeline.Queue; -using RabbitMQ.Client; -using RabbitMQ.Client.Events; - -namespace Microsoft.KernelMemory.Orchestration.RabbitMQ; - -[Experimental("KMEXP04")] -public sealed class RabbitMQPipeline : IQueue -{ - private readonly ILogger _log; - private readonly IConnection _connection; - private readonly IModel _channel; - private readonly AsyncEventingBasicConsumer _consumer; - private string _queueName = string.Empty; - private readonly int _messageTTLMsecs; - - /// - /// Create a new RabbitMQ queue instance - /// - public RabbitMQPipeline(RabbitMqConfig config, ILoggerFactory? loggerFactory = null) - { - this._log = (loggerFactory ?? DefaultLogger.Factory).CreateLogger(); - - // see https://www.rabbitmq.com/dotnet-api-guide.html#consuming-async - var factory = new ConnectionFactory - { - HostName = config.Host, - Port = config.Port, - UserName = config.Username, - Password = config.Password, - VirtualHost = !string.IsNullOrWhiteSpace(config.VirtualHost) ? config.VirtualHost : "/", - DispatchConsumersAsync = true, - Ssl = new SslOption - { - Enabled = config.SslEnabled, - ServerName = config.Host, - } - }; - - this._messageTTLMsecs = config.MessageTTLSecs * 1000; - this._connection = factory.CreateConnection(); - this._channel = this._connection.CreateModel(); - this._channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); - this._consumer = new AsyncEventingBasicConsumer(this._channel); - } - - /// - public Task ConnectToQueueAsync(string queueName, QueueOptions options = default, CancellationToken cancellationToken = default) - { - ArgumentNullExceptionEx.ThrowIfNullOrWhiteSpace(queueName, nameof(queueName), "The queue name is empty"); - - if (!string.IsNullOrEmpty(this._queueName)) - { - throw new InvalidOperationException($"The queue is already connected to `{this._queueName}`"); - } - - this._queueName = queueName; - this._channel.QueueDeclare( - queue: queueName, - durable: true, - exclusive: false, - autoDelete: false, - arguments: null); - - if (options.DequeueEnabled) - { - this._channel.BasicConsume(queue: this._queueName, - autoAck: false, - consumer: this._consumer); - } - - return Task.FromResult(this); - } - - /// - public Task EnqueueAsync(string message, CancellationToken cancellationToken = default) - { - if (cancellationToken.IsCancellationRequested) - { - return Task.FromCanceled(cancellationToken); - } - - if (string.IsNullOrEmpty(this._queueName)) - { - throw new InvalidOperationException("The client must be connected to a queue first"); - } - - var properties = this._channel.CreateBasicProperties(); - properties.Persistent = true; - properties.MessageId = Guid.NewGuid().ToString("N"); - properties.Expiration = $"{this._messageTTLMsecs}"; - - this._log.LogDebug("Sending message: {0} (TTL: {1} secs)...", properties.MessageId, this._messageTTLMsecs / 1000); - - this._channel.BasicPublish( - routingKey: this._queueName, - body: Encoding.UTF8.GetBytes(message), - exchange: string.Empty, - basicProperties: properties); - - this._log.LogDebug("Message sent: {0} (TTL: {1} secs)", properties.MessageId, this._messageTTLMsecs / 1000); - - return Task.CompletedTask; - } - - /// - public void OnDequeue(Func> processMessageAction) - { - this._consumer.Received += async (object sender, BasicDeliverEventArgs args) => - { - try - { - this._log.LogDebug("Message '{0}' received, expires after {1}ms", args.BasicProperties.MessageId, args.BasicProperties.Expiration); - - byte[] body = args.Body.ToArray(); - string message = Encoding.UTF8.GetString(body); - - bool success = await processMessageAction.Invoke(message).ConfigureAwait(false); - if (success) - { - this._log.LogTrace("Message '{0}' successfully processed, deleting message", args.BasicProperties.MessageId); - this._channel.BasicAck(args.DeliveryTag, multiple: false); - } - else - { - this._log.LogWarning("Message '{0}' failed to process, putting message back in the queue", args.BasicProperties.MessageId); - this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true); - } - } -#pragma warning disable CA1031 // Must catch all to handle queue properly - catch (Exception e) - { - // Exceptions caught by this block: - // - message processing failed with exception - // - failed to delete message from queue - // - failed to unlock message in the queue - - this._log.LogWarning(e, "Message '{0}' processing failed with exception, putting message back in the queue", args.BasicProperties.MessageId); - - // TODO: verify and document what happens if this fails. RabbitMQ should automatically unlock messages. - this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true); - } -#pragma warning restore CA1031 - }; - } - - public void Dispose() - { - this._channel.Close(); - this._connection.Close(); - - this._channel.Dispose(); - this._connection.Dispose(); - } -} diff --git a/extensions/RabbitMQ/RabbitMqConfig.cs b/extensions/RabbitMQ/RabbitMqConfig.cs deleted file mode 100644 index f48b66e37..000000000 --- a/extensions/RabbitMQ/RabbitMqConfig.cs +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. - -#pragma warning disable IDE0130 // reduce number of "using" statements -// ReSharper disable once CheckNamespace - reduce number of "using" statements -namespace Microsoft.KernelMemory; - -public class RabbitMqConfig -{ - /// - /// RabbitMQ hostname, e.g. "127.0.0.1" - /// - public string Host { get; set; } = ""; - - /// - /// TCP port for the connection, e.g. 5672 - /// - public int Port { get; set; } = 0; - - /// - /// Authentication username - /// - public string Username { get; set; } = ""; - - /// - /// Authentication password - /// - public string Password { get; set; } = ""; - - /// - /// RabbitMQ virtual host name, e.g. "/" - /// See https://www.rabbitmq.com/docs/vhosts - /// - public string VirtualHost { get; set; } = "/"; - - /// - /// How long to retry messages delivery, ie how long to retry, in seconds. - /// Default: 3600 second, 1 hour. - /// - public int MessageTTLSecs { get; set; } = 3600; - - /// - /// Set to true if your RabbitMQ supports SSL. - /// Default: false - /// - public bool SslEnabled { get; set; } = false; -} diff --git a/service/Abstractions/Diagnostics/ArgumentExceptionEx.cs b/service/Abstractions/Diagnostics/ArgumentExceptionEx.cs new file mode 100644 index 000000000..98b6dc273 --- /dev/null +++ b/service/Abstractions/Diagnostics/ArgumentExceptionEx.cs @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; + +#pragma warning disable IDE0130 // reduce number of "using" statements +// ReSharper disable once CheckNamespace - reduce number of "using" statements +namespace Microsoft.KernelMemory; + +public static class ArgumentExceptionEx +{ + public static void ThrowIf(bool condition, string? paramName, string message) + { + if (!condition) { return; } + + throw new ArgumentException(paramName, message); + } + + public static void ThrowIfNot(bool condition, string? paramName, string message) + { + if (condition) { return; } + + throw new ArgumentException(paramName, message); + } +} diff --git a/service/Core/Core.csproj b/service/Core/Core.csproj index ad64da9a3..1e1c80bd6 100644 --- a/service/Core/Core.csproj +++ b/service/Core/Core.csproj @@ -23,7 +23,7 @@ - + diff --git a/service/Service/ServiceConfiguration.cs b/service/Service/ServiceConfiguration.cs index e3b6c27a1..4f672a67f 100644 --- a/service/Service/ServiceConfiguration.cs +++ b/service/Service/ServiceConfiguration.cs @@ -138,8 +138,8 @@ private void ConfigureQueueDependency(IKernelMemoryBuilder builder) case string y when y.Equals("RabbitMQ", StringComparison.OrdinalIgnoreCase): // Check 2 keys for backward compatibility - builder.Services.AddRabbitMQOrchestration(this.GetServiceConfig("RabbitMQ") - ?? this.GetServiceConfig("RabbitMq")); + builder.Services.AddRabbitMQOrchestration(this.GetServiceConfig("RabbitMQ") + ?? this.GetServiceConfig("RabbitMq")); break; case string y when y.Equals("SimpleQueues", StringComparison.OrdinalIgnoreCase): diff --git a/service/Service/appsettings.json b/service/Service/appsettings.json index c81a56862..100b25119 100644 --- a/service/Service/appsettings.json +++ b/service/Service/appsettings.json @@ -538,7 +538,13 @@ "Password": "", "VirtualHost": "/", "MessageTTLSecs": 3600, - "SslEnabled": false + "SslEnabled": false, + // How many times to dequeue a messages and process before moving it to a poison queue + // Note: this value cannot be changed after queues have been created. In such case + // you might need to drain all queues, delete them, and restart the ingestion service(s). + "MaxRetriesBeforePoisonQueue": 20, + // Suffix used for the poison queues. + "PoisonQueueSuffix": "-poison" }, "Redis": { // Redis connection string, e.g. "localhost:6379,password=..."