From 846f7263521e6f55f01a0b62e0cc265e31fb65aa Mon Sep 17 00:00:00 2001 From: Marco Minerva Date: Mon, 6 May 2024 18:37:31 +0000 Subject: [PATCH] Add missing properties to RabbitMQ messages (#454) ## Motivation and Context (Why the change? What's the scenario?) See https://github.com/microsoft/kernel-memory/issues/408#issuecomment-2063342815. This PR sets the `MessageId` and `Expiration` properties for RabbitMQ messages. --------- Co-authored-by: Devis Lucato --- extensions/RabbitMQ/RabbitMQPipeline.cs | 15 ++++++++++---- extensions/RabbitMQ/RabbitMqConfig.cs | 26 +++++++++++++++++++++++++ service/Service/appsettings.json | 3 ++- 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/extensions/RabbitMQ/RabbitMQPipeline.cs b/extensions/RabbitMQ/RabbitMQPipeline.cs index d8db93472..8f7090e73 100644 --- a/extensions/RabbitMQ/RabbitMQPipeline.cs +++ b/extensions/RabbitMQ/RabbitMQPipeline.cs @@ -21,6 +21,7 @@ public sealed class RabbitMQPipeline : IQueue private readonly IModel _channel; private readonly AsyncEventingBasicConsumer _consumer; private string _queueName = string.Empty; + private readonly int _messageTTLMsecs; /// /// Create a new RabbitMQ queue instance @@ -40,6 +41,7 @@ public RabbitMQPipeline(RabbitMqConfig config, ILogger? log = DispatchConsumersAsync = true }; + this._messageTTLMsecs = config.MessageTTLSecs * 1000; this._connection = factory.CreateConnection(); this._channel = this._connection.CreateModel(); this._channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); @@ -87,15 +89,20 @@ public Task EnqueueAsync(string message, CancellationToken cancellationToken = d throw new InvalidOperationException("The client must be connected to a queue first"); } - this._log.LogDebug("Sending message..."); + var properties = this._channel.CreateBasicProperties(); + properties.Persistent = true; + properties.MessageId = Guid.NewGuid().ToString("N"); + properties.Expiration = $"{this._messageTTLMsecs}"; + + this._log.LogDebug("Sending message: {0} (TTL: {1} secs)...", properties.MessageId, this._messageTTLMsecs / 1000); this._channel.BasicPublish( routingKey: this._queueName, body: Encoding.UTF8.GetBytes(message), exchange: string.Empty, - basicProperties: null); + basicProperties: properties); - this._log.LogDebug("Message sent"); + this._log.LogDebug("Message sent: {0} (TTL: {1} secs)", properties.MessageId, this._messageTTLMsecs / 1000); return Task.CompletedTask; } @@ -107,7 +114,7 @@ public void OnDequeue(Func> processMessageAction) { try { - this._log.LogDebug("Message '{0}' received, expires at {1}", args.BasicProperties.MessageId, args.BasicProperties.Expiration); + this._log.LogDebug("Message '{0}' received, expires after {1}ms", args.BasicProperties.MessageId, args.BasicProperties.Expiration); byte[] body = args.Body.ToArray(); string message = Encoding.UTF8.GetString(body); diff --git a/extensions/RabbitMQ/RabbitMqConfig.cs b/extensions/RabbitMQ/RabbitMqConfig.cs index 020b974a0..ff7a2ac1a 100644 --- a/extensions/RabbitMQ/RabbitMqConfig.cs +++ b/extensions/RabbitMQ/RabbitMqConfig.cs @@ -6,9 +6,35 @@ namespace Microsoft.KernelMemory; public class RabbitMqConfig { + /// + /// RabbitMQ hostname, e.g. "127.0.0.1" + /// public string Host { get; set; } = ""; + + /// + /// TCP port for the connection, e.g. 5672 + /// public int Port { get; set; } = 0; + + /// + /// Authentication username + /// public string Username { get; set; } = ""; + + /// + /// Authentication password + /// public string Password { get; set; } = ""; + + /// + /// RabbitMQ virtual host name, e.g. "/" + /// See https://www.rabbitmq.com/docs/vhosts + /// public string VirtualHost { get; set; } = "/"; + + /// + /// How long to retry messages delivery, ie how long to retry, in seconds. + /// Default: 3600 second, 1 hour. + /// + public int MessageTTLSecs { get; set; } = 3600; } diff --git a/service/Service/appsettings.json b/service/Service/appsettings.json index f536cf1fc..e316f00fa 100644 --- a/service/Service/appsettings.json +++ b/service/Service/appsettings.json @@ -359,7 +359,8 @@ "Port": "5672", "Username": "user", "Password": "", - "VirtualHost": "/" + "VirtualHost": "/", + "MessageTTLSecs": 3600 }, "Redis": { // Redis connection string, e.g. "localhost:6379,password=..."