diff --git a/src/Messaging/API/IMessageBrokerSubscriberService.cs b/src/Messaging/API/IMessageBrokerSubscriberService.cs
index 49c4613..e43de1e 100755
--- a/src/Messaging/API/IMessageBrokerSubscriberService.cs
+++ b/src/Messaging/API/IMessageBrokerSubscriberService.cs
@@ -31,30 +31,6 @@ public interface IMessageBrokerSubscriberService : IDisposable
///
string Name { get; }
- ///
- /// Subscribe to a message topic & queue and executes messageReceivedCallback for every message that is received.
- /// Either provide a topic, a queue or both.
- /// A queue is generated if the name of the queue is not provided.
- ///
- /// Topic/routing key to bind to
- /// Name of the queue to consume
- /// Action to be performed when message is received
- /// Number of unacknowledged messages to receive at once. Defaults to 0.
- [Obsolete("This method is obsolete, use SubscribeAsync instead")]
- void Subscribe(string topic, string queue, Action messageReceivedCallback, ushort prefetchCount = 0);
-
- ///
- /// Subscribe to a message topic & queue and executes messageReceivedCallback for every message that is received.
- /// Either provide a topic, a queue or both.
- /// A queue is generated if the name of the queue is not provided.
- ///
- /// Topics/routing keys to bind to
- /// Name of the queue to consume
- /// Action to be performed when message is received
- /// Number of unacknowledged messages to receive at once. Defaults to 0.
- [Obsolete("This method is obsolete, use SubscribeAsync instead")]
- void Subscribe(string[] topics, string queue, Action messageReceivedCallback, ushort prefetchCount = 0);
-
///
/// Subscribe to a message topic & queue and executes messageReceivedCallback asynchronously for every message that is received.
/// Either provide a topic, a queue or both.
diff --git a/src/Messaging/Common/ServiceException.cs b/src/Messaging/Common/ServiceException.cs
new file mode 100644
index 0000000..6bdaf30
--- /dev/null
+++ b/src/Messaging/Common/ServiceException.cs
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2023 MONAI Consortium
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Runtime.Serialization;
+
+namespace Monai.Deploy.Messaging.Common
+{
+ public class ServiceException : Exception
+ {
+ public ServiceException()
+ {
+ }
+
+ public ServiceException(string? message) : base(message)
+ {
+ }
+
+ public ServiceException(string? message, Exception? innerException) : base(message, innerException)
+ {
+ }
+
+ protected ServiceException(SerializationInfo info, StreamingContext context) : base(info, context)
+ {
+ }
+ }
+}
diff --git a/src/Messaging/Events/ExportCompleteEvent.cs b/src/Messaging/Events/ExportCompleteEvent.cs
index 5b96704..432b8b5 100755
--- a/src/Messaging/Events/ExportCompleteEvent.cs
+++ b/src/Messaging/Events/ExportCompleteEvent.cs
@@ -76,9 +76,9 @@ public ExportCompleteEvent()
public ExportCompleteEvent(ExportRequestEvent exportRequest, ExportStatus exportStatus, Dictionary fileStatuses)
{
- Guard.Against.Null(exportRequest);
- Guard.Against.Null(exportStatus);
- Guard.Against.Null(fileStatuses);
+ Guard.Against.Null(exportRequest, nameof(exportRequest));
+ Guard.Against.Null(exportStatus, nameof(exportStatus));
+ Guard.Against.Null(fileStatuses, nameof(fileStatuses));
WorkflowInstanceId = exportRequest.WorkflowInstanceId;
ExportTaskId = exportRequest.ExportTaskId;
diff --git a/src/Messaging/Monai.Deploy.Messaging.csproj b/src/Messaging/Monai.Deploy.Messaging.csproj
index edade87..70325b7 100644
--- a/src/Messaging/Monai.Deploy.Messaging.csproj
+++ b/src/Messaging/Monai.Deploy.Messaging.csproj
@@ -74,9 +74,9 @@
-
+
-
+
diff --git a/src/Messaging/Tests/Monai.Deploy.Messaging.Tests.csproj b/src/Messaging/Tests/Monai.Deploy.Messaging.Tests.csproj
index 35ddd1f..c1e398d 100644
--- a/src/Messaging/Tests/Monai.Deploy.Messaging.Tests.csproj
+++ b/src/Messaging/Tests/Monai.Deploy.Messaging.Tests.csproj
@@ -32,14 +32,14 @@
-
+
-
-
+
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
diff --git a/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs b/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs
index 5fcf592..9fa63cd 100755
--- a/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs
+++ b/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs
@@ -48,10 +48,10 @@ public RabbitMQConnectionFactory(ILogger logger)
public IModel CreateChannel(ChannelType type, string hostName, string username, string password, string virtualHost, string useSSL, string portNumber)
{
- Guard.Against.NullOrWhiteSpace(hostName);
- Guard.Against.NullOrWhiteSpace(username);
- Guard.Against.NullOrWhiteSpace(password);
- Guard.Against.NullOrWhiteSpace(virtualHost);
+ Guard.Against.NullOrWhiteSpace(hostName, nameof(hostName));
+ Guard.Against.NullOrWhiteSpace(username, nameof(username));
+ Guard.Against.NullOrWhiteSpace(password, nameof(password));
+ Guard.Against.NullOrWhiteSpace(virtualHost, nameof(virtualHost));
var key = $"{type}{hostName}{username}{HashPassword(password)}{virtualHost}";
@@ -154,7 +154,7 @@ private IConnection CreateConnectionOnly(string hostName, string username, strin
private static object HashPassword(string password)
{
- Guard.Against.NullOrWhiteSpace(password);
+ Guard.Against.NullOrWhiteSpace(password, nameof(password));
var sha256 = SHA256.Create();
var hash = sha256.ComputeHash(Encoding.UTF8.GetBytes(password));
return string.Join("", hash.Select(x => x.ToString("x2", CultureInfo.InvariantCulture)));
diff --git a/src/Plugins/RabbitMQ/Monai.Deploy.Messaging.RabbitMQ.csproj b/src/Plugins/RabbitMQ/Monai.Deploy.Messaging.RabbitMQ.csproj
index 85fa9c6..5451616 100644
--- a/src/Plugins/RabbitMQ/Monai.Deploy.Messaging.RabbitMQ.csproj
+++ b/src/Plugins/RabbitMQ/Monai.Deploy.Messaging.RabbitMQ.csproj
@@ -63,8 +63,8 @@
-
-
+
+
diff --git a/src/Plugins/RabbitMQ/Publisher/RabbitMqMessagePublisherService.cs b/src/Plugins/RabbitMQ/Publisher/RabbitMqMessagePublisherService.cs
index 5059ec9..cf49876 100755
--- a/src/Plugins/RabbitMQ/Publisher/RabbitMqMessagePublisherService.cs
+++ b/src/Plugins/RabbitMQ/Publisher/RabbitMqMessagePublisherService.cs
@@ -49,7 +49,7 @@ public RabbitMQMessagePublisherService(IOptions logger,
IRabbitMQConnectionFactory rabbitMqConnectionFactory)
{
- Guard.Against.Null(options);
+ Guard.Against.Null(options, nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_rabbitMqConnectionFactory = rabbitMqConnectionFactory ?? throw new ArgumentNullException(nameof(rabbitMqConnectionFactory));
@@ -83,7 +83,7 @@ public RabbitMQMessagePublisherService(IOptions configuration)
{
- Guard.Against.Null(configuration);
+ Guard.Against.Null(configuration, nameof(configuration));
foreach (var key in ConfigurationKeys.PublisherRequiredKeys)
{
@@ -96,8 +96,8 @@ internal static void ValidateConfiguration(Dictionary configurat
public Task Publish(string topic, Message message)
{
- Guard.Against.NullOrWhiteSpace(topic);
- Guard.Against.Null(message);
+ Guard.Against.NullOrWhiteSpace(topic, nameof(topic));
+ Guard.Against.Null(message, nameof(message));
using var loggingScope = _logger.BeginScope(new LoggingDataDictionary
{
diff --git a/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs b/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs
index e96603d..cc70e32 100755
--- a/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs
+++ b/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs
@@ -58,7 +58,7 @@ public RabbitMQMessageSubscriberService(IOptions logger,
IRabbitMQConnectionFactory rabbitMqConnectionFactory)
{
- Guard.Against.Null(options);
+ Guard.Against.Null(options, nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_rabbitMqConnectionFactory = rabbitMqConnectionFactory ?? throw new ArgumentNullException(nameof(rabbitMqConnectionFactory));
@@ -109,7 +109,7 @@ private void CreateChannel()
.Execute(() =>
{
_logger.ConnectingToRabbitMQ(Name, _endpoint, _virtualHost);
- _channel = _rabbitMqConnectionFactory.CreateChannel(ChannelType.Subscriber, _endpoint, _username, _password, _virtualHost, _useSSL, _portNumber);
+ _channel = _rabbitMqConnectionFactory.CreateChannel(ChannelType.Subscriber, _endpoint, _username, _password, _virtualHost, _useSSL, _portNumber) ?? throw new ServiceException("Failed to create a new channel to RabbitMQ");
_channel.ExchangeDeclare(_exchange, ExchangeType.Topic, durable: true, autoDelete: false);
_channel.ExchangeDeclare(_deadLetterExchange, ExchangeType.Topic, durable: true, autoDelete: false);
_channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
@@ -137,7 +137,7 @@ private void Channel_ModelShutdown(object? sender, ShutdownEventArgs e)
internal static void ValidateConfiguration(Dictionary configuration)
{
- Guard.Against.Null(configuration);
+ Guard.Against.Null(configuration, nameof(configuration));
foreach (var key in ConfigurationKeys.SubscriberRequiredKeys)
{
@@ -162,23 +162,14 @@ internal static void ValidateConfiguration(Dictionary configurat
throw new ConfigurationException($"{ConfigurationKeys.SubscriberServiceName} has int values of less than 1");
}
}
- [Obsolete("This method is obsolete, use SubscribeAsync instead")]
- public void Subscribe(string topic, string queue, Action messageReceivedCallback, ushort prefetchCount = 0)
- => Subscribe(new string[] { topic }, queue, messageReceivedCallback, prefetchCount);
-
- [Obsolete("This method is obsolete, use SubscribeAsync instead")]
- public void Subscribe(string[] topics, string queue, Action messageReceivedCallback, ushort prefetchCount = 0)
- {
- SubscribeAsync(topics, queue, new Func((args) => { messageReceivedCallback.Invoke(args); return Task.FromResult(0); }));
- }
public void SubscribeAsync(string topic, string queue, Func messageReceivedCallback, ushort prefetchCount = 0)
=> SubscribeAsync(new string[] { topic }, queue, messageReceivedCallback, prefetchCount);
public void SubscribeAsync(string[] topics, string queue, Func messageReceivedCallback, ushort prefetchCount = 0)
{
- Guard.Against.Null(topics);
- Guard.Against.Null(messageReceivedCallback);
+ Guard.Against.Null(topics, nameof(topics));
+ Guard.Against.Null(messageReceivedCallback, nameof(messageReceivedCallback));
var queueDeclareResult = DeclareQueues(topics, queue, prefetchCount);
var consumer = CreateConsumer(messageReceivedCallback, queueDeclareResult);
@@ -212,7 +203,7 @@ private EventingBasicConsumer CreateConsumer(Func
-
+
-
+
-
-
+
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
diff --git a/src/Plugins/RabbitMQ/Tests/Unit/RabbitMqMessageSubscriberServiceTest.cs b/src/Plugins/RabbitMQ/Tests/Unit/RabbitMqMessageSubscriberServiceTest.cs
index 72a369c..7e3412a 100755
--- a/src/Plugins/RabbitMQ/Tests/Unit/RabbitMqMessageSubscriberServiceTest.cs
+++ b/src/Plugins/RabbitMQ/Tests/Unit/RabbitMqMessageSubscriberServiceTest.cs
@@ -133,7 +133,7 @@ public void SubscribesToATopic()
var service = new RabbitMQMessageSubscriberService(_options, _logger.Object, _connectionFactory.Object);
- service.Subscribe("topic", "queue", (args) =>
+ service.SubscribeAsync("topic", "queue", async (args) =>
{
Assert.Equal(message.ApplicationId, args.Message.ApplicationId);
Assert.Equal(message.ContentType, args.Message.ContentType);
@@ -143,6 +143,7 @@ public void SubscribesToATopic()
Assert.Equal("topic", args.Message.MessageDescription);
Assert.Equal(message.MessageId, args.Message.MessageId);
Assert.Equal(message.Body, args.Message.Body);
+ await Task.CompletedTask.ConfigureAwait(false);
});
service.SubscribeAsync("topic", "queue", async (args) =>
@@ -240,7 +241,7 @@ public void SubscribesToATopicAndDeadLetterQueueIsDown()
var service = new RabbitMQMessageSubscriberService(_options, _logger.Object, _connectionFactory.Object);
- service.Subscribe("topic", "queue", (args) =>
+ service.SubscribeAsync("topic", "queue", async (args) =>
{
Assert.Equal(message.ApplicationId, args.Message.ApplicationId);
Assert.Equal(message.ContentType, args.Message.ContentType);
@@ -250,6 +251,7 @@ public void SubscribesToATopicAndDeadLetterQueueIsDown()
Assert.Equal("topic", args.Message.MessageDescription);
Assert.Equal(message.MessageId, args.Message.MessageId);
Assert.Equal(message.Body, args.Message.Body);
+ await Task.CompletedTask;
});
service.SubscribeAsync("topic", "queue", async (args) =>
@@ -349,7 +351,7 @@ public void SubscribesToATopicAndDeadLetterQueueSubscriptionFailsWithGenericExce
var act = () =>
{
- service.Subscribe("topic", "queue", (args) =>
+ service.SubscribeAsync("topic", "queue", async (args) =>
{
Assert.Equal(message.ApplicationId, args.Message.ApplicationId);
Assert.Equal(message.ContentType, args.Message.ContentType);
@@ -359,7 +361,7 @@ public void SubscribesToATopicAndDeadLetterQueueSubscriptionFailsWithGenericExce
Assert.Equal("topic", args.Message.MessageDescription);
Assert.Equal(message.MessageId, args.Message.MessageId);
Assert.Equal(message.Body, args.Message.Body);
-
+ await Task.CompletedTask;
});
};
Assert.Throws(act);