Skip to content

Commit

Permalink
Implemented SQL and SQLite messaging journaling services
Browse files Browse the repository at this point in the history
  • Loading branch information
sweetlandj committed Feb 2, 2017
1 parent c19d77c commit b2edeeb
Show file tree
Hide file tree
Showing 26 changed files with 1,282 additions and 50 deletions.
1 change: 1 addition & 0 deletions Source/Platibus.SQLite/Platibus.SQLite.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
<Compile Include="SQLiteDialect.cs" />
<Compile Include="SQLiteDialectProvider.cs" />
<Compile Include="SQLiteLoggingCategories.cs" />
<Compile Include="SQLiteMessageJournalingService.cs" />
<Compile Include="SQLiteMessageQueue.cs" />
<Compile Include="SQLiteMessageQueueingService.cs" />
<Compile Include="SQLiteOperation.cs" />
Expand Down
33 changes: 33 additions & 0 deletions Source/Platibus.SQLite/SQLiteDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,39 @@ CREATE INDEX IF NOT EXISTS [PB_QueuedMessages_IX_QueueName]
ON [PB_QueuedMessages]([QueueName]);"; }
}

/// <summary>
/// The SQLite commands used to create the objects (tables, indexes,
/// stored procedures, views, etc.) needed to store queued messages in the
/// SQL database
/// </summary>
public override string CreateMessageJournalingServiceObjectsCommand
{
get { return @"
CREATE TABLE IF NOT EXISTS [PB_MessageJournal]
(
[Id] INTEGER PRIMARY KEY AUTOINCREMENT,
[MessageId] TEXT NOT NULL,
[Category] TEXT NOT NULL,
[MessageName] TEXT NULL,
[Origination] TEXT NULL,
[Destination] TEXT NULL,
[ReplyTo] TEXT NULL,
[Expires] TEXT NULL,
[ContentType] TEXT NULL,
[Headers] TEXT,
[MessageContent] TEXT,
[Attempts] INT NOT NULL DEFAULT 0,
[Acknowledged] TEXT NULL,
[Abandoned] TEXT NULL
);
CREATE INDEX IF NOT EXISTS [PB_MessageJournal_IX_MessageId]
ON [PB_MessageJournal]([MessageId]);
CREATE INDEX IF NOT EXISTS [PB_MessageJournal_IX_Category]
ON [PB_MessageJournal]([Category]);"; }
}

/// <summary>
/// The SQLite commands used to create the objects (tables, indexes,
/// stored procedures, views, etc.) needed to store subscription tracking data
Expand Down
145 changes: 145 additions & 0 deletions Source/Platibus.SQLite/SQLiteMessageJournalingService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Platibus.SQL;

namespace Platibus.SQLite
{
/// <summary>
/// An <see cref="IMessageQueueingService"/> implementation that stores queued
/// messages in a SQLite database
/// </summary>
public class SQLiteMessageJournalingService : SQLMessageJournalingService, IDisposable
{
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly ActionBlock<ISQLiteOperation> _operationQueue;

private bool _disposed;

/// <summary>
/// Initializes a new <see cref="SQLiteMessageQueueingService"/>
/// </summary>
/// <param name="baseDirectory">The directory in which the SQLite database files will
/// be created</param>
/// <remarks>
/// If a base directory is not specified then the base directory will default to a
/// directory named <c>platibus\queues</c> beneath the current app domain base
/// directory. If the base directory does not exist it will be created.
/// </remarks>
public SQLiteMessageJournalingService(DirectoryInfo baseDirectory)
: base(InitDb(baseDirectory), new SQLiteDialect())
{
_cancellationTokenSource = new CancellationTokenSource();
_operationQueue = new ActionBlock<ISQLiteOperation>(
op => op.Execute(),
new ExecutionDataflowBlockOptions
{
CancellationToken = _cancellationTokenSource.Token,
MaxDegreeOfParallelism = 1
});
}

[SuppressMessage("Microsoft.Security", "CA2100:Review SQL queries for security vulnerabilities")]
private static IDbConnectionProvider InitDb(DirectoryInfo directory)
{
if (directory == null)
{
var appdomainDirectory = AppDomain.CurrentDomain.BaseDirectory;
directory = new DirectoryInfo(Path.Combine(appdomainDirectory, "platibus", "journal"));
}

var dbPath = Path.Combine(directory.FullName, "journal.db");
var connectionStringSettings = new ConnectionStringSettings
{
Name = dbPath,
ConnectionString = "Data Source=" + dbPath + "; Version=3",
ProviderName = "System.Data.SQLite"
};

var connectionProvider = new SingletonConnectionProvider(connectionStringSettings);
var connection = connectionProvider.GetConnection();
try
{
using (var command = connection.CreateCommand())
{
command.CommandType = CommandType.Text;
command.CommandText = new SQLiteDialect().CreateMessageJournalingServiceObjectsCommand;
command.ExecuteNonQuery();
}
}
finally
{
connectionProvider.ReleaseConnection(connection);
}
return connectionProvider;
}

/// <inheritdoc />
protected override Task<SQLJournaledMessage> InsertJournaledMessage(Message message, string category)
{
CheckDisposed();
var op = new SQLiteOperation<SQLJournaledMessage>(() => base.InsertJournaledMessage(message, category));
_operationQueue.Post(op);
return op.Task;
}

/// <inheritdoc />
protected override Task<IEnumerable<SQLJournaledMessage>> SelectJournaledMessages()
{
CheckDisposed();
var op = new SQLiteOperation<IEnumerable<SQLJournaledMessage>>(() => base.SelectJournaledMessages());
_operationQueue.Post(op);
return op.Task;
}

/// <summary>
/// Throws <see cref="ObjectDisposedException"/> if this object has been disposed
/// </summary>
/// <exception cref="ObjectDisposedException">Thrown if this object has been disposed</exception>
protected virtual void CheckDisposed()
{
if (_disposed) throw new ObjectDisposedException(GetType().FullName);
}

/// <summary>
/// Finalizer to ensure that all resources are released
/// </summary>
~SQLiteMessageJournalingService()
{
Dispose(false);
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <filterpriority>2</filterpriority>
public void Dispose()
{
if (_disposed) return;
Dispose(true);
_disposed = true;
GC.SuppressFinalize(this);
}

/// <summary>
/// Called by the <see cref="Dispose()"/> method or by the finalizer to free held resources
/// </summary>
/// <param name="disposing">Indicates whether this method is called from the
/// <see cref="Dispose()"/> method (<c>true</c>) or from the finalizer (<c>false</c>)</param>
[SuppressMessage("Microsoft.Usage", "CA2213:DisposableFieldsShouldBeDisposed", MessageId = "_cancellationTokenSource")]
protected virtual void Dispose(bool disposing)
{
_cancellationTokenSource.Cancel();
if (disposing)
{
_cancellationTokenSource.TryDispose();
}
}
}
}
39 changes: 16 additions & 23 deletions Source/Platibus.SQLite/SQLiteServicesProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,9 @@ namespace Platibus.SQLite
/// A provider for SQLite-based message queueing and subscription tracking services
/// </summary>
[Provider("SQLite")]
public class SQLiteServicesProvider : IMessageQueueingServiceProvider, ISubscriptionTrackingServiceProvider
public class SQLiteServicesProvider : IMessageQueueingServiceProvider, IMessageJournalingServiceProvider, ISubscriptionTrackingServiceProvider
{
/// <summary>
/// Creates an initializes a <see cref="IMessageQueueingService"/>
/// based on the provideds <paramref name="configuration"/>
/// </summary>
/// <param name="configuration">The journaling configuration
/// element</param>
/// <returns>Returns a task whose result is an initialized
/// <see cref="IMessageQueueingService"/></returns>
/// <inheritdoc />
public Task<IMessageQueueingService> CreateMessageQueueingService(QueueingElement configuration)
{
var path = configuration.GetString("path");
Expand All @@ -73,14 +66,17 @@ public Task<IMessageQueueingService> CreateMessageQueueingService(QueueingElemen
return Task.FromResult<IMessageQueueingService>(sqliteMessageQueueingService);
}

/// <summary>
/// Creates an initializes a <see cref="ISubscriptionTrackingService"/>
/// based on the provideds <paramref name="configuration"/>.
/// </summary>
/// <param name="configuration">The journaling configuration
/// element.</param>
/// <returns>Returns a task whose result is an initialized
/// <see cref="ISubscriptionTrackingService"/>.</returns>
/// <inheritdoc />
public Task<IMessageJournalingService> CreateMessageJournalingService(JournalingElement configuration)
{
var path = configuration.GetString("path");
var sqliteBaseDir = new DirectoryInfo(GetRootedPath(path));
var sqliteMessageQueueingService = new SQLiteMessageJournalingService(sqliteBaseDir);
sqliteMessageQueueingService.Init();
return Task.FromResult<IMessageJournalingService>(sqliteMessageQueueingService);
}

/// <inheritdoc />
public async Task<ISubscriptionTrackingService> CreateSubscriptionTrackingService(
SubscriptionTrackingElement configuration)
{
Expand All @@ -99,12 +95,9 @@ private static string GetRootedPath(string path)
return defaultRootedPath;
}

if (Path.IsPathRooted(path))
{
return path;
}

return Path.Combine(defaultRootedPath, path);
return Path.IsPathRooted(path)
? path
: Path.Combine(defaultRootedPath, path);
}
}
}
19 changes: 15 additions & 4 deletions Source/Platibus.SampleWebApp/Web.config
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,17 @@
</nlog>

<!--
This configuration is used by the HTTP module or HTTP handler but not the
OWIN middleware. The baseUri must agree with the bidnings in IIS.
This configuration is used by the PlatibusHttpModule module and the
PlatibusHttpHandler. It will be ignored by the PlatibusOwinMiddleware
is used.
The baseUri must agree with the bindings in IIS.
-->
<platibus.iis baseUri="http://localhost:52180/platibus/" replyTimeout="00:00:30" bypassTransportLocalDestination="true">
<journaling provider="Filesystem" path="platibus\journal" />
<!-- <journaling provider="SQLite" path="platibus\journal" /> -->
<!-- <journaling provider="SQL" connectionName="Platibus" /> -->

<queueing provider="Filesystem" path="platibus\queues" />
<!-- <queueing provider="SQLite" path="platibus\queues\sqlite" /> -->
<!-- <queueing provider="SQL" connectionName="Platibus" /> -->
Expand All @@ -78,11 +84,16 @@
</platibus.iis>

<!--
This configuration is used by the OWIN middleware but not the HTTP module or
HTTP handler. The baseUri must agree with the bidnings in IIS.
This configuration is used by the PlatibusOwinMiddleware. It will be
ignored by the PlatibusHttpModule and PlatibusHttpHandler.
The baseUri must agree with the bindings in IIS or the HttpListener host.
-->
<platibus.owin baseUri="http://localhost:52180/platibus/" replyTimeout="00:00:30" bypassTransportLocalDestination="true">
<journaling provider="Filesystem" path="platibus\journal" />
<!--<journaling provider="SQLite" path="platibus\journal" />-->
<!--<journaling provider="SQL" connectionName="Platibus" />-->

<queueing provider="Filesystem" path="platibus\queues" />
<!-- <queueing provider="SQLite" path="platibus\queues\sqlite" /> -->
<!-- <queueing provider="SQL" connectionName="Platibus" /> -->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System;
using System.IO;
using NUnit.Framework;

namespace Platibus.UnitTests.Filesystem
{
[SetUpFixture]
public class FilesystemCollectionFixture
{
public static FilesystemCollectionFixture Instance;

[SetUp]
public void SetUp()
{
Instance = new FilesystemCollectionFixture();
}

private readonly DirectoryInfo _baseDirectory;

public DirectoryInfo BaseDirectory { get { return _baseDirectory; } }

public FilesystemCollectionFixture()
{
_baseDirectory = GetTempDirectory();
}

protected DirectoryInfo GetTempDirectory()
{
var tempPath = Path.Combine(Path.GetTempPath(), "Platibus.UnitTests", DateTime.Now.ToString("yyyyMMddHHmmss"));
var tempDir = new DirectoryInfo(tempPath);
if (!tempDir.Exists)
{
tempDir.Create();
}
return tempDir;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Platibus.Filesystem;

namespace Platibus.UnitTests.Filesystem
{
internal class FilesystemMessageJournalInspector
{
private readonly DirectoryInfo _baseDirectory;

public FilesystemMessageJournalInspector(DirectoryInfo baseDirectory)
{
_baseDirectory = baseDirectory;
}

public async Task<IEnumerable<Message>> EnumerateSentMessages()
{
var sentMessagePath = Path.Combine(_baseDirectory.FullName, "sent");
var sentMessageDirectory = new DirectoryInfo(sentMessagePath);
var journaledMessageFileTasks = sentMessageDirectory
.GetFiles("*.pmsg", SearchOption.AllDirectories)
.Select(file => new MessageFile(file))
.Select(messageFile => messageFile.ReadMessage());
return await Task.WhenAll(journaledMessageFileTasks);
}

public async Task<IEnumerable<Message>> EnumerateReceivedMessages()
{
var receivedMessagePath = Path.Combine(_baseDirectory.FullName, "received");
var receivedMessageDirectory = new DirectoryInfo(receivedMessagePath);
var journaledMessageFileTasks = receivedMessageDirectory
.GetFiles("*.pmsg", SearchOption.AllDirectories)
.Select(file => new MessageFile(file))
.Select(messageFile => messageFile.ReadMessage());
return await Task.WhenAll(journaledMessageFileTasks);
}

public async Task<IEnumerable<Message>> EnumeratePublishedMessages()
{
var publishedMessagePath = Path.Combine(_baseDirectory.FullName, "published");
var publishedMessageDirectory = new DirectoryInfo(publishedMessagePath);
var journaledMessageFileTasks = publishedMessageDirectory
.GetFiles("*.pmsg", SearchOption.AllDirectories)
.Select(file => new MessageFile(file))
.Select(messageFile => messageFile.ReadMessage());
return await Task.WhenAll(journaledMessageFileTasks);
}
}
}
Loading

0 comments on commit b2edeeb

Please sign in to comment.