diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs index e88a99435b..6b20297998 100644 --- a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs @@ -1,5 +1,4 @@ using System; -using System.Diagnostics; using System.Threading.Tasks; using RabbitMQ.Client.Impl; @@ -80,8 +79,9 @@ public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, b IReadOnlyBasicProperties properties, ReadOnlyMemory body) { var deliverEventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); + // No need to call base, it's empty. - return BasicDeliverWrapper(deliverEventArgs); + return _receivedWrapper.InvokeAsync(this, deliverEventArgs); } ///Fires the Shutdown event. @@ -95,13 +95,5 @@ await _shutdownWrapper.InvokeAsync(this, reason) .ConfigureAwait(false); } } - - private async Task BasicDeliverWrapper(BasicDeliverEventArgs eventArgs) - { - using (Activity? activity = RabbitMQActivitySource.Deliver(eventArgs)) - { - await _receivedWrapper.InvokeAsync(this, eventArgs).ConfigureAwait(false); - } - } } } diff --git a/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs index 3d2009a84c..5fe1307e2d 100644 --- a/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs @@ -30,7 +30,6 @@ //--------------------------------------------------------------------------- using System; -using System.Diagnostics; using System.Threading.Tasks; namespace RabbitMQ.Client.Events @@ -89,13 +88,10 @@ public override void HandleBasicConsumeOk(string consumerTag) public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body) { - BasicDeliverEventArgs eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); - using (Activity? activity = RabbitMQActivitySource.SubscriberHasListeners ? RabbitMQActivitySource.Deliver(eventArgs) : default) - { - await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body) - .ConfigureAwait(false); - Received?.Invoke(this, eventArgs); - } + var eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); + await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body) + .ConfigureAwait(false); + Received?.Invoke(this, eventArgs); } ///Fires the Shutdown event. diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs index 7140a54b77..8bd9384f7d 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Threading.Tasks; using RabbitMQ.Client.Events; using RabbitMQ.Client.Impl; @@ -24,23 +25,35 @@ protected override async Task ProcessChannelAsync() { try { - Task task = work.WorkType switch + switch (work.WorkType) { - WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver( - work.ConsumerTag!, work.DeliveryTag, work.Redelivered, - work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory), - - WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag!), - - WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag!), - - WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag!), - - WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason!), - - _ => Task.CompletedTask - }; - await task.ConfigureAwait(false); + case WorkType.Deliver: + using (Activity? activity = RabbitMQActivitySource.Deliver(work.RoutingKey!, work.Exchange!, + work.DeliveryTag, work.BasicProperties!, work.Body.Size)) + { + await work.AsyncConsumer.HandleBasicDeliver( + work.ConsumerTag!, work.DeliveryTag, work.Redelivered, + work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory) + .ConfigureAwait(false); + } + break; + case WorkType.Cancel: + await work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag!) + .ConfigureAwait(false); + break; + case WorkType.CancelOk: + await work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag!) + .ConfigureAwait(false); + break; + case WorkType.ConsumeOk: + await work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag!) + .ConfigureAwait(false); + break; + case WorkType.Shutdown: + await work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason!) + .ConfigureAwait(false); + break; + } } catch (Exception e) { diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs index 3023fd0cc1..6247a59b43 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Threading.Tasks; using RabbitMQ.Client.Events; using RabbitMQ.Client.Impl; @@ -29,10 +30,14 @@ protected override async Task ProcessChannelAsync() switch (work.WorkType) { case WorkType.Deliver: - await consumer.HandleBasicDeliverAsync( - consumerTag, work.DeliveryTag, work.Redelivered, - work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory) - .ConfigureAwait(false); + using (Activity? activity = RabbitMQActivitySource.Deliver(work.RoutingKey!, work.Exchange!, + work.DeliveryTag, work.BasicProperties!, work.Body.Size)) + { + await consumer.HandleBasicDeliverAsync( + consumerTag, work.DeliveryTag, work.Redelivered, + work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory) + .ConfigureAwait(false); + } break; case WorkType.Cancel: consumer.HandleBasicCancel(consumerTag); diff --git a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs index e2ed18e142..66a3830932 100644 --- a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs +++ b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs @@ -5,7 +5,6 @@ using System.Net.Sockets; using System.Reflection; using System.Text; -using RabbitMQ.Client.Events; using RabbitMQ.Client.Impl; namespace RabbitMQ.Client @@ -46,7 +45,6 @@ public static class RabbitMQActivitySource public static bool UseRoutingKeyAsOperationName { get; set; } = true; internal static bool PublisherHasListeners => s_publisherSource.HasListeners(); - internal static bool SubscriberHasListeners => s_subscriberSource.HasListeners(); internal static readonly IEnumerable> CreationTags = new[] { @@ -120,7 +118,8 @@ public static class RabbitMQActivitySource return activity; } - internal static Activity? Deliver(BasicDeliverEventArgs deliverEventArgs) + internal static Activity? Deliver(string routingKey, string exchange, ulong deliveryTag, + IReadOnlyBasicProperties basicProperties, int bodySize) { if (!s_subscriberSource.HasListeners()) { @@ -129,13 +128,12 @@ public static class RabbitMQActivitySource // Extract the PropagationContext of the upstream parent from the message headers. Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity( - UseRoutingKeyAsOperationName ? $"{deliverEventArgs.RoutingKey} deliver" : "deliver", - ActivityKind.Consumer, ContextExtractor(deliverEventArgs.BasicProperties)); + UseRoutingKeyAsOperationName ? $"{routingKey} deliver" : "deliver", + ActivityKind.Consumer, ContextExtractor(basicProperties)); if (activity != null && activity.IsAllDataRequested) { - PopulateMessagingTags("deliver", deliverEventArgs.RoutingKey, deliverEventArgs.Exchange, - deliverEventArgs.DeliveryTag, deliverEventArgs.BasicProperties, deliverEventArgs.Body.Length, - activity); + PopulateMessagingTags("deliver", routingKey, exchange, + deliveryTag, basicProperties, bodySize, activity); } return activity;