Skip to content

Commit

Permalink
Add poison queues support for RabbitMQ (#648)
Browse files Browse the repository at this point in the history
## Motivation and Context (Why the change? What's the scenario?)

Avoid infinite retries when using RabbitMQ, and move poison messages after N retries.

See #408.

---------

Co-authored-by: Devis Lucato <[email protected]>
  • Loading branch information
marcominerva and dluc authored Oct 6, 2024
1 parent fb55c22 commit 759cc43
Show file tree
Hide file tree
Showing 14 changed files with 553 additions and 220 deletions.
8 changes: 7 additions & 1 deletion KernelMemory.sln
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Qdrant.TestApplication", "e
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Qdrant.FunctionalTests", "extensions\Qdrant\Qdrant.FunctionalTests\Qdrant.FunctionalTests.csproj", "{62B96766-AA6C-4CFF-A6FB-6370C89C2509}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ", "extensions\RabbitMQ\RabbitMQ.csproj", "{E3877E49-958E-4DC8-B5E8-834010F5C4B7}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ", "extensions\RabbitMQ\RabbitMQ\RabbitMQ.csproj", "{E3877E49-958E-4DC8-B5E8-834010F5C4B7}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenAI", "extensions\OpenAI\OpenAI\OpenAI.csproj", "{A6AE31A1-4F60-47B0-8534-7B083D68118C}"
EndProject
Expand Down Expand Up @@ -327,6 +327,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Onnx.FunctionalTests", "ext
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Onnx", "extensions\ONNX\Onnx\Onnx.csproj", "{345DEF9B-6EE1-49DF-B46A-25E38CE9B151}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.TestApplication", "extensions\RabbitMQ\RabbitMQ.TestApplication\RabbitMQ.TestApplication.csproj", "{82670921-FDCD-4672-84BD-4353F5AC24A0}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -602,6 +604,9 @@ Global
{345DEF9B-6EE1-49DF-B46A-25E38CE9B151}.Debug|Any CPU.Build.0 = Debug|Any CPU
{345DEF9B-6EE1-49DF-B46A-25E38CE9B151}.Release|Any CPU.ActiveCfg = Release|Any CPU
{345DEF9B-6EE1-49DF-B46A-25E38CE9B151}.Release|Any CPU.Build.0 = Release|Any CPU
{82670921-FDCD-4672-84BD-4353F5AC24A0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{82670921-FDCD-4672-84BD-4353F5AC24A0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{82670921-FDCD-4672-84BD-4353F5AC24A0}.Release|Any CPU.ActiveCfg = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -698,6 +703,7 @@ Global
{F192513B-265B-4943-A2A9-44E23B15BA18} = {155DA079-E267-49AF-973A-D1D44681970F}
{7BBD348E-CDD9-4462-B8C9-47613C5EC682} = {3C17F42B-CFC8-4900-8CFB-88936311E919}
{345DEF9B-6EE1-49DF-B46A-25E38CE9B151} = {155DA079-E267-49AF-973A-D1D44681970F}
{82670921-FDCD-4672-84BD-4353F5AC24A0} = {3C17F42B-CFC8-4900-8CFB-88936311E919}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {CC136C62-115C-41D1-B414-F9473EFF6EA8}
Expand Down
90 changes: 90 additions & 0 deletions extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Text;
using Microsoft.KernelMemory;
using Microsoft.KernelMemory.Diagnostics;
using Microsoft.KernelMemory.Orchestration.RabbitMQ;
using Microsoft.KernelMemory.Pipeline.Queue;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Microsoft.RabbitMQ.TestApplication;

internal static class Program
{
private const string QueueName = "test queue";

public static async Task Main()
{
var cfg = new ConfigurationBuilder()
.AddJsonFile("appsettings.json")
.AddJsonFile("appsettings.development.json", optional: true)
.AddJsonFile("appsettings.Development.json", optional: true)
.Build();

var rabbitMQConfig = cfg.GetSection("KernelMemory:Services:RabbitMQ").Get<RabbitMQConfig>();
ArgumentNullExceptionEx.ThrowIfNull(rabbitMQConfig, nameof(rabbitMQConfig), "RabbitMQ config not found");

DefaultLogger.Factory = LoggerFactory.Create(builder =>
{
builder.AddConsole();
builder.SetMinimumLevel(LogLevel.Warning);
});

var pipeline = new RabbitMQPipeline(rabbitMQConfig, DefaultLogger.Factory);

var counter = 0;
pipeline.OnDequeue(async msg =>
{
Console.WriteLine($"{++counter} Received message: {msg}");
await Task.Delay(0);
return false;
});

await pipeline.ConnectToQueueAsync(QueueName, QueueOptions.PubSub);

ListenToDeadLetterQueue(rabbitMQConfig);

await pipeline.EnqueueAsync($"test {DateTimeOffset.Now:T}");

while (true)
{
await Task.Delay(TimeSpan.FromSeconds(2));
}
}

private static void ListenToDeadLetterQueue(RabbitMQConfig config)
{
var factory = new ConnectionFactory
{
HostName = config.Host,
Port = config.Port,
UserName = config.Username,
Password = config.Password,
VirtualHost = !string.IsNullOrWhiteSpace(config.VirtualHost) ? config.VirtualHost : "/",
DispatchConsumersAsync = true,
Ssl = new SslOption
{
Enabled = config.SslEnabled,
ServerName = config.Host,
}
};

var connection = factory.CreateConnection();
var channel = connection.CreateModel();
var consumer = new AsyncEventingBasicConsumer(channel);

consumer.Received += async (object sender, BasicDeliverEventArgs args) =>
{
byte[] body = args.Body.ToArray();
string message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Poison message received: {message}");
await Task.Delay(0);
};

channel.BasicConsume(queue: $"{QueueName}{config.PoisonQueueSuffix}",
autoAck: true,
consumer: consumer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<AssemblyName>Microsoft.RabbitMQ.TestApplication</AssemblyName>
<RootNamespace>Microsoft.RabbitMQ.TestApplication</RootNamespace>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<RollForward>LatestMajor</RollForward>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
<NoWarn>$(NoWarn);KMEXP00;KMEXP01;KMEXP02;KMEXP03;KMEXP04;</NoWarn>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\service\Core\Core.csproj" />
</ItemGroup>

</Project>
24 changes: 24 additions & 0 deletions extensions/RabbitMQ/RabbitMQ.TestApplication/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"Logging": {
"LogLevel": {
"Default": "Error"
}
},
"KernelMemory": {
"Services": {
"RabbitMQ": {
"Host": "127.0.0.1",
"Port": "5672",
"Username": "user",
"Password": "password",
"VirtualHost": "/",
"MessageTTLSecs": 3600,
"SslEnabled": false,
// How many times to dequeue a messages and process before moving it to a poison queue
"MaxRetriesBeforePoisonQueue": 5,
// Suffix used for the poison queues.
"PoisonQueueSuffix": "-poison"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Microsoft.KernelMemory;
/// </summary>
public static partial class KernelMemoryBuilderExtensions
{
public static IKernelMemoryBuilder WithRabbitMQOrchestration(this IKernelMemoryBuilder builder, RabbitMqConfig config)
public static IKernelMemoryBuilder WithRabbitMQOrchestration(this IKernelMemoryBuilder builder, RabbitMQConfig config)
{
builder.Services.AddRabbitMQOrchestration(config);
return builder;
Expand All @@ -26,8 +26,10 @@ public static IKernelMemoryBuilder WithRabbitMQOrchestration(this IKernelMemoryB
/// </summary>
public static partial class DependencyInjection
{
public static IServiceCollection AddRabbitMQOrchestration(this IServiceCollection services, RabbitMqConfig config)
public static IServiceCollection AddRabbitMQOrchestration(this IServiceCollection services, RabbitMQConfig config)
{
config.Validate();

IQueue QueueFactory(IServiceProvider serviceProvider)
{
return serviceProvider.GetService<RabbitMQPipeline>()
Expand All @@ -37,7 +39,7 @@ IQueue QueueFactory(IServiceProvider serviceProvider)
// The orchestrator uses multiple queue clients, each linked to a specific queue,
// so it requires a factory rather than a single queue injected to the ctor.
return services
.AddSingleton<RabbitMqConfig>(config)
.AddSingleton<RabbitMQConfig>(config)
.AddTransient<RabbitMQPipeline>()
.AddSingleton<QueueClientFactory>(serviceProvider => new QueueClientFactory(() => QueueFactory(serviceProvider)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\service\Abstractions\Abstractions.csproj" />
<ProjectReference Include="..\..\..\service\Abstractions\Abstractions.csproj" />
</ItemGroup>

<ItemGroup>
Expand All @@ -26,7 +26,7 @@
</PropertyGroup>

<ItemGroup>
<None Include="README.md" Link="README.md" Pack="true" PackagePath="." Visible="false" />
<None Include="..\README.md" Link="README.md" Pack="true" PackagePath="." Visible="false" />
</ItemGroup>

</Project>
106 changes: 106 additions & 0 deletions extensions/RabbitMQ/RabbitMQ/RabbitMQConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Text;

#pragma warning disable IDE0130 // reduce number of "using" statements
// ReSharper disable once CheckNamespace - reduce number of "using" statements
namespace Microsoft.KernelMemory;

public class RabbitMQConfig
{
/// <summary>
/// RabbitMQ hostname, e.g. "127.0.0.1"
/// </summary>
public string Host { get; set; } = "";

/// <summary>
/// TCP port for the connection, e.g. 5672
/// </summary>
public int Port { get; set; } = 0;

/// <summary>
/// Authentication username
/// </summary>
public string Username { get; set; } = "";

/// <summary>
/// Authentication password
/// </summary>
public string Password { get; set; } = "";

/// <summary>
/// RabbitMQ virtual host name, e.g. "/"
/// See https://www.rabbitmq.com/docs/vhosts
/// </summary>
public string VirtualHost { get; set; } = "/";

/// <summary>
/// How long to retry messages delivery, ie how long to retry, in seconds.
/// Default: 3600 second, 1 hour.
/// </summary>
public int MessageTTLSecs { get; set; } = 3600;

/// <summary>
/// Set to true if your RabbitMQ supports SSL.
/// Default: false
/// </summary>
public bool SslEnabled { get; set; } = false;

/// <summary>
/// How many times to retry processing a message before moving it to a poison queue.
/// Example: a value of 20 means that a message will be processed up to 21 times.
/// Note: this value cannot be changed after queues have been created. In such case
/// you might need to drain all queues, delete them, and restart the ingestion service(s).
/// </summary>
public int MaxRetriesBeforePoisonQueue { get; set; } = 20;

/// <summary>
/// Suffix used for the poison queues.
/// </summary>
public string PoisonQueueSuffix { get; set; } = "-poison";

/// <summary>
/// Verify that the current state is valid.
/// </summary>
public void Validate()
{
const int MinTTLSecs = 5;

if (string.IsNullOrWhiteSpace(this.Host) || this.Host != $"{this.Host}".Trim())
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.Host)} cannot be empty or have leading or trailing spaces");
}

if (this.Port < 1)
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.Port)} value {this.Port} is not valid");
}

if (this.MessageTTLSecs < MinTTLSecs)
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.MessageTTLSecs)} value {this.MessageTTLSecs} is too low, cannot be less than {MinTTLSecs}");
}

if (string.IsNullOrWhiteSpace(this.PoisonQueueSuffix) || this.PoisonQueueSuffix != $"{this.PoisonQueueSuffix}".Trim())
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} cannot be empty or have leading or trailing spaces");
}

if (this.MaxRetriesBeforePoisonQueue < 0)
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.MaxRetriesBeforePoisonQueue)} cannot be a negative number");
}

if (string.IsNullOrWhiteSpace(this.PoisonQueueSuffix))
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} is empty");
}

// Queue names can be up to 255 bytes of UTF-8 characters.
// Allow a max of 60 bytes for the suffix, so there is room for the queue name.
if (Encoding.UTF8.GetByteCount(this.PoisonQueueSuffix) > 60)
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} can be up to 60 characters length");
}
}
}
Loading

0 comments on commit 759cc43

Please sign in to comment.