-
Notifications
You must be signed in to change notification settings - Fork 14
RedisPoisonMessages
By default, the Redis transport stores poison messages inside another redis table. This is not ideal, as this consumes resources. However, the default transport has no other realistic options. You can override this behavior and move the bad messges to another storage system for analysis.
- Create a new class to handle poison messages
Here is a basic, working example. It will take poison messages and write them to the file system, and then remove the message from Redis.
public class MoveMessageToFileSystem : IReceivePoisonMessage
{
//root folder to save bad messages
private const string _fileLocation = @"c:\Redis-Bad-Messages";
//the command that removes the message
private readonly ICommandHandlerWithOutput<DeleteMessageCommand, bool> _commandDeleteRecord;
/// <summary>
/// Initializes a new instance of the <see cref="MoveMessageToFileSystem"/> class.
/// </summary>
/// <param name="commandDeleteRecord">The command that deletes records from redis</param>
public MoveMessageToFileSystem(ICommandHandlerWithOutput<DeleteMessageCommand, bool> commandDeleteRecord)
{
_commandDeleteRecord = commandDeleteRecord;
}
public void Handle(IMessageContext context, PoisonMessageException exception)
{
//obtain the message information from the exception
var messageID = exception.MessageId; //should never be null
var correlationID = exception.CorrelationId; //could be null
var messageData = exception.MessagePayload; //could be null
var headerData = exception.HeaderPayload; //could be null
//NOTE - may want to verify that the message has an ID, though it should never get this far without one
var id = messageID.Id.Value.ToString();
var messageFolder = Path.Combine(_fileLocation, id);
if(!Directory.Exists(messageFolder))
{
Directory.CreateDirectory(messageFolder);
}
try
{
HandleMessageData(messageFolder, id, correlationID, headerData, messageData);
}
catch
{
//log error here or handle it
}
//delete posion message - don't leave it in the queue, even if saving it failed.
//You could leave it if you
//belive the error above to be transient - i.e. no disk space, etc...
var result = _commandDeleteRecord.Handle(new DeleteMessageCommand(new RedisQueueId(id)));
if(!result)
{
//log failure, retry, etc...
}
}
private void HandleMessageData(string folderLocation, string messageID, ICorrelationId correlationID, byte[] headers, byte[] message)
{
//write all data to folder
File.WriteAllText(Path.Combine(folderLocation, "ID"), messageID);
if (correlationID.HasValue)
{
var id = correlationID.Id.Value.ToString();
File.WriteAllText(Path.Combine(folderLocation, "CorrelationID"), id);
}
if (message != null)
{
File.WriteAllBytes(Path.Combine(folderLocation, "Message"), message);
}
if (headers != null)
{
File.WriteAllBytes(Path.Combine(folderLocation, "Headers"), headers);
}
}
}
- Inject your new class into the queue container and replace the default implementation
You replace default implemenations of the interfaces as part of the queue container. Here is how you would inject the new class above. The 'registerService' delegate lets you inject overrides or your own classes into the internal IoC container of the queue. You do need to be careful to specify the correct lifetime. This can be found by looking at the base queue or transport init module. Most 'services' are singletons, in that a single instance handles multiple requests.
var queueName = "example";
var connectionString = "127.0.0.1";
using (var queueContainer = new QueueContainer<RedisQueueInit>(registerService =>
registerService.Register<IReceivePoisonMessage, MoveMessageToFileSystem>(LifeStyles.Singleton)))
{
using (var queue = queueContainer.CreateConsumer(queueName, connectionString))
{
queue.Start<SimpleMessage>(HandleMessages);
Console.WriteLine("Processing messages - press any key to stop");
Console.ReadKey((true));
}
}
For any issues please use the GitHub issues