Skip to content

Commit

Permalink
Clean up and reformat
Browse files Browse the repository at this point in the history
Signed-off-by: Victor Chang <[email protected]>
  • Loading branch information
mocsharp committed Sep 12, 2023
1 parent 50f2f18 commit 42bd190
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 26 deletions.
1 change: 1 addition & 0 deletions src/Messaging/Common/ExportRequestType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Monai.Deploy.Messaging.Common
{
public enum ExportRequestType
Expand Down
6 changes: 6 additions & 0 deletions src/Messaging/Events/WorkflowRequestEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,34 @@ public override bool Equals(object? obj)
DataService.Equals(dataOrigin.DataService);
}
}

public enum DataService
{
/// <summary>
/// Unknown data service
/// </summary>
Unknown,

/// <summary>
/// Data received via DIMSE services
/// </summary>
DIMSE,

/// <summary>
/// Data received via DICOMWeb services
/// </summary>
DicomWeb,

/// <summary>
/// Data received via FHIR services
/// </summary>
FHIR,

/// <summary>
/// Data received via HL7 services
/// </summary>
HL7,

/// <summary>
/// Data received via ACR API call
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions src/Messaging/Tests/IServiceCollectionExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,10 @@ internal class GoodSubscriberService : IMessageBrokerSubscriberService
public string Name => throw new NotImplementedException();

#pragma warning disable CS0067 // The event 'GoodSubscriberService.OnConnectionError' is never used

// event used by users of this library
public event ConnectionErrorHandler? OnConnectionError;

#pragma warning restore CS0067 // The event 'GoodSubscriberService.OnConnectionError' is never used

public void Acknowledge(MessageBase message) => throw new NotImplementedException();
Expand Down
1 change: 0 additions & 1 deletion src/Messaging/Tests/TaskCallbackEventTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

namespace Monai.Deploy.Messaging.Tests
{

public class TaskCallbackEventTest
{
[Fact(DisplayName = "Validation throws on error")]
Expand Down
4 changes: 0 additions & 4 deletions src/Messaging/Tests/WorkflowRequestMessageTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,24 @@ public void ConvertsJsonMessageToMessage()
DataService = DataService.DicomWeb,
Source = Guid.NewGuid().ToString(),
Destination = Guid.NewGuid().ToString(),

});
input.DataOrigins.Add(new DataOrigin
{
DataService = DataService.FHIR,
Source = Guid.NewGuid().ToString(),
Destination = Guid.NewGuid().ToString(),

});
input.DataOrigins.Add(new DataOrigin
{
DataService = DataService.DIMSE,
Source = Guid.NewGuid().ToString(),
Destination = Guid.NewGuid().ToString(),

});
input.DataOrigins.Add(new DataOrigin
{
DataService = DataService.HL7,
Source = Guid.NewGuid().ToString(),
Destination = Guid.NewGuid().ToString(),

});

var files = new List<BlockStorageInfo>()
Expand Down
20 changes: 10 additions & 10 deletions src/Plugins/RabbitMQ/Logger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static partial class Logger
{
internal static readonly string LoggingScopeMessageApplication = "Message ID={0}. Application ID={1}.";

[LoggerMessage(EventId = 10000, Level = LogLevel.Information, Message = "Publishing message to {endpoint}/{virtualHost}. Exchange={exchange}, Routing Key={topic}.")]
[LoggerMessage(EventId = 10000, Level = LogLevel.Information, Message = "Publishing message to {endpoint}/{virtualHost}. Exchange: {exchange}, Topic: {topic}.")]
public static partial void PublshingRabbitMQ(this ILogger logger, string endpoint, string virtualHost, string exchange, string topic);

[LoggerMessage(EventId = 10001, Level = LogLevel.Debug, Message = "{ServiceName} connecting to {endpoint}/{virtualHost}.")]
Expand All @@ -32,32 +32,32 @@ public static partial class Logger
[LoggerMessage(EventId = 10002, Level = LogLevel.Information, Message = "Message received from queue {queue} for {topic}.")]
public static partial void MessageReceivedFromQueue(this ILogger logger, string queue, string topic);

[LoggerMessage(EventId = 10003, Level = LogLevel.Information, Message = "Listening for messages from {endpoint}/{virtualHost}. Exchange={exchange}, Queue={queue}, Routing Key={topic}.")]
[LoggerMessage(EventId = 10003, Level = LogLevel.Information, Message = "Listening for messages from {endpoint}/{virtualHost}. Exchange: {exchange}, Queue: {queue}, Topic: {topic}.")]
public static partial void SubscribeToRabbitMQQueue(this ILogger logger, string endpoint, string virtualHost, string exchange, string queue, string topic);

[LoggerMessage(EventId = 10004, Level = LogLevel.Information, Message = "Sending message acknowledgement for message {messageId}.")]
[LoggerMessage(EventId = 10004, Level = LogLevel.Information, Message = "Sending message acknowledgment. MessageId: {messageId}.")]
public static partial void SendingAcknowledgement(this ILogger logger, string messageId);

[LoggerMessage(EventId = 10005, Level = LogLevel.Information, Message = "Ackowledge sent for message {messageId}. Event Duration {durationMilliseconds}")]
[LoggerMessage(EventId = 10005, Level = LogLevel.Information, Message = "Acknowledgment sent. Message ID: {messageId}. Event Duration: {durationMilliseconds}")]
public static partial void AcknowledgementSent(this ILogger logger, string messageId, double durationMilliseconds);

[LoggerMessage(EventId = 10006, Level = LogLevel.Information, Message = "Sending nack message {messageId} and requeuing.")]
[LoggerMessage(EventId = 10006, Level = LogLevel.Information, Message = "Sending nack message. Message ID: {messageId} and requeuing.")]
public static partial void SendingNAcknowledgement(this ILogger logger, string messageId);

[LoggerMessage(EventId = 10007, Level = LogLevel.Information, Message = "Nack message sent for message {messageId}, requeue={requeue}.")]
[LoggerMessage(EventId = 10007, Level = LogLevel.Information, Message = "Nack message sent. Message ID: {messageId}. Requeue: @{requeue}.")]
public static partial void NAcknowledgementSent(this ILogger logger, string messageId, bool requeue);

[LoggerMessage(EventId = 10008, Level = LogLevel.Information, Message = "Closing connections.")]
public static partial void ClosingConnections(this ILogger logger);

[LoggerMessage(EventId = 10009, Level = LogLevel.Error, Message = "Invalid or corrupted message received: Queue={queueName}, Topic={topic}, Message ID={messageId}.")]
[LoggerMessage(EventId = 10009, Level = LogLevel.Error, Message = "Invalid or corrupted message received: Queue name: {queueName}. Topic: {topic}. Message ID: {messageId}.")]
public static partial void InvalidMessage(this ILogger logger, string queueName, string topic, string messageId, Exception ex);

[LoggerMessage(EventId = 10010, Level = LogLevel.Error, Message = "Exception not handled by the subscriber's callback function: Queue={queueName}, Topic={topic}, Message ID={messageId}.")]
[LoggerMessage(EventId = 10010, Level = LogLevel.Error, Message = "Exception not handled by the subscriber's callback function: Queue name: {queueName}. Topic: {topic}. Message ID: {messageId}.")]
public static partial void ErrorNotHandledByCallback(this ILogger logger, string queueName, string topic, string messageId, Exception ex);

[LoggerMessage(EventId = 10011, Level = LogLevel.Error, Message = "Exception thrown: Message ID={messageId}.")]
public static partial void Exception(this ILogger logger, string messageId, Exception ex);
[LoggerMessage(EventId = 10011, Level = LogLevel.Error, Message = "Error requeuing. Message ID: {messageId}.")]
public static partial void ErrorRequeue(this ILogger logger, string messageId, Exception ex);

[LoggerMessage(EventId = 10012, Level = LogLevel.Error, Message = "Health check failure.")]
public static partial void HealthCheckError(this ILogger logger, Exception ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ public Task Publish(string topic, Message message)

using var loggingScope = _logger.BeginScope(new LoggingDataDictionary<string, object>
{
["MessageId"] = message.MessageId,
["ApplicationId"] = message.ApplicationId,
["CorrelationId"] = message.CorrelationId
["@messageId"] = message.MessageId,
["@applicationId"] = message.ApplicationId,
["@correlationId"] = message.CorrelationId
});

_logger.PublshingRabbitMQ(_endpoint, _virtualHost, _exchange, topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ private EventingBasicConsumer CreateConsumer(Func<MessageReceivedEventArgs, Task
{
using var loggingScope = _logger.BeginScope(new LoggingDataDictionary<string, object>
{
["MessageId"] = eventArgs.BasicProperties.MessageId,
["ApplicationId"] = eventArgs.BasicProperties.AppId,
["CorrelationId"] = eventArgs.BasicProperties.CorrelationId,
["RecievedTime"] = DateTime.UtcNow
["@messageId"] = eventArgs.BasicProperties.MessageId,
["@applicationId"] = eventArgs.BasicProperties.AppId,
["@correlationId"] = eventArgs.BasicProperties.CorrelationId,
["@recievedTime"] = DateTime.UtcNow
});
_logger.MessageReceivedFromQueue(queueDeclareResult.QueueName, eventArgs.RoutingKey);
Expand Down Expand Up @@ -274,7 +274,7 @@ public void Acknowledge(MessageBase message)

using var loggingScope = _logger.BeginScope(new LoggingDataDictionary<string, object>
{
["EventDuration"] = eventDuration
["@eventDuration"] = eventDuration
});
_logger.AcknowledgementSent(message.MessageId, eventDuration);
}
Expand All @@ -291,7 +291,7 @@ public async Task RequeueWithDelay(MessageBase message)
}
catch (Exception e)
{
_logger.Exception($"Requeue delay failed.", e);
_logger.ErrorRequeue($"Requeue delay failed.", e);
Reject(message, true);
}
}
Expand Down Expand Up @@ -379,6 +379,7 @@ private static MessageReceivedEventArgs CreateMessage(string topic, BasicDeliver
deliveryTag: eventArgs.DeliveryTag.ToString(CultureInfo.InvariantCulture)),
CancellationToken.None);
}

private (bool exists, bool accessable) QueueExists(string queueName)
{
var testChannel = _rabbitMqConnectionFactory.MakeTempChannel(ChannelType.Subscriber, _endpoint, _username, _password, _virtualHost, _useSSL, _portNumber);
Expand Down
1 change: 0 additions & 1 deletion src/Plugins/RabbitMQ/Tests/Integration/ReliabilityTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ public void GivenMessages_WhenPublished_SubscribeShallReceiveAndAckMessages()
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
}


Parallel.For(0, MessageCount, new ParallelOptions { MaxDegreeOfParallelism = MaxDegreeOfParallelism }, i =>
{
var guid = Guid.NewGuid().ToString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,6 @@ await Task.Run(() =>
s_messageReceived = args.Message;
service.Acknowledge(args.Message);
}).ConfigureAwait(false);
});
};
Assert.Throws<OperationInterruptedException>(asyncAct);
Expand Down

0 comments on commit 42bd190

Please sign in to comment.