Skip to content

Commit

Permalink
feat: support shared sessions in mocked MQTT broker
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed Mar 4, 2022
1 parent f8f816b commit b6894f6
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup Label="Package information">
<BaseVersionSuffix>-beta.6</BaseVersionSuffix>
<BaseVersionSuffix></BaseVersionSuffix>
<BaseVersion>3.6.0$(BaseVersionSuffix)</BaseVersion>
<DatabasePackagesRevision>1</DatabasePackagesRevision>
<DatabasePackagesVersionSuffix>$(BaseVersionSuffix)</DatabasePackagesVersionSuffix>
Expand Down
3 changes: 2 additions & 1 deletion docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ uid: releases

# Releases

## [3.6.0-beta.6](https://github.com/BEagle1984/silverback/releases/tag/v3.6.0-beta.6)
## [3.6.0](https://github.com/BEagle1984/silverback/releases/tag/v3.6.0)

### What's new

Expand All @@ -14,6 +14,7 @@ uid: releases
* Add overload for `Publish` method in the error policies that forwards the exception as well as the envelope
* Throw `TimeoutException` from <xref:Silverback.Testing.KafkaTestingHelper> and <xref:Silverback.Testing.MqttTestingHelper>
* Improve MQTT connection related logs (info for successful reconnect and add broker name to log messages)
* Support shared sessions in mocked MQTT broker

### Fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ internal sealed class ClientSession : IDisposable, IClientSession
{
private readonly IMqttApplicationMessageReceivedHandler _messageHandler;

private readonly SharedSubscriptionsManager _sharedSubscriptionsManager;

private readonly Channel<MqttApplicationMessage> _channel =
Channel.CreateUnbounded<MqttApplicationMessage>();

Expand All @@ -33,10 +35,14 @@ internal sealed class ClientSession : IDisposable, IClientSession

public ClientSession(
IMqttClientOptions clientOptions,
IMqttApplicationMessageReceivedHandler messageHandler)
IMqttApplicationMessageReceivedHandler messageHandler,
SharedSubscriptionsManager sharedSubscriptionsManager)
{
ClientOptions = Check.NotNull(clientOptions, nameof(clientOptions));
_messageHandler = Check.NotNull(messageHandler, nameof(messageHandler));
_sharedSubscriptionsManager = Check.NotNull(
sharedSubscriptionsManager,
nameof(sharedSubscriptionsManager));
}

public IMqttClientOptions ClientOptions { get; }
Expand Down Expand Up @@ -81,7 +87,7 @@ public void Subscribe(IReadOnlyCollection<string> topics)
if (_subscriptions.Any(subscription => subscription.Topic == topic))
continue;

