Skip to content

Commit

Permalink
Add missing properties to RabbitMQ messages (#454)
Browse files Browse the repository at this point in the history
## Motivation and Context (Why the change? What's the scenario?)
See
#408 (comment).
This PR sets the `MessageId` and `Expiration` properties for RabbitMQ
messages.

---------

Co-authored-by: Devis Lucato <[email protected]>
  • Loading branch information
marcominerva and dluc authored May 6, 2024
1 parent b6b8ecf commit 846f726
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 5 deletions.
15 changes: 11 additions & 4 deletions extensions/RabbitMQ/RabbitMQPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public sealed class RabbitMQPipeline : IQueue
private readonly IModel _channel;
private readonly AsyncEventingBasicConsumer _consumer;
private string _queueName = string.Empty;
private readonly int _messageTTLMsecs;

/// <summary>
/// Create a new RabbitMQ queue instance
Expand All @@ -40,6 +41,7 @@ public RabbitMQPipeline(RabbitMqConfig config, ILogger<RabbitMQPipeline>? log =
DispatchConsumersAsync = true
};

this._messageTTLMsecs = config.MessageTTLSecs * 1000;
this._connection = factory.CreateConnection();
this._channel = this._connection.CreateModel();
this._channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Expand Down Expand Up @@ -87,15 +89,20 @@ public Task EnqueueAsync(string message, CancellationToken cancellationToken = d
throw new InvalidOperationException("The client must be connected to a queue first");
}

this._log.LogDebug("Sending message...");
var properties = this._channel.CreateBasicProperties();
properties.Persistent = true;
properties.MessageId = Guid.NewGuid().ToString("N");
properties.Expiration = $"{this._messageTTLMsecs}";

this._log.LogDebug("Sending message: {0} (TTL: {1} secs)...", properties.MessageId, this._messageTTLMsecs / 1000);

this._channel.BasicPublish(
routingKey: this._queueName,
body: Encoding.UTF8.GetBytes(message),
exchange: string.Empty,
basicProperties: null);
basicProperties: properties);

this._log.LogDebug("Message sent");
this._log.LogDebug("Message sent: {0} (TTL: {1} secs)", properties.MessageId, this._messageTTLMsecs / 1000);

return Task.CompletedTask;
}
Expand All @@ -107,7 +114,7 @@ public void OnDequeue(Func<string, Task<bool>> processMessageAction)
{
try
{
this._log.LogDebug("Message '{0}' received, expires at {1}", args.BasicProperties.MessageId, args.BasicProperties.Expiration);
this._log.LogDebug("Message '{0}' received, expires after {1}ms", args.BasicProperties.MessageId, args.BasicProperties.Expiration);
byte[] body = args.Body.ToArray();
string message = Encoding.UTF8.GetString(body);
Expand Down
26 changes: 26 additions & 0 deletions extensions/RabbitMQ/RabbitMqConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,35 @@ namespace Microsoft.KernelMemory;

public class RabbitMqConfig
{
/// <summary>
/// RabbitMQ hostname, e.g. "127.0.0.1"
/// </summary>
public string Host { get; set; } = "";

/// <summary>
/// TCP port for the connection, e.g. 5672
/// </summary>
public int Port { get; set; } = 0;

/// <summary>
/// Authentication username
/// </summary>
public string Username { get; set; } = "";

/// <summary>
/// Authentication password
/// </summary>
public string Password { get; set; } = "";

/// <summary>
/// RabbitMQ virtual host name, e.g. "/"
/// See https://www.rabbitmq.com/docs/vhosts
/// </summary>
public string VirtualHost { get; set; } = "/";

/// <summary>
/// How long to retry messages delivery, ie how long to retry, in seconds.
/// Default: 3600 second, 1 hour.
/// </summary>
public int MessageTTLSecs { get; set; } = 3600;
}
3 changes: 2 additions & 1 deletion service/Service/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@
"Port": "5672",
"Username": "user",
"Password": "",
"VirtualHost": "/"
"VirtualHost": "/",
"MessageTTLSecs": 3600
},
"Redis": {
// Redis connection string, e.g. "localhost:6379,password=..."
Expand Down

0 comments on commit 846f726

Please sign in to comment.