Skip to content

Commit

Permalink
Added tests to verify dead letter / message abandonment functionality…
Browse files Browse the repository at this point in the history
… in various supported message queueing service implementations.
  • Loading branch information
sweetlandj committed Jun 22, 2016
1 parent 327e95a commit b5e9510
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 4 deletions.
82 changes: 79 additions & 3 deletions Source/Platibus.UnitTests/FilesystemMessageQueueingServiceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,80 @@ public async Task Given_Auto_Acknowledge_Queue_When_Not_Acknowledged_Then_Messag
Assert.That(queuedMessages, Is.Empty);
}

[Test]
public async Task Given_Listener_Throws_MaxAttempts_Exceeded_Then_Message_Should_Be_Dead()
{
var message = new Message(new MessageHeaders
{
{HeaderName.ContentType, "text/plain"},
{HeaderName.MessageId, Guid.NewGuid().ToString()}
}, "Hello, world!");

var listenerCalledEvent = new ManualResetEvent(false);
var tempDir = GetTempDirectory();
var queueName = new QueueName(Guid.NewGuid().ToString());
var queuePath = Path.Combine(tempDir.FullName, queueName);
var queueDir = new DirectoryInfo(queuePath);
if (!queueDir.Exists)
{
queueDir.Create();
}

var mockListener = new Mock<IQueueListener>();
mockListener.Setup(
x =>
x.MessageReceived(It.IsAny<Message>(), It.IsAny<IQueuedMessageContext>(),
It.IsAny<CancellationToken>()))
.Callback<Message, IQueuedMessageContext, CancellationToken>((msg, ctx, ct) =>
{
listenerCalledEvent.Set();
throw new Exception();
});

using (var fsQueueingService = new FilesystemMessageQueueingService(tempDir))
using (var cts = new CancellationTokenSource())
{
var ct = cts.Token;
fsQueueingService.Init();

await fsQueueingService.CreateQueue(queueName, mockListener.Object, new QueueOptions
{
AutoAcknowledge = true,
MaxAttempts = 3,
RetryDelay = TimeSpan.FromMilliseconds(250)
}, ct);

await fsQueueingService.EnqueueMessage(queueName, message, Thread.CurrentPrincipal, ct);
await listenerCalledEvent.WaitOneAsync(TimeSpan.FromSeconds(1));

// The listener is called before the file is deleted, so there is a possible
// race condition here. Wait for a second to allow the delete to take place
// before enumerating the files to see that they were actually deleted.
await Task.Delay(TimeSpan.FromSeconds(1), ct);
}

var messageEqualityComparer = new MessageEqualityComparer();
mockListener.Verify(x =>
x.MessageReceived(It.Is<Message>(m => messageEqualityComparer.Equals(m, message)),
It.IsAny<IQueuedMessageContext>(), It.IsAny<CancellationToken>()), Times.Exactly(3));

var queuedMessages = queueDir.EnumerateFiles()
.Select(f => new MessageFile(f))
.ToList();

Assert.That(queuedMessages, Is.Empty);

var deadLetterPath = Path.Combine(queuePath, "dead");
var deadLetterDir = new DirectoryInfo(deadLetterPath);
Assert.That(deadLetterDir.Exists, Is.True);

var deadMessages = deadLetterDir.EnumerateFiles()
.Select(f => new MessageFile(f))
.ToList();

Assert.That(deadMessages, Is.Not.Empty);
}

[Test]
public async Task Given_Auto_Acknowledge_Queue_When_Listener_Throws_Then_Message_Should_Not_Be_Deleted()
{
Expand Down Expand Up @@ -289,8 +363,10 @@ await fsQueueingService
.CreateQueue(queueName, mockListener.Object, new QueueOptions
{
AutoAcknowledge = true,
MaxAttempts = 2, // So the message doesn't get moved to the DLQ
RetryDelay = TimeSpan.FromSeconds(30)
// Prevent message from being sent to the DLQ
MaxAttempts = 100,
// Short retry delay to message handler is called more than once
RetryDelay = TimeSpan.FromMilliseconds(250)
}, ct);

await fsQueueingService.EnqueueMessage(queueName, message, Thread.CurrentPrincipal, ct);
Expand All @@ -309,7 +385,7 @@ await fsQueueingService
var messageEqualityComparer = new MessageEqualityComparer();
mockListener.Verify(x =>
x.MessageReceived(It.Is<Message>(m => messageEqualityComparer.Equals(m, message)),
It.IsAny<IQueuedMessageContext>(), It.IsAny<CancellationToken>()), Times.Once());
It.IsAny<IQueuedMessageContext>(), It.IsAny<CancellationToken>()), Times.AtLeast(2));

var queuedMessages = queueDir.EnumerateFiles()
.Select(f => new MessageFile(f))
Expand Down
76 changes: 76 additions & 0 deletions Source/Platibus.UnitTests/RabbitMQMessageQueueingServiceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,70 @@ await rmqQueueingService
}
}

