Skip to content

Commit

Permalink
Improved HTTP transport logging. Added tests to verify behavior with …
Browse files Browse the repository at this point in the history
…multiple subscribers.
  • Loading branch information
sweetlandj committed Mar 17, 2016
1 parent c495ed8 commit 2419b54
Show file tree
Hide file tree
Showing 9 changed files with 321 additions and 180 deletions.
66 changes: 66 additions & 0 deletions Source/Platibus.IntegrationTests/HttpPubSubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,20 @@
using System;
using System.Threading.Tasks;
using NUnit.Framework;
using Platibus.Http;

namespace Platibus.IntegrationTests
{
internal class HttpPubSubTests
{
private static readonly Random RNG = new Random();

[SetUp]
public void SetUp()
{
TestPublicationHandler.Reset();
}

[Test]
public async Task Given_Subscriber_When_Message_Published_Then_Subscriber_Should_Receive_It()
{
Expand All @@ -49,5 +56,64 @@ await With.HttpHostedBusInstances(async (platibus0, platibus1) =>
Assert.That(publicationReceived, Is.True);
});
}

[Test]
public async Task Given_2Subscribers_When_Message_Published_Then_Both_Subscribers_Should_Receive_It()
{
await With.HttpHostedBusInstances(async (platibus0, platibus1) =>
{
// Start second subscriber
using (await HttpServer.Start("platibus.http2"))
{
// Wait for two publications to be received (one on each subscriber)
TestPublicationHandler.MessageReceivedEvent.AddCount(1);

var publication = new TestPublication
{
GuidData = Guid.NewGuid(),
IntData = RNG.Next(0, int.MaxValue),
StringData = "Hello, world!",
DateData = DateTime.UtcNow
};

await platibus0.Publish(publication, "Topic0");

var publicationReceived = await TestPublicationHandler.WaitHandle.WaitOneAsync(TimeSpan.FromSeconds(3));
Assert.That(publicationReceived, Is.True);
}
});
}

[Test]
public async Task Given_2Subscribers_1_Not_Available_When_Message_Published_Then_The_Available_Subscribers_Should_Receive_It()
{
await With.HttpHostedBusInstances(async (platibus0, platibus1) =>
{
// Start second subscriber to create the subscription, then immediately stop it
using (await HttpServer.Start("platibus.http2"))
{
}

var publication = new TestPublication
{
GuidData = Guid.NewGuid(),
IntData = RNG.Next(0, int.MaxValue),
StringData = "Hello, world!",
DateData = DateTime.UtcNow
};

try
{
await platibus0.Publish(publication, "Topic0");
}
catch (Exception)
{
// We expect an aggregate exception with a 404 error for the second subscriber
}

var publicationReceived = await TestPublicationHandler.WaitHandle.WaitOneAsync(TimeSpan.FromSeconds(3));
Assert.That(publicationReceived, Is.True);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ class HttpSubscriptionRenewalTests
{
private static readonly Random RNG = new Random();

[SetUp]
public void SetUp()
{
TestPublicationHandler.Reset();
}

[Test]
public async Task Given_10Second_TTL_When_Publishing_Message_After_15Seconds_The_Publication_Should_Be_Received()
{
Expand Down
6 changes: 6 additions & 0 deletions Source/Platibus.IntegrationTests/LoopbackPubSubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ internal class LoopbackPubSubTests
{
private static readonly Random RNG = new Random();

[SetUp]
public void SetUp()
{
TestPublicationHandler.Reset();
}

[Test]
public async Task Given_Automatic_Subscription_When_Message_Published_Then_Subscriber_Should_Receive_It()
{
Expand Down
6 changes: 6 additions & 0 deletions Source/Platibus.IntegrationTests/LoopbackSendAndReplyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ internal class LoopbackSendAndReplyTests
{
private static readonly Random RNG = new Random();

[SetUp]
public void SetUp()
{
TestPublicationHandler.Reset();
}

[Test]
public async Task Given_Noncritical_Message_Not_Authorized_When_Sending_Then_UnauthorizedAccessException()
{
Expand Down
6 changes: 6 additions & 0 deletions Source/Platibus.IntegrationTests/RabbitMQPubSubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ internal class RabbitMQPubSubTests
{
private static readonly Random RNG = new Random();

[SetUp]
public void SetUp()
{
TestPublicationHandler.Reset();
}

[Explicit]
[Test]
public async Task Given_Subscriber_When_Message_Published_Then_Subscriber_Should_Receive_It()
Expand Down
27 changes: 24 additions & 3 deletions Source/Platibus.IntegrationTests/TestPublicationHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,20 @@ namespace Platibus.IntegrationTests
{
public class TestPublicationHandler : IMessageHandler
{
private static readonly AutoResetEvent MessageReceivedEvent = new AutoResetEvent(false);
private static readonly object SyncRoot = new object();
private volatile static CountdownEvent _messageReceivedEvent = new CountdownEvent(1);

public static CountdownEvent MessageReceivedEvent { get { return _messageReceivedEvent; } }

public static WaitHandle WaitHandle
{
get { return MessageReceivedEvent; }
get
{
lock (SyncRoot)
{
return _messageReceivedEvent.WaitHandle;
}
}
}

public string Name
Expand All @@ -41,9 +50,21 @@ public string Name

public Task HandleMessage(object content, IMessageContext messageContext, CancellationToken cancellationToken)
{
MessageReceivedEvent.Set();
messageContext.Acknowledge();
lock (SyncRoot)
{
_messageReceivedEvent.Signal();
}
return Task.FromResult(true);
}

public static void Reset()
{
lock (SyncRoot)
{
if (_messageReceivedEvent != null) _messageReceivedEvent.Dispose();
_messageReceivedEvent = new CountdownEvent(1);
}
}
}
}
Loading

0 comments on commit 2419b54

Please sign in to comment.