diff --git a/Source/Platibus.IntegrationTests/HttpPubSubTests.cs b/Source/Platibus.IntegrationTests/HttpPubSubTests.cs index 6daa85a8..af2477ab 100644 --- a/Source/Platibus.IntegrationTests/HttpPubSubTests.cs +++ b/Source/Platibus.IntegrationTests/HttpPubSubTests.cs @@ -23,6 +23,7 @@ using System; using System.Threading.Tasks; using NUnit.Framework; +using Platibus.Http; namespace Platibus.IntegrationTests { @@ -30,6 +31,12 @@ internal class HttpPubSubTests { private static readonly Random RNG = new Random(); + [SetUp] + public void SetUp() + { + TestPublicationHandler.Reset(); + } + [Test] public async Task Given_Subscriber_When_Message_Published_Then_Subscriber_Should_Receive_It() { @@ -49,5 +56,64 @@ await With.HttpHostedBusInstances(async (platibus0, platibus1) => Assert.That(publicationReceived, Is.True); }); } + + [Test] + public async Task Given_2Subscribers_When_Message_Published_Then_Both_Subscribers_Should_Receive_It() + { + await With.HttpHostedBusInstances(async (platibus0, platibus1) => + { + // Start second subscriber + using (await HttpServer.Start("platibus.http2")) + { + // Wait for two publications to be received (one on each subscriber) + TestPublicationHandler.MessageReceivedEvent.AddCount(1); + + var publication = new TestPublication + { + GuidData = Guid.NewGuid(), + IntData = RNG.Next(0, int.MaxValue), + StringData = "Hello, world!", + DateData = DateTime.UtcNow + }; + + await platibus0.Publish(publication, "Topic0"); + + var publicationReceived = await TestPublicationHandler.WaitHandle.WaitOneAsync(TimeSpan.FromSeconds(3)); + Assert.That(publicationReceived, Is.True); + } + }); + } + + [Test] + public async Task Given_2Subscribers_1_Not_Available_When_Message_Published_Then_The_Available_Subscribers_Should_Receive_It() + { + await With.HttpHostedBusInstances(async (platibus0, platibus1) => + { + // Start second subscriber to create the subscription, then immediately stop it + using (await HttpServer.Start("platibus.http2")) + { + } + + var publication = new TestPublication + { + GuidData = Guid.NewGuid(), + IntData = RNG.Next(0, int.MaxValue), + StringData = "Hello, world!", + DateData = DateTime.UtcNow + }; + + try + { + await platibus0.Publish(publication, "Topic0"); + } + catch (Exception) + { + // We expect an aggregate exception with a 404 error for the second subscriber + } + + var publicationReceived = await TestPublicationHandler.WaitHandle.WaitOneAsync(TimeSpan.FromSeconds(3)); + Assert.That(publicationReceived, Is.True); + }); + } } } \ No newline at end of file diff --git a/Source/Platibus.IntegrationTests/HttpSubscriptionRenewalTests.cs b/Source/Platibus.IntegrationTests/HttpSubscriptionRenewalTests.cs index e38ae781..536cad2c 100644 --- a/Source/Platibus.IntegrationTests/HttpSubscriptionRenewalTests.cs +++ b/Source/Platibus.IntegrationTests/HttpSubscriptionRenewalTests.cs @@ -9,6 +9,12 @@ class HttpSubscriptionRenewalTests { private static readonly Random RNG = new Random(); + [SetUp] + public void SetUp() + { + TestPublicationHandler.Reset(); + } + [Test] public async Task Given_10Second_TTL_When_Publishing_Message_After_15Seconds_The_Publication_Should_Be_Received() { diff --git a/Source/Platibus.IntegrationTests/LoopbackPubSubTests.cs b/Source/Platibus.IntegrationTests/LoopbackPubSubTests.cs index 32fa45a0..549ef1e3 100644 --- a/Source/Platibus.IntegrationTests/LoopbackPubSubTests.cs +++ b/Source/Platibus.IntegrationTests/LoopbackPubSubTests.cs @@ -8,6 +8,12 @@ internal class LoopbackPubSubTests { private static readonly Random RNG = new Random(); + [SetUp] + public void SetUp() + { + TestPublicationHandler.Reset(); + } + [Test] public async Task Given_Automatic_Subscription_When_Message_Published_Then_Subscriber_Should_Receive_It() { diff --git a/Source/Platibus.IntegrationTests/LoopbackSendAndReplyTests.cs b/Source/Platibus.IntegrationTests/LoopbackSendAndReplyTests.cs index 8799bef3..947501bc 100644 --- a/Source/Platibus.IntegrationTests/LoopbackSendAndReplyTests.cs +++ b/Source/Platibus.IntegrationTests/LoopbackSendAndReplyTests.cs @@ -11,6 +11,12 @@ internal class LoopbackSendAndReplyTests { private static readonly Random RNG = new Random(); + [SetUp] + public void SetUp() + { + TestPublicationHandler.Reset(); + } + [Test] public async Task Given_Noncritical_Message_Not_Authorized_When_Sending_Then_UnauthorizedAccessException() { diff --git a/Source/Platibus.IntegrationTests/RabbitMQPubSubTests.cs b/Source/Platibus.IntegrationTests/RabbitMQPubSubTests.cs index c0d27b9c..3ae36bad 100644 --- a/Source/Platibus.IntegrationTests/RabbitMQPubSubTests.cs +++ b/Source/Platibus.IntegrationTests/RabbitMQPubSubTests.cs @@ -8,6 +8,12 @@ internal class RabbitMQPubSubTests { private static readonly Random RNG = new Random(); + [SetUp] + public void SetUp() + { + TestPublicationHandler.Reset(); + } + [Explicit] [Test] public async Task Given_Subscriber_When_Message_Published_Then_Subscriber_Should_Receive_It() diff --git a/Source/Platibus.IntegrationTests/TestPublicationHandler.cs b/Source/Platibus.IntegrationTests/TestPublicationHandler.cs index fb7125dc..1596483d 100644 --- a/Source/Platibus.IntegrationTests/TestPublicationHandler.cs +++ b/Source/Platibus.IntegrationTests/TestPublicationHandler.cs @@ -27,11 +27,20 @@ namespace Platibus.IntegrationTests { public class TestPublicationHandler : IMessageHandler { - private static readonly AutoResetEvent MessageReceivedEvent = new AutoResetEvent(false); + private static readonly object SyncRoot = new object(); + private volatile static CountdownEvent _messageReceivedEvent = new CountdownEvent(1); + + public static CountdownEvent MessageReceivedEvent { get { return _messageReceivedEvent; } } public static WaitHandle WaitHandle { - get { return MessageReceivedEvent; } + get + { + lock (SyncRoot) + { + return _messageReceivedEvent.WaitHandle; + } + } } public string Name @@ -41,9 +50,21 @@ public string Name public Task HandleMessage(object content, IMessageContext messageContext, CancellationToken cancellationToken) { - MessageReceivedEvent.Set(); messageContext.Acknowledge(); + lock (SyncRoot) + { + _messageReceivedEvent.Signal(); + } return Task.FromResult(true); } + + public static void Reset() + { + lock (SyncRoot) + { + if (_messageReceivedEvent != null) _messageReceivedEvent.Dispose(); + _messageReceivedEvent = new CountdownEvent(1); + } + } } } \ No newline at end of file diff --git a/Source/Platibus.IntegrationTests/app.config b/Source/Platibus.IntegrationTests/app.config index e6c64260..06c85b73 100644 --- a/Source/Platibus.IntegrationTests/app.config +++ b/Source/Platibus.IntegrationTests/app.config @@ -24,179 +24,194 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. --> - - -
- -
-
-
-
-
-
-
-
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + +
+ +
+
+
+
+
+
+
+
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/Source/Platibus/Http/HttpTransportService.cs b/Source/Platibus/Http/HttpTransportService.cs index 1fd2719a..5b11055f 100644 --- a/Source/Platibus/Http/HttpTransportService.cs +++ b/Source/Platibus/Http/HttpTransportService.cs @@ -130,6 +130,7 @@ public async Task SendMessage(Message message, IEndpointCredentials credentials if (message.Headers.Importance == MessageImportance.Critical) { + Log.DebugFormat("Enqueueing critical message ID {0} for queued outbound delivery...", message.Headers.MessageId); await _messageQueueingService.EnqueueMessage(_outboundQueueName, message, null, cancellationToken); return; } @@ -167,6 +168,14 @@ public async Task PublishMessage(Message message, TopicName topicName, Cancellat }; var addressedMessage = new Message(perEndpointHeaders, message.Content); + if (addressedMessage.Headers.Importance == MessageImportance.Critical) + { + Log.DebugFormat("Enqueueing critical message ID {0} for queued outbound delivery...", message.Headers.MessageId); + await _messageQueueingService.EnqueueMessage(_outboundQueueName, message, null, cancellationToken); + return; + } + + Log.DebugFormat("Forwarding message ID {0} published to topic {0} to subscriber {2}...", message.Headers.MessageId, topicName, subscriber); transportTasks.Add(TransportMessage(addressedMessage, subscriberCredentials, cancellationToken)); } @@ -204,11 +213,17 @@ private async Task TransportMessage(Message message, IEndpointCredentials creden var endpointBaseUri = message.Headers.Destination.WithTrailingSlash(); var httpClient = GetClient(endpointBaseUri, credentials); - var messageId = message.Headers.MessageId; var urlEncondedMessageId = HttpUtility.UrlEncode(messageId); var relativeUri = string.Format("message/{0}", urlEncondedMessageId); + + var postUri = new Uri(endpointBaseUri, relativeUri); + Log.DebugFormat("POSTing content of message ID {0} to URI {1}...", message.Headers.MessageId, postUri); var httpResponseMessage = await httpClient.PostAsync(relativeUri, httpContent, cancellationToken); + Log.DebugFormat("Received HTTP response code {0} {1} for POST request {2}...", + (int)httpResponseMessage.StatusCode, + httpResponseMessage.StatusCode.ToString("G"), + postUri); HandleHttpErrorResponse(httpResponseMessage); diff --git a/Source/Version.cs b/Source/Version.cs index 0b032001..484ed43a 100644 --- a/Source/Version.cs +++ b/Source/Version.cs @@ -22,4 +22,4 @@ using System.Reflection; -[assembly: AssemblyVersion("2.1.8.0")] \ No newline at end of file +[assembly: AssemblyVersion("2.1.9.0")] \ No newline at end of file