[Test]
public async Task Given_Listener_Throws_MaxAttempts_Exceeded_Then_Message_Should_Be_Dead()
{
var message = new Message(new MessageHeaders
{
{HeaderName.ContentType, "text/plain"},
{HeaderName.MessageId, Guid.NewGuid().ToString()}
}, "Hello, world!");

var listenerCalledEvent = new ManualResetEvent(false);
var queueName = new QueueName(Guid.NewGuid().ToString());
var deadQueueName = new QueueName(Guid.NewGuid().ToString());

var mockListener = new Mock<IQueueListener>();
mockListener.Setup(x =>
x.MessageReceived(It.IsAny<Message>(), It.IsAny<IQueuedMessageContext>(),
It.IsAny<CancellationToken>()))
.Callback<Message, IQueuedMessageContext, CancellationToken>(
(msg, ctx, ct) =>
{
listenerCalledEvent.Set();
throw new Exception("Test exception");
})
.Returns(Task.FromResult(true));

var cts = new CancellationTokenSource();
var rmqQueueingService = new RabbitMQMessageQueueingService(RabbitMQUri);
try
{
var ct = cts.Token;
await rmqQueueingService
.CreateQueue(queueName, mockListener.Object, new QueueOptions
{
MaxAttempts = 3,
RetryDelay = TimeSpan.FromMilliseconds(100)
}, ct);

CreateQueue(deadQueueName, queueName.GetDeadLetterExchangeName());

await rmqQueueingService.EnqueueMessage(queueName, message, Thread.CurrentPrincipal, ct);
await listenerCalledEvent.WaitOneAsync(TimeSpan.FromSeconds(3));

// The listener is called before the message is published to the retry queue,
// so there is a possible race condition here. Wait for a second to allow the
// publish to take place before checking the retry queue depth.
await Task.Delay(TimeSpan.FromSeconds(1), ct);

var messageEqualityComparer = new MessageEqualityComparer();
mockListener.Verify(x =>
x.MessageReceived(It.Is<Message>(m => messageEqualityComparer.Equals(m, message)),
It.IsAny<IQueuedMessageContext>(), It.IsAny<CancellationToken>()), Times.Exactly(3));

Assert.That(GetQueueDepth(queueName), Is.EqualTo(0));
Assert.That(GetQueueDepth(deadQueueName), Is.EqualTo(1));
}
finally
{
rmqQueueingService.TryDispose();
cts.TryDispose();
DeleteQueue(queueName);
DeleteQueue(deadQueueName);
}
}

[Test]
public async Task Given_Auto_Acknowledge_Queue_When_Not_Acknowledged_Then_Message_Should_Be_Deleted()
{
Expand Down Expand Up @@ -336,6 +400,18 @@ private static void DeleteQueue(QueueName queueName)
}
}

