diff --git a/Source/Platibus.UnitTests/FilesystemMessageQueueingServiceTests.cs b/Source/Platibus.UnitTests/FilesystemMessageQueueingServiceTests.cs index d823811b..c6f3f728 100644 --- a/Source/Platibus.UnitTests/FilesystemMessageQueueingServiceTests.cs +++ b/Source/Platibus.UnitTests/FilesystemMessageQueueingServiceTests.cs @@ -249,6 +249,80 @@ public async Task Given_Auto_Acknowledge_Queue_When_Not_Acknowledged_Then_Messag Assert.That(queuedMessages, Is.Empty); } + [Test] + public async Task Given_Listener_Throws_MaxAttempts_Exceeded_Then_Message_Should_Be_Dead() + { + var message = new Message(new MessageHeaders + { + {HeaderName.ContentType, "text/plain"}, + {HeaderName.MessageId, Guid.NewGuid().ToString()} + }, "Hello, world!"); + + var listenerCalledEvent = new ManualResetEvent(false); + var tempDir = GetTempDirectory(); + var queueName = new QueueName(Guid.NewGuid().ToString()); + var queuePath = Path.Combine(tempDir.FullName, queueName); + var queueDir = new DirectoryInfo(queuePath); + if (!queueDir.Exists) + { + queueDir.Create(); + } + + var mockListener = new Mock(); + mockListener.Setup( + x => + x.MessageReceived(It.IsAny(), It.IsAny(), + It.IsAny())) + .Callback((msg, ctx, ct) => + { + listenerCalledEvent.Set(); + throw new Exception(); + }); + + using (var fsQueueingService = new FilesystemMessageQueueingService(tempDir)) + using (var cts = new CancellationTokenSource()) + { + var ct = cts.Token; + fsQueueingService.Init(); + + await fsQueueingService.CreateQueue(queueName, mockListener.Object, new QueueOptions + { + AutoAcknowledge = true, + MaxAttempts = 3, + RetryDelay = TimeSpan.FromMilliseconds(250) + }, ct); + + await fsQueueingService.EnqueueMessage(queueName, message, Thread.CurrentPrincipal, ct); + await listenerCalledEvent.WaitOneAsync(TimeSpan.FromSeconds(1)); + + // The listener is called before the file is deleted, so there is a possible + // race condition here. Wait for a second to allow the delete to take place + // before enumerating the files to see that they were actually deleted. + await Task.Delay(TimeSpan.FromSeconds(1), ct); + } + + var messageEqualityComparer = new MessageEqualityComparer(); + mockListener.Verify(x => + x.MessageReceived(It.Is(m => messageEqualityComparer.Equals(m, message)), + It.IsAny(), It.IsAny()), Times.Exactly(3)); + + var queuedMessages = queueDir.EnumerateFiles() + .Select(f => new MessageFile(f)) + .ToList(); + + Assert.That(queuedMessages, Is.Empty); + + var deadLetterPath = Path.Combine(queuePath, "dead"); + var deadLetterDir = new DirectoryInfo(deadLetterPath); + Assert.That(deadLetterDir.Exists, Is.True); + + var deadMessages = deadLetterDir.EnumerateFiles() + .Select(f => new MessageFile(f)) + .ToList(); + + Assert.That(deadMessages, Is.Not.Empty); + } + [Test] public async Task Given_Auto_Acknowledge_Queue_When_Listener_Throws_Then_Message_Should_Not_Be_Deleted() { @@ -289,8 +363,10 @@ await fsQueueingService .CreateQueue(queueName, mockListener.Object, new QueueOptions { AutoAcknowledge = true, - MaxAttempts = 2, // So the message doesn't get moved to the DLQ - RetryDelay = TimeSpan.FromSeconds(30) + // Prevent message from being sent to the DLQ + MaxAttempts = 100, + // Short retry delay to message handler is called more than once + RetryDelay = TimeSpan.FromMilliseconds(250) }, ct); await fsQueueingService.EnqueueMessage(queueName, message, Thread.CurrentPrincipal, ct); @@ -309,7 +385,7 @@ await fsQueueingService var messageEqualityComparer = new MessageEqualityComparer(); mockListener.Verify(x => x.MessageReceived(It.Is(m => messageEqualityComparer.Equals(m, message)), - It.IsAny(), It.IsAny()), Times.Once()); + It.IsAny(), It.IsAny()), Times.AtLeast(2)); var queuedMessages = queueDir.EnumerateFiles() .Select(f => new MessageFile(f)) diff --git a/Source/Platibus.UnitTests/RabbitMQMessageQueueingServiceTests.cs b/Source/Platibus.UnitTests/RabbitMQMessageQueueingServiceTests.cs index a86852c1..de2ed804 100644 --- a/Source/Platibus.UnitTests/RabbitMQMessageQueueingServiceTests.cs +++ b/Source/Platibus.UnitTests/RabbitMQMessageQueueingServiceTests.cs @@ -231,6 +231,70 @@ await rmqQueueingService } } + [Test] + public async Task Given_Listener_Throws_MaxAttempts_Exceeded_Then_Message_Should_Be_Dead() + { + var message = new Message(new MessageHeaders + { + {HeaderName.ContentType, "text/plain"}, + {HeaderName.MessageId, Guid.NewGuid().ToString()} + }, "Hello, world!"); + + var listenerCalledEvent = new ManualResetEvent(false); + var queueName = new QueueName(Guid.NewGuid().ToString()); + var deadQueueName = new QueueName(Guid.NewGuid().ToString()); + + var mockListener = new Mock(); + mockListener.Setup(x => + x.MessageReceived(It.IsAny(), It.IsAny(), + It.IsAny())) + .Callback( + (msg, ctx, ct) => + { + listenerCalledEvent.Set(); + throw new Exception("Test exception"); + }) + .Returns(Task.FromResult(true)); + + var cts = new CancellationTokenSource(); + var rmqQueueingService = new RabbitMQMessageQueueingService(RabbitMQUri); + try + { + var ct = cts.Token; + await rmqQueueingService + .CreateQueue(queueName, mockListener.Object, new QueueOptions + { + MaxAttempts = 3, + RetryDelay = TimeSpan.FromMilliseconds(100) + }, ct); + + CreateQueue(deadQueueName, queueName.GetDeadLetterExchangeName()); + + await rmqQueueingService.EnqueueMessage(queueName, message, Thread.CurrentPrincipal, ct); + await listenerCalledEvent.WaitOneAsync(TimeSpan.FromSeconds(3)); + + // The listener is called before the message is published to the retry queue, + // so there is a possible race condition here. Wait for a second to allow the + // publish to take place before checking the retry queue depth. + await Task.Delay(TimeSpan.FromSeconds(1), ct); + + var messageEqualityComparer = new MessageEqualityComparer(); + mockListener.Verify(x => + x.MessageReceived(It.Is(m => messageEqualityComparer.Equals(m, message)), + It.IsAny(), It.IsAny()), Times.Exactly(3)); + + Assert.That(GetQueueDepth(queueName), Is.EqualTo(0)); + Assert.That(GetQueueDepth(deadQueueName), Is.EqualTo(1)); + } + finally + { + rmqQueueingService.TryDispose(); + cts.TryDispose(); + DeleteQueue(queueName); + DeleteQueue(deadQueueName); + } + } + [Test] public async Task Given_Auto_Acknowledge_Queue_When_Not_Acknowledged_Then_Message_Should_Be_Deleted() { @@ -336,6 +400,18 @@ private static void DeleteQueue(QueueName queueName) } } + private static void CreateQueue(QueueName queueName, string exchangeName) + { + var connectionFactory = new ConnectionFactory { Uri = RabbitMQUri.ToString() }; + using (var connection = connectionFactory.CreateConnection()) + using (var channel = connection.CreateModel()) + { + var queue = RabbitMQHelper.ReplaceInvalidQueueNameCharacters(queueName); + channel.QueueDeclare(queue, true, false, false, null); + channel.QueueBind(queue, exchangeName, "", null); + } + } + [Test] public async Task Given_Existing_Message_When_Creating_Queue_Then_Listener_Should_Fire() { diff --git a/Source/Platibus.UnitTests/SQLMessageQueueInspector.cs b/Source/Platibus.UnitTests/SQLMessageQueueInspector.cs index f6e5457c..a5dd5e02 100644 --- a/Source/Platibus.UnitTests/SQLMessageQueueInspector.cs +++ b/Source/Platibus.UnitTests/SQLMessageQueueInspector.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Security.Principal; using System.Threading; using System.Threading.Tasks; @@ -25,6 +26,11 @@ public Task> EnumerateMessages() return SelectQueuedMessages(); } + public Task> EnumerateAbandonedMessages(DateTime startDate, DateTime endDate) + { + return SelectAbandonedMessages(startDate, endDate); + } + private class NoopQueueListener : IQueueListener { public Task MessageReceived(Message message, IQueuedMessageContext context, diff --git a/Source/Platibus.UnitTests/SQLMessageQueueingServiceTests.cs b/Source/Platibus.UnitTests/SQLMessageQueueingServiceTests.cs index e7611c2e..a2469741 100644 --- a/Source/Platibus.UnitTests/SQLMessageQueueingServiceTests.cs +++ b/Source/Platibus.UnitTests/SQLMessageQueueingServiceTests.cs @@ -289,6 +289,73 @@ await sqlQueueingService } } + [Test] + public async Task Given_Listener_Throws_Max_Attempts_Exceeded_Then_Message_Should_Be_Abandoned() + { + var message = new Message(new MessageHeaders + { + {HeaderName.ContentType, "text/plain"}, + {HeaderName.MessageId, Guid.NewGuid().ToString()} + }, "Hello, world!"); + + var listenerCalledEvent = new ManualResetEvent(false); + var connectionStringSettings = GetConnectionStringSettings(); + var queueName = new QueueName(Guid.NewGuid().ToString()); + + var mockListener = new Mock(); + mockListener.Setup(x => + x.MessageReceived(It.IsAny(), It.IsAny(), + It.IsAny())) + .Callback((msg, ctx, ct) => + { + listenerCalledEvent.Set(); + throw new Exception(); + }); + + var startDate = DateTime.UtcNow; + using (var cts = new CancellationTokenSource()) + using (var sqlQueueingService = new SQLMessageQueueingService(connectionStringSettings)) + { + var ct = cts.Token; + sqlQueueingService.Init(); + + await sqlQueueingService + .CreateQueue(queueName, mockListener.Object, new QueueOptions + { + AutoAcknowledge = true, + MaxAttempts = 3, + RetryDelay = TimeSpan.FromMilliseconds(100) + }, ct); + + await sqlQueueingService.EnqueueMessage(queueName, message, Thread.CurrentPrincipal, ct); + + var listenerCalled = await listenerCalledEvent + .WaitOneAsync(Debugger.IsAttached ? TimeSpan.FromMinutes(5) : TimeSpan.FromSeconds(3)); + + Assert.That(listenerCalled, Is.True); + + // The listener is called before the row is updated, so there is a possible + // race condition here. Wait for a second to allow the update to take place + // before enumerating the rows to see that they were actually not updated. + await Task.Delay(TimeSpan.FromSeconds(1), ct); + + var messageEqualityComparer = new MessageEqualityComparer(); + mockListener.Verify(x => + x.MessageReceived(It.Is(m => messageEqualityComparer.Equals(m, message)), + It.IsAny(), It.IsAny()), Times.Exactly(3)); + + var sqlQueueInspector = new SQLMessageQueueInspector(sqlQueueingService, queueName); + var queuedMessages = (await sqlQueueInspector.EnumerateMessages()).ToList(); + + Assert.That(queuedMessages, Is.Empty); + + var endDate = DateTime.UtcNow; + var abandonedMessages = (await sqlQueueInspector.EnumerateAbandonedMessages(startDate, endDate)).ToList(); + Assert.That(abandonedMessages.Count, Is.EqualTo(1)); + Assert.That(abandonedMessages[0].Message, Is.EqualTo(message).Using(messageEqualityComparer)); + } + } + [Test] public async Task Given_Existing_Message_When_Creating_Queue_Then_Listener_Should_Fire() { diff --git a/Source/Platibus/InMemory/InMemoryQueue.cs b/Source/Platibus/InMemory/InMemoryQueue.cs index fea6e274..9d14e54d 100644 --- a/Source/Platibus/InMemory/InMemoryQueue.cs +++ b/Source/Platibus/InMemory/InMemoryQueue.cs @@ -106,6 +106,7 @@ private async Task ProcessQueuedMessage(QueuedMessage queuedMessage, Cancellatio // TODO: Implement journaling break; } + if (attemptsRemaining > 0) { await Task.Delay(_retryDelay, cancellationToken); diff --git a/Source/Platibus/SQL/CommonSQLDialect.cs b/Source/Platibus/SQL/CommonSQLDialect.cs index 7ee23c32..e81c224a 100644 --- a/Source/Platibus/SQL/CommonSQLDialect.cs +++ b/Source/Platibus/SQL/CommonSQLDialect.cs @@ -11,6 +11,9 @@ /// public abstract class CommonSQLDialect : ISQLDialect { + private string _startDateParameterName; + private string _endDateParameterName; + /// /// The dialect-specific command used to create the objects (tables, indexes, /// stored procedures, views, etc.) needed to store queued messages in the @@ -90,6 +93,35 @@ AND [Acknowledged] IS NULL AND [Abandoned] IS NULL"; } } + /// + /// The dialect-specific command used to select the list of queued messages + /// in a particular queue + /// + public virtual string SelectAbandonedMessagesCommand + { + get { return @" +SELECT + [MessageId], + [QueueName], + [MessageName], + [Origination], + [Destination], + [ReplyTo], + [Expires], + [ContentType], + [SenderPrincipal], + [Headers], + [MessageContent], + [Attempts], + [Acknowledged], + [Abandoned] +FROM [PB_QueuedMessages] +WHERE [QueueName]=@QueueName +AND [Acknowledged] IS NULL +AND [Abandoned] >= @StartDate +AND [Abandoned] < @EndDate"; } + } + /// /// The dialect-specific command used to update the state of a queued message /// @@ -305,5 +337,15 @@ public string CurrentDateParameterName { get { return "@CurrentDate"; } } + + /// + /// The name of the parameter used to specify the start date in queries based on date ranges + /// + public string StartDateParameterName { get { return "@StartDate"; } } + + /// + /// The name of the parameter used to specify the end date in queries based on date ranges + /// + public string EndDateParameterName { get { return "@EndDate"; } } } } \ No newline at end of file diff --git a/Source/Platibus/SQL/ISQLDialect.cs b/Source/Platibus/SQL/ISQLDialect.cs index e9de9f6a..4f76b4c4 100644 --- a/Source/Platibus/SQL/ISQLDialect.cs +++ b/Source/Platibus/SQL/ISQLDialect.cs @@ -42,6 +42,12 @@ public interface ISQLDialect /// string SelectQueuedMessagesCommand { get; } + /// + /// The dialect-specific command used to select the list of abandoned messages + /// in a particular queue + /// + string SelectAbandonedMessagesCommand { get; } + /// /// The dialect-specific command used to update the state of a queued message /// @@ -168,5 +174,17 @@ public interface ISQLDialect /// on the server when selecting active subscriptions /// string CurrentDateParameterName { get; } + + /// + /// The name of the parameter used to specify the start date in queries based on date ranges + /// + string StartDateParameterName { get; } + + /// + /// The name of the parameter used to specify the end date in queries based on date ranges + /// + string EndDateParameterName { get; } } + + } \ No newline at end of file diff --git a/Source/Platibus/SQL/SQLMessageQueue.cs b/Source/Platibus/SQL/SQLMessageQueue.cs index 8f149a87..5233e5e5 100644 --- a/Source/Platibus/SQL/SQLMessageQueue.cs +++ b/Source/Platibus/SQL/SQLMessageQueue.cs @@ -285,6 +285,55 @@ protected virtual Task> SelectQueuedMessages() return Task.FromResult(queuedMessages.AsEnumerable()); } + /// + /// Selects all abandoned messages from the SQL database + /// + /// Returns a task that completes when all records have been selected and whose + /// result is the enumerable sequence of the selected records + [SuppressMessage("Microsoft.Security", "CA2100:Review SQL queries for security vulnerabilities")] + protected virtual Task> SelectAbandonedMessages(DateTime startDate, DateTime endDate) + { + var queuedMessages = new List(); + var connection = _connectionProvider.GetConnection(); + try + { + using (var scope = new TransactionScope(TransactionScopeOption.Required)) + { + using (var command = connection.CreateCommand()) + { + command.CommandType = CommandType.Text; + command.CommandText = _dialect.SelectAbandonedMessagesCommand; + command.SetParameter(_dialect.QueueNameParameterName, (string)_queueName); + command.SetParameter(_dialect.StartDateParameterName, startDate); + command.SetParameter(_dialect.EndDateParameterName, endDate); + + using (var reader = command.ExecuteReader()) + { + while (reader.Read()) + { + var messageContent = reader.GetString("MessageContent"); + var headers = DeserializeHeaders(reader.GetString("Headers")); + var senderPrincipal = DeserializePrincipal(reader.GetString("SenderPrincipal")); + var message = new Message(headers, messageContent); + var attempts = reader.GetInt("Attempts").GetValueOrDefault(0); + var queuedMessage = new SQLQueuedMessage(message, senderPrincipal, attempts); + queuedMessages.Add(queuedMessage); + } + } + } + scope.Complete(); + } + } + finally + { + _connectionProvider.ReleaseConnection(connection); + } + + // SQL calls are not async to avoid the need for TransactionAsyncFlowOption + // and dependency on .NET 4.5.1 and later + return Task.FromResult(queuedMessages.AsEnumerable()); + } + /// /// Updates an existing queued message in the SQL database ///