Skip to content

Commit

Permalink
Added timestamp column to SQL message journal table to capture and fa…
Browse files Browse the repository at this point in the history
…cilitate roll-off based on sent, received, or published date. Extended message journal configuration to enable prevention of sent, received, and/or published messages from being written to the journal.

#25 #26
  • Loading branch information
sweetlandj committed Feb 8, 2017
1 parent 8a868bd commit 79c7e30
Show file tree
Hide file tree
Showing 15 changed files with 382 additions and 152 deletions.
4 changes: 4 additions & 0 deletions Source/Platibus.SQLite/SQLiteDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ CREATE TABLE IF NOT EXISTS [PB_MessageJournal]
(
[Id] INTEGER PRIMARY KEY AUTOINCREMENT,
[MessageId] TEXT NOT NULL,
[Timestamp] INTEGER,
[Category] TEXT NOT NULL,
[MessageName] TEXT NULL,
[Origination] TEXT NULL,
Expand All @@ -91,6 +92,9 @@ [Abandoned] TEXT NULL
CREATE INDEX IF NOT EXISTS [PB_MessageJournal_IX_MessageId]
ON [PB_MessageJournal]([MessageId]);
CREATE INDEX IF NOT EXISTS [PB_MessageJournal_IX_Timestamp]
ON [PB_MessageJournal]([Timestamp]);
CREATE INDEX IF NOT EXISTS [PB_MessageJournal_IX_Category]
ON [PB_MessageJournal]([Category]);"; }
}
Expand Down
4 changes: 2 additions & 2 deletions Source/Platibus.SQLite/SQLiteMessageJournalingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ private static IDbConnectionProvider InitDb(DirectoryInfo directory)
}

/// <inheritdoc />
protected override Task<SQLJournaledMessage> InsertJournaledMessage(Message message, string category)
protected override Task<SQLJournaledMessage> InsertJournaledMessage(Message message, string category, DateTimeOffset timestamp = default(DateTimeOffset))
{
CheckDisposed();
var op = new SQLiteOperation<SQLJournaledMessage>(() => base.InsertJournaledMessage(message, category));
var op = new SQLiteOperation<SQLJournaledMessage>(() => base.InsertJournaledMessage(message, category, timestamp));
_operationQueue.Post(op);
return op.Task;
}
Expand Down
12 changes: 6 additions & 6 deletions Source/Platibus.SampleWebApp/Web.config
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@
The baseUri must agree with the bindings in IIS.
-->
<platibus.iis baseUri="http://localhost:52180/platibus/" replyTimeout="00:00:30" bypassTransportLocalDestination="true">
<journaling provider="Filesystem" path="platibus\journal" />
<!-- <journaling provider="SQLite" path="platibus\journal" /> -->
<!-- <journaling provider="SQL" connectionName="Platibus" /> -->
<journaling provider="Filesystem" path="platibus\journal" sent="true" received="true" published="true" />
<!-- <journaling provider="SQLite" path="platibus\journal" sent="true" received="true" published="true" /> -->
<!-- <journaling provider="SQL" connectionName="Platibus" sent="true" received="true" published="true" /> -->

<queueing provider="Filesystem" path="platibus\queues" />
<!-- <queueing provider="SQLite" path="platibus\queues\sqlite" /> -->
Expand Down Expand Up @@ -90,9 +90,9 @@
The baseUri must agree with the bindings in IIS or the HttpListener host.
-->
<platibus.owin baseUri="http://localhost:52180/platibus/" replyTimeout="00:00:30" bypassTransportLocalDestination="true">
<journaling provider="Filesystem" path="platibus\journal" />
<!--<journaling provider="SQLite" path="platibus\journal" />-->
<!--<journaling provider="SQL" connectionName="Platibus" />-->
<journaling provider="Filesystem" path="platibus\journal" sent="true" received="true" published="true"/>
<!--<journaling provider="SQLite" path="platibus\journal" sent="true" received="true" published="true" />-->
<!--<journaling provider="SQL" connectionName="Platibus" sent="true" received="true" published="true" />-->

