Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add poison queues support for RabbitMQ #648

Merged
merged 37 commits into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1e0f47a
Add support for Dependency Injection with IMemoryDb
marcominerva May 27, 2024
ab05158
Merge branch 'microsoft:main' into main
marcominerva Jun 4, 2024
da55406
Merge branch 'microsoft:main' into main
marcominerva Jun 5, 2024
58eea74
Add poison queue support for RabbitMQ #408
marcominerva Jun 5, 2024
f683514
Merge branch 'main' into rabbitmq-queues
dluc Jun 5, 2024
01c600f
Update extensions/RabbitMQ/RabbitMqConfig.cs
marcominerva Jun 6, 2024
c93bd88
Refactor queue name validation
marcominerva Jun 6, 2024
6dc29c9
Update extensions/RabbitMQ/RabbitMqConfig.cs
dluc Jun 6, 2024
1925306
Misc minor fixes
dluc Jun 8, 2024
2b23ca1
Move private method down
dluc Jun 9, 2024
e597fad
Implement Quorum Queues and Dead Letter Exchange
marcominerva Jun 11, 2024
e713465
Merge commit
marcominerva Jun 11, 2024
1b082ae
Fix logging
marcominerva Jun 11, 2024
482f649
Fix a typo
marcominerva Jun 11, 2024
17b2ccc
Merge branch 'main' of https://github.com/marcominerva/kernel-memory
marcominerva Jun 11, 2024
9f972cc
Merge branch 'main' into rabbitmq-queues
marcominerva Jun 11, 2024
036bb91
Merge branch 'main' of https://github.com/marcominerva/kernel-memory
marcominerva Jun 17, 2024
27c9c88
Merge branch 'main' into rabbitmq-queues
marcominerva Jun 17, 2024
98c9945
Merge branch 'main' of https://github.com/marcominerva/kernel-memory
marcominerva Jun 18, 2024
c97db91
Merge branch 'main' into rabbitmq-queues
marcominerva Jun 18, 2024
0f58b02
Remove unnecessary using
marcominerva Jun 18, 2024
df7300d
Merge branch 'microsoft:main' into rabbitmq-queues
marcominerva Jun 18, 2024
0698574
Merge branch 'microsoft:main' into rabbitmq-queues
marcominerva Jun 19, 2024
70be144
Merge branch 'main' into rabbitmq-queues
dluc Jun 21, 2024
6085129
Fix parameter type error
marcominerva Jun 21, 2024
0a72d87
Merge branch 'microsoft:main' into rabbitmq-queues
marcominerva Jul 9, 2024
c2f2c63
Add exception for x-delivery-limit change
marcominerva Jul 9, 2024
e3f055a
Merge commit
marcominerva Jul 16, 2024
8656653
Restore new properties
marcominerva Jul 16, 2024
1bda22f
Fix casing
marcominerva Jul 16, 2024
d2742a2
Delete duplicated file
marcominerva Jul 16, 2024
c2c0e21
Merge branch 'main' into rabbitmq-queues
dluc Oct 5, 2024
80bb1a0
Fix warnings
dluc Oct 5, 2024
c6d37e1
Fix suffix validation, add more validations
dluc Oct 5, 2024
0f8809e
Misc fixes to pipeline class
dluc Oct 6, 2024
01771e2
Add test application
dluc Oct 6, 2024
65500fd
Improve logging and exceptions handling
dluc Oct 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion KernelMemory.sln
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Qdrant.TestApplication", "e
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Qdrant.FunctionalTests", "extensions\Qdrant\Qdrant.FunctionalTests\Qdrant.FunctionalTests.csproj", "{62B96766-AA6C-4CFF-A6FB-6370C89C2509}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ", "extensions\RabbitMQ\RabbitMQ.csproj", "{E3877E49-958E-4DC8-B5E8-834010F5C4B7}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ", "extensions\RabbitMQ\RabbitMQ\RabbitMQ.csproj", "{E3877E49-958E-4DC8-B5E8-834010F5C4B7}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenAI", "extensions\OpenAI\OpenAI\OpenAI.csproj", "{A6AE31A1-4F60-47B0-8534-7B083D68118C}"
EndProject
Expand Down Expand Up @@ -327,6 +327,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Onnx.FunctionalTests", "ext
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Onnx", "extensions\ONNX\Onnx\Onnx.csproj", "{345DEF9B-6EE1-49DF-B46A-25E38CE9B151}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.TestApplication", "extensions\RabbitMQ\RabbitMQ.TestApplication\RabbitMQ.TestApplication.csproj", "{82670921-FDCD-4672-84BD-4353F5AC24A0}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -602,6 +604,9 @@ Global
{345DEF9B-6EE1-49DF-B46A-25E38CE9B151}.Debug|Any CPU.Build.0 = Debug|Any CPU
{345DEF9B-6EE1-49DF-B46A-25E38CE9B151}.Release|Any CPU.ActiveCfg = Release|Any CPU
{345DEF9B-6EE1-49DF-B46A-25E38CE9B151}.Release|Any CPU.Build.0 = Release|Any CPU
{82670921-FDCD-4672-84BD-4353F5AC24A0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{82670921-FDCD-4672-84BD-4353F5AC24A0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{82670921-FDCD-4672-84BD-4353F5AC24A0}.Release|Any CPU.ActiveCfg = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -698,6 +703,7 @@ Global
{F192513B-265B-4943-A2A9-44E23B15BA18} = {155DA079-E267-49AF-973A-D1D44681970F}
{7BBD348E-CDD9-4462-B8C9-47613C5EC682} = {3C17F42B-CFC8-4900-8CFB-88936311E919}
{345DEF9B-6EE1-49DF-B46A-25E38CE9B151} = {155DA079-E267-49AF-973A-D1D44681970F}
{82670921-FDCD-4672-84BD-4353F5AC24A0} = {3C17F42B-CFC8-4900-8CFB-88936311E919}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {CC136C62-115C-41D1-B414-F9473EFF6EA8}
Expand Down
90 changes: 90 additions & 0 deletions extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Text;
using Microsoft.KernelMemory;
using Microsoft.KernelMemory.Diagnostics;
using Microsoft.KernelMemory.Orchestration.RabbitMQ;
using Microsoft.KernelMemory.Pipeline.Queue;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Microsoft.RabbitMQ.TestApplication;

internal static class Program
{
private const string QueueName = "test queue";

public static async Task Main()
{
var cfg = new ConfigurationBuilder()
.AddJsonFile("appsettings.json")
.AddJsonFile("appsettings.development.json", optional: true)
.AddJsonFile("appsettings.Development.json", optional: true)
.Build();

var rabbitMQConfig = cfg.GetSection("KernelMemory:Services:RabbitMQ").Get<RabbitMQConfig>();
ArgumentNullExceptionEx.ThrowIfNull(rabbitMQConfig, nameof(rabbitMQConfig), "RabbitMQ config not found");

DefaultLogger.Factory = LoggerFactory.Create(builder =>
{
builder.AddConsole();
builder.SetMinimumLevel(LogLevel.Warning);
});

var pipeline = new RabbitMQPipeline(rabbitMQConfig, DefaultLogger.Factory);

var counter = 0;
pipeline.OnDequeue(async msg =>
{
Console.WriteLine($"{++counter} Received message: {msg}");
await Task.Delay(0);
return false;
});

await pipeline.ConnectToQueueAsync(QueueName, QueueOptions.PubSub);

ListenToDeadLetterQueue(rabbitMQConfig);

await pipeline.EnqueueAsync($"test {DateTimeOffset.Now:T}");

while (true)
{
await Task.Delay(TimeSpan.FromSeconds(2));
}
}

private static void ListenToDeadLetterQueue(RabbitMQConfig config)
{
var factory = new ConnectionFactory
{
HostName = config.Host,
Port = config.Port,
UserName = config.Username,
Password = config.Password,
VirtualHost = !string.IsNullOrWhiteSpace(config.VirtualHost) ? config.VirtualHost : "/",
DispatchConsumersAsync = true,
Ssl = new SslOption
{
Enabled = config.SslEnabled,
ServerName = config.Host,
}
};

var connection = factory.CreateConnection();
var channel = connection.CreateModel();
var consumer = new AsyncEventingBasicConsumer(channel);

consumer.Received += async (object sender, BasicDeliverEventArgs args) =>
{
byte[] body = args.Body.ToArray();
string message = Encoding.UTF8.GetString(body);

Console.WriteLine($"Poison message received: {message}");
await Task.Delay(0);
};

channel.BasicConsume(queue: $"{QueueName}{config.PoisonQueueSuffix}",
autoAck: true,
consumer: consumer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<AssemblyName>Microsoft.RabbitMQ.TestApplication</AssemblyName>
<RootNamespace>Microsoft.RabbitMQ.TestApplication</RootNamespace>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<RollForward>LatestMajor</RollForward>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
<NoWarn>$(NoWarn);KMEXP00;KMEXP01;KMEXP02;KMEXP03;KMEXP04;</NoWarn>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\service\Core\Core.csproj" />
</ItemGroup>

</Project>
24 changes: 24 additions & 0 deletions extensions/RabbitMQ/RabbitMQ.TestApplication/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"Logging": {
"LogLevel": {
"Default": "Error"
}
},
"KernelMemory": {
"Services": {
"RabbitMQ": {
"Host": "127.0.0.1",
"Port": "5672",
"Username": "user",
"Password": "password",
"VirtualHost": "/",
"MessageTTLSecs": 3600,
"SslEnabled": false,
// How many times to dequeue a messages and process before moving it to a poison queue
"MaxRetriesBeforePoisonQueue": 5,
// Suffix used for the poison queues.
"PoisonQueueSuffix": "-poison"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Microsoft.KernelMemory;
/// </summary>
public static partial class KernelMemoryBuilderExtensions
{
public static IKernelMemoryBuilder WithRabbitMQOrchestration(this IKernelMemoryBuilder builder, RabbitMqConfig config)
public static IKernelMemoryBuilder WithRabbitMQOrchestration(this IKernelMemoryBuilder builder, RabbitMQConfig config)
{
builder.Services.AddRabbitMQOrchestration(config);
return builder;
Expand All @@ -26,8 +26,10 @@ public static IKernelMemoryBuilder WithRabbitMQOrchestration(this IKernelMemoryB
/// </summary>
public static partial class DependencyInjection
{
public static IServiceCollection AddRabbitMQOrchestration(this IServiceCollection services, RabbitMqConfig config)
public static IServiceCollection AddRabbitMQOrchestration(this IServiceCollection services, RabbitMQConfig config)
{
config.Validate();

IQueue QueueFactory(IServiceProvider serviceProvider)
{
return serviceProvider.GetService<RabbitMQPipeline>()
Expand All @@ -37,7 +39,7 @@ IQueue QueueFactory(IServiceProvider serviceProvider)
// The orchestrator uses multiple queue clients, each linked to a specific queue,
// so it requires a factory rather than a single queue injected to the ctor.
return services
.AddSingleton<RabbitMqConfig>(config)
.AddSingleton<RabbitMQConfig>(config)
.AddTransient<RabbitMQPipeline>()
.AddSingleton<QueueClientFactory>(serviceProvider => new QueueClientFactory(() => QueueFactory(serviceProvider)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\service\Abstractions\Abstractions.csproj" />
<ProjectReference Include="..\..\..\service\Abstractions\Abstractions.csproj" />
</ItemGroup>

<ItemGroup>
Expand All @@ -26,7 +26,7 @@
</PropertyGroup>

<ItemGroup>
<None Include="README.md" Link="README.md" Pack="true" PackagePath="." Visible="false" />
<None Include="..\README.md" Link="README.md" Pack="true" PackagePath="." Visible="false" />
</ItemGroup>

</Project>
106 changes: 106 additions & 0 deletions extensions/RabbitMQ/RabbitMQ/RabbitMQConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Text;

#pragma warning disable IDE0130 // reduce number of "using" statements
// ReSharper disable once CheckNamespace - reduce number of "using" statements
namespace Microsoft.KernelMemory;

public class RabbitMQConfig
{
/// <summary>
/// RabbitMQ hostname, e.g. "127.0.0.1"
/// </summary>
public string Host { get; set; } = "";

/// <summary>
/// TCP port for the connection, e.g. 5672
/// </summary>
public int Port { get; set; } = 0;

/// <summary>
/// Authentication username
/// </summary>
public string Username { get; set; } = "";

/// <summary>
/// Authentication password
/// </summary>
public string Password { get; set; } = "";

/// <summary>
/// RabbitMQ virtual host name, e.g. "/"
/// See https://www.rabbitmq.com/docs/vhosts
/// </summary>
public string VirtualHost { get; set; } = "/";

/// <summary>
/// How long to retry messages delivery, ie how long to retry, in seconds.
/// Default: 3600 second, 1 hour.
/// </summary>
public int MessageTTLSecs { get; set; } = 3600;

/// <summary>
/// Set to true if your RabbitMQ supports SSL.
/// Default: false
/// </summary>
public bool SslEnabled { get; set; } = false;

/// <summary>
/// How many times to retry processing a message before moving it to a poison queue.
/// Example: a value of 20 means that a message will be processed up to 21 times.
/// Note: this value cannot be changed after queues have been created. In such case
/// you might need to drain all queues, delete them, and restart the ingestion service(s).
/// </summary>
public int MaxRetriesBeforePoisonQueue { get; set; } = 20;

/// <summary>
/// Suffix used for the poison queues.
/// </summary>
public string PoisonQueueSuffix { get; set; } = "-poison";

/// <summary>
/// Verify that the current state is valid.
/// </summary>
public void Validate()
{
const int MinTTLSecs = 5;

if (string.IsNullOrWhiteSpace(this.Host) || this.Host != $"{this.Host}".Trim())
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.Host)} cannot be empty or have leading or trailing spaces");
}

if (this.Port < 1)
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.Port)} value {this.Port} is not valid");
}

if (this.MessageTTLSecs < MinTTLSecs)
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.MessageTTLSecs)} value {this.MessageTTLSecs} is too low, cannot be less than {MinTTLSecs}");
}

if (string.IsNullOrWhiteSpace(this.PoisonQueueSuffix) || this.PoisonQueueSuffix != $"{this.PoisonQueueSuffix}".Trim())
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} cannot be empty or have leading or trailing spaces");
}

if (this.MaxRetriesBeforePoisonQueue < 0)
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.MaxRetriesBeforePoisonQueue)} cannot be a negative number");
}

if (string.IsNullOrWhiteSpace(this.PoisonQueueSuffix))
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} is empty");
}

// Queue names can be up to 255 bytes of UTF-8 characters.
// Allow a max of 60 bytes for the suffix, so there is room for the queue name.
if (Encoding.UTF8.GetByteCount(this.PoisonQueueSuffix) > 60)
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} can be up to 60 characters length");
}
}
}
Loading
Loading