private static void CreateQueue(QueueName queueName, string exchangeName)
{
var connectionFactory = new ConnectionFactory { Uri = RabbitMQUri.ToString() };
using (var connection = connectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
{
var queue = RabbitMQHelper.ReplaceInvalidQueueNameCharacters(queueName);
channel.QueueDeclare(queue, true, false, false, null);
channel.QueueBind(queue, exchangeName, "", null);
}
}

[Test]
public async Task Given_Existing_Message_When_Creating_Queue_Then_Listener_Should_Fire()
{
Expand Down
8 changes: 7 additions & 1 deletion Source/Platibus.UnitTests/SQLMessageQueueInspector.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Security.Principal;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -25,6 +26,11 @@ public Task<IEnumerable<SQLQueuedMessage>> EnumerateMessages()
return SelectQueuedMessages();
}

public Task<IEnumerable<SQLQueuedMessage>> EnumerateAbandonedMessages(DateTime startDate, DateTime endDate)
{
return SelectAbandonedMessages(startDate, endDate);
}

private class NoopQueueListener : IQueueListener
{
public Task MessageReceived(Message message, IQueuedMessageContext context,
Expand Down
67 changes: 67 additions & 0 deletions Source/Platibus.UnitTests/SQLMessageQueueingServiceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,73 @@ await sqlQueueingService
}
}

[Test]
public async Task Given_Listener_Throws_Max_Attempts_Exceeded_Then_Message_Should_Be_Abandoned()
{
var message = new Message(new MessageHeaders
{
{HeaderName.ContentType, "text/plain"},
{HeaderName.MessageId, Guid.NewGuid().ToString()}
}, "Hello, world!");

var listenerCalledEvent = new ManualResetEvent(false);
var connectionStringSettings = GetConnectionStringSettings();
var queueName = new QueueName(Guid.NewGuid().ToString());

var mockListener = new Mock<IQueueListener>();
mockListener.Setup(x =>
x.MessageReceived(It.IsAny<Message>(), It.IsAny<IQueuedMessageContext>(),
It.IsAny<CancellationToken>()))
.Callback<Message, IQueuedMessageContext, CancellationToken>((msg, ctx, ct) =>
{
listenerCalledEvent.Set();
throw new Exception();
});

var startDate = DateTime.UtcNow;
using (var cts = new CancellationTokenSource())
using (var sqlQueueingService = new SQLMessageQueueingService(connectionStringSettings))
{
var ct = cts.Token;
sqlQueueingService.Init();

await sqlQueueingService
.CreateQueue(queueName, mockListener.Object, new QueueOptions
{
AutoAcknowledge = true,
MaxAttempts = 3,
RetryDelay = TimeSpan.FromMilliseconds(100)
}, ct);

await sqlQueueingService.EnqueueMessage(queueName, message, Thread.CurrentPrincipal, ct);

var listenerCalled = await listenerCalledEvent
.WaitOneAsync(Debugger.IsAttached ? TimeSpan.FromMinutes(5) : TimeSpan.FromSeconds(3));

Assert.That(listenerCalled, Is.True);

// The listener is called before the row is updated, so there is a possible
// race condition here. Wait for a second to allow the update to take place
// before enumerating the rows to see that they were actually not updated.
await Task.Delay(TimeSpan.FromSeconds(1), ct);

var messageEqualityComparer = new MessageEqualityComparer();
mockListener.Verify(x =>
x.MessageReceived(It.Is<Message>(m => messageEqualityComparer.Equals(m, message)),
It.IsAny<IQueuedMessageContext>(), It.IsAny<CancellationToken>()), Times.Exactly(3));

var sqlQueueInspector = new SQLMessageQueueInspector(sqlQueueingService, queueName);
var queuedMessages = (await sqlQueueInspector.EnumerateMessages()).ToList();

Assert.That(queuedMessages, Is.Empty);

var endDate = DateTime.UtcNow;
var abandonedMessages = (await sqlQueueInspector.EnumerateAbandonedMessages(startDate, endDate)).ToList();
Assert.That(abandonedMessages.Count, Is.EqualTo(1));
Assert.That(abandonedMessages[0].Message, Is.EqualTo(message).Using(messageEqualityComparer));
}
}

[Test]
public async Task Given_Existing_Message_When_Creating_Queue_Then_Listener_Should_Fire()
{
Expand Down
1 change: 1 addition & 0 deletions Source/Platibus/InMemory/InMemoryQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ private async Task ProcessQueuedMessage(QueuedMessage queuedMessage, Cancellatio
// TODO: Implement journaling
break;
}

if (attemptsRemaining > 0)
{
await Task.Delay(_retryDelay, cancellationToken);
Expand Down
42 changes: 42 additions & 0 deletions Source/Platibus/SQL/CommonSQLDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
/// </remarks>
public abstract class CommonSQLDialect : ISQLDialect
{
private string _startDateParameterName;
private string _endDateParameterName;

/// <summary>
/// The dialect-specific command used to create the objects (tables, indexes,
/// stored procedures, views, etc.) needed to store queued messages in the
Expand Down Expand Up @@ -90,6 +93,35 @@ AND [Acknowledged] IS NULL
AND [Abandoned] IS NULL"; }
}

/// <summary>
/// The dialect-specific command used to select the list of queued messages
/// in a particular queue
/// </summary>
public virtual string SelectAbandonedMessagesCommand
{
get { return @"
SELECT
[MessageId],
[QueueName],
[MessageName],
[Origination],
[Destination],
[ReplyTo],
[Expires],
[ContentType],
[SenderPrincipal],
[Headers],
[MessageContent],
[Attempts],
[Acknowledged],
[Abandoned]
FROM [PB_QueuedMessages]
WHERE [QueueName]=@QueueName
AND [Acknowledged] IS NULL
AND [Abandoned] >= @StartDate
AND [Abandoned] < @EndDate"; }
}

/// <summary>
/// The dialect-specific command used to update the state of a queued message
/// </summary>
Expand Down Expand Up @@ -305,5 +337,15 @@ public string CurrentDateParameterName
{
get { return "@CurrentDate"; }
}

/// <summary>
/// The name of the parameter used to specify the start date in queries based on date ranges
/// </summary>
public string StartDateParameterName { get { return "@StartDate"; } }

/// <summary>
/// The name of the parameter used to specify the end date in queries based on date ranges
/// </summary>
public string EndDateParameterName { get { return "@EndDate"; } }
}
}
18 changes: 18 additions & 0 deletions Source/Platibus/SQL/ISQLDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ public interface ISQLDialect
/// </summary>
string SelectQueuedMessagesCommand { get; }

/// <summary>
/// The dialect-specific command used to select the list of abandoned messages
/// in a particular queue
/// </summary>
string SelectAbandonedMessagesCommand { get; }

/// <summary>
/// The dialect-specific command used to update the state of a queued message
/// </summary>
Expand Down Expand Up @@ -168,5 +174,17 @@ public interface ISQLDialect
/// on the server when selecting active subscriptions
/// </summary>
string CurrentDateParameterName { get; }

/// <summary>
/// The name of the parameter used to specify the start date in queries based on date ranges
/// </summary>
string StartDateParameterName { get; }

/// <summary>
/// The name of the parameter used to specify the end date in queries based on date ranges
/// </summary>
string EndDateParameterName { get; }
}


}
Loading

0 comments on commit b5e9510

Please sign in to comment.