_subscriptions.Add(new Subscription(ClientOptions, topic));
_subscriptions.Add(new Subscription(ClientOptions, topic, _sharedSubscriptionsManager));
}
}
}
Expand All @@ -98,7 +104,7 @@ public async ValueTask PushAsync(MqttApplicationMessage message, IMqttClientOpti
{
lock (_subscriptions)
{
if (_subscriptions.All(subscription => !subscription.IsMatch(message.Topic, clientOptions)))
if (_subscriptions.All(subscription => !subscription.IsMatch(message, clientOptions)))
return;
}

Expand Down Expand Up @@ -140,18 +146,60 @@ private async Task ReadChannelAsync(CancellationToken cancellationToken)

private sealed class Subscription
{
public Subscription(IMqttClientOptions clientOptions, string topic)
private readonly SharedSubscriptionsManager _sharedSubscriptionsManager;

public Subscription(
IMqttClientOptions clientOptions,
string topic,
SharedSubscriptionsManager sharedSubscriptionsManager)
{
_sharedSubscriptionsManager = sharedSubscriptionsManager;

if (IsSharedSubscription(topic, out string? group, out string? actualTopic))
{
sharedSubscriptionsManager.Add(group);
topic = actualTopic;
Group = group;
}

Topic = topic;

Regex = GetSubscriptionRegex(topic, clientOptions);
}

public string Topic { get; }

public string? Group { get; }

public Regex Regex { get; }

public bool IsMatch(string topic, IMqttClientOptions clientOptions) =>
Regex.IsMatch(GetFullTopicName(topic, clientOptions));
public bool IsMatch(MqttApplicationMessage message, IMqttClientOptions clientOptions)
{
if (!Regex.IsMatch(GetFullTopicName(message.Topic, clientOptions)))
return false;

return Group == null || _sharedSubscriptionsManager.IsFirstMatch(Group, message);
}

private static bool IsSharedSubscription(
string topic,
[NotNullWhen(true)] out string? group,
[NotNullWhen(true)] out string? actualTopic)
{
const string sharedSubscriptionPrefix = "$share/";
if (topic.StartsWith(sharedSubscriptionPrefix, StringComparison.Ordinal))
{
group = topic.Substring(
sharedSubscriptionPrefix.Length,
topic.IndexOf('/', sharedSubscriptionPrefix.Length));
actualTopic = topic.Substring(topic.IndexOf('/', sharedSubscriptionPrefix.Length) + 1);
return true;
}

group = null;
actualTopic = null;
return false;
}

private static Regex GetSubscriptionRegex(string topic, IMqttClientOptions clientOptions)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ internal sealed class InMemoryMqttBroker : IInMemoryMqttBroker, IDisposable

private readonly Dictionary<string, List<MqttApplicationMessage>> _messagesByTopic = new();

private readonly SharedSubscriptionsManager _sharedSubscriptionsManager = new();

[SuppressMessage(
"ReSharper",
"InconsistentlySynchronizedField",
Expand All @@ -42,8 +44,8 @@ public void Connect(IMqttClientOptions clientOptions, IMqttApplicationMessageRec
{
Disconnect(clientOptions.ClientId);

if (!_sessions.TryGetValue(clientOptions.ClientId, out var session))
session = new ClientSession(clientOptions, handler);
if (!_sessions.TryGetValue(clientOptions.ClientId, out ClientSession? session))
session = new ClientSession(clientOptions, handler, _sharedSubscriptionsManager);

_sessions.Add(clientOptions.ClientId, session);
session.Connect();
Expand Down Expand Up @@ -100,7 +102,8 @@ public Task PublishAsync(

StoreMessage(message);

return _sessions.Values.ForEachAsync(session => session.PushAsync(message, clientOptions).AsTask());
return _sessions.Values.ForEachAsync(
session => session.PushAsync(message, clientOptions).AsTask());
}

[SuppressMessage(
Expand All @@ -112,9 +115,9 @@ public async Task WaitUntilAllMessagesAreConsumedAsync(CancellationToken cancell
while (!cancellationToken.IsCancellationRequested)
{
if (_sessions.Values.All(
session => session.PendingMessagesCount == 0 ||
session.IsConsumerDisconnected ||
!session.IsConnected))
session => session.PendingMessagesCount == 0 ||
session.IsConsumerDisconnected ||
!session.IsConnected))
return;

await Task.Delay(10, cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2020 Sergio Aquilini
// This code is licensed under MIT license (see LICENSE file for details)

using System.Collections.Generic;
using MQTTnet;

namespace Silverback.Messaging.Broker.Mqtt.Mocks
{
internal class SharedSubscriptionsManager
{
private readonly Dictionary<string, int> _groups = new();

private readonly Dictionary<string, Dictionary<object, int>> _counters = new();

public void Add(string group)
{
lock (_groups)
{
if (_groups.ContainsKey(group))
{
_groups[group]++;
}
else
{
_groups[group] = 0;
_counters[group] = new Dictionary<object, int>();
}
}
}

public bool IsFirstMatch(string group, MqttApplicationMessage message)
{
lock (_counters)
{
Dictionary<object, int> counter = _counters[group];

if (counter.ContainsKey(message))
{
counter[message]++;
if (counter[message] >= _groups[group])
counter.Remove(message);

return false;
}

counter[message] = 1;
return true;
}
}
}
}
154 changes: 154 additions & 0 deletions tests/Silverback.Integration.Tests.E2E/Mqtt/SubscriptionTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright (c) 2020 Sergio Aquilini
// This code is licensed under MIT license (see LICENSE file for details)

using System;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using MQTTnet.Formatter;
using Silverback.Messaging;
using Silverback.Messaging.Messages;
using Silverback.Messaging.Publishing;
using Silverback.Tests.Integration.E2E.TestHost;
using Silverback.Tests.Integration.E2E.TestTypes.Messages;
using Xunit;
using Xunit.Abstractions;

namespace Silverback.Tests.Integration.E2E.Mqtt
{
public class SubscriptionTests : MqttTestFixture
{
public SubscriptionTests(ITestOutputHelper testOutputHelper)
: base(testOutputHelper)
{
}

[Fact]
public async Task Subscription_SingleLevelWildcard_MessagesConsumed()
{
Host.ConfigureServices(
services => services
.AddLogging()
.AddSilverback()
.UseModel()
.WithConnectionToMessageBroker(options => options.AddMockedMqtt())
.AddMqttEndpoints(
endpoints => endpoints
.Configure(
config => config
.WithClientId("e2e-test")
.ConnectViaTcp("e2e-mqtt-broker"))
.AddOutbound<TestEventOne>(endpoint => endpoint.ProduceTo("world/news"))
.AddOutbound<TestEventTwo>(
endpoint => endpoint.ProduceTo("world/europe/news"))
.AddOutbound<TestEventThree>(
endpoint => endpoint.ProduceTo("world/europe/switzerland/news"))
.AddInbound(
endpoint => endpoint
.ConsumeFrom("world/+/news")))
.AddIntegrationSpyAndSubscriber())
.Run();

var publisher = Host.ScopedServiceProvider.GetRequiredService<IEventPublisher>();

await publisher.PublishAsync(new TestEventOne());
await publisher.PublishAsync(new TestEventTwo());
await publisher.PublishAsync(new TestEventThree());

await Helper.WaitUntilAllMessagesAreConsumedAsync();

Helper.Spy.OutboundEnvelopes.Should().HaveCount(3);
Helper.Spy.InboundEnvelopes.Should().HaveCount(1);
Helper.Spy.InboundEnvelopes[0].Message.Should().BeOfType<TestEventTwo>();
}

[Fact]
public async Task Subscription_MultiLevelWildcard_MessagesConsumed()
{
Host.ConfigureServices(
services => services
.AddLogging()
.AddSilverback()
.UseModel()
.WithConnectionToMessageBroker(options => options.AddMockedMqtt())
.AddMqttEndpoints(
endpoints => endpoints
.Configure(
config => config
.WithClientId("e2e-test")
.ConnectViaTcp("e2e-mqtt-broker"))
.AddOutbound<TestEventOne>(endpoint => endpoint.ProduceTo("world/news"))
.AddOutbound<TestEventTwo>(
endpoint => endpoint.ProduceTo("world/europe/news"))
.AddOutbound<TestEventThree>(
endpoint => endpoint.ProduceTo("world/europe/switzerland/news"))
.AddInbound(
endpoint => endpoint
.ConsumeFrom("world/#/news")))
.AddIntegrationSpyAndSubscriber())
.Run();

var publisher = Host.ScopedServiceProvider.GetRequiredService<IEventPublisher>();

await publisher.PublishAsync(new TestEventOne());
await publisher.PublishAsync(new TestEventTwo());
await publisher.PublishAsync(new TestEventThree());

await Helper.WaitUntilAllMessagesAreConsumedAsync();

Helper.Spy.OutboundEnvelopes.Should().HaveCount(3);
Helper.Spy.InboundEnvelopes.Should().HaveCount(2);
Helper.Spy.InboundEnvelopes[0].Message.Should().BeOfType<TestEventTwo>();
Helper.Spy.InboundEnvelopes[1].Message.Should().BeOfType<TestEventThree>();
}

[Fact]
public async Task Subscription_Shared_MessagesConsumedOnce()
{
Host.ConfigureServices(
services => services
.AddLogging()
.AddSilverback()
.UseModel()
.WithConnectionToMessageBroker(options => options.AddMockedMqtt())
.AddMqttEndpoints(
endpoints => endpoints
.Configure(
config => config
.WithClientId("e2e-test")
.ConnectViaTcp("e2e-mqtt-broker"))
.AddOutbound<IIntegrationEvent>(
endpoint => endpoint.ProduceTo(DefaultTopicName))
.AddInbound(
endpoint => endpoint
.Configure(config => config.WithClientId("consumer-1"))
.ConsumeFrom("$share/group/" + DefaultTopicName))
.AddInbound(
endpoint => endpoint
.Configure(config => config.WithClientId("consumer-2"))
.ConsumeFrom("$share/group/" + DefaultTopicName)))
.AddIntegrationSpyAndSubscriber())
.Run();

var publisher = Host.ScopedServiceProvider.GetRequiredService<IEventPublisher>();

for (int i = 1; i <= 15; i++)
{
await publisher.PublishAsync(
new TestEventOne
{
Content = $"{i}"
});
}

await Helper.WaitUntilAllMessagesAreConsumedAsync();

Helper.Spy.OutboundEnvelopes.Should().HaveCount(15);
Helper.Spy.InboundEnvelopes.Should().HaveCount(15);
Helper.Spy.InboundEnvelopes
.Select(envelope => ((TestEventOne)envelope.Message!).Content)
.Should().BeEquivalentTo(Enumerable.Range(1, 15).Select(i => $"{i}"));
}
}
}

0 comments on commit b6894f6

Please sign in to comment.