Skip to content

Commit

Permalink
First implementation of adding ActivitySource to common operations to…
Browse files Browse the repository at this point in the history
… enable OpenTelemetry scenarios.

Linking existing context for publish if one exists

dotnet format

Adding standard tags without allocating.

Updating code after comments

Moving ActivitySource tests to Integration
  • Loading branch information
stebet authored and lukebakken committed Jan 25, 2024
1 parent 52add14 commit d34160b
Show file tree
Hide file tree
Showing 17 changed files with 827 additions and 31 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*.orig
*.log
_site/

Expand Down Expand Up @@ -28,6 +29,11 @@ test.sh
test-output.log
InternalTrace*
nunit-agent*
#################
## JetBrains Rider
#################
.idea/

#################
## Visual Studio
#################
Expand Down
1 change: 1 addition & 0 deletions RabbitMQDotNetClient.sln
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{34486CC0-D61E-46BA-9E5E-6E8EFA7C34B5}"
ProjectSection(SolutionItems) = preProject
.editorconfig = .editorconfig
.gitignore = .gitignore
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client", "projects\RabbitMQ.Client\RabbitMQ.Client.csproj", "{8C554257-5ECC-45DB-873D-560BFBB74EC8}"
Expand Down
9 changes: 7 additions & 2 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ RabbitMQ.Client.IChannel.BasicAcks -> System.EventHandler<RabbitMQ.Client.Events
RabbitMQ.Client.IChannel.BasicGetAsync(string queue, bool autoAck) -> System.Threading.Tasks.ValueTask<RabbitMQ.Client.BasicGetResult>
RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler<RabbitMQ.Client.Events.BasicNackEventArgs>
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string exchange, string routingKey, TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicReturn -> System.EventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs>
RabbitMQ.Client.IChannel.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
RabbitMQ.Client.IChannel.ChannelNumber.get -> int
Expand Down Expand Up @@ -658,6 +658,7 @@ RabbitMQ.Client.PublicationAddress
RabbitMQ.Client.PublicationAddress.PublicationAddress(string exchangeType, string exchangeName, string routingKey) -> void
RabbitMQ.Client.QueueDeclareOk
RabbitMQ.Client.QueueDeclareOk.QueueDeclareOk(string queueName, uint messageCount, uint consumerCount) -> void
RabbitMQ.Client.RabbitMQActivitySource
RabbitMQ.Client.ReadOnlyBasicProperties
RabbitMQ.Client.ReadOnlyBasicProperties.AppId.get -> string
RabbitMQ.Client.ReadOnlyBasicProperties.ClusterId.get -> string
Expand Down Expand Up @@ -851,6 +852,8 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync<T>(this RabbitMQ.Cli
static RabbitMQ.Client.PublicationAddress.Parse(string uriLikeString) -> RabbitMQ.Client.PublicationAddress
static RabbitMQ.Client.PublicationAddress.TryParse(string uriLikeString, out RabbitMQ.Client.PublicationAddress result) -> bool
static RabbitMQ.Client.QueueDeclareOk.implicit operator string(RabbitMQ.Client.QueueDeclareOk declareOk) -> string
static RabbitMQ.Client.RabbitMQActivitySource.UseRoutingKeyAsOperationName.get -> bool
static RabbitMQ.Client.RabbitMQActivitySource.UseRoutingKeyAsOperationName.set -> void
static RabbitMQ.Client.TcpClientAdapter.GetMatchingHost(System.Collections.Generic.IReadOnlyCollection<System.Net.IPAddress> addresses, System.Net.Sockets.AddressFamily addressFamily) -> System.Net.IPAddress
static RabbitMQ.Client.TimerBasedCredentialRefresherEventSource.Log.get -> RabbitMQ.Client.TimerBasedCredentialRefresherEventSource
static readonly RabbitMQ.Client.CachedString.Empty -> RabbitMQ.Client.CachedString
Expand Down Expand Up @@ -881,6 +884,8 @@ virtual RabbitMQ.Client.TcpClientAdapter.Dispose(bool disposing) -> void
virtual RabbitMQ.Client.TcpClientAdapter.GetStream() -> System.Net.Sockets.NetworkStream
virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.get -> System.TimeSpan
virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~const RabbitMQ.Client.RabbitMQActivitySource.PublisherSourceName = "RabbitMQ.Client.Publisher" -> string
~const RabbitMQ.Client.RabbitMQActivitySource.SubscriberSourceName = "RabbitMQ.Client.Subscriber" -> string
~override RabbitMQ.Client.Events.EventingBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.Task
~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(RabbitMQ.Client.IEndpointResolver endpointResolver, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
Expand Down
3 changes: 3 additions & 0 deletions projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,7 @@
<PackageReference Include="System.IO.Pipelines" Version="8.0.0" />
</ItemGroup>

<ItemGroup Condition="$(TargetFramework) == 'netstandard2.0'">
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="7.0.2" />
</ItemGroup>
</Project>
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/api/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public interface IChannel : IDisposable
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

/// <summary>
Expand All @@ -203,7 +203,7 @@ ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

#nullable disable
Expand Down
6 changes: 3 additions & 3 deletions projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ public static Task<string> BasicConsumeAsync(this IChannel channel, string queue
public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, in T basicProperties, ReadOnlyMemory<byte> body)
where T : IReadOnlyBasicProperties, IAmqpHeader
{
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, in basicProperties, body);
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body);
}

