diff --git a/.editorconfig b/.editorconfig
index 0d73107917..96ff657df1 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -167,7 +167,7 @@ dotnet_diagnostic.RS0026.severity = none
dotnet_diagnostic.RS0027.severity = none
dotnet_diagnostic.RS0036.severity = none
dotnet_diagnostic.RS0041.severity = none
-dotnet_diagnostic.RS0051.severity = error
+dotnet_diagnostic.RS0051.severity = none
dotnet_diagnostic.CA2007.severity = error
diff --git a/projects/Directory.Packages.props b/projects/Directory.Packages.props
index 8db8106fd4..649439c40e 100644
--- a/projects/Directory.Packages.props
+++ b/projects/Directory.Packages.props
@@ -7,6 +7,7 @@
+
- 8.0
+ 9.0
+ enable
@@ -64,6 +64,7 @@
See https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1481#pullrequestreview-1847905299
-->
+
diff --git a/projects/RabbitMQ.Client/client/RentedMemory.cs b/projects/RabbitMQ.Client/client/RentedMemory.cs
index 37efc44a4c..c78955731c 100644
--- a/projects/RabbitMQ.Client/client/RentedMemory.cs
+++ b/projects/RabbitMQ.Client/client/RentedMemory.cs
@@ -70,7 +70,7 @@ public void Dispose()
if (RentedArray != null)
{
ArrayPool.Shared.Return(RentedArray);
- RentedArray = default;
+ RentedArray = Array.Empty();
Memory = default;
}
}
diff --git a/projects/RabbitMQ.Client/client/api/AmqpTcpEndpoint.cs b/projects/RabbitMQ.Client/client/api/AmqpTcpEndpoint.cs
index ab10b21677..83489964b9 100644
--- a/projects/RabbitMQ.Client/client/api/AmqpTcpEndpoint.cs
+++ b/projects/RabbitMQ.Client/client/api/AmqpTcpEndpoint.cs
@@ -266,7 +266,7 @@ public static AmqpTcpEndpoint[] ParseMultiple(string addresses)
///
/// Compares this instance by value (protocol, hostname, port) against another instance.
///
- public override bool Equals(object obj)
+ public override bool Equals(object? obj)
{
if (!(obj is AmqpTcpEndpoint other))
{
diff --git a/projects/RabbitMQ.Client/client/api/AmqpTimestamp.cs b/projects/RabbitMQ.Client/client/api/AmqpTimestamp.cs
index c908615af1..ecafffcfa7 100644
--- a/projects/RabbitMQ.Client/client/api/AmqpTimestamp.cs
+++ b/projects/RabbitMQ.Client/client/api/AmqpTimestamp.cs
@@ -66,7 +66,7 @@ public AmqpTimestamp(long unixTime) : this()
public bool Equals(AmqpTimestamp other) => UnixTime == other.UnixTime;
- public override bool Equals(object obj) => obj is AmqpTimestamp other && Equals(other);
+ public override bool Equals(object? obj) => obj is AmqpTimestamp other && Equals(other);
public override int GetHashCode() => UnixTime.GetHashCode();
diff --git a/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs
index 646e2798c5..b4cc859762 100644
--- a/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs
+++ b/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs
@@ -48,7 +48,7 @@ public string[] ConsumerTags
/// If our shuts down, this property will contain a description of the reason for the
/// shutdown. Otherwise it will contain null. See .
///
- public ShutdownEventArgs ShutdownReason { get; protected set; }
+ public ShutdownEventArgs? ShutdownReason { get; protected set; }
///
/// Signalled when the consumer gets cancelled.
@@ -64,7 +64,7 @@ public event AsyncEventHandler ConsumerCancelled
/// Retrieve the this consumer is associated with,
/// for use in acknowledging received messages, for instance.
///
- public IChannel Channel { get; set; }
+ public IChannel? Channel { get; set; }
///
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
diff --git a/projects/RabbitMQ.Client/client/api/BasicCredentialsProvider.cs b/projects/RabbitMQ.Client/client/api/BasicCredentialsProvider.cs
index 28c1c9ac63..97faa08d03 100644
--- a/projects/RabbitMQ.Client/client/api/BasicCredentialsProvider.cs
+++ b/projects/RabbitMQ.Client/client/api/BasicCredentialsProvider.cs
@@ -39,7 +39,7 @@ public class BasicCredentialsProvider : ICredentialsProvider
private readonly string _userName;
private readonly string _password;
- public BasicCredentialsProvider(string name, string userName, string password)
+ public BasicCredentialsProvider(string? name, string userName, string password)
{
_name = name ?? string.Empty;
_userName = userName ?? throw new ArgumentNullException(nameof(userName));
diff --git a/projects/RabbitMQ.Client/client/api/BasicProperties.cs b/projects/RabbitMQ.Client/client/api/BasicProperties.cs
index 05872f6673..b39d9a361b 100644
--- a/projects/RabbitMQ.Client/client/api/BasicProperties.cs
+++ b/projects/RabbitMQ.Client/client/api/BasicProperties.cs
@@ -31,12 +31,12 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Client.Impl;
namespace RabbitMQ.Client
{
-#nullable enable
///
/// AMQP specification content header properties for content class "basic".
///
@@ -74,7 +74,7 @@ public PublicationAddress? ReplyToAddress
{
get
{
- PublicationAddress.TryParse(ReplyTo, out PublicationAddress result);
+ PublicationAddress.TryParse(ReplyTo, out PublicationAddress? result);
return result;
}
@@ -118,19 +118,30 @@ public BasicProperties(IReadOnlyBasicProperties input)
public void ClearAppId() => AppId = default;
public void ClearClusterId() => ClusterId = default;
+ [MemberNotNullWhen(true, nameof(ContentType))]
public bool IsContentTypePresent() => ContentType != default;
+ [MemberNotNullWhen(true, nameof(ContentEncoding))]
public bool IsContentEncodingPresent() => ContentEncoding != default;
+ [MemberNotNullWhen(true, nameof(Headers))]
public bool IsHeadersPresent() => Headers != default;
public bool IsDeliveryModePresent() => DeliveryMode != default;
public bool IsPriorityPresent() => Priority != default;
+ [MemberNotNullWhen(true, nameof(CorrelationId))]
public bool IsCorrelationIdPresent() => CorrelationId != default;
+ [MemberNotNullWhen(true, nameof(ReplyTo))]
public bool IsReplyToPresent() => ReplyTo != default;
+ [MemberNotNullWhen(true, nameof(Expiration))]
public bool IsExpirationPresent() => Expiration != default;
+ [MemberNotNullWhen(true, nameof(MessageId))]
public bool IsMessageIdPresent() => MessageId != default;
public bool IsTimestampPresent() => Timestamp != default;
+ [MemberNotNullWhen(true, nameof(Type))]
public bool IsTypePresent() => Type != default;
+ [MemberNotNullWhen(true, nameof(UserId))]
public bool IsUserIdPresent() => UserId != default;
+ [MemberNotNullWhen(true, nameof(AppId))]
public bool IsAppIdPresent() => AppId != default;
+ [MemberNotNullWhen(true, nameof(ClusterId))]
public bool IsClusterIdPresent() => ClusterId != default;
ushort IAmqpHeader.ProtocolClassId => ClassConstants.Basic;
diff --git a/projects/RabbitMQ.Client/client/api/BinaryTableValue.cs b/projects/RabbitMQ.Client/client/api/BinaryTableValue.cs
index d70ed52330..16a2e08c83 100644
--- a/projects/RabbitMQ.Client/client/api/BinaryTableValue.cs
+++ b/projects/RabbitMQ.Client/client/api/BinaryTableValue.cs
@@ -29,6 +29,8 @@
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------
+using System;
+
namespace RabbitMQ.Client
{
/// Wrapper for a byte[]. May appear as values read from
@@ -61,9 +63,9 @@ namespace RabbitMQ.Client
public class BinaryTableValue
{
///
- /// Creates a new instance of the with null for its Bytes property.
+ /// Creates a new instance of the with an empty array for its Bytes property.
///
- public BinaryTableValue() : this(null)
+ public BinaryTableValue() : this(Array.Empty())
{
}
diff --git a/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs b/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs
index bf3512a8fc..fcc225ea61 100644
--- a/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs
+++ b/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs
@@ -37,7 +37,6 @@
namespace RabbitMQ.Client
{
-#nullable enable
///
/// The configuration of a connection.
///
@@ -151,7 +150,7 @@ public sealed class ConnectionConfig
internal readonly Func> FrameHandlerFactoryAsync;
internal ConnectionConfig(string virtualHost, string userName, string password,
- ICredentialsProvider credentialsProvider, ICredentialsRefresher credentialsRefresher,
+ ICredentialsProvider? credentialsProvider, ICredentialsRefresher credentialsRefresher,
IEnumerable authMechanisms,
IDictionary clientProperties, string? clientProvidedName,
ushort maxChannelCount, uint maxFrameSize, uint maxInboundMessageBodySize, bool topologyRecoveryEnabled,
diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
index 06851fd532..5ac3f46981 100644
--- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
+++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
@@ -31,6 +31,7 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Net.Security;
using System.Reflection;
@@ -107,7 +108,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
public const uint DefaultFrameMax = 0;
///
- /// Default value for ConnectionFactory
's MaxInboundMessageBodySize
.
+ /// Default value for ConnectionFactory
's MaxInboundMessageBodySize
.
///
public const uint DefaultMaxInboundMessageBodySize = 1_048_576 * 64;
@@ -196,8 +197,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
private TimeSpan _continuationTimeout = TimeSpan.FromSeconds(20);
// just here to hold the value that was set through the setter
- private Uri _uri;
- private string _clientProvidedName;
+ private string? _clientProvidedName;
///
/// Amount of time protocol handshake operations are allowed to take before
@@ -278,7 +278,7 @@ public TimeSpan ContinuationTimeout
///
public ConnectionFactory()
{
- ClientProperties = new Dictionary(DefaultClientProperties);
+ ClientProperties = new Dictionary(DefaultClientProperties);
}
///
@@ -299,12 +299,12 @@ public AmqpTcpEndpoint Endpoint
///
/// Dictionary of client properties to be sent to the server.
///
- public IDictionary ClientProperties { get; set; }
+ public IDictionary ClientProperties { get; set; }
- private static readonly Dictionary DefaultClientProperties = new Dictionary(5)
+ private static readonly Dictionary DefaultClientProperties = new Dictionary(5)
{
["product"] = Encoding.UTF8.GetBytes("RabbitMQ"),
- ["version"] = Encoding.UTF8.GetBytes(typeof(ConnectionFactory).Assembly.GetCustomAttribute().InformationalVersion),
+ ["version"] = Encoding.UTF8.GetBytes(typeof(ConnectionFactory).Assembly.GetCustomAttribute()!.InformationalVersion),
["platform"] = Encoding.UTF8.GetBytes(".NET"),
["copyright"] = Encoding.UTF8.GetBytes("Copyright (c) 2007-2023 Broadcom."),
["information"] = Encoding.UTF8.GetBytes("Licensed under the MPL. See https://www.rabbitmq.com/")
@@ -323,7 +323,7 @@ public AmqpTcpEndpoint Endpoint
///
/// CredentialsProvider used to obtain username and password.
///
- public ICredentialsProvider CredentialsProvider { get; set; }
+ public ICredentialsProvider? CredentialsProvider { get; set; }
///
/// Used to refresh credentials.
@@ -361,14 +361,14 @@ public AmqpTcpEndpoint Endpoint
///
public Uri Uri
{
- get { return _uri; }
+ get { return GetUri(); }
set { SetUri(value); }
}
///
/// Default client provided name to be used for connections.
///
- public string ClientProvidedName
+ public string? ClientProvidedName
{
get => _clientProvidedName;
set
@@ -381,7 +381,7 @@ public string ClientProvidedName
/// Given a list of mechanism names supported by the server, select a preferred mechanism,
/// or null if we have none in common.
///
- public IAuthMechanismFactory AuthMechanismFactory(IEnumerable argServerMechanismNames)
+ public IAuthMechanismFactory? AuthMechanismFactory(IEnumerable argServerMechanismNames)
{
string[] serverMechanismNames = argServerMechanismNames.ToArray();
@@ -435,7 +435,7 @@ public Task CreateConnectionAsync(
///
/// When the configured hostname was not reachable.
///
- public Task CreateConnectionAsync(string clientProvidedName,
+ public Task CreateConnectionAsync(string? clientProvidedName,
CancellationToken cancellationToken = default)
{
return CreateConnectionAsync(EndpointResolverFactory(LocalEndpoints()), clientProvidedName, cancellationToken);
@@ -483,7 +483,7 @@ public Task CreateConnectionAsync(IEnumerable hostnames,
///
/// When no hostname was reachable.
///
- public Task CreateConnectionAsync(IEnumerable hostnames, string clientProvidedName,
+ public Task CreateConnectionAsync(IEnumerable hostnames, string? clientProvidedName,
CancellationToken cancellationToken = default)
{
IEnumerable endpoints = hostnames.Select(h => new AmqpTcpEndpoint(h, Port, Ssl, MaxInboundMessageBodySize));
@@ -530,7 +530,7 @@ public Task CreateConnectionAsync(IEnumerable endp
///
/// When no hostname was reachable.
///
- public Task CreateConnectionAsync(IEnumerable endpoints, string clientProvidedName,
+ public Task CreateConnectionAsync(IEnumerable endpoints, string? clientProvidedName,
CancellationToken cancellationToken = default)
{
return CreateConnectionAsync(EndpointResolverFactory(endpoints), clientProvidedName, cancellationToken);
@@ -553,7 +553,7 @@ public Task CreateConnectionAsync(IEnumerable endp
///
/// When no hostname was reachable.
///
- public async Task CreateConnectionAsync(IEndpointResolver endpointResolver, string clientProvidedName,
+ public async Task CreateConnectionAsync(IEndpointResolver endpointResolver, string? clientProvidedName,
CancellationToken cancellationToken = default)
{
ConnectionConfig config = CreateConfig(clientProvidedName);
@@ -561,8 +561,7 @@ public async Task CreateConnectionAsync(IEndpointResolver endpointR
{
if (AutomaticRecoveryEnabled)
{
- var c = new AutorecoveringConnection(config, endpointResolver);
- return await c.OpenAsync(cancellationToken)
+ return await AutorecoveringConnection.CreateAsync(config, endpointResolver, cancellationToken)
.ConfigureAwait(false);
}
else
@@ -591,7 +590,7 @@ public async Task CreateConnectionAsync(IEndpointResolver endpointR
}
}
- private ConnectionConfig CreateConfig(string clientProvidedName)
+ private ConnectionConfig CreateConfig(string? clientProvidedName)
{
return new ConnectionConfig(
VirtualHost,
@@ -622,10 +621,9 @@ internal async Task CreateFrameHandlerAsync(
AmqpTcpEndpoint endpoint, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
- IFrameHandler fh = new SocketFrameHandler(endpoint, SocketFactory, RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout);
- await fh.ConnectAsync(cancellationToken)
+ SocketFrameHandler frameHandler = await SocketFrameHandler.CreateAsync(endpoint, SocketFactory, RequestedConnectionTimeout, cancellationToken)
.ConfigureAwait(false);
- return ConfigureFrameHandler(fh);
+ return ConfigureFrameHandler(frameHandler);
}
private IFrameHandler ConfigureFrameHandler(IFrameHandler fh)
@@ -647,6 +645,48 @@ private IFrameHandler ConfigureFrameHandler(IFrameHandler fh)
return fh;
}
+ private Uri GetUri()
+ {
+ var builder = new UriBuilder();
+
+ if (Ssl.Enabled)
+ {
+ builder.Scheme = "amqps";
+ }
+ else
+ {
+ builder.Scheme = "amqp";
+ }
+
+ builder.Host = HostName;
+
+ if (Port == AmqpTcpEndpoint.UseDefaultPort)
+ {
+ builder.Port = 5672;
+ }
+ else
+ {
+ builder.Port = Port;
+ }
+
+ if (false == string.IsNullOrEmpty(UserName))
+ {
+ builder.UserName = UserName;
+ }
+
+ if (false == string.IsNullOrEmpty(Password))
+ {
+ builder.Password = Password;
+ }
+
+ if (false == string.IsNullOrEmpty(VirtualHost))
+ {
+ builder.Path = Uri.EscapeDataString(VirtualHost);
+ }
+
+ return builder.Uri;
+ }
+
private void SetUri(Uri uri)
{
Endpoint = new AmqpTcpEndpoint();
@@ -666,11 +706,13 @@ private void SetUri(Uri uri)
{
throw new ArgumentException($"Wrong scheme in AMQP URI: {uri.Scheme}");
}
+
string host = uri.Host;
if (!string.IsNullOrEmpty(host))
{
HostName = host;
}
+
Ssl.ServerName = HostName;
int port = uri.Port;
@@ -700,12 +742,11 @@ that has at least the path segment "/". */
{
throw new ArgumentException($"Multiple segments in path of AMQP URI: {string.Join(", ", uri.Segments)}");
}
+
if (uri.Segments.Length == 2)
{
VirtualHost = UriDecode(uri.Segments[1]);
}
-
- _uri = uri;
}
///
@@ -713,7 +754,7 @@ that has at least the path segment "/". */
///
private static string UriDecode(string uri)
{
- return System.Uri.UnescapeDataString(uri.Replace("+", "%2B"));
+ return Uri.UnescapeDataString(uri.Replace("+", "%2B"));
}
private List LocalEndpoints()
@@ -721,7 +762,8 @@ private List LocalEndpoints()
return new List { Endpoint };
}
- private static string EnsureClientProvidedNameLength(string clientProvidedName)
+ [return: NotNullIfNotNull(nameof(clientProvidedName))]
+ private static string? EnsureClientProvidedNameLength(string? clientProvidedName)
{
if (clientProvidedName != null)
{
diff --git a/projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs
index 0ef3eca115..7ad66cd2f1 100644
--- a/projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs
+++ b/projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs
@@ -90,7 +90,7 @@ public string[] ConsumerTags
/// If our shuts down, this property will contain a description of the reason for the
/// shutdown. Otherwise it will contain null. See .
///
- public ShutdownEventArgs ShutdownReason { get; protected set; }
+ public ShutdownEventArgs? ShutdownReason { get; protected set; }
///
/// Signalled when the consumer gets cancelled.
@@ -106,7 +106,7 @@ public event EventHandler ConsumerCancelled
/// Retrieve the this consumer is associated with,
/// for use in acknowledging received messages, for instance.
///
- public IChannel Channel { get; set; }
+ public IChannel? Channel { get; set; }
///
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
diff --git a/projects/RabbitMQ.Client/client/api/ExternalMechanism.cs b/projects/RabbitMQ.Client/client/api/ExternalMechanism.cs
index 76bb9f14e4..364881f66f 100644
--- a/projects/RabbitMQ.Client/client/api/ExternalMechanism.cs
+++ b/projects/RabbitMQ.Client/client/api/ExternalMechanism.cs
@@ -38,7 +38,7 @@ public class ExternalMechanism : IAuthMechanism
///
/// Handle one round of challenge-response.
///
- public byte[] handleChallenge(byte[] challenge, ConnectionConfig config)
+ public byte[] handleChallenge(byte[]? challenge, ConnectionConfig config)
{
return Array.Empty();
}
diff --git a/projects/RabbitMQ.Client/client/api/IAmqpHeader.cs b/projects/RabbitMQ.Client/client/api/IAmqpHeader.cs
index 5925a2b84b..23192d21f8 100644
--- a/projects/RabbitMQ.Client/client/api/IAmqpHeader.cs
+++ b/projects/RabbitMQ.Client/client/api/IAmqpHeader.cs
@@ -31,7 +31,6 @@
namespace RabbitMQ.Client
{
-#nullable enable
///
/// A AMQP header.
///
diff --git a/projects/RabbitMQ.Client/client/api/IAmqpWriteable.cs b/projects/RabbitMQ.Client/client/api/IAmqpWriteable.cs
index ec317b5c0b..0b02af222e 100644
--- a/projects/RabbitMQ.Client/client/api/IAmqpWriteable.cs
+++ b/projects/RabbitMQ.Client/client/api/IAmqpWriteable.cs
@@ -33,7 +33,6 @@
namespace RabbitMQ.Client
{
-#nullable enable
///
/// A AMQP writeable.
///
diff --git a/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs
index d86950b496..b96fbe2e81 100644
--- a/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs
+++ b/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs
@@ -11,7 +11,7 @@ public interface IAsyncBasicConsumer
/// Retrieve the this consumer is associated with,
/// for use in acknowledging received messages, for instance.
///
- IChannel Channel { get; }
+ IChannel? Channel { get; }
///
/// Signalled when the consumer gets cancelled.
diff --git a/projects/RabbitMQ.Client/client/api/IAuthMechanism.cs b/projects/RabbitMQ.Client/client/api/IAuthMechanism.cs
index 963c6b4358..08c44e4af5 100644
--- a/projects/RabbitMQ.Client/client/api/IAuthMechanism.cs
+++ b/projects/RabbitMQ.Client/client/api/IAuthMechanism.cs
@@ -39,6 +39,6 @@ public interface IAuthMechanism
///
/// Handle one round of challenge-response.
///
- byte[] handleChallenge(byte[] challenge, ConnectionConfig config);
+ byte[] handleChallenge(byte[]? challenge, ConnectionConfig config);
}
}
diff --git a/projects/RabbitMQ.Client/client/api/IBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/IBasicConsumer.cs
index 6906e6e12f..82b802e1b7 100644
--- a/projects/RabbitMQ.Client/client/api/IBasicConsumer.cs
+++ b/projects/RabbitMQ.Client/client/api/IBasicConsumer.cs
@@ -54,7 +54,7 @@ public interface IBasicConsumer
/// Retrieve the this consumer is associated with,
/// for use in acknowledging received messages, for instance.
///
- IChannel Channel { get; }
+ IChannel? Channel { get; }
///
/// Signalled when the consumer gets cancelled.
diff --git a/projects/RabbitMQ.Client/client/api/IBasicProperties.cs b/projects/RabbitMQ.Client/client/api/IBasicProperties.cs
index 14610638c6..c5a289501c 100644
--- a/projects/RabbitMQ.Client/client/api/IBasicProperties.cs
+++ b/projects/RabbitMQ.Client/client/api/IBasicProperties.cs
@@ -33,7 +33,6 @@
namespace RabbitMQ.Client
{
-#nullable enable
///
/// The AMQP Basic headers class interface,
/// spanning the union of the functionality offered by versions
diff --git a/projects/RabbitMQ.Client/client/api/IChannel.cs b/projects/RabbitMQ.Client/client/api/IChannel.cs
index 27facd5248..dc171db3f8 100644
--- a/projects/RabbitMQ.Client/client/api/IChannel.cs
+++ b/projects/RabbitMQ.Client/client/api/IChannel.cs
@@ -56,7 +56,7 @@ public interface IChannel : IDisposable
/// Returns null if the session is still in a state where it can be used,
/// or the cause of its closure otherwise.
///
- ShutdownEventArgs CloseReason { get; }
+ ShutdownEventArgs? CloseReason { get; }
/// Signalled when an unexpected message is delivered.
///
@@ -82,7 +82,7 @@ public interface IChannel : IDisposable
///
/// Most people will not need to use this.
///
- IBasicConsumer DefaultConsumer { get; set; }
+ IBasicConsumer? DefaultConsumer { get; set; }
///
/// Returns true if the channel is no longer in a state where it can be used.
@@ -106,7 +106,7 @@ public interface IChannel : IDisposable
///
/// https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.queue-name
///
- string CurrentQueue { get; }
+ string? CurrentQueue { get; }
///
/// Signalled when a Basic.Ack command arrives from the broker.
@@ -178,7 +178,7 @@ Task BasicCancelAsync(string consumerTag, bool noWait = false,
/// Cancellation token for this operation.
///
Task BasicConsumeAsync(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive,
- IDictionary arguments, IBasicConsumer consumer,
+ IDictionary? arguments, IBasicConsumer consumer,
CancellationToken cancellationToken = default);
///
@@ -190,11 +190,9 @@ Task BasicConsumeAsync(string queue, bool autoAck, string consumerTag, b
/// If set to true, automatically ack the message.
/// Cancellation token for this operation.
///
- ValueTask BasicGetAsync(string queue, bool autoAck,
+ ValueTask BasicGetAsync(string queue, bool autoAck,
CancellationToken cancellationToken = default);
-#nullable enable
-
///
/// Asynchronously publishes a message.
///
@@ -229,16 +227,14 @@ ValueTask BasicPublishAsync(CachedString exchange, CachedString rou
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
-#nullable disable
-
///
/// Configures QoS parameters of the Basic content-class.
///
/// Size of the prefetch in bytes.
/// The prefetch count.
- /// If set to true, use global prefetch.
+ /// If set to true, use global prefetch.
/// Cancellation token for this operation.
- /// See the Consumer Prefetch documentation.
+ /// See the Consumer Prefetch documentation.
Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global,
CancellationToken cancellationToken = default);
@@ -286,7 +282,7 @@ Task CloseAsync(ShutdownEventArgs reason, bool abort,
/// The exchange is declared non-internal.
///
Task ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete,
- IDictionary arguments = null, bool passive = false, bool noWait = false,
+ IDictionary? arguments = null, bool passive = false, bool noWait = false,
CancellationToken cancellationToken = default);
///
@@ -325,7 +321,7 @@ Task ExchangeDeleteAsync(string exchange, bool ifUnused = false, bool noWait = f
/// Routing key must be shorter than 255 bytes.
///
Task ExchangeBindAsync(string destination, string source, string routingKey,
- IDictionary arguments = null, bool noWait = false,
+ IDictionary? arguments = null, bool noWait = false,
CancellationToken cancellationToken = default);
///
@@ -341,7 +337,7 @@ Task ExchangeBindAsync(string destination, string source, string routingKey,
/// Routing key must be shorter than 255 bytes.
///
Task ExchangeUnbindAsync(string destination, string source, string routingKey,
- IDictionary arguments = null, bool noWait = false,
+ IDictionary? arguments = null, bool noWait = false,
CancellationToken cancellationToken = default);
///
@@ -356,7 +352,7 @@ Task ExchangeUnbindAsync(string destination, string source, string routingKey,
/// Optional; Set to true to not require a response from the server.
/// CancellationToken for this operation.
Task QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete,
- IDictionary arguments = null, bool passive = false, bool noWait = false,
+ IDictionary? arguments = null, bool passive = false, bool noWait = false,
CancellationToken cancellationToken = default);
/// Asynchronously declare a queue passively.
@@ -403,7 +399,7 @@ Task QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty, bool noWa
/// Routing key must be shorter than 255 bytes.
///
Task QueueBindAsync(string queue, string exchange, string routingKey,
- IDictionary arguments = null, bool noWait = false,
+ IDictionary? arguments = null, bool noWait = false,
CancellationToken cancellationToken = default);
///
@@ -418,7 +414,7 @@ Task QueueBindAsync(string queue, string exchange, string routingKey,
/// Routing key must be shorter than 255 bytes.
///
Task QueueUnbindAsync(string queue, string exchange, string routingKey,
- IDictionary arguments = null,
+ IDictionary? arguments = null,
CancellationToken cancellationToken = default);
///
diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
index 59d1fecb8f..c35a3f4562 100644
--- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
+++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
@@ -48,7 +48,7 @@ public static Task BasicConsumeAsync(this IChannel channel,
string consumerTag = "",
bool noLocal = false,
bool exclusive = false,
- IDictionary arguments = null)
+ IDictionary? arguments = null)
{
return channel.BasicConsumeAsync(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer);
}
@@ -74,14 +74,12 @@ public static Task BasicConsumeAsync(this IChannel channel, string queue
public static Task BasicConsumeAsync(this IChannel channel, string queue,
bool autoAck,
string consumerTag,
- IDictionary arguments,
+ IDictionary? arguments,
IBasicConsumer consumer)
{
return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, arguments, consumer);
}
-#nullable enable
-
///
/// (Extension method) Convenience overload of BasicPublish.
///
@@ -103,13 +101,11 @@ public static ValueTask BasicPublishAsync(this IChannel channel, CachedString ex
CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false) =>
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
-#nullable disable
-
///
/// Asynchronously declare a queue.
///
public static Task QueueDeclareAsync(this IChannel channel, string queue = "", bool durable = false, bool exclusive = true,
- bool autoDelete = true, IDictionary arguments = null, bool noWait = false)
+ bool autoDelete = true, IDictionary? arguments = null, bool noWait = false)
{
return channel.QueueDeclareAsync(queue: queue, passive: false,
durable: durable, exclusive: exclusive, autoDelete: autoDelete,
@@ -120,7 +116,7 @@ public static Task QueueDeclareAsync(this IChannel channel, stri
/// Asynchronously declare an exchange.
///
public static Task ExchangeDeclareAsync(this IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false,
- IDictionary arguments = null, bool noWait = false)
+ IDictionary? arguments = null, bool noWait = false)
{
return channel.ExchangeDeclareAsync(exchange, type, durable, autoDelete,
arguments: arguments, passive: false, noWait: noWait);
@@ -137,7 +133,7 @@ public static Task QueueDeleteAsync(this IChannel channel, string queue, b
///
/// Asynchronously unbinds a queue.
///
- public static Task QueueUnbindAsync(this IChannel channel, string queue, string exchange, string routingKey, IDictionary arguments = null)
+ public static Task QueueUnbindAsync(this IChannel channel, string queue, string exchange, string routingKey, IDictionary? arguments = null)
{
return channel.QueueUnbindAsync(queue, exchange, routingKey, arguments);
}
diff --git a/projects/RabbitMQ.Client/client/api/IConnection.cs b/projects/RabbitMQ.Client/client/api/IConnection.cs
index 286b810570..5f3510ee77 100644
--- a/projects/RabbitMQ.Client/client/api/IConnection.cs
+++ b/projects/RabbitMQ.Client/client/api/IConnection.cs
@@ -66,7 +66,7 @@ public interface IConnection : INetworkConnection, IDisposable
///
/// A copy of the client properties that has been sent to the server.
///
- IDictionary ClientProperties { get; }
+ IDictionary ClientProperties { get; }
///
/// Returns null if the connection is still in a state
@@ -86,7 +86,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// vary depending on the particular operation being attempted).
///
///
- ShutdownEventArgs CloseReason { get; }
+ ShutdownEventArgs? CloseReason { get; }
///
/// Retrieve the endpoint this connection is connected to.
@@ -118,7 +118,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// A dictionary of the server properties sent by the server while establishing the connection.
/// This typically includes the product name and version of the server.
///
- IDictionary ServerProperties { get; }
+ IDictionary? ServerProperties { get; }
///
/// Returns the list of objects that contain information
@@ -137,7 +137,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// be used as a connection identifier, e.g. in HTTP API requests.
/// This value is supposed to be human-readable.
///
- string ClientProvidedName { get; }
+ string? ClientProvidedName { get; }
///
/// Signalled when an exception occurs in a callback invoked by the connection.
diff --git a/projects/RabbitMQ.Client/client/api/IConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/IConnectionFactory.cs
index eb138ceab6..1059cde1c5 100644
--- a/projects/RabbitMQ.Client/client/api/IConnectionFactory.cs
+++ b/projects/RabbitMQ.Client/client/api/IConnectionFactory.cs
@@ -42,7 +42,7 @@ public interface IConnectionFactory
///
/// Dictionary of client properties to be sent to the server.
///
- IDictionary ClientProperties { get; set; }
+ IDictionary ClientProperties { get; set; }
///
/// Password to use when authenticating to the server.
@@ -78,7 +78,7 @@ public interface IConnectionFactory
/// Credentials provider. It is optional. When set, username and password
/// are obtained thru this provider.
///
- ICredentialsProvider CredentialsProvider { get; set; }
+ ICredentialsProvider? CredentialsProvider { get; set; }
ICredentialsRefresher CredentialsRefresher { get; set; }
@@ -90,13 +90,13 @@ public interface IConnectionFactory
///
/// Default client provided name to be used for connections.
///
- string ClientProvidedName { get; set; }
+ string? ClientProvidedName { get; set; }
///
/// Given a list of mechanism names supported by the server, select a preferred mechanism,
/// or null if we have none in common.
///
- IAuthMechanismFactory AuthMechanismFactory(IEnumerable mechanismNames);
+ IAuthMechanismFactory? AuthMechanismFactory(IEnumerable mechanismNames);
///
/// Asynchronously create a connection to the specified endpoint.
diff --git a/projects/RabbitMQ.Client/client/api/ICredentialsRefresher.cs b/projects/RabbitMQ.Client/client/api/ICredentialsRefresher.cs
index ac1d7f6427..ea16d26b1f 100644
--- a/projects/RabbitMQ.Client/client/api/ICredentialsRefresher.cs
+++ b/projects/RabbitMQ.Client/client/api/ICredentialsRefresher.cs
@@ -94,7 +94,7 @@ public ICredentialsProvider Register(ICredentialsProvider provider, ICredentials
public bool Unregister(ICredentialsProvider provider)
{
- if (_registrations.TryRemove(provider, out System.Timers.Timer timer))
+ if (_registrations.TryRemove(provider, out System.Timers.Timer? timer))
{
try
{
@@ -116,7 +116,7 @@ public bool Unregister(ICredentialsProvider provider)
private System.Timers.Timer scheduleTimer(ICredentialsProvider provider, ICredentialsRefresher.NotifyCredentialRefreshedAsync callback)
{
System.Timers.Timer timer = new System.Timers.Timer();
- timer.Interval = provider.ValidUntil.Value.TotalMilliseconds * (1.0 - (1 / 3.0));
+ timer.Interval = provider.ValidUntil!.Value.TotalMilliseconds * (1.0 - (1 / 3.0));
timer.Elapsed += (o, e) =>
{
TimerBasedCredentialRefresherEventSource.Log.TriggeredTimer(provider.Name);
diff --git a/projects/RabbitMQ.Client/client/api/IEndpointResolverExtensions.cs b/projects/RabbitMQ.Client/client/api/IEndpointResolverExtensions.cs
index b64975bd22..e571d628ad 100644
--- a/projects/RabbitMQ.Client/client/api/IEndpointResolverExtensions.cs
+++ b/projects/RabbitMQ.Client/client/api/IEndpointResolverExtensions.cs
@@ -49,7 +49,7 @@ public static async Task SelectOneAsync(this IEndpointResolver resolver,
try
{
t = await selector(ep, cancellationToken).ConfigureAwait(false);
- if (t.Equals(default(T)) == false)
+ if (t!.Equals(default(T)) == false)
{
return t;
}
@@ -71,12 +71,19 @@ public static async Task SelectOneAsync(this IEndpointResolver resolver,
}
}
- if (EqualityComparer.Default.Equals(t, default(T)) && exceptions.Count > 0)
+ if (EqualityComparer.Default.Equals(t!, default!))
{
- throw new AggregateException(exceptions);
+ if (exceptions.Count > 0)
+ {
+ throw new AggregateException(exceptions);
+ }
+ else
+ {
+ throw new InvalidOperationException(InternalConstants.BugFound);
+ }
}
- return t;
+ return t!;
}
}
}
diff --git a/projects/RabbitMQ.Client/client/api/IRecordedBinding.cs b/projects/RabbitMQ.Client/client/api/IRecordedBinding.cs
index 1ec1ef1179..1f5c57b0e8 100644
--- a/projects/RabbitMQ.Client/client/api/IRecordedBinding.cs
+++ b/projects/RabbitMQ.Client/client/api/IRecordedBinding.cs
@@ -2,7 +2,6 @@
namespace RabbitMQ.Client
{
-#nullable enable
public interface IRecordedBinding
{
string Source { get; }
@@ -11,6 +10,6 @@ public interface IRecordedBinding
string RoutingKey { get; }
- IDictionary? Arguments { get; }
+ IDictionary? Arguments { get; }
}
}
diff --git a/projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs b/projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs
index 576a1663bb..99321b5239 100644
--- a/projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs
+++ b/projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs
@@ -2,7 +2,6 @@
namespace RabbitMQ.Client
{
-#nullable enable
public interface IRecordedConsumer
{
string ConsumerTag { get; }
@@ -13,6 +12,6 @@ public interface IRecordedConsumer
bool Exclusive { get; }
- IDictionary? Arguments { get; }
+ IDictionary? Arguments { get; }
}
}
diff --git a/projects/RabbitMQ.Client/client/api/IRecordedExchange.cs b/projects/RabbitMQ.Client/client/api/IRecordedExchange.cs
index bebd5bb83b..9da788e38a 100644
--- a/projects/RabbitMQ.Client/client/api/IRecordedExchange.cs
+++ b/projects/RabbitMQ.Client/client/api/IRecordedExchange.cs
@@ -2,7 +2,6 @@
namespace RabbitMQ.Client
{
-#nullable enable
public interface IRecordedExchange
{
string Name { get; }
@@ -13,6 +12,6 @@ public interface IRecordedExchange
bool AutoDelete { get; }
- IDictionary? Arguments { get; }
+ IDictionary? Arguments { get; }
}
}
diff --git a/projects/RabbitMQ.Client/client/api/IRecordedQueue.cs b/projects/RabbitMQ.Client/client/api/IRecordedQueue.cs
index 9f5ac2474b..d7526512f1 100644
--- a/projects/RabbitMQ.Client/client/api/IRecordedQueue.cs
+++ b/projects/RabbitMQ.Client/client/api/IRecordedQueue.cs
@@ -2,7 +2,6 @@
namespace RabbitMQ.Client
{
-#nullable enable
public interface IRecordedQueue
{
string Name { get; }
@@ -13,7 +12,7 @@ public interface IRecordedQueue
bool AutoDelete { get; }
- IDictionary? Arguments { get; }
+ IDictionary? Arguments { get; }
bool IsServerNamed { get; }
}
diff --git a/projects/RabbitMQ.Client/client/api/InternalConstants.cs b/projects/RabbitMQ.Client/client/api/InternalConstants.cs
index d70900def4..f61e0bb4ae 100644
--- a/projects/RabbitMQ.Client/client/api/InternalConstants.cs
+++ b/projects/RabbitMQ.Client/client/api/InternalConstants.cs
@@ -51,5 +51,7 @@ internal static class InternalConstants
/// https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/980
///
internal const int DefaultRabbitMqMaxClientProvideNameLength = 3000;
+
+ internal const string BugFound = "BUG FOUND - please report this exception (with stacktrace) here: https://github.com/rabbitmq/rabbitmq-dotnet-client/issues";
}
}
diff --git a/projects/RabbitMQ.Client/client/api/PlainMechanism.cs b/projects/RabbitMQ.Client/client/api/PlainMechanism.cs
index 7c4b374dff..ca2218f5d4 100644
--- a/projects/RabbitMQ.Client/client/api/PlainMechanism.cs
+++ b/projects/RabbitMQ.Client/client/api/PlainMechanism.cs
@@ -35,7 +35,7 @@ namespace RabbitMQ.Client
{
public class PlainMechanism : IAuthMechanism
{
- public byte[] handleChallenge(byte[] challenge, ConnectionConfig config)
+ public byte[] handleChallenge(byte[]? challenge, ConnectionConfig config)
{
return Encoding.UTF8.GetBytes($"\0{config.CredentialsProvider.UserName}\0{config.CredentialsProvider.Password}");
}
diff --git a/projects/RabbitMQ.Client/client/api/PublicationAddress.cs b/projects/RabbitMQ.Client/client/api/PublicationAddress.cs
index 44cd45d65d..73afff532b 100644
--- a/projects/RabbitMQ.Client/client/api/PublicationAddress.cs
+++ b/projects/RabbitMQ.Client/client/api/PublicationAddress.cs
@@ -29,6 +29,7 @@
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------
+using System.Diagnostics.CodeAnalysis;
using System.Text.RegularExpressions;
namespace RabbitMQ.Client
@@ -95,7 +96,7 @@ public PublicationAddress(string exchangeType, string exchangeName, string routi
/// Parse a out of the given string,
/// using the regex.
///
- public static PublicationAddress Parse(string uriLikeString)
+ public static PublicationAddress? Parse(string uriLikeString)
{
Match match = PSEUDO_URI_PARSER.Match(uriLikeString);
if (match.Success)
@@ -107,7 +108,7 @@ public static PublicationAddress Parse(string uriLikeString)
return null;
}
- public static bool TryParse(string uriLikeString, out PublicationAddress result)
+ public static bool TryParse([NotNullWhen(true)] string? uriLikeString, out PublicationAddress? result)
{
// Callers such as IBasicProperties.ReplyToAddress
// expect null result for invalid input.
@@ -121,7 +122,7 @@ public static bool TryParse(string uriLikeString, out PublicationAddress result)
{
try
{
- PublicationAddress res = Parse(uriLikeString);
+ PublicationAddress? res = Parse(uriLikeString);
result = res;
return true;
}
diff --git a/projects/RabbitMQ.Client/client/api/ReadonlyBasicProperties.cs b/projects/RabbitMQ.Client/client/api/ReadonlyBasicProperties.cs
index 98010a3184..a0651473a5 100644
--- a/projects/RabbitMQ.Client/client/api/ReadonlyBasicProperties.cs
+++ b/projects/RabbitMQ.Client/client/api/ReadonlyBasicProperties.cs
@@ -31,11 +31,11 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
using RabbitMQ.Client.Impl;
namespace RabbitMQ.Client
{
-#nullable enable
///
/// AMQP specification content header properties for content class "basic"
///
@@ -77,7 +77,7 @@ public PublicationAddress? ReplyToAddress
{
get
{
- PublicationAddress.TryParse(ReplyTo, out PublicationAddress result);
+ PublicationAddress.TryParse(ReplyTo, out PublicationAddress? result);
return result;
}
}
@@ -109,19 +109,30 @@ public ReadOnlyBasicProperties(ReadOnlySpan span)
if (bits.IsBitSet(BasicProperties.ClusterIdBit)) { WireFormatting.ReadShortstr(span.Slice(offset), out _clusterId); }
}
+ [MemberNotNullWhen(true, nameof(ContentType))]
public bool IsContentTypePresent() => ContentType != default;
+ [MemberNotNullWhen(true, nameof(ContentEncoding))]
public bool IsContentEncodingPresent() => ContentEncoding != default;
+ [MemberNotNullWhen(true, nameof(Headers))]
public bool IsHeadersPresent() => Headers != default;
public bool IsDeliveryModePresent() => DeliveryMode != default;
public bool IsPriorityPresent() => Priority != default;
+ [MemberNotNullWhen(true, nameof(CorrelationId))]
public bool IsCorrelationIdPresent() => CorrelationId != default;
+ [MemberNotNullWhen(true, nameof(ReplyTo))]
public bool IsReplyToPresent() => ReplyTo != default;
+ [MemberNotNullWhen(true, nameof(Expiration))]
public bool IsExpirationPresent() => Expiration != default;
+ [MemberNotNullWhen(true, nameof(MessageId))]
public bool IsMessageIdPresent() => MessageId != default;
public bool IsTimestampPresent() => Timestamp != default;
+ [MemberNotNullWhen(true, nameof(Type))]
public bool IsTypePresent() => Type != default;
+ [MemberNotNullWhen(true, nameof(UserId))]
public bool IsUserIdPresent() => UserId != default;
+ [MemberNotNullWhen(true, nameof(AppId))]
public bool IsAppIdPresent() => AppId != default;
+ [MemberNotNullWhen(true, nameof(ClusterId))]
public bool IsClusterIdPresent() => ClusterId != default;
}
}
diff --git a/projects/RabbitMQ.Client/client/api/ShutdownEventArgs.cs b/projects/RabbitMQ.Client/client/api/ShutdownEventArgs.cs
index f55686ecfa..2d940ebb64 100644
--- a/projects/RabbitMQ.Client/client/api/ShutdownEventArgs.cs
+++ b/projects/RabbitMQ.Client/client/api/ShutdownEventArgs.cs
@@ -41,14 +41,14 @@ namespace RabbitMQ.Client
///
public class ShutdownEventArgs : EventArgs
{
- private readonly Exception _exception;
+ private readonly Exception? _exception;
///
/// Construct a with the given parameters and
/// 0 for and .
///
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText,
- object cause = null)
+ object? cause = null)
: this(initiator, replyCode, replyText, 0, 0, cause)
{
}
@@ -57,7 +57,7 @@ public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string r
/// Construct a with the given parameters.
///
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText,
- ushort classId, ushort methodId, object cause = null)
+ ushort classId, ushort methodId, object? cause = null)
{
Initiator = initiator;
ReplyCode = replyCode;
@@ -79,7 +79,7 @@ public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string r
///
/// Exception causing the shutdown, or null if none.
///
- public Exception Exception
+ public Exception? Exception
{
get
{
@@ -90,7 +90,7 @@ public Exception Exception
///
/// Object causing the shutdown, or null if none.
///
- public object Cause { get; }
+ public object? Cause { get; }
///
/// AMQP content-class ID, or 0 if none.
@@ -142,7 +142,7 @@ private string GetMessageCore()
{
return $"AMQP close-reason, initiated by {Initiator}"
+ $", code={ReplyCode}"
- + (ReplyText != null ? $", text='{ReplyText}'" : string.Empty)
+ + $", text='{ReplyText}'"
+ $", classId={ClassId}"
+ $", methodId={MethodId}"
+ (Cause != null ? $", cause={Cause}" : string.Empty);
diff --git a/projects/RabbitMQ.Client/client/api/ShutdownReportEntry.cs b/projects/RabbitMQ.Client/client/api/ShutdownReportEntry.cs
index 09df9c29c0..104286a793 100644
--- a/projects/RabbitMQ.Client/client/api/ShutdownReportEntry.cs
+++ b/projects/RabbitMQ.Client/client/api/ShutdownReportEntry.cs
@@ -39,7 +39,7 @@ namespace RabbitMQ.Client
///
public class ShutdownReportEntry
{
- public ShutdownReportEntry(string description, Exception exception)
+ public ShutdownReportEntry(string description, Exception? exception)
{
Description = description;
Exception = exception;
@@ -53,12 +53,12 @@ public ShutdownReportEntry(string description, Exception exception)
///
/// object that occurred during shutdown, or null if unspecified.
///
- public Exception Exception { get; set; }
+ public Exception? Exception { get; set; }
public override string ToString()
{
string description = $"Message: {Description}";
- return (Exception != null) ? $"{description} Exception: {Exception}" : description;
+ return Exception != null ? $"{description} Exception: {Exception}" : description;
}
}
}
diff --git a/projects/RabbitMQ.Client/client/api/SslOption.cs b/projects/RabbitMQ.Client/client/api/SslOption.cs
index dae675be2b..682c00ba83 100644
--- a/projects/RabbitMQ.Client/client/api/SslOption.cs
+++ b/projects/RabbitMQ.Client/client/api/SslOption.cs
@@ -44,7 +44,7 @@ namespace RabbitMQ.Client
///
public class SslOption
{
- private X509CertificateCollection _certificateCollection;
+ private X509CertificateCollection? _certificateCollection;
///
/// Constructs an specifying both the server canonical name and the client's certificate path.
@@ -76,7 +76,7 @@ public SslOption()
///
/// Retrieve or set the client certificate passphrase.
///
- public string CertPassphrase { get; set; }
+ public string? CertPassphrase { get; set; }
///
/// Retrieve or set the path to client certificate.
@@ -87,20 +87,20 @@ public SslOption()
/// An optional client TLS certificate selection callback. If this is not specified,
/// the first valid certificate found will be used.
///
- public LocalCertificateSelectionCallback CertificateSelectionCallback { get; set; }
+ public LocalCertificateSelectionCallback? CertificateSelectionCallback { get; set; }
///
/// An optional peer verification (TLS certificate validation) callback. If this is not specified,
/// the default callback will be used in conjunction with the property to
/// determine if the peer's (server's) certificate should be considered valid (acceptable).
///
- public RemoteCertificateValidationCallback CertificateValidationCallback { get; set; }
+ public RemoteCertificateValidationCallback? CertificateValidationCallback { get; set; }
///
/// Retrieve or set the X509CertificateCollection containing the client certificate.
/// If no collection is set, the client will attempt to load one from the specified .
///
- public X509CertificateCollection Certs
+ public X509CertificateCollection? Certs
{
get
{
diff --git a/projects/RabbitMQ.Client/client/api/TcpClientAdapter.cs b/projects/RabbitMQ.Client/client/api/TcpClientAdapter.cs
index 8db8fa639b..4a36b4ad8e 100644
--- a/projects/RabbitMQ.Client/client/api/TcpClientAdapter.cs
+++ b/projects/RabbitMQ.Client/client/api/TcpClientAdapter.cs
@@ -13,7 +13,7 @@ namespace RabbitMQ.Client
///
public class TcpClientAdapter : ITcpClient
{
- private Socket _sock;
+ private readonly Socket _sock;
public TcpClientAdapter(Socket socket)
{
@@ -23,7 +23,6 @@ public TcpClientAdapter(Socket socket)
#if NET6_0_OR_GREATER
public virtual Task ConnectAsync(IPAddress ep, int port, CancellationToken cancellationToken = default)
{
- AssertSocket();
return _sock.ConnectAsync(ep, port, cancellationToken).AsTask();
}
#else
@@ -36,7 +35,6 @@ public virtual Task ConnectAsync(IPAddress ep, int port, CancellationToken cance
public virtual void Close()
{
_sock.Dispose();
- _sock = null;
}
public void Dispose()
@@ -54,7 +52,6 @@ protected virtual void Dispose(bool disposing)
public virtual NetworkStream GetStream()
{
- AssertSocket();
return new NetworkStream(_sock);
}
@@ -70,10 +67,6 @@ public virtual bool Connected
{
get
{
- if (_sock is null)
- {
- return false;
- }
return _sock.Connected;
}
}
@@ -82,27 +75,17 @@ public virtual TimeSpan ReceiveTimeout
{
get
{
- AssertSocket();
return TimeSpan.FromMilliseconds(_sock.ReceiveTimeout);
}
set
{
- AssertSocket();
_sock.ReceiveTimeout = (int)value.TotalMilliseconds;
}
}
- private void AssertSocket()
+ public static IPAddress? GetMatchingHost(IReadOnlyCollection addresses, AddressFamily addressFamily)
{
- if (_sock is null)
- {
- throw new InvalidOperationException("Cannot perform operation as socket is null");
- }
- }
-
- public static IPAddress GetMatchingHost(IReadOnlyCollection addresses, AddressFamily addressFamily)
- {
- IPAddress ep = addresses.FirstOrDefault(a => a.AddressFamily == addressFamily);
+ IPAddress? ep = addresses.FirstOrDefault(a => a.AddressFamily == addressFamily);
if (ep is null && addresses.Count == 1 && addressFamily == AddressFamily.Unspecified)
{
return addresses.Single();
diff --git a/projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs b/projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs
index 4efc3c54e8..7dac1f4c0c 100644
--- a/projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs
+++ b/projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs
@@ -13,14 +13,14 @@ public class TopologyRecoveryExceptionHandler
private static readonly Func s_defaultBindingExceptionCondition = (b, ex) => true;
private static readonly Func s_defaultConsumerExceptionCondition = (c, ex) => true;
- private Func _exchangeRecoveryExceptionCondition;
- private Func _queueRecoveryExceptionCondition;
- private Func _bindingRecoveryExceptionCondition;
- private Func _consumerRecoveryExceptionCondition;
- private Func _exchangeRecoveryExceptionHandlerAsync;
- private Func _queueRecoveryExceptionHandlerAsync;
- private Func _bindingRecoveryExceptionHandlerAsync;
- private Func _consumerRecoveryExceptionHandlerAsync;
+ private Func? _exchangeRecoveryExceptionCondition;
+ private Func? _queueRecoveryExceptionCondition;
+ private Func? _bindingRecoveryExceptionCondition;
+ private Func? _consumerRecoveryExceptionCondition;
+ private Func? _exchangeRecoveryExceptionHandlerAsync;
+ private Func? _queueRecoveryExceptionHandlerAsync;
+ private Func? _bindingRecoveryExceptionHandlerAsync;
+ private Func? _consumerRecoveryExceptionHandlerAsync;
///
/// Decides which exchange recovery exceptions the custom exception handler is applied to.
@@ -33,7 +33,9 @@ public Func ExchangeRecoveryExceptionConditi
set
{
if (_exchangeRecoveryExceptionCondition != null)
+ {
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionCondition)} after it has been initialized.");
+ }
_exchangeRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionCondition));
}
@@ -50,7 +52,9 @@ public Func QueueRecoveryExceptionCondition
set
{
if (_queueRecoveryExceptionCondition != null)
+ {
throw new InvalidOperationException($"Cannot modify {nameof(QueueRecoveryExceptionCondition)} after it has been initialized.");
+ }
_queueRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(QueueRecoveryExceptionCondition));
}
@@ -67,9 +71,11 @@ public Func BindingRecoveryExceptionCondition
set
{
if (_bindingRecoveryExceptionCondition != null)
- throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionCondition)} after it has been initialized.");
+ {
+ throw new InvalidOperationException($"Cannot modify {nameof(BindingRecoveryExceptionCondition)} after it has been initialized.");
+ }
- _bindingRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionCondition));
+ _bindingRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(BindingRecoveryExceptionCondition));
}
}
@@ -84,7 +90,9 @@ public Func ConsumerRecoveryExceptionConditi
set
{
if (_consumerRecoveryExceptionCondition != null)
+ {
throw new InvalidOperationException($"Cannot modify {nameof(ConsumerRecoveryExceptionCondition)} after it has been initialized.");
+ }
_consumerRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ConsumerRecoveryExceptionCondition));
}
@@ -93,7 +101,7 @@ public Func ConsumerRecoveryExceptionConditi
///
/// Retries, or otherwise handles, an exception thrown when attempting to recover an exchange.
///
- public Func ExchangeRecoveryExceptionHandlerAsync
+ public Func? ExchangeRecoveryExceptionHandlerAsync
{
get => _exchangeRecoveryExceptionHandlerAsync;
@@ -111,7 +119,7 @@ public Func ExchangeRecoveryExc
///
/// Retries, or otherwise handles, an exception thrown when attempting to recover a queue.
///
- public Func QueueRecoveryExceptionHandlerAsync
+ public Func? QueueRecoveryExceptionHandlerAsync
{
get => _queueRecoveryExceptionHandlerAsync;
@@ -129,7 +137,7 @@ public Func QueueRecoveryException
///
/// Retries, or otherwise handles, an exception thrown when attempting to recover a binding.
///
- public Func BindingRecoveryExceptionHandlerAsync
+ public Func? BindingRecoveryExceptionHandlerAsync
{
get => _bindingRecoveryExceptionHandlerAsync;
@@ -147,7 +155,7 @@ public Func BindingRecoveryExcep
///
/// Retries, or otherwise handles, an exception thrown when attempting to recover a consumer.
///
- public Func ConsumerRecoveryExceptionHandlerAsync
+ public Func? ConsumerRecoveryExceptionHandlerAsync
{
get => _consumerRecoveryExceptionHandlerAsync;
diff --git a/projects/RabbitMQ.Client/client/api/TopologyRecoveryFilter.cs b/projects/RabbitMQ.Client/client/api/TopologyRecoveryFilter.cs
index d95430f755..969776070e 100644
--- a/projects/RabbitMQ.Client/client/api/TopologyRecoveryFilter.cs
+++ b/projects/RabbitMQ.Client/client/api/TopologyRecoveryFilter.cs
@@ -13,10 +13,10 @@ public class TopologyRecoveryFilter
private static readonly Func s_defaultBindingFilter = binding => true;
private static readonly Func s_defaultConsumerFilter = consumer => true;
- private Func _exchangeFilter;
- private Func _queueFilter;
- private Func _bindingFilter;
- private Func _consumerFilter;
+ private Func? _exchangeFilter;
+ private Func? _queueFilter;
+ private Func? _bindingFilter;
+ private Func? _consumerFilter;
///
/// Decides whether an exchange is recovered or not.
diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs
index 60a720e5be..e88a99435b 100644
--- a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs
+++ b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs
@@ -98,7 +98,7 @@ await _shutdownWrapper.InvokeAsync(this, reason)
private async Task BasicDeliverWrapper(BasicDeliverEventArgs eventArgs)
{
- using (Activity activity = RabbitMQActivitySource.Deliver(eventArgs))
+ using (Activity? activity = RabbitMQActivitySource.Deliver(eventArgs))
{
await _receivedWrapper.InvokeAsync(this, eventArgs).ConfigureAwait(false);
}
diff --git a/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs
index cfd906db73..3d2009a84c 100644
--- a/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs
+++ b/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs
@@ -53,16 +53,16 @@ public EventingBasicConsumer(IChannel channel) : base(channel)
/// Accessing the body at a later point is unsafe as its memory can
/// be already released.
///
- public event EventHandler Received;
+ public event EventHandler? Received;
///Fires when the server confirms successful consumer registration.
- public event EventHandler Registered;
+ public event EventHandler? Registered;
///Fires on channel (channel) shutdown, both client and server initiated.
- public event EventHandler Shutdown;
+ public event EventHandler? Shutdown;
///Fires when the server confirms successful consumer cancellation.
- public event EventHandler Unregistered;
+ public event EventHandler? Unregistered;
///Fires when the server confirms successful consumer cancellation.
public override void HandleBasicCancelOk(string consumerTag)
@@ -90,7 +90,7 @@ public override async Task HandleBasicDeliverAsync(string consumerTag, ulong del
IReadOnlyBasicProperties properties, ReadOnlyMemory body)
{
BasicDeliverEventArgs eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
- using (Activity activity = RabbitMQActivitySource.SubscriberHasListeners ? RabbitMQActivitySource.Deliver(eventArgs) : default)
+ using (Activity? activity = RabbitMQActivitySource.SubscriberHasListeners ? RabbitMQActivitySource.Deliver(eventArgs) : default)
{
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)
.ConfigureAwait(false);
diff --git a/projects/RabbitMQ.Client/client/events/RecoveringConsumerEventArgs.cs b/projects/RabbitMQ.Client/client/events/RecoveringConsumerEventArgs.cs
index 16d11cf5e0..7da24d90cf 100644
--- a/projects/RabbitMQ.Client/client/events/RecoveringConsumerEventArgs.cs
+++ b/projects/RabbitMQ.Client/client/events/RecoveringConsumerEventArgs.cs
@@ -44,7 +44,7 @@ public class RecoveringConsumerEventArgs
///
/// Consumer arguments of the consumer for this event
/// Consumer tag of the consumer for this event
- public RecoveringConsumerEventArgs(string consumerTag, IDictionary consumerArguments)
+ public RecoveringConsumerEventArgs(string consumerTag, IDictionary? consumerArguments)
{
ConsumerTag = consumerTag;
ConsumerArguments = consumerArguments;
@@ -58,6 +58,6 @@ public RecoveringConsumerEventArgs(string consumerTag, IDictionary
/// Access the consumer tag of the consumer this event relates to.
///
- public IDictionary ConsumerArguments { get; }
+ public IDictionary? ConsumerArguments { get; }
}
}
diff --git a/projects/RabbitMQ.Client/client/exceptions/OperationInterruptedException.cs b/projects/RabbitMQ.Client/client/exceptions/OperationInterruptedException.cs
index 83a2528afc..8877b50cfa 100644
--- a/projects/RabbitMQ.Client/client/exceptions/OperationInterruptedException.cs
+++ b/projects/RabbitMQ.Client/client/exceptions/OperationInterruptedException.cs
@@ -46,7 +46,7 @@ public class OperationInterruptedException
{
///Construct an OperationInterruptedException with
///the passed-in explanation, if any.
- public OperationInterruptedException(ShutdownEventArgs reason)
+ public OperationInterruptedException(ShutdownEventArgs? reason)
: base(reason is null ? "The AMQP operation was interrupted" :
$"The AMQP operation was interrupted: {reason}")
{
@@ -55,7 +55,7 @@ public OperationInterruptedException(ShutdownEventArgs reason)
///Construct an OperationInterruptedException with
///the passed-in explanation and prefix, if any.
- public OperationInterruptedException(ShutdownEventArgs reason, string prefix)
+ public OperationInterruptedException(ShutdownEventArgs? reason, string prefix)
: base(reason is null ? $"{prefix}: The AMQP operation was interrupted" :
$"{prefix}: The AMQP operation was interrupted: {reason}")
{
@@ -77,6 +77,6 @@ protected OperationInterruptedException(string message, Exception inner)
///Retrieves the explanation for the shutdown. May
///return null if no explanation is available.
- public ShutdownEventArgs ShutdownReason { get; protected set; }
+ public ShutdownEventArgs? ShutdownReason { get; protected set; }
}
}
diff --git a/projects/RabbitMQ.Client/client/exceptions/WireFormattingException.cs b/projects/RabbitMQ.Client/client/exceptions/WireFormattingException.cs
index 229805624b..9a9b3dfd00 100644
--- a/projects/RabbitMQ.Client/client/exceptions/WireFormattingException.cs
+++ b/projects/RabbitMQ.Client/client/exceptions/WireFormattingException.cs
@@ -46,7 +46,7 @@ public WireFormattingException(string message) : this(message, null)
///Construct a WireFormattingException with the given
///offender
- public WireFormattingException(string message, object offender)
+ public WireFormattingException(string message, object? offender)
: base(message)
{
Offender = offender;
@@ -54,6 +54,6 @@ public WireFormattingException(string message, object offender)
///Object which this exception is complaining about;
///may be null if no particular offender exists
- public object Offender { get; }
+ public object? Offender { get; }
}
}
diff --git a/projects/RabbitMQ.Client/client/framing/BasicConsume.cs b/projects/RabbitMQ.Client/client/framing/BasicConsume.cs
index b44b6ec25e..78da4b86fa 100644
--- a/projects/RabbitMQ.Client/client/framing/BasicConsume.cs
+++ b/projects/RabbitMQ.Client/client/framing/BasicConsume.cs
@@ -47,9 +47,9 @@ namespace RabbitMQ.Client.Framing.Impl
public readonly bool _noAck;
public readonly bool _exclusive;
public readonly bool _nowait;
- public readonly IDictionary _arguments;
+ public readonly IDictionary? _arguments;
- public BasicConsume(string Queue, string ConsumerTag, bool NoLocal, bool NoAck, bool Exclusive, bool Nowait, IDictionary Arguments)
+ public BasicConsume(string Queue, string ConsumerTag, bool NoLocal, bool NoAck, bool Exclusive, bool Nowait, IDictionary? Arguments)
{
_queue = Queue;
_consumerTag = ConsumerTag;
diff --git a/projects/RabbitMQ.Client/client/framing/ConnectionStart.cs b/projects/RabbitMQ.Client/client/framing/ConnectionStart.cs
index 4b11e146df..acfae7f144 100644
--- a/projects/RabbitMQ.Client/client/framing/ConnectionStart.cs
+++ b/projects/RabbitMQ.Client/client/framing/ConnectionStart.cs
@@ -41,7 +41,7 @@ namespace RabbitMQ.Client.Framing.Impl
{
public readonly byte _versionMajor;
public readonly byte _versionMinor;
- public readonly Dictionary _serverProperties;
+ public readonly Dictionary? _serverProperties;
public readonly byte[] _mechanisms;
public readonly byte[] _locales;
diff --git a/projects/RabbitMQ.Client/client/framing/ConnectionStartOk.cs b/projects/RabbitMQ.Client/client/framing/ConnectionStartOk.cs
index 67a95f245e..e2e2bc2cb4 100644
--- a/projects/RabbitMQ.Client/client/framing/ConnectionStartOk.cs
+++ b/projects/RabbitMQ.Client/client/framing/ConnectionStartOk.cs
@@ -39,12 +39,12 @@ namespace RabbitMQ.Client.Framing.Impl
{
internal readonly struct ConnectionStartOk : IOutgoingAmqpMethod
{
- public readonly IDictionary _clientProperties;
+ public readonly IDictionary? _clientProperties;
public readonly string _mechanism;
public readonly byte[] _response;
public readonly string _locale;
- public ConnectionStartOk(IDictionary ClientProperties, string Mechanism, byte[] Response, string Locale)
+ public ConnectionStartOk(IDictionary? ClientProperties, string Mechanism, byte[] Response, string Locale)
{
_clientProperties = ClientProperties;
_mechanism = Mechanism;
diff --git a/projects/RabbitMQ.Client/client/framing/ExchangeBind.cs b/projects/RabbitMQ.Client/client/framing/ExchangeBind.cs
index 908fd6a627..5d0625dccf 100644
--- a/projects/RabbitMQ.Client/client/framing/ExchangeBind.cs
+++ b/projects/RabbitMQ.Client/client/framing/ExchangeBind.cs
@@ -45,9 +45,9 @@ namespace RabbitMQ.Client.Framing.Impl
public readonly string _source;
public readonly string _routingKey;
public readonly bool _nowait;
- public readonly IDictionary _arguments;
+ public readonly IDictionary? _arguments;
- public ExchangeBind(string Destination, string Source, string RoutingKey, bool Nowait, IDictionary Arguments)
+ public ExchangeBind(string Destination, string Source, string RoutingKey, bool Nowait, IDictionary? Arguments)
{
_destination = Destination;
_source = Source;
diff --git a/projects/RabbitMQ.Client/client/framing/ExchangeDeclare.cs b/projects/RabbitMQ.Client/client/framing/ExchangeDeclare.cs
index caea19aad3..d99dbc994a 100644
--- a/projects/RabbitMQ.Client/client/framing/ExchangeDeclare.cs
+++ b/projects/RabbitMQ.Client/client/framing/ExchangeDeclare.cs
@@ -48,9 +48,9 @@ namespace RabbitMQ.Client.Framing.Impl
public readonly bool _autoDelete;
public readonly bool _internal;
public readonly bool _nowait;
- public readonly IDictionary _arguments;
+ public readonly IDictionary? _arguments;
- public ExchangeDeclare(string Exchange, string Type, bool Passive, bool Durable, bool AutoDelete, bool Internal, bool Nowait, IDictionary Arguments)
+ public ExchangeDeclare(string Exchange, string Type, bool Passive, bool Durable, bool AutoDelete, bool Internal, bool Nowait, IDictionary? Arguments)
{
_exchange = Exchange;
_type = Type;
diff --git a/projects/RabbitMQ.Client/client/framing/ExchangeUnbind.cs b/projects/RabbitMQ.Client/client/framing/ExchangeUnbind.cs
index f9af0857fc..2479f33faa 100644
--- a/projects/RabbitMQ.Client/client/framing/ExchangeUnbind.cs
+++ b/projects/RabbitMQ.Client/client/framing/ExchangeUnbind.cs
@@ -45,9 +45,9 @@ namespace RabbitMQ.Client.Framing.Impl
public readonly string _source;
public readonly string _routingKey;
public readonly bool _nowait;
- public readonly IDictionary _arguments;
+ public readonly IDictionary? _arguments;
- public ExchangeUnbind(string Destination, string Source, string RoutingKey, bool Nowait, IDictionary Arguments)
+ public ExchangeUnbind(string Destination, string Source, string RoutingKey, bool Nowait, IDictionary? Arguments)
{
_destination = Destination;
_source = Source;
diff --git a/projects/RabbitMQ.Client/client/framing/QueueBind.cs b/projects/RabbitMQ.Client/client/framing/QueueBind.cs
index 95a7b5a42e..a182f81ef7 100644
--- a/projects/RabbitMQ.Client/client/framing/QueueBind.cs
+++ b/projects/RabbitMQ.Client/client/framing/QueueBind.cs
@@ -45,9 +45,9 @@ namespace RabbitMQ.Client.Framing.Impl
public readonly string _exchange;
public readonly string _routingKey;
public readonly bool _nowait;
- public readonly IDictionary _arguments;
+ public readonly IDictionary? _arguments;
- public QueueBind(string Queue, string Exchange, string RoutingKey, bool Nowait, IDictionary Arguments)
+ public QueueBind(string Queue, string Exchange, string RoutingKey, bool Nowait, IDictionary? Arguments)
{
_queue = Queue;
_exchange = Exchange;
diff --git a/projects/RabbitMQ.Client/client/framing/QueueDeclare.cs b/projects/RabbitMQ.Client/client/framing/QueueDeclare.cs
index a9533f31d4..fc40d330d3 100644
--- a/projects/RabbitMQ.Client/client/framing/QueueDeclare.cs
+++ b/projects/RabbitMQ.Client/client/framing/QueueDeclare.cs
@@ -47,9 +47,9 @@ namespace RabbitMQ.Client.Framing.Impl
public readonly bool _exclusive;
public readonly bool _autoDelete;
public readonly bool _nowait;
- public readonly IDictionary _arguments;
+ public readonly IDictionary? _arguments;
- public QueueDeclare(string Queue, bool Passive, bool Durable, bool Exclusive, bool AutoDelete, bool Nowait, IDictionary Arguments)
+ public QueueDeclare(string Queue, bool Passive, bool Durable, bool Exclusive, bool AutoDelete, bool Nowait, IDictionary? Arguments)
{
_queue = Queue;
_passive = Passive;
diff --git a/projects/RabbitMQ.Client/client/framing/QueueUnbind.cs b/projects/RabbitMQ.Client/client/framing/QueueUnbind.cs
index 94262db1f2..17eb131008 100644
--- a/projects/RabbitMQ.Client/client/framing/QueueUnbind.cs
+++ b/projects/RabbitMQ.Client/client/framing/QueueUnbind.cs
@@ -44,9 +44,9 @@ namespace RabbitMQ.Client.Framing.Impl
public readonly string _queue;
public readonly string _exchange;
public readonly string _routingKey;
- public readonly IDictionary _arguments;
+ public readonly IDictionary? _arguments;
- public QueueUnbind(string Queue, string Exchange, string RoutingKey, IDictionary Arguments)
+ public QueueUnbind(string Queue, string Exchange, string RoutingKey, IDictionary? Arguments)
{
_queue = Queue;
_exchange = Exchange;
diff --git a/projects/RabbitMQ.Client/client/impl/AmqpVersion.cs b/projects/RabbitMQ.Client/client/impl/AmqpVersion.cs
index e5977ba07c..5297c5d362 100644
--- a/projects/RabbitMQ.Client/client/impl/AmqpVersion.cs
+++ b/projects/RabbitMQ.Client/client/impl/AmqpVersion.cs
@@ -82,7 +82,7 @@ public AmqpVersion(int major, int minor)
///
/// Implement value-equality comparison.
///
- public override bool Equals(object other)
+ public override bool Equals(object? other)
{
return other is AmqpVersion version && Equals(version);
}
diff --git a/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs
index 49e6c18aad..beecf229a1 100644
--- a/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs
+++ b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs
@@ -4,7 +4,6 @@
namespace RabbitMQ.Client.Impl
{
-#nullable enable
internal struct AsyncEventingWrapper
{
private event AsyncEventHandler? _event;
diff --git a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs
index 7f1e1a375c..447b8d9829 100644
--- a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs
+++ b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs
@@ -61,9 +61,9 @@ public AsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken canc
_continuationTimeoutCancellationTokenSource = new CancellationTokenSource(continuationTimeout);
#if NET6_0_OR_GREATER
- _continuationTimeoutCancellationTokenRegistration = _continuationTimeoutCancellationTokenSource.Token.UnsafeRegister((object state) =>
+ _continuationTimeoutCancellationTokenRegistration = _continuationTimeoutCancellationTokenSource.Token.UnsafeRegister((object? state) =>
{
- var tcs = (TaskCompletionSource)state;
+ var tcs = (TaskCompletionSource)state!;
if (tcs.TrySetCanceled())
{
// Cancellation was successful, does this mean we set a TimeoutException
@@ -150,15 +150,17 @@ public override Task HandleCommandAsync(IncomingCommand cmd)
if (cmd.CommandId == ProtocolCommandId.ConnectionSecure)
{
var secure = new ConnectionSecure(cmd.MethodSpan);
- _tcs.TrySetResult(new ConnectionSecureOrTune { m_challenge = secure._challenge });
+ _tcs.TrySetResult(new ConnectionSecureOrTune(secure._challenge, default));
}
else if (cmd.CommandId == ProtocolCommandId.ConnectionTune)
{
var tune = new ConnectionTune(cmd.MethodSpan);
- _tcs.TrySetResult(new ConnectionSecureOrTune
+ _tcs.TrySetResult(new ConnectionSecureOrTune(default, new ConnectionTuneDetails
{
- m_tuneDetails = new ConnectionTuneDetails { m_channelMax = tune._channelMax, m_frameMax = tune._frameMax, m_heartbeatInSeconds = tune._heartbeat }
- });
+ m_channelMax = tune._channelMax,
+ m_frameMax = tune._frameMax,
+ m_heartbeatInSeconds = tune._heartbeat
+ }));
}
else
{
@@ -280,7 +282,7 @@ await _consumerDispatcher.HandleBasicConsumeOkAsync(_consumer, method._consumerT
}
}
- internal class BasicGetAsyncRpcContinuation : AsyncRpcContinuation
+ internal class BasicGetAsyncRpcContinuation : AsyncRpcContinuation
{
private readonly Func _adjustDeliveryTag;
@@ -358,7 +360,7 @@ public override void HandleChannelShutdown(ShutdownEventArgs reason)
// Nothing to do here!
}
- public void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
+ public void OnConnectionShutdown(object? sender, ShutdownEventArgs reason)
{
_tcs.TrySetResult(true);
}
diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
index 050e42c48f..a822da6351 100644
--- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
+++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
@@ -128,9 +128,9 @@ public IEnumerable ConsumerTags
public int ChannelNumber => InnerChannel.ChannelNumber;
- public ShutdownEventArgs CloseReason => InnerChannel.CloseReason;
+ public ShutdownEventArgs? CloseReason => InnerChannel.CloseReason;
- public IBasicConsumer DefaultConsumer
+ public IBasicConsumer? DefaultConsumer
{
get => InnerChannel.DefaultConsumer;
set => InnerChannel.DefaultConsumer = value;
@@ -138,11 +138,11 @@ public IBasicConsumer DefaultConsumer
public bool IsClosed => !IsOpen;
- public bool IsOpen => _innerChannel != null && _innerChannel.IsOpen;
+ public bool IsOpen => !_disposed && _innerChannel.IsOpen;
public ulong NextPublishSeqNo => InnerChannel.NextPublishSeqNo;
- public string CurrentQueue => InnerChannel.CurrentQueue;
+ public string? CurrentQueue => InnerChannel.CurrentQueue;
internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers,
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
@@ -250,8 +250,6 @@ public void Dispose()
}
_recordedConsumerTags.Clear();
- _connection = null;
- _innerChannel = null;
_disposed = true;
}
@@ -274,7 +272,7 @@ await _innerChannel.BasicCancelAsync(consumerTag, noWait, cancellationToken)
}
public async Task BasicConsumeAsync(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive,
- IDictionary arguments, IBasicConsumer consumer,
+ IDictionary? arguments, IBasicConsumer consumer,
CancellationToken cancellationToken)
{
string resultConsumerTag = await InnerChannel.BasicConsumeAsync(queue, autoAck, consumerTag, noLocal,
@@ -288,7 +286,7 @@ await _connection.RecordConsumerAsync(rc, recordedEntitiesSemaphoreHeld: false)
return resultConsumerTag;
}
- public ValueTask BasicGetAsync(string queue, bool autoAck, CancellationToken cancellationToken)
+ public ValueTask BasicGetAsync(string queue, bool autoAck, CancellationToken cancellationToken)
=> InnerChannel.BasicGetAsync(queue, autoAck, cancellationToken);
public ValueTask BasicPublishAsync(string exchange, string routingKey, TProperties basicProperties,
@@ -328,7 +326,7 @@ await InnerChannel.ConfirmSelectAsync(cancellationToken)
}
public async Task ExchangeBindAsync(string destination, string source, string routingKey,
- IDictionary arguments, bool noWait,
+ IDictionary? arguments, bool noWait,
CancellationToken cancellationToken)
{
await InnerChannel.ExchangeBindAsync(destination, source, routingKey, arguments, noWait, cancellationToken)
@@ -342,7 +340,7 @@ public Task ExchangeDeclarePassiveAsync(string exchange, CancellationToken cance
=> InnerChannel.ExchangeDeclarePassiveAsync(exchange, cancellationToken);
public async Task ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete,
- IDictionary arguments, bool passive, bool noWait,
+ IDictionary? arguments, bool passive, bool noWait,
CancellationToken cancellationToken)
{
await InnerChannel.ExchangeDeclareAsync(exchange, type, durable, autoDelete, arguments, passive, noWait, cancellationToken)
@@ -365,7 +363,7 @@ await _connection.DeleteRecordedExchangeAsync(exchange, recordedEntitiesSemaphor
}
public async Task ExchangeUnbindAsync(string destination, string source, string routingKey,
- IDictionary arguments, bool noWait,
+ IDictionary? arguments, bool noWait,
CancellationToken cancellationToken)
{
ThrowIfDisposed();
@@ -379,7 +377,7 @@ await _connection.DeleteAutoDeleteExchangeAsync(source, recordedEntitiesSemaphor
}
public async Task QueueBindAsync(string queue, string exchange, string routingKey,
- IDictionary arguments, bool noWait,
+ IDictionary? arguments, bool noWait,
CancellationToken cancellationToken)
{
await InnerChannel.QueueBindAsync(queue, exchange, routingKey, arguments, noWait, cancellationToken)
@@ -397,7 +395,7 @@ public Task QueueDeclarePassiveAsync(string queue, CancellationT
}
public async Task QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete,
- IDictionary arguments, bool passive, bool noWait,
+ IDictionary? arguments, bool passive, bool noWait,
CancellationToken cancellationToken)
{
QueueDeclareOk result = await InnerChannel.QueueDeclareAsync(queue, durable, exclusive, autoDelete, arguments, passive, noWait, cancellationToken)
@@ -433,7 +431,7 @@ public Task QueuePurgeAsync(string queue, CancellationToken cancellationTo
=> InnerChannel.QueuePurgeAsync(queue, cancellationToken);
public async Task QueueUnbindAsync(string queue, string exchange, string routingKey,
- IDictionary arguments,
+ IDictionary? arguments,
CancellationToken cancellationToken)
{
ThrowIfDisposed();
diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs
index 301fdb8263..db5dffc909 100644
--- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs
+++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs
@@ -38,7 +38,6 @@
namespace RabbitMQ.Client.Framing.Impl
{
-#nullable enable
internal sealed partial class AutorecoveringConnection
{
private readonly SemaphoreSlim _recordedEntitiesSemaphore = new SemaphoreSlim(1, 1);
diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs
index 229023664e..e1d47ba877 100644
--- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs
+++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs
@@ -40,13 +40,12 @@
namespace RabbitMQ.Client.Framing.Impl
{
-#nullable enable
internal sealed partial class AutorecoveringConnection
{
private Task? _recoveryTask;
private readonly CancellationTokenSource _recoveryCancellationTokenSource = new CancellationTokenSource();
- private void HandleConnectionShutdown(object _, ShutdownEventArgs args)
+ private void HandleConnectionShutdown(object? _, ShutdownEventArgs args)
{
if (ShouldTriggerConnectionRecovery(args))
{
diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs
index 298e3066c2..0ae34e39af 100644
--- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs
+++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs
@@ -60,35 +60,32 @@ private Connection InnerConnection
}
}
- internal AutorecoveringConnection(ConnectionConfig config, IEndpointResolver endpoints)
+ internal AutorecoveringConnection(ConnectionConfig config, IEndpointResolver endpoints, Connection innerConnection)
{
_config = config;
_endpoints = endpoints;
- }
-
- internal async ValueTask OpenAsync(CancellationToken cancellationToken)
- {
- IFrameHandler fh = await _endpoints.SelectOneAsync(_config.FrameHandlerFactoryAsync, cancellationToken)
- .ConfigureAwait(false);
- CreateInnerConnection(fh);
- await _innerConnection.OpenAsync(cancellationToken)
- .ConfigureAwait(false);
- return this;
- }
-
- private void CreateInnerConnection(IFrameHandler frameHandler)
- {
- _innerConnection = new Connection(_config, frameHandler);
-
- void onException(Exception exception, string context) =>
- _innerConnection.OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
+ _innerConnection = innerConnection;
+ ConnectionShutdown += HandleConnectionShutdown;
_recoverySucceededWrapper = new EventingWrapper("OnConnectionRecovery", onException);
_connectionRecoveryErrorWrapper = new EventingWrapper("OnConnectionRecoveryError", onException);
_consumerTagChangeAfterRecoveryWrapper = new EventingWrapper("OnConsumerRecovery", onException);
_queueNameChangedAfterRecoveryWrapper = new EventingWrapper("OnQueueRecovery", onException);
- ConnectionShutdown += HandleConnectionShutdown;
+ void onException(Exception exception, string context) =>
+ _innerConnection.OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
+ }
+
+ internal static async ValueTask CreateAsync(ConnectionConfig config, IEndpointResolver endpoints,
+ CancellationToken cancellationToken)
+ {
+ IFrameHandler fh = await endpoints.SelectOneAsync(config.FrameHandlerFactoryAsync, cancellationToken)
+ .ConfigureAwait(false);
+ Connection innerConnection = new(config, fh);
+ AutorecoveringConnection connection = new(config, endpoints, innerConnection);
+ await innerConnection.OpenAsync(cancellationToken)
+ .ConfigureAwait(false);
+ return connection;
}
public event EventHandler RecoverySucceeded
@@ -150,13 +147,13 @@ public event EventHandler RecoveringConsumer
}
private EventingWrapper _consumerAboutToBeRecovered;
- public string ClientProvidedName => _config.ClientProvidedName;
+ public string? ClientProvidedName => _config.ClientProvidedName;
public ushort ChannelMax => InnerConnection.ChannelMax;
- public IDictionary ClientProperties => InnerConnection.ClientProperties;
+ public IDictionary ClientProperties => InnerConnection.ClientProperties;
- public ShutdownEventArgs CloseReason => InnerConnection.CloseReason;
+ public ShutdownEventArgs? CloseReason => InnerConnection.CloseReason;
public AmqpTcpEndpoint Endpoint => InnerConnection.Endpoint;
@@ -164,13 +161,13 @@ public event EventHandler RecoveringConsumer
public TimeSpan Heartbeat => InnerConnection.Heartbeat;
- public bool IsOpen => _innerConnection?.IsOpen ?? false;
+ public bool IsOpen => _innerConnection.IsOpen;
public int LocalPort => InnerConnection.LocalPort;
public int RemotePort => InnerConnection.RemotePort;
- public IDictionary ServerProperties => InnerConnection.ServerProperties;
+ public IDictionary? ServerProperties => InnerConnection.ServerProperties;
public IEnumerable ShutdownReport => InnerConnection.ShutdownReport;
@@ -182,8 +179,7 @@ public async ValueTask CreateNonRecoveringChannelAsync(Can
{
ISession session = InnerConnection.CreateSession();
var result = new RecoveryAwareChannel(_config, session);
- return await result.OpenAsync(cancellationToken)
- .ConfigureAwait(false) as RecoveryAwareChannel;
+ return (RecoveryAwareChannel)await result.OpenAsync(cancellationToken).ConfigureAwait(false);
}
public override string ToString()
@@ -274,7 +270,6 @@ public void Dispose()
finally
{
_channels.Clear();
- _innerConnection = null;
_recordedEntitiesSemaphore.Dispose();
_channelsSemaphore.Dispose();
_recoveryCancellationTokenSource.Dispose();
diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs
index 98297b4fe1..317e628814 100644
--- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs
+++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs
@@ -32,6 +32,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Runtime.CompilerServices;
using System.Text;
@@ -51,21 +52,21 @@ internal abstract class ChannelBase : IChannel, IRecoverable
{
///Only used to kick-start a connection open
///sequence. See
- internal TaskCompletionSource m_connectionStartCell;
- private Exception m_connectionStartException = null;
+ internal TaskCompletionSource? m_connectionStartCell;
+ private Exception? m_connectionStartException;
// AMQP only allows one RPC operation to be active at a time.
protected readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1);
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true);
- private SemaphoreSlim _confirmSemaphore;
+ private SemaphoreSlim? _confirmSemaphore;
private readonly LinkedList _pendingDeliveryTags = new LinkedList();
private bool _onlyAcksReceived = true;
- private ShutdownEventArgs _closeReason;
- public ShutdownEventArgs CloseReason => Volatile.Read(ref _closeReason);
+ private ShutdownEventArgs? _closeReason;
+ public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);
internal readonly IConsumerDispatcher ConsumerDispatcher;
@@ -173,7 +174,7 @@ internal void RunRecoveryEventHandlers(object sender)
public int ChannelNumber => ((Session)Session).ChannelNumber;
- public IBasicConsumer DefaultConsumer
+ public IBasicConsumer? DefaultConsumer
{
get => ConsumerDispatcher.DefaultConsumer;
set => ConsumerDispatcher.DefaultConsumer = value;
@@ -181,15 +182,16 @@ public IBasicConsumer DefaultConsumer
public bool IsClosed => !IsOpen;
+ [MemberNotNullWhen(false, nameof(CloseReason))]
public bool IsOpen => CloseReason is null;
public ulong NextPublishSeqNo { get; private set; }
- public string CurrentQueue { get; private set; }
+ public string? CurrentQueue { get; private set; }
public ISession Session { get; private set; }
- public Exception ConnectionStartException => m_connectionStartException;
+ public Exception? ConnectionStartException => m_connectionStartException;
public void MaybeSetConnectionStartException(Exception ex)
{
@@ -324,7 +326,7 @@ await ModelSendAsync(method, k.CancellationToken)
}
internal async ValueTask ConnectionStartOkAsync(
- IDictionary clientProperties,
+ IDictionary clientProperties,
string mechanism, byte[] response, string locale,
CancellationToken cancellationToken)
{
@@ -409,7 +411,7 @@ await ModelSendAsync(method, k.CancellationToken)
internal void FinishClose()
{
- ShutdownEventArgs reason = CloseReason;
+ ShutdownEventArgs? reason = CloseReason;
if (reason != null)
{
Session.Close(reason);
@@ -418,6 +420,7 @@ internal void FinishClose()
m_connectionStartCell?.TrySetResult(null);
}
+ [MemberNotNullWhen(true, nameof(_confirmSemaphore))]
private bool ConfirmsAreEnabled => _confirmSemaphore != null;
private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken)
@@ -513,7 +516,7 @@ private void OnChannelShutdown(ShutdownEventArgs reason)
*
* Aborted PR: https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1551
*/
- private void OnSessionShutdown(object sender, ShutdownEventArgs reason)
+ private void OnSessionShutdown(object? sender, ShutdownEventArgs reason)
{
ConsumerDispatcher.Quiesce();
SetCloseReason(reason);
@@ -521,6 +524,7 @@ private void OnSessionShutdown(object sender, ShutdownEventArgs reason)
ConsumerDispatcher.Shutdown(reason);
}
+ [MemberNotNull(nameof(_closeReason))]
internal bool SetCloseReason(ShutdownEventArgs reason)
{
if (reason is null)
@@ -533,7 +537,7 @@ internal bool SetCloseReason(ShutdownEventArgs reason)
}
public override string ToString()
- => Session.ToString();
+ => Session.ToString()!;
void IDisposable.Dispose()
{
@@ -614,7 +618,7 @@ protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack)
{
if (multiple)
{
- while (_pendingDeliveryTags.First.Value < deliveryTag)
+ while (_pendingDeliveryTags.First!.Value < deliveryTag)
{
_pendingDeliveryTags.RemoveFirst();
}
@@ -632,7 +636,7 @@ protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack)
_onlyAcksReceived = _onlyAcksReceived && !isNack;
- if (_pendingDeliveryTags.Count == 0 && _confirmsTaskCompletionSources.Count > 0)
+ if (_pendingDeliveryTags.Count == 0 && _confirmsTaskCompletionSources!.Count > 0)
{
// Done, mark tasks
foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources)
@@ -729,7 +733,7 @@ protected async Task HandleChannelCloseAsync(IncomingCommand cmd, Cancella
channelClose._classId,
channelClose._methodId));
- Session.Close(CloseReason, false);
+ Session.Close(_closeReason, false);
var method = new ChannelCloseOk();
await ModelSendAsync(method, cancellationToken)
@@ -754,7 +758,7 @@ protected async Task HandleChannelCloseOkAsync(IncomingCommand cmd, Cancel
*/
FinishClose();
- if (_continuationQueue.TryPeek(out ChannelCloseAsyncRpcContinuation k))
+ if (_continuationQueue.TryPeek(out ChannelCloseAsyncRpcContinuation? k))
{
_continuationQueue.Next();
await k.HandleCommandAsync(cmd)
@@ -827,7 +831,7 @@ protected async Task HandleConnectionCloseAsync(IncomingCommand cmd, Cance
await ModelSendAsync(replyMethod, cancellationToken)
.ConfigureAwait(false);
- SetCloseReason(Session.Connection.CloseReason);
+ SetCloseReason(Session.Connection.CloseReason!);
}
catch (IOException)
{
@@ -871,14 +875,8 @@ await Session.Connection.CloseAsync(reason, false,
else
{
var method = new ConnectionStart(cmd.MethodSpan);
- var details = new ConnectionStartDetails
- {
- m_versionMajor = method._versionMajor,
- m_versionMinor = method._versionMinor,
- m_serverProperties = method._serverProperties,
- m_mechanisms = method._mechanisms,
- m_locales = method._locales
- };
+ var details = new ConnectionStartDetails(method._locales, method._mechanisms,
+ method._serverProperties, method._versionMajor, method._versionMinor);
m_connectionStartCell.SetResult(details);
m_connectionStartCell = null;
}
@@ -970,7 +968,7 @@ await ModelSendAsync(method, k.CancellationToken)
}
public async Task BasicConsumeAsync(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive,
- IDictionary arguments, IBasicConsumer consumer,
+ IDictionary? arguments, IBasicConsumer consumer,
CancellationToken cancellationToken)
{
if (ConsumerDispatcher is AsyncConsumerDispatcher)
@@ -1017,7 +1015,7 @@ await ModelSendAsync(method, k.CancellationToken)
}
}
- public async ValueTask BasicGetAsync(string queue, bool autoAck,
+ public async ValueTask BasicGetAsync(string queue, bool autoAck,
CancellationToken cancellationToken)
{
bool enqueued = false;
@@ -1034,9 +1032,9 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
await ModelSendAsync(method, k.CancellationToken)
.ConfigureAwait(false);
- BasicGetResult result = await k;
+ BasicGetResult? result = await k;
- using Activity activity = result != null
+ using Activity? activity = result != null
? RabbitMQActivitySource.Receive(result.RoutingKey,
result.Exchange,
result.DeliveryTag, result.BasicProperties, result.Body.Length)
@@ -1078,15 +1076,23 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
try
{
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
- using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners
+ using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
: default;
if (sendActivity != null)
{
- BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity);
- await ModelSendAsync(in cmd, in props, body, cancellationToken)
- .ConfigureAwait(false);
+ BasicProperties? props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity);
+ if (props is null)
+ {
+ await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
+ .ConfigureAwait(false);
+ }
+ else
+ {
+ await ModelSendAsync(in cmd, in props, body, cancellationToken)
+ .ConfigureAwait(false);
+ }
}
else
{
@@ -1137,15 +1143,23 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
try
{
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
- using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners
+ using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
: default;
if (sendActivity != null)
{
- BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity);
- await ModelSendAsync(in cmd, in props, body, cancellationToken)
- .ConfigureAwait(false);
+ BasicProperties? props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity);
+ if (props is null)
+ {
+ await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
+ .ConfigureAwait(false);
+ }
+ else
+ {
+ await ModelSendAsync(in cmd, in props, body, cancellationToken)
+ .ConfigureAwait(false);
+ }
}
else
{
@@ -1287,7 +1301,7 @@ await ModelSendAsync(method, k.CancellationToken)
}
public async Task ExchangeBindAsync(string destination, string source, string routingKey,
- IDictionary arguments, bool noWait,
+ IDictionary? arguments, bool noWait,
CancellationToken cancellationToken)
{
bool enqueued = false;
@@ -1335,7 +1349,7 @@ public Task ExchangeDeclarePassiveAsync(string exchange, CancellationToken cance
}
public async Task ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete,
- IDictionary arguments, bool passive, bool noWait,
+ IDictionary? arguments, bool passive, bool noWait,
CancellationToken cancellationToken)
{
bool enqueued = false;
@@ -1415,7 +1429,7 @@ await ModelSendAsync(method, k.CancellationToken)
}
public async Task ExchangeUnbindAsync(string destination, string source, string routingKey,
- IDictionary arguments, bool noWait,
+ IDictionary? arguments, bool noWait,
CancellationToken cancellationToken)
{
bool enqueued = false;
@@ -1464,7 +1478,7 @@ public Task QueueDeclarePassiveAsync(string queue,
}
public async Task QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete,
- IDictionary arguments, bool passive, bool noWait,
+ IDictionary? arguments, bool passive, bool noWait,
CancellationToken cancellationToken)
{
if (true == noWait)
@@ -1529,7 +1543,7 @@ await ModelSendAsync(method, k.CancellationToken)
}
public async Task QueueBindAsync(string queue, string exchange, string routingKey,
- IDictionary arguments, bool noWait,
+ IDictionary? arguments, bool noWait,
CancellationToken cancellationToken)
{
bool enqueued = false;
@@ -1653,7 +1667,7 @@ await ModelSendAsync(method, k.CancellationToken)
}
public async Task QueueUnbindAsync(string queue, string exchange, string routingKey,
- IDictionary arguments,
+ IDictionary? arguments,
CancellationToken cancellationToken)
{
bool enqueued = false;
@@ -1770,7 +1784,7 @@ await ModelSendAsync(method, k.CancellationToken)
}
}
- private List> _confirmsTaskCompletionSources;
+ private List>? _confirmsTaskCompletionSources;
public async Task WaitForConfirmsAsync(CancellationToken cancellationToken = default)
{
@@ -1796,7 +1810,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
}
tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- _confirmsTaskCompletionSources.Add(tcs);
+ _confirmsTaskCompletionSources!.Add(tcs);
}
finally
{
@@ -1853,10 +1867,10 @@ private async Task WaitForConfirmsWithTokenAsync(TaskCompletionSource ((TaskCompletionSource)state).TrySetCanceled(), tcs);
+ state => ((TaskCompletionSource)state!).TrySetCanceled(), tcs);
#else
cancellationToken.Register(
- state => ((TaskCompletionSource)state).TrySetCanceled(),
+ state => ((TaskCompletionSource)state!).TrySetCanceled(),
state: tcs, useSynchronizationContext: false);
#endif
try
@@ -1874,7 +1888,7 @@ await tokenRegistration.DisposeAsync()
}
}
- private static BasicProperties PopulateActivityAndPropagateTraceId(TProperties basicProperties,
+ private static BasicProperties? PopulateActivityAndPropagateTraceId(TProperties basicProperties,
Activity sendActivity) where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
// This activity is marked as recorded, so let's propagate the trace and span ids.
@@ -1886,22 +1900,34 @@ private static BasicProperties PopulateActivityAndPropagateTraceId(
}
}
- BasicProperties props = default;
- if (basicProperties is BasicProperties properties)
+ IDictionary? headers = basicProperties.Headers;
+ if (headers is null)
{
- props = properties;
+ return AddHeaders(basicProperties, sendActivity);
}
- else if (basicProperties is EmptyBasicProperty)
- {
- props = new BasicProperties();
- }
-
- IDictionary headers = props.Headers ?? new Dictionary();
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
RabbitMQActivitySource.ContextInjector(sendActivity, headers);
- props.Headers = headers;
- return props;
+ return null;
+
+ static BasicProperties? AddHeaders(TProperties basicProperties, Activity sendActivity)
+ {
+ var headers = new Dictionary();
+
+ // Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
+ RabbitMQActivitySource.ContextInjector(sendActivity, headers);
+
+ switch (basicProperties)
+ {
+ case BasicProperties writableProperties:
+ writableProperties.Headers = headers;
+ return null;
+ case EmptyBasicProperty:
+ return new BasicProperties { Headers = headers };
+ default:
+ return new BasicProperties(basicProperties) { Headers = headers };
+ }
+ }
}
}
}
diff --git a/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs b/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
index 5dbc762514..47be525c64 100644
--- a/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
+++ b/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
@@ -39,7 +39,6 @@
namespace RabbitMQ.Client.Impl
{
-#nullable enable
internal sealed class CommandAssembler
{
private const int MaxArrayOfBytesSize = 2_147_483_591;
@@ -160,7 +159,7 @@ private void ParseBodyFrame(InboundFrame frame)
throw new MalformedFrameException($"Overlong content body received - {_remainingBodyByteCount} bytes remaining, {payloadLength} bytes received");
}
- if (_currentCommand.Body.RentedArray is null)
+ if (_currentCommand.Body.Memory.IsEmpty)
{
// check for single frame payload for an early exit
if (payloadLength == _remainingBodyByteCount)
diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs
index 086fd489e3..adb1ee8cc9 100644
--- a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs
+++ b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs
@@ -40,7 +40,6 @@
namespace RabbitMQ.Client.Framing.Impl
{
-#nullable enable
internal sealed partial class Connection
{
public Task UpdateSecretAsync(string newSecret, string reason,
@@ -73,7 +72,7 @@ internal void HandleConnectionUnblocked()
private async ValueTask StartAndTuneAsync(CancellationToken cancellationToken)
{
- var connectionStartCell = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var connectionStartCell = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
#if NET6_0_OR_GREATER
using CancellationTokenRegistration ctr = cancellationToken.UnsafeRegister((object? state) =>
@@ -99,8 +98,8 @@ private async ValueTask StartAndTuneAsync(CancellationToken cancellationToken)
await _frameHandler.SendProtocolHeaderAsync(cancellationToken)
.ConfigureAwait(false);
- Task csct = connectionStartCell.Task;
- ConnectionStartDetails connectionStart = await csct.ConfigureAwait(false);
+ Task csct = connectionStartCell.Task;
+ ConnectionStartDetails? connectionStart = await csct.ConfigureAwait(false);
if (connectionStart is null || csct.IsCanceled)
{
@@ -150,7 +149,7 @@ await FinishCloseAsync(cancellationToken)
if (res.m_challenge is null)
{
- connectionTune = res.m_tuneDetails;
+ connectionTune = res.m_tuneDetails.GetValueOrDefault();
tuned = true;
}
else
diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs b/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs
index 600e46c89c..e5588d39cd 100644
--- a/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs
+++ b/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs
@@ -35,7 +35,6 @@
namespace RabbitMQ.Client.Framing.Impl
{
-#nullable enable
internal sealed partial class Connection
{
private TimeSpan _heartbeat;
diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
index 9fd75db08f..c3a90f8ec4 100644
--- a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
+++ b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
@@ -38,7 +38,6 @@
namespace RabbitMQ.Client.Framing.Impl
{
-#nullable enable
internal sealed partial class Connection
{
private readonly CancellationTokenSource _mainLoopCts = new CancellationTokenSource();
@@ -208,14 +207,14 @@ private void HandleMainLoopException(ShutdownEventArgs reason)
string message = reason.GetLogMessage();
if (false == SetCloseReason(reason))
{
- LogCloseError($"unexpected main loop exception while closing: {message}", reason.Exception);
+ LogCloseError($"unexpected main loop exception while closing: {message}", reason.Exception!);
return;
}
- _channel0.MaybeSetConnectionStartException(reason.Exception);
+ _channel0.MaybeSetConnectionStartException(reason.Exception!);
OnShutdown(reason);
- LogCloseError($"unexpected connection closure: {message}", reason.Exception);
+ LogCloseError($"unexpected connection closure: {message}", reason.Exception!);
}
private async Task HardProtocolExceptionHandlerAsync(HardProtocolException hpe,
diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs
index ad1aeb859d..e50a85cb52 100644
--- a/projects/RabbitMQ.Client/client/impl/Connection.cs
+++ b/projects/RabbitMQ.Client/client/impl/Connection.cs
@@ -44,7 +44,6 @@
namespace RabbitMQ.Client.Framing.Impl
{
-#nullable enable
internal sealed partial class Connection : IConnection
{
private bool _disposed;
@@ -226,9 +225,6 @@ internal async ValueTask OpenAsync(CancellationToken cancellationTo
cancellationToken.ThrowIfCancellationRequested();
- await _frameHandler.ConnectAsync(cancellationToken)
- .ConfigureAwait(false);
-
// Note: this must happen *after* the frame handler is started
_mainLoopTask = Task.Run(MainLoop, cancellationToken);
@@ -427,9 +423,8 @@ private async Task FinishCloseAsync(CancellationToken cancellationToken)
_closed = true;
MaybeStopHeartbeatTimers();
- await _frameHandler.CloseAsync(cancellationToken)
- .ConfigureAwait(false);
- _channel0.SetCloseReason(CloseReason);
+ await _frameHandler.CloseAsync(cancellationToken).ConfigureAwait(false);
+ _channel0.SetCloseReason(CloseReason!);
_channel0.FinishClose();
RabbitMqClientEventSource.Log.ConnectionClosed();
}
diff --git a/projects/RabbitMQ.Client/client/impl/ConnectionSecureOrTune.cs b/projects/RabbitMQ.Client/client/impl/ConnectionSecureOrTune.cs
index daf85f6be0..505f430b0a 100644
--- a/projects/RabbitMQ.Client/client/impl/ConnectionSecureOrTune.cs
+++ b/projects/RabbitMQ.Client/client/impl/ConnectionSecureOrTune.cs
@@ -45,9 +45,15 @@ internal struct ConnectionTuneDetails
public ushort m_heartbeatInSeconds;
}
- internal class ConnectionSecureOrTune
+ internal sealed class ConnectionSecureOrTune
{
- public byte[] m_challenge;
- public ConnectionTuneDetails m_tuneDetails;
+ public byte[]? m_challenge;
+ public ConnectionTuneDetails? m_tuneDetails;
+
+ public ConnectionSecureOrTune(byte[]? challenge, ConnectionTuneDetails? tuneDetails)
+ {
+ m_challenge = challenge;
+ m_tuneDetails = tuneDetails;
+ }
}
}
diff --git a/projects/RabbitMQ.Client/client/impl/ConnectionStartDetails.cs b/projects/RabbitMQ.Client/client/impl/ConnectionStartDetails.cs
index 4e604d1415..91cf5b4a36 100644
--- a/projects/RabbitMQ.Client/client/impl/ConnectionStartDetails.cs
+++ b/projects/RabbitMQ.Client/client/impl/ConnectionStartDetails.cs
@@ -33,12 +33,21 @@
namespace RabbitMQ.Client.Impl
{
- internal class ConnectionStartDetails
+ internal sealed class ConnectionStartDetails
{
public byte[] m_locales;
public byte[] m_mechanisms;
- public IDictionary m_serverProperties;
+ public IDictionary? m_serverProperties;
public byte m_versionMajor;
public byte m_versionMinor;
+
+ public ConnectionStartDetails(byte[] locales, byte[] mechanisms, IDictionary? serverProperties, byte versionMajor, byte versionMinor)
+ {
+ m_locales = locales;
+ m_mechanisms = mechanisms;
+ m_serverProperties = serverProperties;
+ m_versionMajor = versionMajor;
+ m_versionMinor = versionMinor;
+ }
}
}
diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs
index b96ef1bbfc..7140a54b77 100644
--- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs
+++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs
@@ -5,7 +5,6 @@
namespace RabbitMQ.Client.ConsumerDispatching
{
-#nullable enable
internal sealed class AsyncConsumerDispatcher : ConsumerDispatcherChannelBase
{
internal AsyncConsumerDispatcher(ChannelBase channel, int concurrency)
@@ -28,16 +27,16 @@ protected override async Task ProcessChannelAsync()
Task task = work.WorkType switch
{
WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver(
- work.ConsumerTag, work.DeliveryTag, work.Redelivered,
- work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory),
+ work.ConsumerTag!, work.DeliveryTag, work.Redelivered,
+ work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory),
- WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag),
+ WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag!),
- WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag),
+ WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag!),
- WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag),
+ WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag!),
- WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason),
+ WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason!),
_ => Task.CompletedTask
};
diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs
index dd7ba9b081..3023fd0cc1 100644
--- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs
+++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs
@@ -5,7 +5,6 @@
namespace RabbitMQ.Client.ConsumerDispatching
{
-#nullable enable
internal sealed class ConsumerDispatcher : ConsumerDispatcherChannelBase
{
internal ConsumerDispatcher(ChannelBase channel, int concurrency)
@@ -26,13 +25,13 @@ protected override async Task ProcessChannelAsync()
try
{
IBasicConsumer consumer = work.Consumer;
- string? consumerTag = work.ConsumerTag;
+ string consumerTag = work.ConsumerTag!;
switch (work.WorkType)
{
case WorkType.Deliver:
await consumer.HandleBasicDeliverAsync(
consumerTag, work.DeliveryTag, work.Redelivered,
- work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory)
+ work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory)
.ConfigureAwait(false);
break;
case WorkType.Cancel:
@@ -45,7 +44,7 @@ await consumer.HandleBasicDeliverAsync(
consumer.HandleBasicConsumeOk(consumerTag);
break;
case WorkType.Shutdown:
- consumer.HandleChannelShutdown(_channel, work.Reason);
+ consumer.HandleChannelShutdown(_channel, work.Reason!);
break;
}
}
diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs
index 41960fb5e1..c5d455a914 100644
--- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs
+++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs
@@ -1,12 +1,10 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
-using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace RabbitMQ.Client.ConsumerDispatching
{
-#nullable enable
internal abstract class ConsumerDispatcherBase
{
private static readonly FallbackConsumer s_fallbackConsumer = new FallbackConsumer();
diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs
index 6d94336993..23e7c4b1d3 100644
--- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs
+++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs
@@ -8,7 +8,6 @@
namespace RabbitMQ.Client.ConsumerDispatching
{
-#nullable enable
internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase, IConsumerDispatcher
{
protected readonly ChannelBase _channel;
diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs
index c618df748e..17eb121cd7 100644
--- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs
+++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs
@@ -5,7 +5,6 @@
namespace RabbitMQ.Client.ConsumerDispatching
{
-#nullable enable
internal sealed class FallbackConsumer : IBasicConsumer, IAsyncBasicConsumer
{
public IChannel? Channel { get; } = null;
diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs
index 2b104622e9..cd87b5d5a6 100644
--- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs
+++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs
@@ -35,7 +35,6 @@
namespace RabbitMQ.Client.ConsumerDispatching
{
-#nullable enable
internal interface IConsumerDispatcher : IDisposable
{
IBasicConsumer? DefaultConsumer { get; set; }
diff --git a/projects/RabbitMQ.Client/client/impl/EmptyBasicProperty.cs b/projects/RabbitMQ.Client/client/impl/EmptyBasicProperty.cs
index 65ef3cb141..e037324a47 100644
--- a/projects/RabbitMQ.Client/client/impl/EmptyBasicProperty.cs
+++ b/projects/RabbitMQ.Client/client/impl/EmptyBasicProperty.cs
@@ -5,7 +5,6 @@
namespace RabbitMQ.Client.client.impl
{
-#nullable enable
internal sealed class EmptyBasicProperty : IReadOnlyBasicProperties, IAmqpHeader
{
internal static EmptyBasicProperty Empty => new EmptyBasicProperty();
diff --git a/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs b/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs
index f9a1a334cb..b541f7a309 100644
--- a/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs
+++ b/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs
@@ -2,7 +2,6 @@
namespace RabbitMQ.Client.Impl
{
-#nullable enable
internal struct EventingWrapper
{
private event EventHandler? _event;
diff --git a/projects/RabbitMQ.Client/client/impl/Frame.cs b/projects/RabbitMQ.Client/client/impl/Frame.cs
index 9ed3e3bded..64835fe27e 100644
--- a/projects/RabbitMQ.Client/client/impl/Frame.cs
+++ b/projects/RabbitMQ.Client/client/impl/Frame.cs
@@ -207,7 +207,6 @@ private static int GetBodyFrameCount(int maxPayloadBytes, int length)
}
}
-#nullable enable
internal sealed class InboundFrame
{
public FrameType Type { get; private set; }
diff --git a/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs
index 0f2b07ec48..f65e0ffdee 100644
--- a/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs
+++ b/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs
@@ -48,8 +48,6 @@ internal interface IFrameHandler
int RemotePort { get; }
- Task ConnectAsync(CancellationToken cancellationToken);
-
///Socket read timeout. System.Threading.Timeout.InfiniteTimeSpan signals "infinity".
TimeSpan ReadTimeout { set; }
diff --git a/projects/RabbitMQ.Client/client/impl/ISession.cs b/projects/RabbitMQ.Client/client/impl/ISession.cs
index ce29e81b63..5dd574b25a 100644
--- a/projects/RabbitMQ.Client/client/impl/ISession.cs
+++ b/projects/RabbitMQ.Client/client/impl/ISession.cs
@@ -48,12 +48,12 @@ internal interface ISession
///
/// Gets the close reason.
///
- ShutdownEventArgs CloseReason { get; }
+ ShutdownEventArgs? CloseReason { get; }
///
/// Single recipient - no need for multiple handlers to be informed of arriving commands.
///
- CommandReceivedAction CommandReceived { get; set; }
+ CommandReceivedAction? CommandReceived { get; set; }
///
/// Gets the connection.
diff --git a/projects/RabbitMQ.Client/client/impl/ProtocolBase.cs b/projects/RabbitMQ.Client/client/impl/ProtocolBase.cs
index 9d4890751d..4a67104f93 100644
--- a/projects/RabbitMQ.Client/client/impl/ProtocolBase.cs
+++ b/projects/RabbitMQ.Client/client/impl/ProtocolBase.cs
@@ -67,8 +67,18 @@ public AmqpVersion Version
internal abstract ProtocolCommandId DecodeCommandIdFrom(ReadOnlySpan span);
- public override bool Equals(object obj)
+ public override bool Equals(object? obj)
{
+ if (ReferenceEquals(null, obj))
+ {
+ return false;
+ }
+
+ if (ReferenceEquals(this, obj))
+ {
+ return true;
+ }
+
return GetType() == obj.GetType();
}
diff --git a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs
index fb65e048f5..e2ed18e142 100644
--- a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs
+++ b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs
@@ -39,7 +39,7 @@ public static class RabbitMQActivitySource
public const string PublisherSourceName = "RabbitMQ.Client.Publisher";
public const string SubscriberSourceName = "RabbitMQ.Client.Subscriber";
- public static Action> ContextInjector { get; set; } = DefaultContextInjector;
+ public static Action> ContextInjector { get; set; } = DefaultContextInjector;
public static Func ContextExtractor { get; set; } =
DefaultContextExtractor;
@@ -48,47 +48,48 @@ public static class RabbitMQActivitySource
internal static bool PublisherHasListeners => s_publisherSource.HasListeners();
internal static bool SubscriberHasListeners => s_subscriberSource.HasListeners();
- internal static readonly IEnumerable> CreationTags = new[]
+ internal static readonly IEnumerable> CreationTags = new[]
{
- new KeyValuePair(MessagingSystem, "rabbitmq"),
- new KeyValuePair(ProtocolName, "amqp"),
- new KeyValuePair(ProtocolVersion, "0.9.1")
+ new KeyValuePair(MessagingSystem, "rabbitmq"),
+ new KeyValuePair(ProtocolName, "amqp"),
+ new KeyValuePair(ProtocolVersion, "0.9.1")
};
- internal static Activity Send(string routingKey, string exchange, int bodySize,
+ internal static Activity? Send(string routingKey, string exchange, int bodySize,
ActivityContext linkedContext = default)
{
- if (s_publisherSource.HasListeners())
+ if (!s_publisherSource.HasListeners())
{
- Activity activity = linkedContext == default
- ? s_publisherSource.StartRabbitMQActivity(
- UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish",
- ActivityKind.Producer)
- : s_publisherSource.StartLinkedRabbitMQActivity(
- UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish",
- ActivityKind.Producer, linkedContext);
- if (activity?.IsAllDataRequested == true)
- {
- PopulateMessagingTags("publish", routingKey, exchange, 0, bodySize, activity);
- }
+ return null;
+ }
- return activity;
+ Activity? activity = linkedContext == default
+ ? s_publisherSource.StartRabbitMQActivity(
+ UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish",
+ ActivityKind.Producer)
+ : s_publisherSource.StartLinkedRabbitMQActivity(
+ UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish",
+ ActivityKind.Producer, linkedContext);
+ if (activity != null && activity.IsAllDataRequested)
+ {
+ PopulateMessagingTags("publish", routingKey, exchange, 0, bodySize, activity);
}
- return null;
+ return activity;
+
}
- internal static Activity ReceiveEmpty(string queue)
+ internal static Activity? ReceiveEmpty(string queue)
{
if (!s_subscriberSource.HasListeners())
{
return null;
}
- Activity activity = s_subscriberSource.StartRabbitMQActivity(
+ Activity? activity = s_subscriberSource.StartRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{queue} receive" : "receive",
ActivityKind.Consumer);
- if (activity.IsAllDataRequested)
+ if (activity != null && activity.IsAllDataRequested)
{
activity
.SetTag(MessagingOperation, "receive")
@@ -98,7 +99,7 @@ internal static Activity ReceiveEmpty(string queue)
return activity;
}
- internal static Activity Receive(string routingKey, string exchange, ulong deliveryTag,
+ internal static Activity? Receive(string routingKey, string exchange, ulong deliveryTag,
IReadOnlyBasicProperties readOnlyBasicProperties, int bodySize)
{
if (!s_subscriberSource.HasListeners())
@@ -107,10 +108,10 @@ internal static Activity Receive(string routingKey, string exchange, ulong deliv
}
// Extract the PropagationContext of the upstream parent from the message headers.
- Activity activity = s_subscriberSource.StartLinkedRabbitMQActivity(
+ Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} receive" : "receive", ActivityKind.Consumer,
ContextExtractor(readOnlyBasicProperties));
- if (activity.IsAllDataRequested)
+ if (activity != null && activity.IsAllDataRequested)
{
PopulateMessagingTags("receive", routingKey, exchange, deliveryTag, readOnlyBasicProperties,
bodySize, activity);
@@ -119,7 +120,7 @@ internal static Activity Receive(string routingKey, string exchange, ulong deliv
return activity;
}
- internal static Activity Deliver(BasicDeliverEventArgs deliverEventArgs)
+ internal static Activity? Deliver(BasicDeliverEventArgs deliverEventArgs)
{
if (!s_subscriberSource.HasListeners())
{
@@ -127,7 +128,7 @@ internal static Activity Deliver(BasicDeliverEventArgs deliverEventArgs)
}
// Extract the PropagationContext of the upstream parent from the message headers.
- Activity activity = s_subscriberSource.StartLinkedRabbitMQActivity(
+ Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{deliverEventArgs.RoutingKey} deliver" : "deliver",
ActivityKind.Consumer, ContextExtractor(deliverEventArgs.BasicProperties));
if (activity != null && activity.IsAllDataRequested)
@@ -138,25 +139,21 @@ internal static Activity Deliver(BasicDeliverEventArgs deliverEventArgs)
}
return activity;
-
}
- private static Activity StartRabbitMQActivity(this ActivitySource source, string name, ActivityKind kind,
+ private static Activity? StartRabbitMQActivity(this ActivitySource source, string name, ActivityKind kind,
ActivityContext parentContext = default)
{
- Activity activity = source
- .CreateActivity(name, kind, parentContext, idFormat: ActivityIdFormat.W3C, tags: CreationTags)?.Start();
- return activity;
+ return source.CreateActivity(name, kind, parentContext, idFormat: ActivityIdFormat.W3C, tags: CreationTags)?.Start();
}
- private static Activity StartLinkedRabbitMQActivity(this ActivitySource source, string name, ActivityKind kind,
+ private static Activity? StartLinkedRabbitMQActivity(this ActivitySource source, string name, ActivityKind kind,
ActivityContext linkedContext = default, ActivityContext parentContext = default)
{
- Activity activity = source.CreateActivity(name, kind, parentContext: parentContext,
+ return source.CreateActivity(name, kind, parentContext: parentContext,
links: new[] { new ActivityLink(linkedContext) }, idFormat: ActivityIdFormat.W3C,
tags: CreationTags)
?.Start();
- return activity;
}
private static void PopulateMessagingTags(string operation, string routingKey, string exchange,
@@ -190,7 +187,7 @@ private static void PopulateMessagingTags(string operation, string routingKey, s
}
}
- internal static void PopulateMessageEnvelopeSize(Activity activity, int size)
+ internal static void PopulateMessageEnvelopeSize(Activity? activity, int size)
{
if (activity != null && activity.IsAllDataRequested && PublisherHasListeners)
{
@@ -198,7 +195,7 @@ internal static void PopulateMessageEnvelopeSize(Activity activity, int size)
}
}
- internal static void SetNetworkTags(this Activity activity, IFrameHandler frameHandler)
+ internal static void SetNetworkTags(this Activity? activity, IFrameHandler frameHandler)
{
if (PublisherHasListeners && activity != null && activity.IsAllDataRequested)
{
@@ -247,9 +244,8 @@ internal static void SetNetworkTags(this Activity activity, IFrameHandler frameH
}
}
- private static void DefaultContextInjector(Activity sendActivity, IDictionary props)
+ private static void DefaultContextInjector(Activity sendActivity, IDictionary props)
{
- props ??= new Dictionary();
DistributedContextPropagator.Current.Inject(sendActivity, props, DefaultContextSetter);
}
@@ -276,11 +272,11 @@ private static ActivityContext DefaultContextExtractor(IReadOnlyBasicProperties
return default;
}
- DistributedContextPropagator.Current.ExtractTraceIdAndState(props.Headers, DefaultContextGetter, out string traceParent, out string traceState);
+ DistributedContextPropagator.Current.ExtractTraceIdAndState(props.Headers, DefaultContextGetter, out string? traceParent, out string? traceState);
return ActivityContext.TryParse(traceParent, traceState, out ActivityContext context) ? context : default;
}
- private static void DefaultContextSetter(object carrier, string name, string value)
+ private static void DefaultContextSetter(object? carrier, string name, string value)
{
if (!(carrier is IDictionary carrierDictionary))
{
@@ -291,11 +287,10 @@ private static void DefaultContextSetter(object carrier, string name, string val
carrierDictionary[name] = value;
}
- private static void DefaultContextGetter(object carrier, string name, out string value,
- out IEnumerable values)
+ private static void DefaultContextGetter(object? carrier, string name, out string? value, out IEnumerable? values)
{
if (carrier is IDictionary carrierDict &&
- carrierDict.TryGetValue(name, out object propsVal) && propsVal is byte[] bytes)
+ carrierDict.TryGetValue(name, out object? propsVal) && propsVal is byte[] bytes)
{
value = Encoding.UTF8.GetString(bytes);
values = default;
diff --git a/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs b/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs
index 71cd60f020..1b9ac5717a 100644
--- a/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs
+++ b/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs
@@ -36,21 +36,20 @@
namespace RabbitMQ.Client.Impl
{
-#nullable enable
internal readonly struct RecordedBinding : IEquatable, IRecordedBinding
{
private readonly bool _isQueueBinding;
private readonly string _destination;
private readonly string _source;
private readonly string _routingKey;
- private readonly IDictionary? _arguments;
+ private readonly IDictionary? _arguments;
public string Destination => _destination;
public string Source => _source;
public string RoutingKey => _routingKey;
- public IDictionary? Arguments => _arguments;
+ public IDictionary? Arguments => _arguments;
- public RecordedBinding(bool isQueueBinding, string destination, string source, string routingKey, IDictionary? arguments)
+ public RecordedBinding(bool isQueueBinding, string destination, string source, string routingKey, IDictionary? arguments)
{
_isQueueBinding = isQueueBinding;
_destination = destination;
@@ -64,7 +63,7 @@ public RecordedBinding(bool isQueueBinding, string destination, string source, s
}
else
{
- _arguments = new Dictionary(arguments);
+ _arguments = new Dictionary(arguments);
}
}
diff --git a/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs b/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs
index 2d7402cb5d..272b47ad16 100644
--- a/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs
+++ b/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs
@@ -35,7 +35,6 @@
namespace RabbitMQ.Client.Impl
{
-#nullable enable
internal readonly struct RecordedConsumer : IRecordedConsumer
{
private readonly AutorecoveringChannel _channel;
@@ -44,9 +43,9 @@ namespace RabbitMQ.Client.Impl
private readonly bool _autoAck;
private readonly string _consumerTag;
private readonly bool _exclusive;
- private readonly IDictionary? _arguments;
+ private readonly IDictionary? _arguments;
- public RecordedConsumer(AutorecoveringChannel channel, IBasicConsumer consumer, string consumerTag, string queue, bool autoAck, bool exclusive, IDictionary? arguments)
+ public RecordedConsumer(AutorecoveringChannel channel, IBasicConsumer consumer, string consumerTag, string queue, bool autoAck, bool exclusive, IDictionary? arguments)
{
if (channel == null)
{
@@ -68,7 +67,7 @@ public RecordedConsumer(AutorecoveringChannel channel, IBasicConsumer consumer,
{
if (queue == string.Empty)
{
- _queue = _channel.CurrentQueue;
+ _queue = _channel.CurrentQueue ?? string.Empty;
}
else
{
@@ -91,7 +90,7 @@ public RecordedConsumer(AutorecoveringChannel channel, IBasicConsumer consumer,
}
else
{
- _arguments = new Dictionary(arguments);
+ _arguments = new Dictionary(arguments);
}
}
@@ -101,7 +100,7 @@ public RecordedConsumer(AutorecoveringChannel channel, IBasicConsumer consumer,
public bool AutoAck => _autoAck;
public string ConsumerTag => _consumerTag;
public bool Exclusive => _exclusive;
- public IDictionary? Arguments => _arguments;
+ public IDictionary? Arguments => _arguments;
public static RecordedConsumer WithNewConsumerTag(string newTag, in RecordedConsumer old)
{
diff --git a/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs b/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs
index fd492a3b28..a70e178b8f 100644
--- a/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs
+++ b/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs
@@ -35,22 +35,21 @@
namespace RabbitMQ.Client.Impl
{
-#nullable enable
internal readonly struct RecordedExchange : IRecordedExchange
{
private readonly string _name;
private readonly string _type;
private readonly bool _durable;
private readonly bool _autoDelete;
- private readonly IDictionary? _arguments;
+ private readonly IDictionary? _arguments;
public string Name => _name;
public bool AutoDelete => _autoDelete;
public string Type => _type;
public bool Durable => _durable;
- public IDictionary? Arguments => _arguments;
+ public IDictionary? Arguments => _arguments;
- public RecordedExchange(string name, string type, bool durable, bool autoDelete, IDictionary? arguments)
+ public RecordedExchange(string name, string type, bool durable, bool autoDelete, IDictionary? arguments)
{
_name = name;
_type = type;
@@ -63,7 +62,7 @@ public RecordedExchange(string name, string type, bool durable, bool autoDelete,
}
else
{
- _arguments = new Dictionary(arguments);
+ _arguments = new Dictionary(arguments);
}
}
diff --git a/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs b/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs
index 959a12b488..c3b929c679 100644
--- a/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs
+++ b/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs
@@ -35,11 +35,10 @@
namespace RabbitMQ.Client.Impl
{
-#nullable enable
internal readonly struct RecordedQueue : IRecordedQueue
{
private readonly string _name;
- private readonly IDictionary? _arguments;
+ private readonly IDictionary? _arguments;
private readonly bool _durable;
private readonly bool _exclusive;
private readonly bool _autoDelete;
@@ -50,9 +49,9 @@ namespace RabbitMQ.Client.Impl
public bool IsServerNamed => _isServerNamed;
public bool Durable => _durable;
public bool Exclusive => _exclusive;
- public IDictionary? Arguments => _arguments;
+ public IDictionary? Arguments => _arguments;
- public RecordedQueue(string name, bool isServerNamed, bool durable, bool exclusive, bool autoDelete, IDictionary? arguments)
+ public RecordedQueue(string name, bool isServerNamed, bool durable, bool exclusive, bool autoDelete, IDictionary? arguments)
{
_name = name;
_isServerNamed = isServerNamed;
@@ -66,7 +65,7 @@ public RecordedQueue(string name, bool isServerNamed, bool durable, bool exclusi
}
else
{
- _arguments = new Dictionary(arguments);
+ _arguments = new Dictionary(arguments);
}
}
diff --git a/projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs b/projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs
index 680bdd43db..b59e2416f7 100644
--- a/projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs
+++ b/projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs
@@ -30,6 +30,7 @@
//---------------------------------------------------------------------------
using System;
+using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
@@ -80,7 +81,7 @@ public void Dispose()
public void Enqueue(IRpcContinuation k)
{
IRpcContinuation result = Interlocked.CompareExchange(ref _outstandingRpc, k, s_tmp);
- if (!(result is EmptyRpcContinuation))
+ if (result is not EmptyRpcContinuation)
{
throw new NotSupportedException($"Pipelining of requests forbidden (attempted: {k.GetType()}, enqueued: {result.GetType()})");
}
@@ -122,7 +123,7 @@ public IRpcContinuation Next()
/// waiting continuations.
///
///
- public bool TryPeek(out T continuation) where T : IRpcContinuation
+ public bool TryPeek([NotNullWhen(true)] out T? continuation) where T : class, IRpcContinuation
{
if (_outstandingRpc is T result)
{
diff --git a/projects/RabbitMQ.Client/client/impl/Session.cs b/projects/RabbitMQ.Client/client/impl/Session.cs
index 29be35c6d9..9a3e28457c 100644
--- a/projects/RabbitMQ.Client/client/impl/Session.cs
+++ b/projects/RabbitMQ.Client/client/impl/Session.cs
@@ -48,13 +48,13 @@ public Session(Connection connection, ushort channelNumber, uint maxBodyLength)
public override Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken)
{
- IncomingCommand cmd = _assembler.HandleFrame(frame);
+ IncomingCommand? cmd = _assembler.HandleFrame(frame);
if (cmd is null)
{
return Task.CompletedTask;
}
- return CommandReceived.Invoke(cmd, cancellationToken);
+ return CommandReceived!.Invoke(cmd, cancellationToken);
}
}
}
diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs
index 50ec2a17f5..b5dea9b14a 100644
--- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs
+++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs
@@ -31,6 +31,7 @@
using System;
using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
@@ -43,8 +44,8 @@ namespace RabbitMQ.Client.Impl
{
internal abstract class SessionBase : ISession
{
- private ShutdownEventArgs _closeReason;
- public ShutdownEventArgs CloseReason => Volatile.Read(ref _closeReason);
+ private ShutdownEventArgs? _closeReason;
+ public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);
protected SessionBase(Connection connection, ushort channelNumber)
{
@@ -79,12 +80,13 @@ public event EventHandler SessionShutdown
public ushort ChannelNumber { get; }
- public CommandReceivedAction CommandReceived { get; set; }
+ public CommandReceivedAction? CommandReceived { get; set; }
public Connection Connection { get; }
+ [MemberNotNullWhen(false, nameof(CloseReason))]
public bool IsOpen => CloseReason is null;
- public virtual void OnConnectionShutdown(object conn, ShutdownEventArgs reason)
+ public virtual void OnConnectionShutdown(object? conn, ShutdownEventArgs reason)
{
Close(reason);
}
@@ -114,7 +116,7 @@ public void Close(ShutdownEventArgs reason, bool notify)
if (notify)
{
- OnSessionShutdown(CloseReason);
+ OnSessionShutdown(CloseReason!);
}
}
@@ -124,7 +126,7 @@ public void Notify()
{
// Ensure that we notify only when session is already closed
// If not, throw exception, since this is a serious bug in the library
- ShutdownEventArgs reason = CloseReason;
+ ShutdownEventArgs? reason = CloseReason;
if (reason is null)
{
throw new InvalidOperationException("Internal Error in SessionBase.Notify");
@@ -160,6 +162,6 @@ public ValueTask TransmitAsync(in TMethod cmd, in THeader head
}
private void ThrowAlreadyClosedException()
- => throw new AlreadyClosedException(CloseReason);
+ => throw new AlreadyClosedException(CloseReason!);
}
}
diff --git a/projects/RabbitMQ.Client/client/impl/SessionManager.cs b/projects/RabbitMQ.Client/client/impl/SessionManager.cs
index 8c60bc1396..87d2111862 100644
--- a/projects/RabbitMQ.Client/client/impl/SessionManager.cs
+++ b/projects/RabbitMQ.Client/client/impl/SessionManager.cs
@@ -83,11 +83,11 @@ public ISession Create()
}
}
- private void HandleSessionShutdown(object sender, ShutdownEventArgs reason)
+ private void HandleSessionShutdown(object? sender, ShutdownEventArgs reason)
{
lock (_sessionMap)
{
- var session = (ISession)sender;
+ var session = (ISession)sender!;
_sessionMap.Remove(session.ChannelNumber);
_ints.Free(session.ChannelNumber);
}
diff --git a/projects/RabbitMQ.Client/client/impl/SocketFactory.cs b/projects/RabbitMQ.Client/client/impl/SocketFactory.cs
new file mode 100644
index 0000000000..3c63f626a3
--- /dev/null
+++ b/projects/RabbitMQ.Client/client/impl/SocketFactory.cs
@@ -0,0 +1,96 @@
+using System;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using RabbitMQ.Client.Exceptions;
+
+namespace RabbitMQ.Client.client.impl
+{
+ internal static class SocketFactory
+ {
+ public static async Task OpenAsync(AmqpTcpEndpoint amqpTcpEndpoint, Func socketFactory,
+ TimeSpan connectionTimeout, CancellationToken cancellationToken)
+ {
+ IPAddress[] ipAddresses = await Dns.GetHostAddressesAsync(
+ amqpTcpEndpoint.HostName
+#if NET6_0_OR_GREATER
+ , cancellationToken
+#endif
+ ).ConfigureAwait(false);
+
+ IPAddress? ipv6 = TcpClientAdapter.GetMatchingHost(ipAddresses, AddressFamily.InterNetworkV6);
+ if (ipv6 == default(IPAddress))
+ {
+ if (amqpTcpEndpoint.AddressFamily == AddressFamily.InterNetworkV6)
+ {
+ throw new ConnectFailureException($"Connection failed, host {amqpTcpEndpoint}",
+ new ArgumentException($"No IPv6 address could be resolved for {amqpTcpEndpoint}"));
+ }
+ }
+ else if (ShouldTryIPv6(amqpTcpEndpoint))
+ {
+ try
+ {
+ return await ConnectUsingAddressFamilyAsync(new IPEndPoint(ipv6, amqpTcpEndpoint.Port), socketFactory,
+ AddressFamily.InterNetworkV6, connectionTimeout, cancellationToken).ConfigureAwait(false);
+ }
+ catch (ConnectFailureException)
+ {
+ // We resolved to a ipv6 address and tried it but it still didn't connect, try IPv4
+ }
+ }
+
+ IPAddress? ipv4 = TcpClientAdapter.GetMatchingHost(ipAddresses, AddressFamily.InterNetwork);
+ if (ipv4 == default(IPAddress))
+ {
+ throw new ConnectFailureException($"Connection failed, host {amqpTcpEndpoint}",
+ new ArgumentException($"No ip address could be resolved for {amqpTcpEndpoint}"));
+ }
+
+ return await ConnectUsingAddressFamilyAsync(new IPEndPoint(ipv4, amqpTcpEndpoint.Port), socketFactory,
+ AddressFamily.InterNetwork, connectionTimeout, cancellationToken).ConfigureAwait(false);
+ }
+
+ private static bool ShouldTryIPv6(AmqpTcpEndpoint endpoint)
+ {
+ return Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork;
+ }
+
+ private static async ValueTask ConnectUsingAddressFamilyAsync(IPEndPoint endpoint, Func socketFactory,
+ AddressFamily family, TimeSpan connectionTimeout, CancellationToken cancellationToken)
+ {
+ /*
+ * Create linked cancellation token that includes the connection timeout value
+ * https://learn.microsoft.com/en-us/dotnet/standard/threading/how-to-listen-for-multiple-cancellation-requests
+ */
+ using var timeoutTokenSource = new CancellationTokenSource(connectionTimeout);
+ using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSource.Token, cancellationToken);
+
+ ITcpClient socket = socketFactory(family);
+ try
+ {
+ await socket.ConnectAsync(endpoint.Address, endpoint.Port, linkedTokenSource.Token).ConfigureAwait(false);
+ return socket;
+ }
+ catch (Exception e)
+ {
+ socket.Dispose();
+
+ string msg = $"Connection failed, host {endpoint}";
+
+ if (e is ArgumentException or SocketException or NotSupportedException)
+ {
+ throw new ConnectFailureException(msg, e);
+ }
+
+ if (e is OperationCanceledException && timeoutTokenSource.Token.IsCancellationRequested)
+ {
+ throw new ConnectFailureException(msg, new TimeoutException(msg, e));
+ }
+
+ throw;
+ }
+ }
+ }
+}
diff --git a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
index f0c4822ca1..ec970fe93c 100644
--- a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
+++ b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
@@ -37,7 +37,7 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
-using RabbitMQ.Client.Exceptions;
+using RabbitMQ.Client.client.impl;
using RabbitMQ.Client.Logging;
namespace RabbitMQ.Client.Impl
@@ -45,36 +45,26 @@ namespace RabbitMQ.Client.Impl
internal sealed class SocketFrameHandler : IFrameHandler
{
private readonly AmqpTcpEndpoint _amqpTcpEndpoint;
- private readonly Func _socketFactory;
- private readonly TimeSpan _connectionTimeout;
+ private readonly ITcpClient _socket;
+ private readonly Stream _stream;
private readonly ChannelWriter _channelWriter;
private readonly ChannelReader