diff --git a/Source/Platibus.SQLite/SQLiteDialect.cs b/Source/Platibus.SQLite/SQLiteDialect.cs index b150c152..5e6f3456 100644 --- a/Source/Platibus.SQLite/SQLiteDialect.cs +++ b/Source/Platibus.SQLite/SQLiteDialect.cs @@ -74,6 +74,7 @@ CREATE TABLE IF NOT EXISTS [PB_MessageJournal] ( [Id] INTEGER PRIMARY KEY AUTOINCREMENT, [MessageId] TEXT NOT NULL, + [Timestamp] INTEGER, [Category] TEXT NOT NULL, [MessageName] TEXT NULL, [Origination] TEXT NULL, @@ -91,6 +92,9 @@ [Abandoned] TEXT NULL CREATE INDEX IF NOT EXISTS [PB_MessageJournal_IX_MessageId] ON [PB_MessageJournal]([MessageId]); +CREATE INDEX IF NOT EXISTS [PB_MessageJournal_IX_Timestamp] + ON [PB_MessageJournal]([Timestamp]); + CREATE INDEX IF NOT EXISTS [PB_MessageJournal_IX_Category] ON [PB_MessageJournal]([Category]);"; } } diff --git a/Source/Platibus.SQLite/SQLiteMessageJournalingService.cs b/Source/Platibus.SQLite/SQLiteMessageJournalingService.cs index 1bf4e4a5..f8a349bb 100644 --- a/Source/Platibus.SQLite/SQLiteMessageJournalingService.cs +++ b/Source/Platibus.SQLite/SQLiteMessageJournalingService.cs @@ -81,10 +81,10 @@ private static IDbConnectionProvider InitDb(DirectoryInfo directory) } /// - protected override Task InsertJournaledMessage(Message message, string category) + protected override Task InsertJournaledMessage(Message message, string category, DateTimeOffset timestamp = default(DateTimeOffset)) { CheckDisposed(); - var op = new SQLiteOperation(() => base.InsertJournaledMessage(message, category)); + var op = new SQLiteOperation(() => base.InsertJournaledMessage(message, category, timestamp)); _operationQueue.Post(op); return op.Task; } diff --git a/Source/Platibus.SampleWebApp/Web.config b/Source/Platibus.SampleWebApp/Web.config index 66203db5..de4e8cbf 100644 --- a/Source/Platibus.SampleWebApp/Web.config +++ b/Source/Platibus.SampleWebApp/Web.config @@ -60,9 +60,9 @@ The baseUri must agree with the bindings in IIS. --> - - - + + + @@ -90,9 +90,9 @@ The baseUri must agree with the bindings in IIS or the HttpListener host. --> - - - + + + diff --git a/Source/Platibus.UnitTests/MessageJournalingServiceTests.cs b/Source/Platibus.UnitTests/MessageJournalingServiceTests.cs index c522c330..8cf2050a 100644 --- a/Source/Platibus.UnitTests/MessageJournalingServiceTests.cs +++ b/Source/Platibus.UnitTests/MessageJournalingServiceTests.cs @@ -52,7 +52,7 @@ protected Message GivenSentMessage() return Message = new Message(messageHeaders, "Hello, world!"); } - protected Task WhenJournalingSentMessage() + protected virtual Task WhenJournalingSentMessage() { return MessageJournalingService.MessageSent(Message); } @@ -74,7 +74,7 @@ protected Message GivenReceivedMessage() return Message = new Message(messageHeaders, "Hello, world!"); } - protected Task WhenJournalingReceivedMessage() + protected virtual Task WhenJournalingReceivedMessage() { return MessageJournalingService.MessageReceived(Message); } @@ -95,7 +95,7 @@ protected Message GivenPublishedMessage() return Message = new Message(messageHeaders, "Hello, world!"); } - protected Task WhenJournalingPublishedMessage() + protected virtual Task WhenJournalingPublishedMessage() { return MessageJournalingService.MessagePublished(Message); } diff --git a/Source/Platibus.UnitTests/Mocks/FilteredMessageJournalingServiceTests.cs b/Source/Platibus.UnitTests/Mocks/FilteredMessageJournalingServiceTests.cs new file mode 100644 index 00000000..8a4cd5d8 --- /dev/null +++ b/Source/Platibus.UnitTests/Mocks/FilteredMessageJournalingServiceTests.cs @@ -0,0 +1,126 @@ +using System.Threading; +using System.Threading.Tasks; +using Moq; +using NUnit.Framework; + +namespace Platibus.UnitTests.Mocks +{ + public class FilteredMessageJournalingServiceTests : MessageJournalingServiceTests + { + private readonly Mock _mockMessageJournalingService; + + protected bool JournalSentMessages = true; + protected bool JournalReceivedMessages = true; + protected bool JournalPublishedMessages = true; + + public FilteredMessageJournalingServiceTests() + : this(MockCollectionFixture.Instance) + { + } + + public FilteredMessageJournalingServiceTests(MockCollectionFixture fixture) + : base(fixture.MockMessageJournalingService.Object) + { + _mockMessageJournalingService = fixture.MockMessageJournalingService; + } + + [Test] + public async Task SentMessagesNotJournaledWhenSentMessagesFilteredOut() + { + GivenSentMessage(); + GivenSentMessagesOmittedFromJournal(); + await WhenJournalingSentMessage(); + AssertSentMessageIsNotWrittenToJournal(); + } + + [Test] + public async Task ReceivedMessagesNotJournaledWhenReceivedMessagesFilteredOut() + { + GivenReceivedMessage(); + GivenReceivedMessagesOmittedFromJournal(); + await WhenJournalingReceivedMessage(); + AssertReceivedMessageIsNotWrittenToJournal(); + } + + [Test] + public async Task PublishedMessagesNotJournaledWhenPublishedMessagesFilteredOut() + { + GivenPublishedMessage(); + GivenPublishedMessagesOmittedFromJournal(); + await WhenJournalingPublishedMessage(); + AssertPublishedMessageIsNotWrittenToJournal(); + } + + protected void GivenSentMessagesOmittedFromJournal() + { + JournalSentMessages = false; + } + + protected void GivenReceivedMessagesOmittedFromJournal() + { + JournalReceivedMessages = false; + } + + protected void GivenPublishedMessagesOmittedFromJournal() + { + JournalPublishedMessages = false; + } + + protected FilteredMessageJournalingService CreateFilteredMessageJournalingService() + { + return new FilteredMessageJournalingService( + MessageJournalingService, + JournalSentMessages, + JournalReceivedMessages, + JournalPublishedMessages); + } + + protected override Task WhenJournalingSentMessage() + { + return CreateFilteredMessageJournalingService().MessageSent(Message); + } + + protected override Task WhenJournalingReceivedMessage() + { + return CreateFilteredMessageJournalingService().MessageReceived(Message); + } + + protected override Task WhenJournalingPublishedMessage() + { + return CreateFilteredMessageJournalingService().MessagePublished(Message); + } + + protected override Task AssertSentMessageIsWrittenToJournal() + { + _mockMessageJournalingService.Verify(x => x.MessageSent(Message, It.IsAny()), Times.Once()); + return Task.FromResult(true); + } + + protected override Task AssertReceivedMessageIsWrittenToJournal() + { + _mockMessageJournalingService.Verify(x => x.MessageReceived(Message, It.IsAny()), Times.Once()); + return Task.FromResult(true); + } + + protected override Task AssertPublishedMessageIsWrittenToJournal() + { + _mockMessageJournalingService.Verify(x => x.MessagePublished(Message, It.IsAny()), Times.Once()); + return Task.FromResult(true); + } + + protected void AssertSentMessageIsNotWrittenToJournal() + { + _mockMessageJournalingService.Verify(x => x.MessageSent(Message, It.IsAny()), Times.Never); + } + + protected void AssertReceivedMessageIsNotWrittenToJournal() + { + _mockMessageJournalingService.Verify(x => x.MessageReceived(Message, It.IsAny()), Times.Never); + } + + protected void AssertPublishedMessageIsNotWrittenToJournal() + { + _mockMessageJournalingService.Verify(x => x.MessagePublished(Message, It.IsAny()), Times.Never); + } + } +} diff --git a/Source/Platibus.UnitTests/Mocks/MockCollectionFixture.cs b/Source/Platibus.UnitTests/Mocks/MockCollectionFixture.cs new file mode 100644 index 00000000..2dd9562c --- /dev/null +++ b/Source/Platibus.UnitTests/Mocks/MockCollectionFixture.cs @@ -0,0 +1,24 @@ +using Moq; +using NUnit.Framework; + +namespace Platibus.UnitTests.Mocks +{ + [SetUpFixture] + public class MockCollectionFixture + { + public static MockCollectionFixture Instance; + + [SetUp] + public void SetUp() + { + Instance = new MockCollectionFixture(); + } + + public readonly Mock MockMessageJournalingService; + + public MockCollectionFixture() + { + MockMessageJournalingService = new Mock(); + } + } +} diff --git a/Source/Platibus.UnitTests/Platibus.UnitTests.csproj b/Source/Platibus.UnitTests/Platibus.UnitTests.csproj index 7d7d48b4..c0ae26e0 100644 --- a/Source/Platibus.UnitTests/Platibus.UnitTests.csproj +++ b/Source/Platibus.UnitTests/Platibus.UnitTests.csproj @@ -123,9 +123,11 @@ + + diff --git a/Source/Platibus/Config/JournalingElement.cs b/Source/Platibus/Config/JournalingElement.cs index 99f66f35..58cb8bc4 100644 --- a/Source/Platibus/Config/JournalingElement.cs +++ b/Source/Platibus/Config/JournalingElement.cs @@ -32,6 +32,9 @@ public class JournalingElement : ExtensibleConfigurationElement { private const string EnabledPropertyName = "enabled"; private const string ProviderPropertyName = "provider"; + private const string SentPropertyName = "sent"; + private const string ReceivedPropertyName = "received"; + private const string PublishedPropertyName = "published"; /// /// Indicates whether message journaling is enabled @@ -54,5 +57,35 @@ public string Provider get { return (string) base[ProviderPropertyName]; } set { base[ProviderPropertyName] = value; } } + + /// + /// Whether to journal sent messages + /// + [ConfigurationProperty(SentPropertyName, DefaultValue = true)] + public bool JournalSentMessages + { + get { return (bool)base[SentPropertyName]; } + set { base[SentPropertyName] = value; } + } + + /// + /// Whether to journal received messages + /// + [ConfigurationProperty(ReceivedPropertyName, DefaultValue = true)] + public bool JournalReceivedMessages + { + get { return (bool)base[ReceivedPropertyName]; } + set { base[ReceivedPropertyName] = value; } + } + + /// + /// Whether to journal published messages + /// + [ConfigurationProperty(PublishedPropertyName, DefaultValue = true)] + public bool JournalPublishedMessages + { + get { return (bool)base[PublishedPropertyName]; } + set { base[PublishedPropertyName] = value; } + } } } \ No newline at end of file diff --git a/Source/Platibus/Config/PlatibusConfigurationManager.cs b/Source/Platibus/Config/PlatibusConfigurationManager.cs index f391e226..b3febbbf 100644 --- a/Source/Platibus/Config/PlatibusConfigurationManager.cs +++ b/Source/Platibus/Config/PlatibusConfigurationManager.cs @@ -268,7 +268,7 @@ public static Task InitMessageQueueingService(QueueingE /// Returns a task whose result is an initialized message journaling service /// Thrown if is /// null - public static Task InitMessageJournalingService(JournalingElement config) + public static async Task InitMessageJournalingService(JournalingElement config) { if (config == null) throw new ArgumentNullException("config"); @@ -282,7 +282,14 @@ public static Task InitMessageJournalingService(Journ var provider = ProviderHelper.GetProvider(providerName); Log.Debug("Initializing message journaling service..."); - return provider.CreateMessageJournalingService(config); + var messageJournalingService = await provider.CreateMessageJournalingService(config); + var filteredMessageJournalingService = new FilteredMessageJournalingService( + messageJournalingService, + config.JournalSentMessages, + config.JournalReceivedMessages, + config.JournalPublishedMessages); + + return filteredMessageJournalingService; } /// diff --git a/Source/Platibus/FilteredMessageJournalingService.cs b/Source/Platibus/FilteredMessageJournalingService.cs new file mode 100644 index 00000000..aaff3bfe --- /dev/null +++ b/Source/Platibus/FilteredMessageJournalingService.cs @@ -0,0 +1,66 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Platibus.Config; + +namespace Platibus +{ + /// + /// An decorator that allows or inhibits + /// message journaling operations based on user configuration. + /// + /// + /// + /// + public class FilteredMessageJournalingService : IMessageJournalingService + { + private readonly IMessageJournalingService _messageJournalingService; + private readonly bool _journalSentMessages; + private readonly bool _journalReceivedMessages; + private readonly bool _journalPublishedMessages; + + /// + /// Initializes a new instance + /// + /// The decorated message journaling service + /// Whether to allow sent messages to be journaled + /// Whether to allow received messages to be journaled + /// Whether to allow published messages to be journaled + public FilteredMessageJournalingService(IMessageJournalingService messageJournalingService, bool journalSentMessages, bool journalReceivedMessages, bool journalPublishedMessages) + { + if (messageJournalingService == null) throw new ArgumentNullException("messageJournalingService"); + + _messageJournalingService = messageJournalingService; + _journalSentMessages = journalSentMessages; + _journalReceivedMessages = journalReceivedMessages; + _journalPublishedMessages = journalPublishedMessages; + } + + /// + public async Task MessageSent(Message message, CancellationToken cancellationToken = new CancellationToken()) + { + if (_journalSentMessages) + { + await _messageJournalingService.MessageSent(message, cancellationToken); + } + } + + /// + public async Task MessageReceived(Message message, CancellationToken cancellationToken = new CancellationToken()) + { + if (_journalReceivedMessages) + { + await _messageJournalingService.MessageReceived(message, cancellationToken); + } + } + + /// + public async Task MessagePublished(Message message, CancellationToken cancellationToken = new CancellationToken()) + { + if (_journalPublishedMessages) + { + await _messageJournalingService.MessagePublished(message, cancellationToken); + } + } + } +} diff --git a/Source/Platibus/Platibus.csproj b/Source/Platibus/Platibus.csproj index 7a33e783..e1d462f3 100644 --- a/Source/Platibus/Platibus.csproj +++ b/Source/Platibus/Platibus.csproj @@ -90,6 +90,7 @@ + diff --git a/Source/Platibus/SQL/CommonSQLDialect.cs b/Source/Platibus/SQL/CommonSQLDialect.cs index dfcbbfb0..db42f205 100644 --- a/Source/Platibus/SQL/CommonSQLDialect.cs +++ b/Source/Platibus/SQL/CommonSQLDialect.cs @@ -33,30 +33,16 @@ namespace Platibus.SQL /// public abstract class CommonSQLDialect : ISQLDialect { - /// - /// The dialect-specific command used to create the objects (tables, indexes, - /// stored procedures, views, etc.) needed to store queued messages in the - /// SQL database - /// + /// public abstract string CreateMessageQueueingServiceObjectsCommand { get; } - /// - /// The dialect-specific command used to create the objects (tables, indexes, - /// stored procedures, views, etc.) needed to store journaled messages in the - /// SQL database - /// + /// public abstract string CreateMessageJournalingServiceObjectsCommand { get; } - /// - /// The dialect-specific command used to create the objects (tables, indexes, - /// stored procedures, views, etc.) needed to store subscription tracking data - /// in the SQL database - /// + /// public abstract string CreateSubscriptionTrackingServiceObjectsCommand { get; } - /// - /// The dialect-specific command used to insert a queued message - /// + /// public virtual string InsertQueuedMessageCommand { get { return @" @@ -91,14 +77,13 @@ FROM [PB_QueuedMessages] AND [QueueName]=@QueueName)"; } } - /// - /// The dialect-specific command used to insert a queued message - /// + /// public virtual string InsertJournaledMessageCommand { get { return @" INSERT INTO [PB_MessageJournal] ( - [MessageId], + [MessageId], + [Timestamp], [Category], [MessageName], [Origination], @@ -110,6 +95,7 @@ INSERT INTO [PB_MessageJournal] ( [MessageContent]) VALUES ( @MessageId, + @Timestamp, @Category, @MessageName, @Origination, @@ -121,10 +107,7 @@ INSERT INTO [PB_MessageJournal] ( @MessageContent)"; } } - /// - /// The dialect-specific command used to select the list of queued messages - /// in a particular queue - /// + /// public virtual string SelectQueuedMessagesCommand { get { return @" @@ -149,10 +132,7 @@ AND [Acknowledged] IS NULL AND [Abandoned] IS NULL"; } } - /// - /// The dialect-specific command used to select the list of journaled messages - /// in a particular queue - /// + /// public virtual string SelectJournaledMessagesCommand { get { return @" @@ -170,10 +150,7 @@ public virtual string SelectJournaledMessagesCommand FROM [PB_MessageJournal]"; } } - /// - /// The dialect-specific command used to select the list of queued messages - /// in a particular queue - /// + /// public virtual string SelectAbandonedMessagesCommand { get { return @" @@ -199,9 +176,7 @@ AND [Acknowledged] IS NULL AND [Abandoned] < @EndDate"; } } - /// - /// The dialect-specific command used to update the state of a queued message - /// + /// public virtual string UpdateQueuedMessageCommand { get { return @" @@ -213,9 +188,7 @@ UPDATE [PB_QueuedMessages] SET AND [QueueName]=@QueueName"; } } - /// - /// The dialect-specific command used to insert new subscriptions - /// + /// public string InsertSubscriptionCommand { get { return @" @@ -228,9 +201,7 @@ FROM [PB_Subscriptions] AND [Subscriber]=@Subscriber)"; } } - /// - /// The dialect-specific command used to update existing subscriptions - /// + /// public string UpdateSubscriptionCommand { get { return @" @@ -239,9 +210,7 @@ public string UpdateSubscriptionCommand AND [Subscriber]=@Subscriber"; } } - /// - /// The dialect-specific command used to select subscriptions - /// + /// public string SelectSubscriptionsCommand { get { return @" @@ -251,9 +220,7 @@ WHERE [Expires] IS NULL OR [Expires] > @CurrentDate"; } } - /// - /// The dialect-specific command used to delete a subscription - /// + /// public string DeleteSubscriptionCommand { get { return @" @@ -262,172 +229,130 @@ DELETE FROM [PB_Subscriptions] AND [Subscriber]=@Subscriber"; } } - /// - /// The dialect-specific name for the parameter used to specify the name of - /// a queue when inserting messages - /// + /// public virtual string QueueNameParameterName { get { return "@QueueName"; } } - /// - /// The dialect-specific name for the parameter used to specify the message ID value - /// when inserting, updating, or deleting queued messages - /// + /// public virtual string MessageIdParameterName { get { return "@MessageId"; } } - /// - /// The dialect-specific name for the parameter used to specify the message name value - /// when inserting queued messages - /// + /// public virtual string MessageNameParameterName { get { return "@MessageName"; } } - /// - /// The dialect-specific name for the parameter used to specify the originating URI value - /// when inserting queued messages - /// + /// public virtual string OriginationParameterName { get { return "@Origination"; } } - /// - /// The dialect-specific name for the parameter used to specify the destination URI value - /// when inserting queued messages - /// + /// public virtual string DestinationParameterName { get { return "@Destination"; } } - /// - /// The dialect-specific name for the parameter used to specify the reply-to URI value - /// when inserting queued messages; inserting subscriptions; or updating subscriptions - /// + /// public virtual string ReplyToParameterName { get { return "@ReplyTo"; } } - /// - /// The dialect-specific name for the parameter used to specify the message expiration - /// date when inserting queued messages - /// + /// public virtual string ExpiresParameterName { get { return "@Expires"; } } - /// - /// The dialect-specific name for the parameter used to specify the content type value - /// when inserting queued messages - /// + /// public virtual string ContentTypeParameterName { get { return "@ContentType"; } } - /// - /// The dialect-specific name for the parameter used to specify the entire serialized headers - /// collection when inserting queued messages - /// + /// public virtual string HeadersParameterName { get { return "@Headers"; } } - /// - /// The dialect-specific name for the parameter used to specify the sender principal - /// when inserting queued messages - /// + /// public virtual string SenderPrincipalParameterName { get { return "@SenderPrincipal"; } } - /// - /// The dialect-specific name for the parameter used to specify the message content when - /// inserting queued messages - /// + /// public virtual string MessageContentParameterName { get { return "@MessageContent"; } } - /// - /// The dialect-specific name for the parameter used to specify the number of attempts when - /// updating queued messages - /// + /// public virtual string AttemptsParameterName { get { return "@Attempts"; } } - /// - /// The dialect-specific name for the parameter used to specify the date and time the message - /// was acknowledged when updating queued messages - /// + /// public virtual string AcknowledgedParameterName { get { return "@Acknowledged"; } } - /// - /// The dialect-specific name for the parameter used to specify the date and time the message - /// was abandoned when updating queued messages - /// + /// public virtual string AbandonedParameterName { get { return "@Abandoned"; } } - /// - /// The dialect-specific name for the parameter used to specify the name of the topic being - /// subscribed to when inserting subscriptions - /// + /// public string TopicNameParameterName { get { return "@TopicName"; } } - /// - /// The dialect-specific name for the parameter used to specify the URI of the subscriber - /// when inserting subscriptions - /// + /// public string SubscriberParameterName { get { return "@Subscriber"; } } - /// - /// The dialect-specific name for the parameter used to specify the current date and time - /// on the server when selecting active subscriptions - /// + /// public string CurrentDateParameterName { get { return "@CurrentDate"; } } - /// - /// The name of the parameter used to specify the start date in queries based on date ranges - /// - public string StartDateParameterName { get { return "@StartDate"; } } + /// + public string StartDateParameterName + { + get { return "@StartDate"; } + } + + /// + public string EndDateParameterName + { + get { return "@EndDate"; } + } - /// - /// The name of the parameter used to specify the end date in queries based on date ranges - /// - public string EndDateParameterName { get { return "@EndDate"; } } + /// + public string CategoryParameterName + { + get { return "@Category"; } + } - /// - /// The name of the parameter used to specify a category - /// - public string CategoryParameterName { get { return "@Category"; } } + /// + public string TimestampParameterName + { + get { return "@Timestamp"; } + } } } \ No newline at end of file diff --git a/Source/Platibus/SQL/ISQLDialect.cs b/Source/Platibus/SQL/ISQLDialect.cs index 0d5a47ac..bda8ce01 100644 --- a/Source/Platibus/SQL/ISQLDialect.cs +++ b/Source/Platibus/SQL/ISQLDialect.cs @@ -229,6 +229,11 @@ public interface ISQLDialect /// in a particular queue /// string SelectJournaledMessagesCommand { get; } + + /// + /// The name of the parameter used to specify a timestamp + /// + string TimestampParameterName { get; } } diff --git a/Source/Platibus/SQL/MSSQLDialect.cs b/Source/Platibus/SQL/MSSQLDialect.cs index 330b4ced..e26c6e62 100644 --- a/Source/Platibus/SQL/MSSQLDialect.cs +++ b/Source/Platibus/SQL/MSSQLDialect.cs @@ -73,6 +73,7 @@ CREATE TABLE [PB_MessageJournal] ( [Id] INT IDENTITY(1,1), [MessageId] UNIQUEIDENTIFIER NOT NULL, + [Timestamp] DATETIMEOFFSET NOT NULL, [Category] VARCHAR(20) NOT NULL, [MessageName] VARCHAR(500) NULL, [Origination] VARCHAR(500) NULL, @@ -82,9 +83,6 @@ [ReplyTo] VARCHAR(500) NULL, [ContentType] VARCHAR(100) NULL, [Headers] VARCHAR(MAX), [MessageContent] TEXT, - [Acknowledged] DATETIME NULL, - [Abandoned] DATETIME NULL, - [Attempts] INT NOT NULL DEFAULT 0, CONSTRAINT [PB_MessageJournal_PK] PRIMARY KEY CLUSTERED ([Id]) ) @@ -92,9 +90,25 @@ CONSTRAINT [PB_MessageJournal_PK] PRIMARY KEY CLUSTERED ([Id]) CREATE INDEX [PB_MessageJournal_IX_MessageId] ON [PB_MessageJournal]([MessageId]) + CREATE INDEX [PB_MessageJournal_IX_Timestamp] + ON [PB_MessageJournal]([Timestamp]) + CREATE INDEX [PB_MessageJournal_IX_Category] ON [PB_MessageJournal]([Category]) -END"; } +END + +IF NOT EXISTS (SELECT * FROM [sys].[columns] + WHERE [object_id] = OBJECT_ID(N'[PB_MessageJournal]') + AND [name] = 'Timestamp') +BEGIN + ALTER TABLE [PB_MessageJournal] + ADD [Timestamp] DATETIMEOFFSET NOT NULL DEFAULT SYSDATETIMEOFFSET() + + CREATE INDEX [PB_MessageJournal_IX_Timestamp] + ON [PB_MessageJournal]([Timestamp]) +END + +"; } } /// diff --git a/Source/Platibus/SQL/SQLMessageJournalingService.cs b/Source/Platibus/SQL/SQLMessageJournalingService.cs index feb54915..5179481e 100644 --- a/Source/Platibus/SQL/SQLMessageJournalingService.cs +++ b/Source/Platibus/SQL/SQLMessageJournalingService.cs @@ -101,19 +101,34 @@ public SQLMessageJournalingService(IDbConnectionProvider connectionProvider, ISQ /// public Task MessageReceived(Message message, CancellationToken cancellationToken = new CancellationToken()) { - return InsertJournaledMessage(message, "Received"); + var received = message.Headers.Received; + if (received == default(DateTime)) + { + received = DateTime.UtcNow; + } + return InsertJournaledMessage(message, "Received", received); } /// public Task MessageSent(Message message, CancellationToken cancellationToken = new CancellationToken()) { - return InsertJournaledMessage(message, "Sent"); + var sent = message.Headers.Sent; + if (sent == default(DateTime)) + { + sent = DateTime.UtcNow; + } + return InsertJournaledMessage(message, "Sent", sent); } /// public Task MessagePublished(Message message, CancellationToken cancellationToken = new CancellationToken()) { - return InsertJournaledMessage(message, "Published"); + var published = message.Headers.Published; + if (published == default(DateTime)) + { + published = DateTime.UtcNow; + } + return InsertJournaledMessage(message, "Published", published); } /// @@ -160,20 +175,27 @@ protected virtual Task> SelectJournaledMessages // and dependency on .NET 4.5.1 and later return Task.FromResult(queuedMessages.AsEnumerable()); } - + /// /// Inserts a journaled message into the SQL database /// /// The message to enqueue /// The journaled message category, e.g. "Sent", "Received", or "Published" + /// (Optional) The date/time that the message was sent, received, or published + /// according to message headers /// Returns a task that completes when the message has been inserted into the SQL /// database and whose result is a copy of the inserted record [SuppressMessage("Microsoft.Security", "CA2100:Review SQL queries for security vulnerabilities")] - protected virtual Task InsertJournaledMessage(Message message, string category) + protected virtual Task InsertJournaledMessage(Message message, string category, DateTimeOffset timestamp = default(DateTimeOffset)) { if (message == null) throw new ArgumentNullException("message"); if (string.IsNullOrWhiteSpace(category)) throw new ArgumentNullException("category"); + if (timestamp == default(DateTimeOffset)) + { + timestamp = DateTimeOffset.UtcNow; + } + SQLJournaledMessage journaledMessage; var connection = _connectionProvider.GetConnection(); try @@ -188,6 +210,7 @@ protected virtual Task InsertJournaledMessage(Message messa var headers = message.Headers; command.SetParameter(_dialect.MessageIdParameterName, (Guid)headers.MessageId); + command.SetParameter(_dialect.TimestampParameterName, timestamp); command.SetParameter(_dialect.CategoryParameterName, category); command.SetParameter(_dialect.MessageNameParameterName, (string)headers.MessageName); command.SetParameter(_dialect.OriginationParameterName, @@ -216,7 +239,7 @@ protected virtual Task InsertJournaledMessage(Message messa // and dependency on .NET 4.5.1 and later return Task.FromResult(journaledMessage); } - + /// /// A helper method to serialize message headers so that they can be inserted into a /// single column in the SQL database