Skip to content

Commit

Permalink
#33 Implemented custom IEndpointCollection implementation for Loopbac…
Browse files Browse the repository at this point in the history
…kConfigration to support sending to arbitrary named endpoints
  • Loading branch information
sweetlandj committed May 2, 2018
1 parent 89394d8 commit f98f9c3
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 221 deletions.
68 changes: 68 additions & 0 deletions Source/Platibus.UnitTests/LoopbackTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Platibus.Config;
using Platibus.InMemory;
using Xunit;

namespace Platibus.UnitTests
{
public class LoopbackTests
{
protected LoopbackConfiguration Configuration = new LoopbackConfiguration();
protected object MessageContent = Guid.NewGuid().ToString();
protected SendOptions SendOptions = new SendOptions {ContentType = "text/plain"};
protected CancellationTokenSource CancellationSource = new CancellationTokenSource();

public LoopbackTests()
{
Configuration.MessageQueueingService = new InMemoryMessageQueueingService();
}

[Theory]
[InlineData("loopback")] // Standard loopback endpoint
[InlineData("undefined")] // Undefined/unknown endpoint
public async Task MessageIsHandledWhenSendingToNamedEndpoint(string endpointName)
{
var handlerTask = GivenHandler();
await WhenSendingToNamedEndpoint(endpointName);
var handledContent = await handlerTask;
Assert.Equal(MessageContent, handledContent);
}

[Theory]
[InlineData("urn:localhost/loopback")] // Standard loopback endpoint address
[InlineData("http://localhost/undefined")] // Undefined/unknown endpoint address
public async Task MessageIsHandledWhenSendingToEndpointAddress(string address)
{
var handlerTask = GivenHandler();
await WhenSendingToEndpointAddress(new Uri(address));
var handledContent = await handlerTask;
Assert.Equal(MessageContent, handledContent);
}

protected Task<object> GivenHandler()
{
var handlerCompletionSource = new TaskCompletionSource<object>();
CancellationSource.Token.Register(() => handlerCompletionSource.TrySetCanceled());
Configuration.AddHandlingRule<object>(".*", (content, ctx) =>
{
handlerCompletionSource.TrySetResult(content);
ctx.Acknowledge();
});
return handlerCompletionSource.Task;
}

protected async Task WhenSendingToNamedEndpoint(EndpointName endpointName)
{
var host = await LoopbackHost.Start(Configuration, CancellationSource.Token);
await host.Bus.Send(MessageContent, endpointName, SendOptions, CancellationSource.Token);
}

protected async Task WhenSendingToEndpointAddress(Uri endpointAddress)
{
var host = await LoopbackHost.Start(Configuration, CancellationSource.Token);
await host.Bus.Send(MessageContent, endpointAddress, SendOptions, CancellationSource.Token);
}
}
}
2 changes: 1 addition & 1 deletion Source/Platibus/Bus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public Bus(IPlatibusConfiguration configuration, Uri baseUri, ITransportService
_messageNamingService = configuration.MessageNamingService;
_serializationService = configuration.SerializationService;

_endpoints = new ReadOnlyEndpointCollection(configuration.Endpoints);
_endpoints = configuration.Endpoints ?? EndpointCollection.Empty;
_topics = configuration.Topics.ToList();
_sendRules = configuration.SendRules.ToList();
_handlingRules = configuration.HandlingRules.ToList();
Expand Down
86 changes: 22 additions & 64 deletions Source/Platibus/Config/LoopbackConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,90 +25,48 @@

namespace Platibus.Config
{
/// <inheritdoc cref="PlatibusConfiguration"/>
/// <inheritdoc cref="ILoopbackConfiguration"/>
/// <summary>
/// A loopback configuration
/// </summary>
public class LoopbackConfiguration : PlatibusConfiguration, ILoopbackConfiguration
{
private readonly EndpointName _loopbackEndpoint;
private static readonly EndpointName LoopbackEndpoint = "loopback";
private static readonly Uri LoopbackUri = new Uri("urn:localhost/loopback");

/// <summary>
/// The base URI of the loopback bus instance
/// </summary>
public Uri BaseUri { get; }
/// <inheritdoc />
public Uri BaseUri => LoopbackUri;

/// <summary>
/// The message queueing service to use
/// </summary>
/// <inheritdoc />
public IMessageQueueingService MessageQueueingService { get; set; }

/// <inheritdoc />
/// <summary>
/// Initializes a new <see cref="T:Platibus.Config.LoopbackConfiguration" />
/// </summary>
public LoopbackConfiguration() : this(null)
{
}

/// <inheritdoc />
/// <summary>
/// Initializes a new <see cref="LoopbackConfiguration"/> with a preconfigured
/// <paramref name="diagnosticService"/>
/// Initializes a new <see cref="T:Platibus.Config.LoopbackConfiguration" /> with a preconfigured
/// <paramref name="diagnosticService" />
/// </summary>
/// <param name="diagnosticService">(Optional) The service through which diagnostic events
/// are reported and processed</param>
public LoopbackConfiguration(IDiagnosticService diagnosticService = null) : base(diagnosticService)
public LoopbackConfiguration(IDiagnosticService diagnosticService) : base(diagnosticService, new LoopbackEndpoints(LoopbackEndpoint, LoopbackUri))
{
_loopbackEndpoint = new EndpointName("looback");
BaseUri = new Uri("urn:localhost/loopback");
base.AddEndpoint(_loopbackEndpoint, new Endpoint(BaseUri));
var allMessages = new MessageNamePatternSpecification(".*");
base.AddSendRule(new SendRule(allMessages, _loopbackEndpoint));
base.AddSendRule(new SendRule(allMessages, LoopbackEndpoint));
}

/// <summary>
/// Adds a topic to the configuration
/// </summary>
/// <param name="topic">The name of the topic</param>
/// <remarks>
/// Topics must be explicitly added in order to publish messages to them
/// </remarks>

/// <inheritdoc />
public override void AddTopic(TopicName topic)
{
base.AddTopic(topic);
base.AddSubscription(new Subscription(_loopbackEndpoint, topic));
}

/// <summary>
/// Adds a named endpoint to the configuration
/// </summary>
/// <param name="name">The name of the endpoint</param>
/// <param name="endpoint">The endpoint</param>
/// <remarks>
/// Not supported in loopback configurations
/// </remarks>
/// <exception cref="NotSupportedException">Always thrown</exception>
public override void AddEndpoint(EndpointName name, IEndpoint endpoint)
{
throw new NotSupportedException();
}

/// <summary>
/// Adds a rule governing to which endpoints messages will be sent
/// </summary>
/// <param name="sendRule">The send rule</param>
/// <remarks>
/// Not supported in loopback configurations
/// </remarks>
/// <exception cref="NotSupportedException">Always thrown</exception>
public override void AddSendRule(ISendRule sendRule)
{
throw new NotSupportedException();
}

/// <summary>
/// Adds a subscription to a local or remote topic
/// </summary>
/// <param name="subscription">The subscription</param>
/// <remarks>
/// Not supported in loopback configurations
/// </remarks>
/// <exception cref="NotSupportedException">Always thrown</exception>
public override void AddSubscription(ISubscription subscription)
{
throw new NotSupportedException();
AddSubscription(new Subscription(LoopbackEndpoint, topic));
}
}
}
35 changes: 31 additions & 4 deletions Source/Platibus/Config/PlatibusConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@

namespace Platibus.Config
{
/// <inheritdoc />
/// <summary>
/// Concrete mutable implementation of <see cref="IPlatibusConfiguration" /> used for
/// Concrete mutable implementation of <see cref="T:Platibus.Config.IPlatibusConfiguration" /> used for
/// programmatically configuring the bus.
/// </summary>
/// <remarks>
Expand All @@ -39,7 +40,7 @@ namespace Platibus.Config
/// </remarks>
public class PlatibusConfiguration : IPlatibusConfiguration
{
private readonly EndpointCollection _endpoints = new EndpointCollection();
private readonly EndpointCollection _endpoints;
private readonly IList<IHandlingRule> _handlingRules = new List<IHandlingRule>();
private readonly IList<ISendRule> _sendRules = new List<ISendRule>();
private readonly IList<ISubscription> _subscriptions = new List<ISubscription>();
Expand Down Expand Up @@ -81,23 +82,49 @@ public class PlatibusConfiguration : IPlatibusConfiguration
/// <inheritdoc />
public SendOptions DefaultSendOptions { get; set; }

/// <inheritdoc />
/// <summary>
/// Initializes a new <see cref="T:Platibus.Config.PlatibusConfiguration" />
/// </summary>
public PlatibusConfiguration() : this(null, null)
{
}

/// <inheritdoc />
/// <summary>
/// Initializes a new <see cref="T:Platibus.Config.PlatibusConfiguration" /> with a preconfigured
/// <paramref name="diagnosticService" />
/// </summary>
/// <param name="diagnosticService">(Optional) The service through which diagnostic events
/// are reported and processed</param>
/// <remarks>
/// If a custom <paramref name="diagnosticService" /> is not specified, then the default
/// singleton instance <see cref="F:Platibus.Diagnostics.DiagnosticService.DefaultInstance" /> will
/// be used.
/// </remarks>
public PlatibusConfiguration(IDiagnosticService diagnosticService) : this(diagnosticService, null)
{
}

/// <summary>
/// Initializes a new <see cref="PlatibusConfiguration"/> with a preconfigured
/// <paramref name="diagnosticService"/>
/// <paramref name="diagnosticService"/> and an initial set of <paramref name="endpoints"/>
/// </summary>
/// <param name="diagnosticService">(Optional) The service through which diagnostic events
/// are reported and processed</param>
/// <param name="endpoints">(Optional) An initial set of default endpoints</param>
/// <remarks>
/// If a custom <paramref name="diagnosticService"/> is not specified, then the default
/// singleton instance <see cref="Diagnostics.DiagnosticService.DefaultInstance"/> will
/// be used.
/// </remarks>
public PlatibusConfiguration(IDiagnosticService diagnosticService = null)
public PlatibusConfiguration(IDiagnosticService diagnosticService, EndpointCollection endpoints)
{
MessageNamingService = new DefaultMessageNamingService();
SerializationService = new DefaultSerializationService();
DefaultContentType = "application/json";
DiagnosticService = diagnosticService ?? Diagnostics.DiagnosticService.DefaultInstance;
_endpoints = endpoints ?? new EndpointCollection();
}

/// <summary>
Expand Down
38 changes: 11 additions & 27 deletions Source/Platibus/EndpointCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@

namespace Platibus
{
/// <inheritdoc />
/// <summary>
/// A mutable <see cref="IEndpointCollection"/> implementation
/// A mutable <see cref="T:Platibus.IEndpointCollection" /> implementation
/// </summary>
public class EndpointCollection : IEndpointCollection
{
public static readonly IEndpointCollection Empty = new EndpointCollection();

private readonly IDictionary<EndpointName, IEndpoint> _endpoints = new Dictionary<EndpointName, IEndpoint>();

/// <summary>
Expand All @@ -44,21 +47,15 @@ public class EndpointCollection : IEndpointCollection
/// or <paramref name="endpoint"/> are <c>null</c></exception>
/// <exception cref="EndpointAlreadyExistsException">Thrown if there is already an
/// endpoint with the specified <paramref name="endpointName"/></exception>
public void Add(EndpointName endpointName, IEndpoint endpoint)
public virtual void Add(EndpointName endpointName, IEndpoint endpoint)
{
if (endpointName == null) throw new ArgumentNullException(nameof(endpointName));
if (_endpoints.ContainsKey(endpointName)) throw new EndpointAlreadyExistsException(endpointName);
_endpoints[endpointName] = endpoint ?? throw new ArgumentNullException(nameof(endpoint));
}

/// <summary>
/// Returns the endpoint with the specified name
/// </summary>
/// <param name="endpointName">The name of the endpoint</param>
/// <returns>Returns the endpoint</returns>
/// <exception cref="EndpointNotFoundException">Thrown if there
/// is no endpoint with the specified name</exception>
public IEndpoint this[EndpointName endpointName]
/// <inheritdoc />
public virtual IEndpoint this[EndpointName endpointName]
{
get
{
Expand All @@ -70,23 +67,16 @@ public IEndpoint this[EndpointName endpointName]
}
}

/// <summary>
/// Tries to retrieve the endpoint with the specified address
/// </summary>
/// <param name="address">The address of the endpoint</param>
/// <param name="endpoint">An output parameter that will be initialied
/// with the endpoint if the endpoint is found</param>
/// <returns>Returns <c>true</c> if the endpoint is found; <c>false</c>
/// otherwise</returns>
public bool TryGetEndpointByAddress(Uri address, out IEndpoint endpoint)
/// <inheritdoc />
public virtual bool TryGetEndpointByAddress(Uri address, out IEndpoint endpoint)
{
var comparer = new EndpointAddressEqualityComparer();
endpoint = _endpoints.Values.FirstOrDefault(e => comparer.Equals(e.Address, address));
return endpoint != null;
}

/// <inheritdoc />
public bool Contains(EndpointName endpointName)
public virtual bool Contains(EndpointName endpointName)
{
return _endpoints.ContainsKey(endpointName);
}
Expand All @@ -96,13 +86,7 @@ IEnumerator IEnumerable.GetEnumerator()
return GetEnumerator();
}

/// <summary>
/// Returns an enumerator that iterates through the collection.
/// </summary>
/// <returns>
/// A <see cref="T:System.Collections.Generic.IEnumerator`1"/> that can be used to iterate through the collection.
/// </returns>
/// <filterpriority>1</filterpriority>
/// <inheritdoc />
public IEnumerator<KeyValuePair<EndpointName, IEndpoint>> GetEnumerator()
{
return _endpoints.GetEnumerator();
Expand Down
4 changes: 1 addition & 3 deletions Source/Platibus/Http/HttpTransportService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@ public HttpTransportService(HttpTransportServiceOptions options)
if (options == null) throw new ArgumentNullException(nameof(options));

_baseUri = options.BaseUri.WithTrailingSlash();
_endpoints = options.Endpoints == null
? ReadOnlyEndpointCollection.Empty
: new ReadOnlyEndpointCollection(options.Endpoints);
_endpoints = options.Endpoints = options.Endpoints ?? EndpointCollection.Empty;

_messageQueueingService = options.MessageQueueingService;
_messageJournal = options.MessageJournal;
Expand Down
33 changes: 33 additions & 0 deletions Source/Platibus/LoopbackEndpoints.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System;

namespace Platibus
{
internal class LoopbackEndpoints : EndpointCollection
{
private readonly Uri _baseUri;

public LoopbackEndpoints(EndpointName name, Uri baseUri)
{
_baseUri = baseUri ?? throw new ArgumentNullException(nameof(baseUri));
base.Add(name, new Endpoint(baseUri));
}

public override void Add(EndpointName endpointName, IEndpoint endpoint)
{
}

public override IEndpoint this[EndpointName endpointName] => new Endpoint(_baseUri);

public override bool TryGetEndpointByAddress(Uri address, out IEndpoint endpoint)
{
endpoint = new Endpoint(_baseUri);
return true;
}

public override bool Contains(EndpointName endpointName)
{
return true;
}

}
}
Loading

0 comments on commit f98f9c3

Please sign in to comment.