public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
=> channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);

public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
=> channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);

#nullable disable

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using RabbitMQ.Client.Impl;

Expand Down Expand Up @@ -78,8 +79,9 @@ await base.HandleBasicConsumeOk(consumerTag)
public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
var deliverEventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
// No need to call base, it's empty.
return _receivedWrapper.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body));
return BasicDeliverWrapper(deliverEventArgs);
}

///<summary>Fires the Shutdown event.</summary>
Expand All @@ -93,5 +95,13 @@ await _shutdownWrapper.InvokeAsync(this, reason)
.ConfigureAwait(false);
}
}

private async Task BasicDeliverWrapper(BasicDeliverEventArgs eventArgs)
{
using (Activity activity = RabbitMQActivitySource.Deliver(eventArgs))
{
await _receivedWrapper.InvokeAsync(this, eventArgs).ConfigureAwait(false);
}
}
}
}
11 changes: 7 additions & 4 deletions projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
//---------------------------------------------------------------------------

using System;
using System.Diagnostics;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Events
Expand Down Expand Up @@ -88,10 +89,12 @@ public override void HandleBasicConsumeOk(string consumerTag)
public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
Received?.Invoke(
this,
new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body));
BasicDeliverEventArgs eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
using (Activity activity = RabbitMQActivitySource.SubscriberHasListeners ? RabbitMQActivitySource.Deliver(eventArgs) : default)
{
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
Received?.Invoke(this, eventArgs);
}
}

///<summary>Fires the Shutdown event.</summary>
Expand Down
2 changes: 2 additions & 0 deletions projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public AsyncRpcContinuation(TimeSpan continuationTimeout)
_tcsConfiguredTaskAwaitable = _tcs.Task.ConfigureAwait(false);
}

internal DateTime StartTime { get; } = DateTime.UtcNow;

public ConfiguredTaskAwaitable<T>.ConfiguredTaskAwaiter GetAwaiter()
{
return _tcsConfiguredTaskAwaitable.GetAwaiter();
Expand Down
8 changes: 4 additions & 4 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,13 @@ public ValueTask<BasicGetResult> BasicGetAsync(string queue, bool autoAck)
public ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue)
=> InnerChannel.BasicNackAsync(deliveryTag, multiple, requeue);

public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory);
=> InnerChannel.BasicPublishAsync(exchange, routingKey, basicProperties, body, mandatory);

public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory);
=> InnerChannel.BasicPublishAsync(exchange, routingKey, basicProperties, body, mandatory);

public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global)
{
Expand Down
Loading

0 comments on commit d34160b

Please sign in to comment.