<queueing provider="Filesystem" path="platibus\queues" />
<!-- <queueing provider="SQLite" path="platibus\queues\sqlite" /> -->
Expand Down
6 changes: 3 additions & 3 deletions Source/Platibus.UnitTests/MessageJournalingServiceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected Message GivenSentMessage()
return Message = new Message(messageHeaders, "Hello, world!");
}

protected Task WhenJournalingSentMessage()
protected virtual Task WhenJournalingSentMessage()
{
return MessageJournalingService.MessageSent(Message);
}
Expand All @@ -74,7 +74,7 @@ protected Message GivenReceivedMessage()
return Message = new Message(messageHeaders, "Hello, world!");
}

protected Task WhenJournalingReceivedMessage()
protected virtual Task WhenJournalingReceivedMessage()
{
return MessageJournalingService.MessageReceived(Message);
}
Expand All @@ -95,7 +95,7 @@ protected Message GivenPublishedMessage()
return Message = new Message(messageHeaders, "Hello, world!");
}

protected Task WhenJournalingPublishedMessage()
protected virtual Task WhenJournalingPublishedMessage()
{
return MessageJournalingService.MessagePublished(Message);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
using System.Threading;
using System.Threading.Tasks;
using Moq;
using NUnit.Framework;

namespace Platibus.UnitTests.Mocks
{
public class FilteredMessageJournalingServiceTests : MessageJournalingServiceTests
{
private readonly Mock<IMessageJournalingService> _mockMessageJournalingService;

protected bool JournalSentMessages = true;
protected bool JournalReceivedMessages = true;
protected bool JournalPublishedMessages = true;

public FilteredMessageJournalingServiceTests()
: this(MockCollectionFixture.Instance)
{
}

public FilteredMessageJournalingServiceTests(MockCollectionFixture fixture)
: base(fixture.MockMessageJournalingService.Object)
{
_mockMessageJournalingService = fixture.MockMessageJournalingService;
}

[Test]
public async Task SentMessagesNotJournaledWhenSentMessagesFilteredOut()
{
GivenSentMessage();
GivenSentMessagesOmittedFromJournal();
await WhenJournalingSentMessage();
AssertSentMessageIsNotWrittenToJournal();
}

[Test]
public async Task ReceivedMessagesNotJournaledWhenReceivedMessagesFilteredOut()
{
GivenReceivedMessage();
GivenReceivedMessagesOmittedFromJournal();
await WhenJournalingReceivedMessage();
AssertReceivedMessageIsNotWrittenToJournal();
}

[Test]
public async Task PublishedMessagesNotJournaledWhenPublishedMessagesFilteredOut()
{
GivenPublishedMessage();
GivenPublishedMessagesOmittedFromJournal();
await WhenJournalingPublishedMessage();
AssertPublishedMessageIsNotWrittenToJournal();
}

protected void GivenSentMessagesOmittedFromJournal()
{
JournalSentMessages = false;
}

protected void GivenReceivedMessagesOmittedFromJournal()
{
JournalReceivedMessages = false;
}

protected void GivenPublishedMessagesOmittedFromJournal()
{
JournalPublishedMessages = false;
}

protected FilteredMessageJournalingService CreateFilteredMessageJournalingService()
{
return new FilteredMessageJournalingService(
MessageJournalingService,
JournalSentMessages,
JournalReceivedMessages,
JournalPublishedMessages);
}

protected override Task WhenJournalingSentMessage()
{
return CreateFilteredMessageJournalingService().MessageSent(Message);
}

protected override Task WhenJournalingReceivedMessage()
{
return CreateFilteredMessageJournalingService().MessageReceived(Message);
}

protected override Task WhenJournalingPublishedMessage()
{
return CreateFilteredMessageJournalingService().MessagePublished(Message);
}

protected override Task AssertSentMessageIsWrittenToJournal()
{
_mockMessageJournalingService.Verify(x => x.MessageSent(Message, It.IsAny<CancellationToken>()), Times.Once());
return Task.FromResult(true);
}

protected override Task AssertReceivedMessageIsWrittenToJournal()
{
_mockMessageJournalingService.Verify(x => x.MessageReceived(Message, It.IsAny<CancellationToken>()), Times.Once());
return Task.FromResult(true);
}

protected override Task AssertPublishedMessageIsWrittenToJournal()
{
_mockMessageJournalingService.Verify(x => x.MessagePublished(Message, It.IsAny<CancellationToken>()), Times.Once());
return Task.FromResult(true);
}

protected void AssertSentMessageIsNotWrittenToJournal()
{
_mockMessageJournalingService.Verify(x => x.MessageSent(Message, It.IsAny<CancellationToken>()), Times.Never);
}

protected void AssertReceivedMessageIsNotWrittenToJournal()
{
_mockMessageJournalingService.Verify(x => x.MessageReceived(Message, It.IsAny<CancellationToken>()), Times.Never);
}

protected void AssertPublishedMessageIsNotWrittenToJournal()
{
_mockMessageJournalingService.Verify(x => x.MessagePublished(Message, It.IsAny<CancellationToken>()), Times.Never);
}
}
}
24 changes: 24 additions & 0 deletions Source/Platibus.UnitTests/Mocks/MockCollectionFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using Moq;
using NUnit.Framework;

namespace Platibus.UnitTests.Mocks
{
[SetUpFixture]
public class MockCollectionFixture
{
public static MockCollectionFixture Instance;

[SetUp]
public void SetUp()
{
Instance = new MockCollectionFixture();
}

public readonly Mock<IMessageJournalingService> MockMessageJournalingService;

public MockCollectionFixture()
{
MockMessageJournalingService = new Mock<IMessageJournalingService>();
}
}
}
2 changes: 2 additions & 0 deletions Source/Platibus.UnitTests/Platibus.UnitTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,11 @@
<Compile Include="Filesystem\FilesystemCollectionFixture.cs" />
<Compile Include="Filesystem\FilesystemMessageJournalInspector.cs" />
<Compile Include="Filesystem\FilesystemMessageJournalingServiceTests.cs" />
<Compile Include="Mocks\FilteredMessageJournalingServiceTests.cs" />
<Compile Include="LocalDB\LocalDBCollectionFixture.cs" />
<Compile Include="LocalDB\LocalDBMessageJournalingServiceTests.cs" />
<Compile Include="MessageJournalingServiceTests.cs" />
<Compile Include="Mocks\MockCollectionFixture.cs" />
<Compile Include="PlatibusConfigurationExtensionTests.cs" />
<Compile Include="RabbitMQMessageQueueingServiceTests.cs" />
<Compile Include="SQLite\SQLiteMessageJournalInspector.cs" />
Expand Down
33 changes: 33 additions & 0 deletions Source/Platibus/Config/JournalingElement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class JournalingElement : ExtensibleConfigurationElement
{
private const string EnabledPropertyName = "enabled";
private const string ProviderPropertyName = "provider";
private const string SentPropertyName = "sent";
private const string ReceivedPropertyName = "received";
private const string PublishedPropertyName = "published";

/// <summary>
/// Indicates whether message journaling is enabled
Expand All @@ -54,5 +57,35 @@ public string Provider
get { return (string) base[ProviderPropertyName]; }
set { base[ProviderPropertyName] = value; }
}

/// <summary>
/// Whether to journal sent messages
/// </summary>
[ConfigurationProperty(SentPropertyName, DefaultValue = true)]
public bool JournalSentMessages
{
get { return (bool)base[SentPropertyName]; }
set { base[SentPropertyName] = value; }
}

/// <summary>
/// Whether to journal received messages
/// </summary>
[ConfigurationProperty(ReceivedPropertyName, DefaultValue = true)]
public bool JournalReceivedMessages
{
get { return (bool)base[ReceivedPropertyName]; }
set { base[ReceivedPropertyName] = value; }
}

/// <summary>
/// Whether to journal published messages
/// </summary>
[ConfigurationProperty(PublishedPropertyName, DefaultValue = true)]
public bool JournalPublishedMessages
{
get { return (bool)base[PublishedPropertyName]; }
set { base[PublishedPropertyName] = value; }
}
}
}
11 changes: 9 additions & 2 deletions Source/Platibus/Config/PlatibusConfigurationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public static Task<IMessageQueueingService> InitMessageQueueingService(QueueingE
/// <returns>Returns a task whose result is an initialized message journaling service</returns>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="config"/> is
/// <c>null</c></exception>
public static Task<IMessageJournalingService> InitMessageJournalingService(JournalingElement config)
public static async Task<IMessageJournalingService> InitMessageJournalingService(JournalingElement config)
{
if (config == null) throw new ArgumentNullException("config");

Expand All @@ -282,7 +282,14 @@ public static Task<IMessageJournalingService> InitMessageJournalingService(Journ
var provider = ProviderHelper.GetProvider<IMessageJournalingServiceProvider>(providerName);

Log.Debug("Initializing message journaling service...");
return provider.CreateMessageJournalingService(config);
var messageJournalingService = await provider.CreateMessageJournalingService(config);
var filteredMessageJournalingService = new FilteredMessageJournalingService(
messageJournalingService,
config.JournalSentMessages,
config.JournalReceivedMessages,
config.JournalPublishedMessages);

return filteredMessageJournalingService;
}

/// <summary>
Expand Down
66 changes: 66 additions & 0 deletions Source/Platibus/FilteredMessageJournalingService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Platibus.Config;

namespace Platibus
{
/// <summary>
/// An <see cref="IMessageJournalingService"/> decorator that allows or inhibits
/// message journaling operations based on user configuration.
/// </summary>
/// <see cref="JournalingElement.JournalSentMessages"/>
/// <see cref="JournalingElement.JournalReceivedMessages"/>
/// <see cref="JournalingElement.JournalPublishedMessages"/>
public class FilteredMessageJournalingService : IMessageJournalingService
{
private readonly IMessageJournalingService _messageJournalingService;
private readonly bool _journalSentMessages;
private readonly bool _journalReceivedMessages;
private readonly bool _journalPublishedMessages;

/// <summary>
/// Initializes a new <see cref="FilteredMessageJournalingService"/> instance
/// </summary>
/// <param name="messageJournalingService">The decorated message journaling service</param>
/// <param name="journalSentMessages">Whether to allow sent messages to be journaled</param>
/// <param name="journalReceivedMessages">Whether to allow received messages to be journaled</param>
/// <param name="journalPublishedMessages">Whether to allow published messages to be journaled</param>
public FilteredMessageJournalingService(IMessageJournalingService messageJournalingService, bool journalSentMessages, bool journalReceivedMessages, bool journalPublishedMessages)
{
if (messageJournalingService == null) throw new ArgumentNullException("messageJournalingService");

_messageJournalingService = messageJournalingService;
_journalSentMessages = journalSentMessages;
_journalReceivedMessages = journalReceivedMessages;
_journalPublishedMessages = journalPublishedMessages;
}

/// <inheritdoc />
public async Task MessageSent(Message message, CancellationToken cancellationToken = new CancellationToken())
{
if (_journalSentMessages)
{
await _messageJournalingService.MessageSent(message, cancellationToken);
}
}

/// <inheritdoc />
public async Task MessageReceived(Message message, CancellationToken cancellationToken = new CancellationToken())
{
if (_journalReceivedMessages)
{
await _messageJournalingService.MessageReceived(message, cancellationToken);
}
}

/// <inheritdoc />
public async Task MessagePublished(Message message, CancellationToken cancellationToken = new CancellationToken())
{
if (_journalPublishedMessages)
{
await _messageJournalingService.MessagePublished(message, cancellationToken);
}
}
}
}
1 change: 1 addition & 0 deletions Source/Platibus/Platibus.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
<Compile Include="DataContractMessageNamingService.cs" />
<Compile Include="EndpointAddressEqualityComparer.cs" />
<Compile Include="EndpointCollection.cs" />
<Compile Include="FilteredMessageJournalingService.cs" />
<Compile Include="Http\AuthenticationSchemeElement.cs" />
<Compile Include="Http\AuthenticationSchemesElementCollection.cs" />
<Compile Include="Http\BasicAuthorizationService.cs" />
Expand Down
Loading

0 comments on commit 79c7e30

Please sign in to comment.