Skip to content

Commit

Permalink
Merge pull request #290 from Project-MONAI/271-nds
Browse files Browse the repository at this point in the history
allowing prefectCount to be configurable
  • Loading branch information
neildsouth authored Jan 31, 2024
2 parents 054a0e9 + 1da9a3c commit 4e46905
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ jobs:
run: license_finder -r

- name: Check License Header
uses: apache/skywalking-eyes@v0.5.0
uses: apache/skywalking-eyes@v0.4.0

unit-test:
runs-on: ubuntu-latest
Expand Down
6 changes: 3 additions & 3 deletions doc/dependency_decisions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,14 @@
- :who: mocsharp
:why: MIT ( https://licenses.nuget.org/MIT)
:versions:
- 8.2.0
- 8.2.1
:when: 2022-11-09 18:57:32.294717110 Z
- - :approve
- Polly.Core
- :who: mocsharp
:why: MIT ( https://licenses.nuget.org/MIT)
:versions:
- 8.2.0
- 8.2.1
:when: 2022-11-09 18:57:32.294717110 Z
- - :approve
- RabbitMQ.Client
Expand Down Expand Up @@ -803,5 +803,5 @@
- :who: mocsharp
:why: MIT ( https://licenses.nuget.org/MIT)
:versions:
- 2.5.5
- 2.5.6
:when: 2022-08-16 21:40:32.294717110 Z
5 changes: 3 additions & 2 deletions src/Plugins/RabbitMQ/ConfigurationKeys.cs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ internal static class ConfigurationKeys
public static readonly string RequeueDelay = "requeueDelay";
public static readonly string UseSSL = "useSSL";
public static readonly string Port = "port";
public static readonly string[] PublisherRequiredKeys = new[] { EndPoint, Username, Password, VirtualHost, Exchange };
public static readonly string[] SubscriberRequiredKeys = new[] { EndPoint, Username, Password, VirtualHost, Exchange, DeadLetterExchange, DeliveryLimit, RequeueDelay };
public static readonly string[] PublisherRequiredKeys = [EndPoint, Username, Password, VirtualHost, Exchange];
public static readonly string[] SubscriberRequiredKeys = [EndPoint, Username, Password, VirtualHost, Exchange, DeadLetterExchange, DeliveryLimit, RequeueDelay];
public static readonly string PrefetchCount = "prefetchCount";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class RabbitMQMessageSubscriberService : IMessageBrokerSubscriberService
private readonly string _portNumber;
private IModel? _channel;
private bool _disposedValue;
private readonly ushort _prefetchCount = 1;

public event ConnectionErrorHandler? OnConnectionError;

Expand All @@ -72,19 +73,23 @@ public RabbitMQMessageSubscriberService(IOptions<MessageBrokerServiceConfigurati
_deadLetterExchange = configuration.SubscriberSettings[ConfigurationKeys.DeadLetterExchange];
_deliveryLimit = int.Parse(configuration.SubscriberSettings[ConfigurationKeys.DeliveryLimit], NumberFormatInfo.InvariantInfo);
_requeueDelay = int.Parse(configuration.SubscriberSettings[ConfigurationKeys.RequeueDelay], NumberFormatInfo.InvariantInfo);
if (configuration.SubscriberSettings.TryGetValue(ConfigurationKeys.PrefetchCount, out var value))
{
_prefetchCount = ushort.Parse(value ?? "1", NumberFormatInfo.InvariantInfo);
}

if (configuration.SubscriberSettings.ContainsKey(ConfigurationKeys.UseSSL))
if (configuration.SubscriberSettings.TryGetValue(ConfigurationKeys.UseSSL, out var sslValue))
{
_useSSL = configuration.SubscriberSettings[ConfigurationKeys.UseSSL];
_useSSL = sslValue;
}
else
{
_useSSL = string.Empty;
}

if (configuration.SubscriberSettings.ContainsKey(ConfigurationKeys.Port))
if (configuration.SubscriberSettings.TryGetValue(ConfigurationKeys.Port, out var portValue))
{
_portNumber = configuration.SubscriberSettings[ConfigurationKeys.Port];
_portNumber = portValue;
}
else
{
Expand Down Expand Up @@ -112,7 +117,7 @@ private void CreateChannel()
_channel = _rabbitMqConnectionFactory.CreateChannel(ChannelType.Subscriber, _endpoint, _username, _password, _virtualHost, _useSSL, _portNumber) ?? throw new ServiceException("Failed to create a new channel to RabbitMQ");
_channel.ExchangeDeclare(_exchange, ExchangeType.Topic, durable: true, autoDelete: false);
_channel.ExchangeDeclare(_deadLetterExchange, ExchangeType.Topic, durable: true, autoDelete: false);
_channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
_channel.BasicQos(prefetchSize: 0, prefetchCount: _prefetchCount, global: false);
_channel.ModelShutdown += Channel_ModelShutdown;
_logger.ConnectedToRabbitMQ(Name, _endpoint, _virtualHost);
});
Expand Down Expand Up @@ -234,15 +239,15 @@ private QueueDeclareOk DeclareQueues(string[] topics, string queue, ushort prefe

var queueDeclareResult = _channel!.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments);

var deadLetterExists = QueueExists(deadLetterQueue);
if (deadLetterExists.exists == false)
var (exists, accessable) = QueueExists(deadLetterQueue);
if (exists == false)
{
_channel.QueueDeclare(queue: deadLetterQueue, durable: true, exclusive: false, autoDelete: false);
}

try
{
BindToRoutingKeys(topics, queueDeclareResult.QueueName, deadLetterExists.accessable ? deadLetterQueue : "");
BindToRoutingKeys(topics, queueDeclareResult.QueueName, accessable ? deadLetterQueue : "");
_channel.BasicQos(0, prefetchCount, false);
}
catch (OperationInterruptedException operationInterruptedException)
Expand Down
6 changes: 3 additions & 3 deletions src/Plugins/RabbitMQ/Tests/Integration/ReliabilityTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public ReliabilityTest()
_loggerSubscriber = new Mock<ILogger<RabbitMQMessageSubscriberService>>();
_factory = new RabbitMQConnectionFactory(_logger.Object);

_publishers = new List<IMessageBrokerPublisherService>();
_subscribers = new List<IMessageBrokerSubscriberService>();
_publishers = [];
_subscribers = [];

_options = Options.Create(new MessageBrokerServiceConfiguration());
_options.Value.PublisherSettings[ConfigurationKeys.EndPoint] = RabbitMqConnection.HostName;
Expand All @@ -72,7 +72,7 @@ public ReliabilityTest()
_options.Value.SubscriberSettings[ConfigurationKeys.DeliveryLimit] = RabbitMqConnection.DeliveryLimit;
_options.Value.SubscriberSettings[ConfigurationKeys.RequeueDelay] = RabbitMqConnection.RequeueDelay;

_topics = new List<string>();
_topics = [];
_messages = new ConcurrentDictionary<string, int>();
_random = new Random();
SetupTopics();
Expand Down

0 comments on commit 4e46905

Please sign in to comment.