From b2edeeb526eaeee1496f9e3e0cfda6cad75e22bf Mon Sep 17 00:00:00 2001 From: Jesse Sweetland Date: Thu, 2 Feb 2017 09:23:12 -0600 Subject: [PATCH] Implemented SQL and SQLite messaging journaling services --- Source/Platibus.SQLite/Platibus.SQLite.csproj | 1 + Source/Platibus.SQLite/SQLiteDialect.cs | 33 ++ .../SQLiteMessageJournalingService.cs | 145 ++++++++ .../Platibus.SQLite/SQLiteServicesProvider.cs | 39 +- Source/Platibus.SampleWebApp/Web.config | 19 +- .../Filesystem/FilesystemCollectionFixture.cs | 38 ++ .../FilesystemMessageJournalInspector.cs | 51 +++ ...FilesystemMessageJournalingServiceTests.cs | 54 +++ .../LocalDB/LocalDBCollectionFixture.cs | 78 ++++ .../LocalDBMessageJournalingServiceTests.cs | 59 +++ .../LocalDB/SQLMessageJournalInspector.cs | 19 + .../MessageJournalingServiceTests.cs | 105 ++++++ .../Platibus.UnitTests.csproj | 10 + .../SQLite/SQLiteCollectionFixture.cs | 38 ++ .../SQLite/SQLiteMessageJournalInspector.cs | 21 ++ .../SQLiteMessageJournalingServiceTests.cs | 55 +++ .../SQLiteMessageQueueingServiceTests.cs | 3 +- Source/Platibus/Http/HttpTransportService.cs | 11 +- Source/Platibus/Platibus.csproj | 2 + Source/Platibus/SQL/CommonSQLDialect.cs | 66 +++- Source/Platibus/SQL/IDbConnectionProvider.cs | 1 - Source/Platibus/SQL/ISQLDialect.cs | 23 ++ Source/Platibus/SQL/MSSQLDialect.cs | 36 +- Source/Platibus/SQL/SQLJournaledMessage.cs | 46 +++ .../SQL/SQLMessageJournalingService.cs | 345 ++++++++++++++++++ Source/Platibus/SQL/SQLServicesProvider.cs | 34 +- 26 files changed, 1282 insertions(+), 50 deletions(-) create mode 100644 Source/Platibus.SQLite/SQLiteMessageJournalingService.cs create mode 100644 Source/Platibus.UnitTests/Filesystem/FilesystemCollectionFixture.cs create mode 100644 Source/Platibus.UnitTests/Filesystem/FilesystemMessageJournalInspector.cs create mode 100644 Source/Platibus.UnitTests/Filesystem/FilesystemMessageJournalingServiceTests.cs create mode 100644 Source/Platibus.UnitTests/LocalDB/LocalDBCollectionFixture.cs create mode 100644 Source/Platibus.UnitTests/LocalDB/LocalDBMessageJournalingServiceTests.cs create mode 100644 Source/Platibus.UnitTests/LocalDB/SQLMessageJournalInspector.cs create mode 100644 Source/Platibus.UnitTests/MessageJournalingServiceTests.cs create mode 100644 Source/Platibus.UnitTests/SQLite/SQLiteCollectionFixture.cs create mode 100644 Source/Platibus.UnitTests/SQLite/SQLiteMessageJournalInspector.cs create mode 100644 Source/Platibus.UnitTests/SQLite/SQLiteMessageJournalingServiceTests.cs create mode 100644 Source/Platibus/SQL/SQLJournaledMessage.cs create mode 100644 Source/Platibus/SQL/SQLMessageJournalingService.cs diff --git a/Source/Platibus.SQLite/Platibus.SQLite.csproj b/Source/Platibus.SQLite/Platibus.SQLite.csproj index 5ff3190c..6afe3140 100644 --- a/Source/Platibus.SQLite/Platibus.SQLite.csproj +++ b/Source/Platibus.SQLite/Platibus.SQLite.csproj @@ -71,6 +71,7 @@ + diff --git a/Source/Platibus.SQLite/SQLiteDialect.cs b/Source/Platibus.SQLite/SQLiteDialect.cs index b83ecb04..b150c152 100644 --- a/Source/Platibus.SQLite/SQLiteDialect.cs +++ b/Source/Platibus.SQLite/SQLiteDialect.cs @@ -62,6 +62,39 @@ CREATE INDEX IF NOT EXISTS [PB_QueuedMessages_IX_QueueName] ON [PB_QueuedMessages]([QueueName]);"; } } + /// + /// The SQLite commands used to create the objects (tables, indexes, + /// stored procedures, views, etc.) needed to store queued messages in the + /// SQL database + /// + public override string CreateMessageJournalingServiceObjectsCommand + { + get { return @" +CREATE TABLE IF NOT EXISTS [PB_MessageJournal] +( + [Id] INTEGER PRIMARY KEY AUTOINCREMENT, + [MessageId] TEXT NOT NULL, + [Category] TEXT NOT NULL, + [MessageName] TEXT NULL, + [Origination] TEXT NULL, + [Destination] TEXT NULL, + [ReplyTo] TEXT NULL, + [Expires] TEXT NULL, + [ContentType] TEXT NULL, + [Headers] TEXT, + [MessageContent] TEXT, + [Attempts] INT NOT NULL DEFAULT 0, + [Acknowledged] TEXT NULL, + [Abandoned] TEXT NULL +); + +CREATE INDEX IF NOT EXISTS [PB_MessageJournal_IX_MessageId] + ON [PB_MessageJournal]([MessageId]); + +CREATE INDEX IF NOT EXISTS [PB_MessageJournal_IX_Category] + ON [PB_MessageJournal]([Category]);"; } + } + /// /// The SQLite commands used to create the objects (tables, indexes, /// stored procedures, views, etc.) needed to store subscription tracking data diff --git a/Source/Platibus.SQLite/SQLiteMessageJournalingService.cs b/Source/Platibus.SQLite/SQLiteMessageJournalingService.cs new file mode 100644 index 00000000..1bf4e4a5 --- /dev/null +++ b/Source/Platibus.SQLite/SQLiteMessageJournalingService.cs @@ -0,0 +1,145 @@ +using System; +using System.Collections.Generic; +using System.Configuration; +using System.Data; +using System.Diagnostics.CodeAnalysis; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using Platibus.SQL; + +namespace Platibus.SQLite +{ + /// + /// An implementation that stores queued + /// messages in a SQLite database + /// + public class SQLiteMessageJournalingService : SQLMessageJournalingService, IDisposable + { + private readonly CancellationTokenSource _cancellationTokenSource; + private readonly ActionBlock _operationQueue; + + private bool _disposed; + + /// + /// Initializes a new + /// + /// The directory in which the SQLite database files will + /// be created + /// + /// If a base directory is not specified then the base directory will default to a + /// directory named platibus\queues beneath the current app domain base + /// directory. If the base directory does not exist it will be created. + /// + public SQLiteMessageJournalingService(DirectoryInfo baseDirectory) + : base(InitDb(baseDirectory), new SQLiteDialect()) + { + _cancellationTokenSource = new CancellationTokenSource(); + _operationQueue = new ActionBlock( + op => op.Execute(), + new ExecutionDataflowBlockOptions + { + CancellationToken = _cancellationTokenSource.Token, + MaxDegreeOfParallelism = 1 + }); + } + + [SuppressMessage("Microsoft.Security", "CA2100:Review SQL queries for security vulnerabilities")] + private static IDbConnectionProvider InitDb(DirectoryInfo directory) + { + if (directory == null) + { + var appdomainDirectory = AppDomain.CurrentDomain.BaseDirectory; + directory = new DirectoryInfo(Path.Combine(appdomainDirectory, "platibus", "journal")); + } + + var dbPath = Path.Combine(directory.FullName, "journal.db"); + var connectionStringSettings = new ConnectionStringSettings + { + Name = dbPath, + ConnectionString = "Data Source=" + dbPath + "; Version=3", + ProviderName = "System.Data.SQLite" + }; + + var connectionProvider = new SingletonConnectionProvider(connectionStringSettings); + var connection = connectionProvider.GetConnection(); + try + { + using (var command = connection.CreateCommand()) + { + command.CommandType = CommandType.Text; + command.CommandText = new SQLiteDialect().CreateMessageJournalingServiceObjectsCommand; + command.ExecuteNonQuery(); + } + } + finally + { + connectionProvider.ReleaseConnection(connection); + } + return connectionProvider; + } + + /// + protected override Task InsertJournaledMessage(Message message, string category) + { + CheckDisposed(); + var op = new SQLiteOperation(() => base.InsertJournaledMessage(message, category)); + _operationQueue.Post(op); + return op.Task; + } + + /// + protected override Task> SelectJournaledMessages() + { + CheckDisposed(); + var op = new SQLiteOperation>(() => base.SelectJournaledMessages()); + _operationQueue.Post(op); + return op.Task; + } + + /// + /// Throws if this object has been disposed + /// + /// Thrown if this object has been disposed + protected virtual void CheckDisposed() + { + if (_disposed) throw new ObjectDisposedException(GetType().FullName); + } + + /// + /// Finalizer to ensure that all resources are released + /// + ~SQLiteMessageJournalingService() + { + Dispose(false); + } + + /// + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// + /// 2 + public void Dispose() + { + if (_disposed) return; + Dispose(true); + _disposed = true; + GC.SuppressFinalize(this); + } + + /// + /// Called by the method or by the finalizer to free held resources + /// + /// Indicates whether this method is called from the + /// method (true) or from the finalizer (false) + [SuppressMessage("Microsoft.Usage", "CA2213:DisposableFieldsShouldBeDisposed", MessageId = "_cancellationTokenSource")] + protected virtual void Dispose(bool disposing) + { + _cancellationTokenSource.Cancel(); + if (disposing) + { + _cancellationTokenSource.TryDispose(); + } + } + } +} diff --git a/Source/Platibus.SQLite/SQLiteServicesProvider.cs b/Source/Platibus.SQLite/SQLiteServicesProvider.cs index e89bb42b..a504ef1b 100644 --- a/Source/Platibus.SQLite/SQLiteServicesProvider.cs +++ b/Source/Platibus.SQLite/SQLiteServicesProvider.cs @@ -54,16 +54,9 @@ namespace Platibus.SQLite /// A provider for SQLite-based message queueing and subscription tracking services /// [Provider("SQLite")] - public class SQLiteServicesProvider : IMessageQueueingServiceProvider, ISubscriptionTrackingServiceProvider + public class SQLiteServicesProvider : IMessageQueueingServiceProvider, IMessageJournalingServiceProvider, ISubscriptionTrackingServiceProvider { - /// - /// Creates an initializes a - /// based on the provideds - /// - /// The journaling configuration - /// element - /// Returns a task whose result is an initialized - /// + /// public Task CreateMessageQueueingService(QueueingElement configuration) { var path = configuration.GetString("path"); @@ -73,14 +66,17 @@ public Task CreateMessageQueueingService(QueueingElemen return Task.FromResult(sqliteMessageQueueingService); } - /// - /// Creates an initializes a - /// based on the provideds . - /// - /// The journaling configuration - /// element. - /// Returns a task whose result is an initialized - /// . + /// + public Task CreateMessageJournalingService(JournalingElement configuration) + { + var path = configuration.GetString("path"); + var sqliteBaseDir = new DirectoryInfo(GetRootedPath(path)); + var sqliteMessageQueueingService = new SQLiteMessageJournalingService(sqliteBaseDir); + sqliteMessageQueueingService.Init(); + return Task.FromResult(sqliteMessageQueueingService); + } + + /// public async Task CreateSubscriptionTrackingService( SubscriptionTrackingElement configuration) { @@ -99,12 +95,9 @@ private static string GetRootedPath(string path) return defaultRootedPath; } - if (Path.IsPathRooted(path)) - { - return path; - } - - return Path.Combine(defaultRootedPath, path); + return Path.IsPathRooted(path) + ? path + : Path.Combine(defaultRootedPath, path); } } } \ No newline at end of file diff --git a/Source/Platibus.SampleWebApp/Web.config b/Source/Platibus.SampleWebApp/Web.config index ff88e607..66203db5 100644 --- a/Source/Platibus.SampleWebApp/Web.config +++ b/Source/Platibus.SampleWebApp/Web.config @@ -53,11 +53,17 @@ + + + @@ -78,11 +84,16 @@ + + + diff --git a/Source/Platibus.UnitTests/Filesystem/FilesystemCollectionFixture.cs b/Source/Platibus.UnitTests/Filesystem/FilesystemCollectionFixture.cs new file mode 100644 index 00000000..37df9ec2 --- /dev/null +++ b/Source/Platibus.UnitTests/Filesystem/FilesystemCollectionFixture.cs @@ -0,0 +1,38 @@ +using System; +using System.IO; +using NUnit.Framework; + +namespace Platibus.UnitTests.Filesystem +{ + [SetUpFixture] + public class FilesystemCollectionFixture + { + public static FilesystemCollectionFixture Instance; + + [SetUp] + public void SetUp() + { + Instance = new FilesystemCollectionFixture(); + } + + private readonly DirectoryInfo _baseDirectory; + + public DirectoryInfo BaseDirectory { get { return _baseDirectory; } } + + public FilesystemCollectionFixture() + { + _baseDirectory = GetTempDirectory(); + } + + protected DirectoryInfo GetTempDirectory() + { + var tempPath = Path.Combine(Path.GetTempPath(), "Platibus.UnitTests", DateTime.Now.ToString("yyyyMMddHHmmss")); + var tempDir = new DirectoryInfo(tempPath); + if (!tempDir.Exists) + { + tempDir.Create(); + } + return tempDir; + } + } +} diff --git a/Source/Platibus.UnitTests/Filesystem/FilesystemMessageJournalInspector.cs b/Source/Platibus.UnitTests/Filesystem/FilesystemMessageJournalInspector.cs new file mode 100644 index 00000000..0bac04a8 --- /dev/null +++ b/Source/Platibus.UnitTests/Filesystem/FilesystemMessageJournalInspector.cs @@ -0,0 +1,51 @@ +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Platibus.Filesystem; + +namespace Platibus.UnitTests.Filesystem +{ + internal class FilesystemMessageJournalInspector + { + private readonly DirectoryInfo _baseDirectory; + + public FilesystemMessageJournalInspector(DirectoryInfo baseDirectory) + { + _baseDirectory = baseDirectory; + } + + public async Task> EnumerateSentMessages() + { + var sentMessagePath = Path.Combine(_baseDirectory.FullName, "sent"); + var sentMessageDirectory = new DirectoryInfo(sentMessagePath); + var journaledMessageFileTasks = sentMessageDirectory + .GetFiles("*.pmsg", SearchOption.AllDirectories) + .Select(file => new MessageFile(file)) + .Select(messageFile => messageFile.ReadMessage()); + return await Task.WhenAll(journaledMessageFileTasks); + } + + public async Task> EnumerateReceivedMessages() + { + var receivedMessagePath = Path.Combine(_baseDirectory.FullName, "received"); + var receivedMessageDirectory = new DirectoryInfo(receivedMessagePath); + var journaledMessageFileTasks = receivedMessageDirectory + .GetFiles("*.pmsg", SearchOption.AllDirectories) + .Select(file => new MessageFile(file)) + .Select(messageFile => messageFile.ReadMessage()); + return await Task.WhenAll(journaledMessageFileTasks); + } + + public async Task> EnumeratePublishedMessages() + { + var publishedMessagePath = Path.Combine(_baseDirectory.FullName, "published"); + var publishedMessageDirectory = new DirectoryInfo(publishedMessagePath); + var journaledMessageFileTasks = publishedMessageDirectory + .GetFiles("*.pmsg", SearchOption.AllDirectories) + .Select(file => new MessageFile(file)) + .Select(messageFile => messageFile.ReadMessage()); + return await Task.WhenAll(journaledMessageFileTasks); + } + } +} \ No newline at end of file diff --git a/Source/Platibus.UnitTests/Filesystem/FilesystemMessageJournalingServiceTests.cs b/Source/Platibus.UnitTests/Filesystem/FilesystemMessageJournalingServiceTests.cs new file mode 100644 index 00000000..0f3cd454 --- /dev/null +++ b/Source/Platibus.UnitTests/Filesystem/FilesystemMessageJournalingServiceTests.cs @@ -0,0 +1,54 @@ +using System.Linq; +using System.Threading.Tasks; +using NUnit.Framework; +using Platibus.Filesystem; + +namespace Platibus.UnitTests.Filesystem +{ + public class FilesystemMessageJournalingServiceTests : MessageJournalingServiceTests + { + private readonly FilesystemMessageJournalInspector _inspector; + + public FilesystemMessageJournalingServiceTests() + : this(FilesystemCollectionFixture.Instance) + { + } + + public FilesystemMessageJournalingServiceTests(FilesystemCollectionFixture fixture) + : base(InitMessageJournalingService(fixture)) + { + _inspector = new FilesystemMessageJournalInspector(fixture.BaseDirectory); + } + + private static FilesystemMessageJournalingService InitMessageJournalingService(FilesystemCollectionFixture fixture) + { + var messageJournalingService = new FilesystemMessageJournalingService(fixture.BaseDirectory); + messageJournalingService.Init(); + return messageJournalingService; + } + + protected override async Task AssertSentMessageIsWrittenToJournal() + { + var journaledMessages = await _inspector.EnumerateSentMessages(); + var messageIsJournaled = journaledMessages + .Any(m => m.Headers.MessageId == Message.Headers.MessageId); + Assert.That(messageIsJournaled, Is.True); + } + + protected override async Task AssertReceivedMessageIsWrittenToJournal() + { + var journaledMessages = await _inspector.EnumerateReceivedMessages(); + var messageIsJournaled = journaledMessages + .Any(m => m.Headers.MessageId == Message.Headers.MessageId); + Assert.That(messageIsJournaled, Is.True); + } + + protected override async Task AssertPublishedMessageIsWrittenToJournal() + { + var journaledMessages = await _inspector.EnumeratePublishedMessages(); + var messageIsJournaled = journaledMessages + .Any(m => m.Headers.MessageId == Message.Headers.MessageId); + Assert.That(messageIsJournaled, Is.True); + } + } +} diff --git a/Source/Platibus.UnitTests/LocalDB/LocalDBCollectionFixture.cs b/Source/Platibus.UnitTests/LocalDB/LocalDBCollectionFixture.cs new file mode 100644 index 00000000..555f6ce8 --- /dev/null +++ b/Source/Platibus.UnitTests/LocalDB/LocalDBCollectionFixture.cs @@ -0,0 +1,78 @@ +using System.Configuration; +using NUnit.Framework; +using Platibus.SQL; + +namespace Platibus.UnitTests.LocalDB +{ + [SetUpFixture] + public class LocalDBCollectionFixture + { + public static LocalDBCollectionFixture Instance; + + [SetUp] + public void SetUp() + { + Instance = new LocalDBCollectionFixture(); + Instance.DeleteJournaledMessages(); + Instance.DeleteQueuedMessages(); + } + + private readonly IDbConnectionProvider _connectionProvider; + private readonly ISQLDialect _dialect; + + public IDbConnectionProvider ConnectionProvider { get { return _connectionProvider; } } + public ISQLDialect Dialect { get { return _dialect; } } + + public LocalDBCollectionFixture() + { + var connectionStringSettings = ConfigurationManager.ConnectionStrings["PlatibusUnitTests.LocalDB"]; + _connectionProvider = new DefaultConnectionProvider(connectionStringSettings); + _dialect = new MSSQLDialect(); + } + + public void DeleteQueuedMessages() + { + using (var connection = _connectionProvider.GetConnection()) + using (var command = connection.CreateCommand()) + { + command.CommandText = @" + IF (OBJECT_ID('[PB_QueuedMessages]')) IS NOT NULL + BEGIN + DELETE FROM [PB_QueuedMessages] + END"; + + command.ExecuteNonQuery(); + } + } + + public void DeleteJournaledMessages() + { + using (var connection = _connectionProvider.GetConnection()) + using (var command = connection.CreateCommand()) + { + command.CommandText = @" + IF (OBJECT_ID('[PB_MessageJournal]')) IS NOT NULL + BEGIN + DELETE FROM [PB_MessageJournal] + END"; + + command.ExecuteNonQuery(); + } + } + + public void DeleteSubscriptions() + { + using (var connection = _connectionProvider.GetConnection()) + using (var command = connection.CreateCommand()) + { + command.CommandText = @" + IF (OBJECT_ID('[PB_Subscriptions]')) IS NOT NULL + BEGIN + DELETE FROM [PB_Subscriptions] + END"; + + command.ExecuteNonQuery(); + } + } + } +} diff --git a/Source/Platibus.UnitTests/LocalDB/LocalDBMessageJournalingServiceTests.cs b/Source/Platibus.UnitTests/LocalDB/LocalDBMessageJournalingServiceTests.cs new file mode 100644 index 00000000..eb277a04 --- /dev/null +++ b/Source/Platibus.UnitTests/LocalDB/LocalDBMessageJournalingServiceTests.cs @@ -0,0 +1,59 @@ +using System.Linq; +using System.Threading.Tasks; +using NUnit.Framework; +using Platibus.SQL; + +namespace Platibus.UnitTests.LocalDB +{ + public class LocalDBMessageJournalingServiceTests : MessageJournalingServiceTests + { + private readonly SQLMessageJournalInspector _inspector; + + public LocalDBMessageJournalingServiceTests() + : this(LocalDBCollectionFixture.Instance) + { + } + + public LocalDBMessageJournalingServiceTests(LocalDBCollectionFixture fixture) + : this(InitMessageJournalingService(fixture)) + { + } + + private LocalDBMessageJournalingServiceTests(SQLMessageJournalingService messageJournalingService) + : base(messageJournalingService) + { + _inspector = new SQLMessageJournalInspector(messageJournalingService); + } + + private static SQLMessageJournalingService InitMessageJournalingService(LocalDBCollectionFixture fixture) + { + var messageJournalingService = new SQLMessageJournalingService(fixture.ConnectionProvider, fixture.Dialect); + messageJournalingService.Init(); + return messageJournalingService; + } + + protected override async Task AssertSentMessageIsWrittenToJournal() + { + var journaledMessages = await _inspector.EnumerateMessages(); + var messageIsJournaled = journaledMessages + .Any(m => m.Message.Headers.MessageId == Message.Headers.MessageId && m.Category == "Sent"); + Assert.That(messageIsJournaled, Is.True); + } + + protected override async Task AssertReceivedMessageIsWrittenToJournal() + { + var journaledMessages = await _inspector.EnumerateMessages(); + var messageIsJournaled = journaledMessages + .Any(m => m.Message.Headers.MessageId == Message.Headers.MessageId && m.Category == "Received"); + Assert.That(messageIsJournaled, Is.True); + } + + protected override async Task AssertPublishedMessageIsWrittenToJournal() + { + var journaledMessages = await _inspector.EnumerateMessages(); + var messageIsJournaled = journaledMessages + .Any(m => m.Message.Headers.MessageId == Message.Headers.MessageId && m.Category == "Published"); + Assert.That(messageIsJournaled, Is.True); + } + } +} diff --git a/Source/Platibus.UnitTests/LocalDB/SQLMessageJournalInspector.cs b/Source/Platibus.UnitTests/LocalDB/SQLMessageJournalInspector.cs new file mode 100644 index 00000000..6d68f0da --- /dev/null +++ b/Source/Platibus.UnitTests/LocalDB/SQLMessageJournalInspector.cs @@ -0,0 +1,19 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using Platibus.SQL; + +namespace Platibus.UnitTests.LocalDB +{ + internal class SQLMessageJournalInspector : SQLMessageJournalingService + { + public SQLMessageJournalInspector(SQLMessageJournalingService messageJournalingService) + : base(messageJournalingService.ConnectionProvider, messageJournalingService.Dialect) + { + } + + public Task> EnumerateMessages() + { + return SelectJournaledMessages(); + } + } +} \ No newline at end of file diff --git a/Source/Platibus.UnitTests/MessageJournalingServiceTests.cs b/Source/Platibus.UnitTests/MessageJournalingServiceTests.cs new file mode 100644 index 00000000..c522c330 --- /dev/null +++ b/Source/Platibus.UnitTests/MessageJournalingServiceTests.cs @@ -0,0 +1,105 @@ +using System; +using System.Threading.Tasks; +using NUnit.Framework; + +namespace Platibus.UnitTests +{ + public abstract class MessageJournalingServiceTests + { + protected IMessageJournalingService MessageJournalingService; + protected Message Message; + + protected MessageJournalingServiceTests(IMessageJournalingService messageJournalingService) + { + MessageJournalingService = messageJournalingService; + } + + [Test] + public async Task SentMessagesShouldBeWrittenToJournal() + { + GivenSentMessage(); + await WhenJournalingSentMessage(); + await AssertSentMessageIsWrittenToJournal(); + } + + [Test] + public async Task ReceivedMessagesShouldBeWrittenToJournal() + { + GivenReceivedMessage(); + await WhenJournalingReceivedMessage(); + await AssertReceivedMessageIsWrittenToJournal(); + } + + [Test] + public async Task PublishedMessagesShouldBeWrittenToJournal() + { + GivenPublishedMessage(); + await WhenJournalingPublishedMessage(); + await AssertPublishedMessageIsWrittenToJournal(); + } + + protected Message GivenSentMessage() + { + var messageHeaders = new MessageHeaders + { + MessageId = MessageId.Generate(), + Sent = DateTime.UtcNow, + Origination = new Uri("urn:localhost/platibus0"), + Destination = new Uri("urn:localhost/platibus1"), + MessageName = "TestMessage", + ContentType = "text/plain" + }; + return Message = new Message(messageHeaders, "Hello, world!"); + } + + protected Task WhenJournalingSentMessage() + { + return MessageJournalingService.MessageSent(Message); + } + + protected abstract Task AssertSentMessageIsWrittenToJournal(); + + protected Message GivenReceivedMessage() + { + var messageHeaders = new MessageHeaders + { + MessageId = MessageId.Generate(), + Sent = DateTime.UtcNow.AddSeconds(-1), + Received = DateTime.UtcNow, + Origination = new Uri("urn:localhost/platibus0"), + Destination = new Uri("urn:localhost/platibus1"), + MessageName = "TestMessage", + ContentType = "text/plain" + }; + return Message = new Message(messageHeaders, "Hello, world!"); + } + + protected Task WhenJournalingReceivedMessage() + { + return MessageJournalingService.MessageReceived(Message); + } + + protected abstract Task AssertReceivedMessageIsWrittenToJournal(); + + protected Message GivenPublishedMessage() + { + var messageHeaders = new MessageHeaders + { + MessageId = MessageId.Generate(), + Published = DateTime.UtcNow, + Origination = new Uri("urn:localhost/platibus0"), + MessageName = "TestMessage", + ContentType = "text/plain", + Topic = "TestTopic" + }; + return Message = new Message(messageHeaders, "Hello, world!"); + } + + protected Task WhenJournalingPublishedMessage() + { + return MessageJournalingService.MessagePublished(Message); + } + + protected abstract Task AssertPublishedMessageIsWrittenToJournal(); + } +} diff --git a/Source/Platibus.UnitTests/Platibus.UnitTests.csproj b/Source/Platibus.UnitTests/Platibus.UnitTests.csproj index 742956e9..7d7d48b4 100644 --- a/Source/Platibus.UnitTests/Platibus.UnitTests.csproj +++ b/Source/Platibus.UnitTests/Platibus.UnitTests.csproj @@ -120,9 +120,19 @@ + + + + + + + + + + diff --git a/Source/Platibus.UnitTests/SQLite/SQLiteCollectionFixture.cs b/Source/Platibus.UnitTests/SQLite/SQLiteCollectionFixture.cs new file mode 100644 index 00000000..0d798993 --- /dev/null +++ b/Source/Platibus.UnitTests/SQLite/SQLiteCollectionFixture.cs @@ -0,0 +1,38 @@ +using System; +using System.IO; +using NUnit.Framework; + +namespace Platibus.UnitTests.SQLite +{ + [SetUpFixture] + public class SQLiteCollectionFixture + { + public static SQLiteCollectionFixture Instance; + + [SetUp] + public void SetUp() + { + Instance = new SQLiteCollectionFixture(); + } + + private readonly DirectoryInfo _baseDirectory; + + public DirectoryInfo BaseDirectory { get { return _baseDirectory; } } + + public SQLiteCollectionFixture() + { + _baseDirectory = GetTempDirectory(); + } + + protected DirectoryInfo GetTempDirectory() + { + var tempPath = Path.Combine(Path.GetTempPath(), "Platibus.UnitTests", DateTime.Now.ToString("yyyyMMddHHmmss")); + var tempDir = new DirectoryInfo(tempPath); + if (!tempDir.Exists) + { + tempDir.Create(); + } + return tempDir; + } + } +} diff --git a/Source/Platibus.UnitTests/SQLite/SQLiteMessageJournalInspector.cs b/Source/Platibus.UnitTests/SQLite/SQLiteMessageJournalInspector.cs new file mode 100644 index 00000000..257cbb5e --- /dev/null +++ b/Source/Platibus.UnitTests/SQLite/SQLiteMessageJournalInspector.cs @@ -0,0 +1,21 @@ +using System.Collections.Generic; +using System.IO; +using System.Threading.Tasks; +using Platibus.SQL; +using Platibus.SQLite; + +namespace Platibus.UnitTests.SQLite +{ + internal class SQLiteMessageJournalInspector : SQLiteMessageJournalingService + { + public SQLiteMessageJournalInspector(DirectoryInfo baseDirectory) + : base(baseDirectory) + { + } + + public Task> EnumerateMessages() + { + return SelectJournaledMessages(); + } + } +} \ No newline at end of file diff --git a/Source/Platibus.UnitTests/SQLite/SQLiteMessageJournalingServiceTests.cs b/Source/Platibus.UnitTests/SQLite/SQLiteMessageJournalingServiceTests.cs new file mode 100644 index 00000000..541f1d8a --- /dev/null +++ b/Source/Platibus.UnitTests/SQLite/SQLiteMessageJournalingServiceTests.cs @@ -0,0 +1,55 @@ +using System.Linq; +using System.Threading.Tasks; +using NUnit.Framework; +using Platibus.SQL; +using Platibus.SQLite; + +namespace Platibus.UnitTests.SQLite +{ + public class SQLiteMessageJournalingServiceTests : MessageJournalingServiceTests + { + private readonly SQLiteMessageJournalInspector _inspector; + + public SQLiteMessageJournalingServiceTests() + : this(SQLiteCollectionFixture.Instance) + { + } + + public SQLiteMessageJournalingServiceTests(SQLiteCollectionFixture fixture) + : base(InitMessageJournalingService(fixture)) + { + _inspector = new SQLiteMessageJournalInspector(fixture.BaseDirectory); + } + + private static SQLMessageJournalingService InitMessageJournalingService(SQLiteCollectionFixture fixture) + { + var messageJournalingService = new SQLiteMessageJournalingService(fixture.BaseDirectory); + messageJournalingService.Init(); + return messageJournalingService; + } + + protected override async Task AssertSentMessageIsWrittenToJournal() + { + var journaledMessages = await _inspector.EnumerateMessages(); + var messageIsJournaled = journaledMessages + .Any(m => m.Message.Headers.MessageId == Message.Headers.MessageId && m.Category == "Sent"); + Assert.That(messageIsJournaled, Is.True); + } + + protected override async Task AssertReceivedMessageIsWrittenToJournal() + { + var journaledMessages = await _inspector.EnumerateMessages(); + var messageIsJournaled = journaledMessages + .Any(m => m.Message.Headers.MessageId == Message.Headers.MessageId && m.Category == "Received"); + Assert.That(messageIsJournaled, Is.True); + } + + protected override async Task AssertPublishedMessageIsWrittenToJournal() + { + var journaledMessages = await _inspector.EnumerateMessages(); + var messageIsJournaled = journaledMessages + .Any(m => m.Message.Headers.MessageId == Message.Headers.MessageId && m.Category == "Published"); + Assert.That(messageIsJournaled, Is.True); + } + } +} diff --git a/Source/Platibus.UnitTests/SQLiteMessageQueueingServiceTests.cs b/Source/Platibus.UnitTests/SQLiteMessageQueueingServiceTests.cs index 48c7b3c4..d7c2c956 100644 --- a/Source/Platibus.UnitTests/SQLiteMessageQueueingServiceTests.cs +++ b/Source/Platibus.UnitTests/SQLiteMessageQueueingServiceTests.cs @@ -401,8 +401,9 @@ public async Task Given_10_Existing_Messages_10_New_Messages_Then_Listener_Shoul var allmessages = existingMessages.Union(newMessages); foreach (var message in allmessages) { + var myMessage = message; mockListener.Verify(x => - x.MessageReceived(It.Is(m => messageEqualityComparer.Equals(m, message)), + x.MessageReceived(It.Is(m => messageEqualityComparer.Equals(m, myMessage)), It.IsAny(), It.IsAny()), Times.Once()); } } diff --git a/Source/Platibus/Http/HttpTransportService.cs b/Source/Platibus/Http/HttpTransportService.cs index 7ec489ef..f65e325f 100644 --- a/Source/Platibus/Http/HttpTransportService.cs +++ b/Source/Platibus/Http/HttpTransportService.cs @@ -175,7 +175,6 @@ public async Task PublishMessage(Message message, TopicName topicName, Cancellat foreach (var subscriber in subscribers) { - IEndpointCredentials subscriberCredentials = null; IEndpoint subscriberEndpoint; if (_endpoints.TryGetEndpointByAddress(subscriber, out subscriberEndpoint)) @@ -209,6 +208,11 @@ public async Task PublishMessage(Message message, TopicName topicName, Cancellat HttpClient httpClient = null; try { + if (_messageJournalingService != null) + { + await _messageJournalingService.MessageSent(message, cancellationToken); + } + var endpointBaseUri = message.Headers.Destination.WithTrailingSlash(); if (_bypassTransportLocalDestination && endpointBaseUri == _baseUri) { @@ -234,11 +238,6 @@ public async Task PublishMessage(Message message, TopicName topicName, Cancellat postUri); HandleHttpErrorResponse(httpResponseMessage); - - if (_messageJournalingService != null) - { - await _messageJournalingService.MessageSent(message, cancellationToken); - } } catch (TransportException) { diff --git a/Source/Platibus/Platibus.csproj b/Source/Platibus/Platibus.csproj index d0f27fa4..a9b0f8c6 100644 --- a/Source/Platibus/Platibus.csproj +++ b/Source/Platibus/Platibus.csproj @@ -112,6 +112,8 @@ + + diff --git a/Source/Platibus/SQL/CommonSQLDialect.cs b/Source/Platibus/SQL/CommonSQLDialect.cs index 814f7f05..dfcbbfb0 100644 --- a/Source/Platibus/SQL/CommonSQLDialect.cs +++ b/Source/Platibus/SQL/CommonSQLDialect.cs @@ -33,9 +33,6 @@ namespace Platibus.SQL /// public abstract class CommonSQLDialect : ISQLDialect { - private string _startDateParameterName; - private string _endDateParameterName; - /// /// The dialect-specific command used to create the objects (tables, indexes, /// stored procedures, views, etc.) needed to store queued messages in the @@ -43,6 +40,13 @@ public abstract class CommonSQLDialect : ISQLDialect /// public abstract string CreateMessageQueueingServiceObjectsCommand { get; } + /// + /// The dialect-specific command used to create the objects (tables, indexes, + /// stored procedures, views, etc.) needed to store journaled messages in the + /// SQL database + /// + public abstract string CreateMessageJournalingServiceObjectsCommand { get; } + /// /// The dialect-specific command used to create the objects (tables, indexes, /// stored procedures, views, etc.) needed to store subscription tracking data @@ -87,6 +91,36 @@ FROM [PB_QueuedMessages] AND [QueueName]=@QueueName)"; } } + /// + /// The dialect-specific command used to insert a queued message + /// + public virtual string InsertJournaledMessageCommand + { + get { return @" +INSERT INTO [PB_MessageJournal] ( + [MessageId], + [Category], + [MessageName], + [Origination], + [Destination], + [ReplyTo], + [Expires], + [ContentType], + [Headers], + [MessageContent]) +VALUES ( + @MessageId, + @Category, + @MessageName, + @Origination, + @Destination, + @ReplyTo, + @Expires, + @ContentType, + @Headers, + @MessageContent)"; } + } + /// /// The dialect-specific command used to select the list of queued messages /// in a particular queue @@ -115,6 +149,27 @@ AND [Acknowledged] IS NULL AND [Abandoned] IS NULL"; } } + /// + /// The dialect-specific command used to select the list of journaled messages + /// in a particular queue + /// + public virtual string SelectJournaledMessagesCommand + { + get { return @" +SELECT + [MessageId], + [Category], + [MessageName], + [Origination], + [Destination], + [ReplyTo], + [Expires], + [ContentType], + [Headers], + [MessageContent] +FROM [PB_MessageJournal]"; } + } + /// /// The dialect-specific command used to select the list of queued messages /// in a particular queue @@ -369,5 +424,10 @@ public string CurrentDateParameterName /// The name of the parameter used to specify the end date in queries based on date ranges /// public string EndDateParameterName { get { return "@EndDate"; } } + + /// + /// The name of the parameter used to specify a category + /// + public string CategoryParameterName { get { return "@Category"; } } } } \ No newline at end of file diff --git a/Source/Platibus/SQL/IDbConnectionProvider.cs b/Source/Platibus/SQL/IDbConnectionProvider.cs index 3f3152b0..16ded004 100644 --- a/Source/Platibus/SQL/IDbConnectionProvider.cs +++ b/Source/Platibus/SQL/IDbConnectionProvider.cs @@ -20,7 +20,6 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -using System; using System.Data.Common; namespace Platibus.SQL diff --git a/Source/Platibus/SQL/ISQLDialect.cs b/Source/Platibus/SQL/ISQLDialect.cs index c59a8a65..0d5a47ac 100644 --- a/Source/Platibus/SQL/ISQLDialect.cs +++ b/Source/Platibus/SQL/ISQLDialect.cs @@ -206,6 +206,29 @@ public interface ISQLDialect /// The name of the parameter used to specify the end date in queries based on date ranges /// string EndDateParameterName { get; } + + /// + /// The dialect-specific command used to create the objects (tables, indexes, + /// stored procedures, views, etc.) needed to store journaled messages in the + /// SQL database + /// + string CreateMessageJournalingServiceObjectsCommand { get; } + + /// + /// The dialect-specific command used to insert a queued message + /// + string InsertJournaledMessageCommand { get; } + + /// + /// The name of the parameter used to specify a category + /// + string CategoryParameterName { get; } + + /// + /// The dialect-specific command used to select the list of journaled messages + /// in a particular queue + /// + string SelectJournaledMessagesCommand { get; } } diff --git a/Source/Platibus/SQL/MSSQLDialect.cs b/Source/Platibus/SQL/MSSQLDialect.cs index 896d20ef..330b4ced 100644 --- a/Source/Platibus/SQL/MSSQLDialect.cs +++ b/Source/Platibus/SQL/MSSQLDialect.cs @@ -58,11 +58,45 @@ CONSTRAINT [PB_QueuedMessages_PK] PRIMARY KEY NONCLUSTERED ([MessageId], [QueueName]) ) - CREATE CLUSTERED INDEX [PB_QueuedMessages_IX_QueueName] + CREATE INDEX [PB_QueuedMessages_IX_QueueName] ON [PB_QueuedMessages]([QueueName]) END"; } } + /// + public override string CreateMessageJournalingServiceObjectsCommand + { + get { return @" +IF OBJECT_ID('[PB_MessageJournal]') IS NULL +BEGIN + CREATE TABLE [PB_MessageJournal] + ( + [Id] INT IDENTITY(1,1), + [MessageId] UNIQUEIDENTIFIER NOT NULL, + [Category] VARCHAR(20) NOT NULL, + [MessageName] VARCHAR(500) NULL, + [Origination] VARCHAR(500) NULL, + [Destination] VARCHAR(500) NULL, + [ReplyTo] VARCHAR(500) NULL, + [Expires] DATETIME NULL, + [ContentType] VARCHAR(100) NULL, + [Headers] VARCHAR(MAX), + [MessageContent] TEXT, + [Acknowledged] DATETIME NULL, + [Abandoned] DATETIME NULL, + [Attempts] INT NOT NULL DEFAULT 0, + + CONSTRAINT [PB_MessageJournal_PK] PRIMARY KEY CLUSTERED ([Id]) + ) + + CREATE INDEX [PB_MessageJournal_IX_MessageId] + ON [PB_MessageJournal]([MessageId]) + + CREATE INDEX [PB_MessageJournal_IX_Category] + ON [PB_MessageJournal]([Category]) +END"; } + } + /// /// The MS SQL commands used to create the objects (tables, indexes, /// stored procedures, views, etc.) needed to store subscription tracking data diff --git a/Source/Platibus/SQL/SQLJournaledMessage.cs b/Source/Platibus/SQL/SQLJournaledMessage.cs new file mode 100644 index 00000000..abefc6f6 --- /dev/null +++ b/Source/Platibus/SQL/SQLJournaledMessage.cs @@ -0,0 +1,46 @@ +using System; + +namespace Platibus.SQL +{ + /// + /// An immutable representation of a journaled message used by the + /// + /// + public class SQLJournaledMessage + { + private readonly Message _message; + private readonly string _category; + + /// + /// The queued message + /// + public Message Message + { + get { return _message; } + } + + /// + /// The journal category, e.g. "Sent", "Received", or "Published" + /// + public string Category + { + get { return _category; } + } + + /// + /// Initializes a new with the specified + /// and + /// + /// The message + /// The journal category (e.g. "Sent", "Received", or "Published") + /// Thrown if + /// is null + public SQLJournaledMessage(Message message, string category) + { + if (message == null) throw new ArgumentNullException("message"); + if (string.IsNullOrWhiteSpace(category)) throw new ArgumentNullException("category"); + _message = message; + _category = category; + } + } +} \ No newline at end of file diff --git a/Source/Platibus/SQL/SQLMessageJournalingService.cs b/Source/Platibus/SQL/SQLMessageJournalingService.cs new file mode 100644 index 00000000..feb54915 --- /dev/null +++ b/Source/Platibus/SQL/SQLMessageJournalingService.cs @@ -0,0 +1,345 @@ +using System; +using System.Collections.Generic; +using System.Configuration; +using System.Data; +using System.Diagnostics.CodeAnalysis; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Transactions; + +namespace Platibus.SQL +{ + /// + /// A implementation that uses a SQL database + /// to store journaled messages + /// + public class SQLMessageJournalingService : IMessageJournalingService + { + private readonly IDbConnectionProvider _connectionProvider; + private readonly ISQLDialect _dialect; + + /// + /// The connection provider used to obtain connections to the SQL database + /// + public IDbConnectionProvider ConnectionProvider + { + get { return _connectionProvider; } + } + + /// + /// The SQL dialect + /// + public ISQLDialect Dialect + { + get { return _dialect; } + } + + /// + /// Initializes the message queueing service by creating the necessary objects in the + /// SQL database + /// + [SuppressMessage("Microsoft.Security", "CA2100:Review SQL queries for security vulnerabilities")] + public void Init() + { + var connection = _connectionProvider.GetConnection(); + try + { + using (var command = connection.CreateCommand()) + { + command.CommandType = CommandType.Text; + command.CommandText = _dialect.CreateMessageJournalingServiceObjectsCommand; + command.ExecuteNonQuery(); + } + } + finally + { + _connectionProvider.ReleaseConnection(connection); + } + } + + /// + /// Initializes a new with the specified connection + /// string settings and dialect + /// + /// The connection string settings to use to connect to + /// the SQL database + /// (Optional) The SQL dialect to use + /// Thrown if + /// is null + /// + /// If a SQL dialect is not specified, then one will be selected based on the supplied + /// connection string settings + /// + /// + /// + public SQLMessageJournalingService(ConnectionStringSettings connectionStringSettings, ISQLDialect dialect = null) + { + if (connectionStringSettings == null) throw new ArgumentNullException("connectionStringSettings"); + _connectionProvider = new DefaultConnectionProvider(connectionStringSettings); + _dialect = dialect ?? connectionStringSettings.GetSQLDialect(); + } + + /// + /// Initializes a new with the specified connection + /// provider and dialect + /// + /// The connection provider to use to connect to + /// the SQL database + /// The SQL dialect to use + /// Thrown if + /// or is null + public SQLMessageJournalingService(IDbConnectionProvider connectionProvider, ISQLDialect dialect) + { + if (connectionProvider == null) throw new ArgumentNullException("connectionProvider"); + if (dialect == null) throw new ArgumentNullException("dialect"); + _connectionProvider = connectionProvider; + _dialect = dialect; + } + + /// + public Task MessageReceived(Message message, CancellationToken cancellationToken = new CancellationToken()) + { + return InsertJournaledMessage(message, "Received"); + } + + /// + public Task MessageSent(Message message, CancellationToken cancellationToken = new CancellationToken()) + { + return InsertJournaledMessage(message, "Sent"); + } + + /// + public Task MessagePublished(Message message, CancellationToken cancellationToken = new CancellationToken()) + { + return InsertJournaledMessage(message, "Published"); + } + + /// + /// Selects all journaled messages from the SQL database + /// + /// Returns a task that completes when all records have been selected and whose + /// result is the enumerable sequence of the selected records + [SuppressMessage("Microsoft.Security", "CA2100:Review SQL queries for security vulnerabilities")] + protected virtual Task> SelectJournaledMessages() + { + var queuedMessages = new List(); + var connection = _connectionProvider.GetConnection(); + try + { + using (var scope = new TransactionScope(TransactionScopeOption.Required)) + { + using (var command = connection.CreateCommand()) + { + command.CommandType = CommandType.Text; + command.CommandText = _dialect.SelectJournaledMessagesCommand; + + using (var reader = command.ExecuteReader()) + { + while (reader.Read()) + { + var messageContent = reader.GetString("MessageContent"); + var category = reader.GetString("Category"); + var headers = DeserializeHeaders(reader.GetString("Headers")); + var message = new Message(headers, messageContent); + var queuedMessage = new SQLJournaledMessage(message, category); + queuedMessages.Add(queuedMessage); + } + } + } + scope.Complete(); + } + } + finally + { + _connectionProvider.ReleaseConnection(connection); + } + + // SQL calls are not async to avoid the need for TransactionAsyncFlowOption + // and dependency on .NET 4.5.1 and later + return Task.FromResult(queuedMessages.AsEnumerable()); + } + + /// + /// Inserts a journaled message into the SQL database + /// + /// The message to enqueue + /// The journaled message category, e.g. "Sent", "Received", or "Published" + /// Returns a task that completes when the message has been inserted into the SQL + /// database and whose result is a copy of the inserted record + [SuppressMessage("Microsoft.Security", "CA2100:Review SQL queries for security vulnerabilities")] + protected virtual Task InsertJournaledMessage(Message message, string category) + { + if (message == null) throw new ArgumentNullException("message"); + if (string.IsNullOrWhiteSpace(category)) throw new ArgumentNullException("category"); + + SQLJournaledMessage journaledMessage; + var connection = _connectionProvider.GetConnection(); + try + { + using (var scope = new TransactionScope(TransactionScopeOption.Required)) + { + using (var command = connection.CreateCommand()) + { + command.CommandType = CommandType.Text; + command.CommandText = _dialect.InsertJournaledMessageCommand; + + var headers = message.Headers; + + command.SetParameter(_dialect.MessageIdParameterName, (Guid)headers.MessageId); + command.SetParameter(_dialect.CategoryParameterName, category); + command.SetParameter(_dialect.MessageNameParameterName, (string)headers.MessageName); + command.SetParameter(_dialect.OriginationParameterName, + headers.Origination == null ? null : headers.Origination.ToString()); + command.SetParameter(_dialect.DestinationParameterName, + headers.Destination == null ? null : headers.Destination.ToString()); + command.SetParameter(_dialect.ReplyToParameterName, + headers.ReplyTo == null ? null : headers.ReplyTo.ToString()); + command.SetParameter(_dialect.ExpiresParameterName, headers.Expires); + command.SetParameter(_dialect.ContentTypeParameterName, headers.ContentType); + command.SetParameter(_dialect.HeadersParameterName, SerializeHeaders(headers)); + command.SetParameter(_dialect.MessageContentParameterName, message.Content); + + command.ExecuteNonQuery(); + } + scope.Complete(); + } + journaledMessage = new SQLJournaledMessage(message, category); + } + finally + { + _connectionProvider.ReleaseConnection(connection); + } + + // SQL calls are not async to avoid the need for TransactionAsyncFlowOption + // and dependency on .NET 4.5.1 and later + return Task.FromResult(journaledMessage); + } + + /// + /// A helper method to serialize message headers so that they can be inserted into a + /// single column in the SQL database + /// + /// The message headers to serialize + /// Returns the serialized message headers + protected virtual string SerializeHeaders(IMessageHeaders headers) + { + if (headers == null) return null; + + using (var writer = new StringWriter()) + { + foreach (var header in headers) + { + var headerName = header.Key; + var headerValue = header.Value; + writer.Write("{0}: ", headerName); + using (var headerValueReader = new StringReader(headerValue)) + { + var multilineContinuation = false; + string line; + while ((line = headerValueReader.ReadLine()) != null) + { + if (multilineContinuation) + { + // Prefix continuation with whitespace so that subsequent + // lines are not confused with different headers. + line = " " + line; + } + writer.WriteLine(line); + multilineContinuation = true; + } + } + } + return writer.ToString(); + } + } + + /// + /// Helper method to deserialize message headers read from a record in the SQL database + /// + /// The serialized header string + /// Returns the deserialized headers + /// + /// This method performs the inverse of + /// + protected virtual IMessageHeaders DeserializeHeaders(string headerString) + { + var headers = new MessageHeaders(); + if (string.IsNullOrWhiteSpace(headerString)) return headers; + + var currentHeaderName = (HeaderName)null; + var currentHeaderValue = new StringWriter(); + var finishedReadingHeaders = false; + var lineNumber = 0; + + using (var reader = new StringReader(headerString)) + { + string currentLine; + while (!finishedReadingHeaders && (currentLine = reader.ReadLine()) != null) + { + lineNumber++; + if (string.IsNullOrWhiteSpace(currentLine)) + { + if (currentHeaderName != null) + { + headers[currentHeaderName] = currentHeaderValue.ToString(); + currentHeaderName = null; + currentHeaderValue = new StringWriter(); + } + + finishedReadingHeaders = true; + continue; + } + + if (currentLine.StartsWith(" ") && currentHeaderName != null) + { + // Continuation of previous header value + currentHeaderValue.WriteLine(); + currentHeaderValue.Write(currentLine.Trim()); + continue; + } + + // New header. Finish up with the header we were just working on. + if (currentHeaderName != null) + { + headers[currentHeaderName] = currentHeaderValue.ToString(); + currentHeaderValue = new StringWriter(); + } + + if (currentLine.StartsWith("#")) + { + // Special line. Ignore. + continue; + } + + var separatorPos = currentLine.IndexOf(':'); + if (separatorPos < 0) + { + throw new FormatException(string.Format("Invalid header on line {0}: Character ':' expected", + lineNumber)); + } + + if (separatorPos == 0) + { + throw new FormatException( + string.Format( + "Invalid header on line {0}: Character ':' found at position 0 (missing header name)", + lineNumber)); + } + + currentHeaderName = currentLine.Substring(0, separatorPos); + currentHeaderValue.Write(currentLine.Substring(separatorPos + 1).Trim()); + } + + // Make sure we set the last header we were working on, if there is one + if (currentHeaderName != null) + { + headers[currentHeaderName] = currentHeaderValue.ToString(); + } + } + + return headers; + } + } +} diff --git a/Source/Platibus/SQL/SQLServicesProvider.cs b/Source/Platibus/SQL/SQLServicesProvider.cs index 3d6a5b00..eca9c79a 100644 --- a/Source/Platibus/SQL/SQLServicesProvider.cs +++ b/Source/Platibus/SQL/SQLServicesProvider.cs @@ -31,13 +31,9 @@ namespace Platibus.SQL /// A provider for SQL-based message queueing and subscription tracking services /// [Provider("SQL")] - public class SQLServicesProvider : IMessageQueueingServiceProvider, ISubscriptionTrackingServiceProvider + public class SQLServicesProvider : IMessageQueueingServiceProvider, IMessageJournalingServiceProvider, ISubscriptionTrackingServiceProvider { - /// - /// Returns an SQL-based message queueing service - /// - /// The queueing configuration element - /// Returns an SQL-based message queueing service + /// public Task CreateMessageQueueingService(QueueingElement configuration) { var connectionName = configuration.GetString("connectionName"); @@ -57,11 +53,27 @@ public Task CreateMessageQueueingService(QueueingElemen return Task.FromResult(sqlMessageQueueingService); } - /// - /// Returns an SQL-based subscription tracking service - /// - /// The subscription tracking configuration element - /// Returns an SQL-based subscription tracking service + /// + public Task CreateMessageJournalingService(JournalingElement configuration) + { + var connectionName = configuration.GetString("connectionName"); + if (string.IsNullOrWhiteSpace(connectionName)) + { + throw new ConfigurationErrorsException( + "Attribute 'connectionName' is required for SQL message journaling service"); + } + + var connectionStringSettings = ConfigurationManager.ConnectionStrings[connectionName]; + if (connectionStringSettings == null) + { + throw new ConfigurationErrorsException("Connection string settings \"" + connectionName + "\" not found"); + } + var sqlMessageJournalingService = new SQLMessageJournalingService(connectionStringSettings); + sqlMessageJournalingService.Init(); + return Task.FromResult(sqlMessageJournalingService); + } + + /// public async Task CreateSubscriptionTrackingService( SubscriptionTrackingElement configuration) {