From fd0f5bdedad4ca625d571f938e1381efe9c5bc71 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 6 Sep 2024 08:00:23 -0700 Subject: [PATCH 1/3] Clean up `IChannelExtensions` * Remove extension method that could create an ambiguous situation when parameter names were used --- .../RabbitMQ.Client/PublicAPI.Shipped.txt | 1 - .../client/api/IChannelExtensions.cs | 162 ++++++++++-------- .../TestRecoveringConsumerEventHandlers.cs | 3 +- .../Test/Integration/TestAsyncConsumer.cs | 2 +- 4 files changed, 92 insertions(+), 76 deletions(-) diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index 656279bb0..5cfab29f5 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -856,7 +856,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.set -> void ~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.get -> System.Action> ~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.set -> void -static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.IAsyncBasicConsumer! consumer, string! queue, bool autoAck = false, string! consumerTag = "", bool noLocal = false, bool exclusive = false, System.Collections.Generic.IDictionary? arguments = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, string! consumerTag, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, string! consumerTag, System.Collections.Generic.IDictionary? arguments, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs index 6bf174563..ca86e257e 100644 --- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs @@ -31,7 +31,6 @@ using System; using System.Collections.Generic; -using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client.client.impl; @@ -42,47 +41,36 @@ public static class IChannelExtensions { /// Asynchronously start a Basic content-class consumer. public static Task BasicConsumeAsync(this IChannel channel, - IAsyncBasicConsumer consumer, string queue, - bool autoAck = false, - string consumerTag = "", - bool noLocal = false, - bool exclusive = false, - IDictionary? arguments = null, - CancellationToken cancellationToken = default) - { - return channel.BasicConsumeAsync(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer, cancellationToken); - } - - /// Asynchronously start a Basic content-class consumer. - public static Task BasicConsumeAsync(this IChannel channel, string queue, bool autoAck, IAsyncBasicConsumer consumer, - CancellationToken cancellationToken = default) - { - return channel.BasicConsumeAsync(queue, autoAck, string.Empty, false, false, null, consumer, cancellationToken); - } + CancellationToken cancellationToken = default) => + channel.BasicConsumeAsync(queue: queue, autoAck: autoAck, consumerTag: string.Empty, + noLocal: false, exclusive: false, arguments: null, consumer: consumer, + cancellationToken); /// Asynchronously start a Basic content-class consumer. - public static Task BasicConsumeAsync(this IChannel channel, string queue, + public static Task BasicConsumeAsync(this IChannel channel, + string queue, bool autoAck, string consumerTag, IAsyncBasicConsumer consumer, - CancellationToken cancellationToken = default) - { - return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, null, consumer, cancellationToken); - } + CancellationToken cancellationToken = default) => + channel.BasicConsumeAsync(queue: queue, autoAck: autoAck, consumerTag: consumerTag, + noLocal: false, exclusive: false, arguments: null, consumer: consumer, + cancellationToken); /// Asynchronously start a Basic content-class consumer. - public static Task BasicConsumeAsync(this IChannel channel, string queue, + public static Task BasicConsumeAsync(this IChannel channel, + string queue, bool autoAck, string consumerTag, IDictionary? arguments, IAsyncBasicConsumer consumer, - CancellationToken cancellationToken = default) - { - return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, arguments, consumer, cancellationToken); - } + CancellationToken cancellationToken = default) => + channel.BasicConsumeAsync(queue: queue, autoAck: autoAck, consumerTag: consumerTag, + noLocal: false, exclusive: false, arguments: arguments, consumer: consumer, + cancellationToken); /// /// (Extension method) Convenience overload of BasicPublish. @@ -90,57 +78,87 @@ public static Task BasicConsumeAsync(this IChannel channel, string queue /// /// The publication occurs with mandatory=false and immediate=false. /// - public static ValueTask BasicPublishAsync(this IChannel channel, PublicationAddress addr, T basicProperties, - ReadOnlyMemory body, CancellationToken cancellationToken = default) - where T : IReadOnlyBasicProperties, IAmqpHeader - { - return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body, false, cancellationToken); - } + public static ValueTask BasicPublishAsync(this IChannel channel, + PublicationAddress addr, + T basicProperties, + ReadOnlyMemory body, + CancellationToken cancellationToken = default) + where T : IReadOnlyBasicProperties, IAmqpHeader => + channel.BasicPublishAsync(exchange: addr.ExchangeName, routingKey: addr.RoutingKey, + basicProperties: basicProperties, body: body, mandatory: false, + cancellationToken); - public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, - ReadOnlyMemory body = default, bool mandatory = false, CancellationToken cancellationToken = default) => - channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory, cancellationToken); + public static ValueTask BasicPublishAsync(this IChannel channel, + string exchange, + string routingKey, + ReadOnlyMemory body = default, + bool mandatory = false, + CancellationToken cancellationToken = default) => + channel.BasicPublishAsync(exchange: exchange, routingKey: routingKey, + basicProperties: EmptyBasicProperty.Empty, body: body, mandatory: mandatory, + cancellationToken); - public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, - CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false, CancellationToken cancellationToken = default) => - channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory, cancellationToken); + public static ValueTask BasicPublishAsync(this IChannel channel, + CachedString exchange, + CachedString routingKey, + ReadOnlyMemory body = default, + bool mandatory = false, + CancellationToken cancellationToken = default) => + channel.BasicPublishAsync(exchange: exchange, routingKey: routingKey, + basicProperties: EmptyBasicProperty.Empty, body: body, mandatory: mandatory, + cancellationToken); /// /// Asynchronously declare a queue. /// - public static Task QueueDeclareAsync(this IChannel channel, string queue = "", bool durable = false, bool exclusive = true, - bool autoDelete = true, IDictionary? arguments = null, bool noWait = false, CancellationToken cancellationToken = default) - { - return channel.QueueDeclareAsync(queue: queue, passive: false, + public static Task QueueDeclareAsync(this IChannel channel, + string queue = "", + bool durable = false, + bool exclusive = true, + bool autoDelete = true, + IDictionary? arguments = null, + bool noWait = false, + CancellationToken cancellationToken = default) => + channel.QueueDeclareAsync(queue: queue, passive: false, durable: durable, exclusive: exclusive, autoDelete: autoDelete, - arguments: arguments, noWait: noWait, cancellationToken: cancellationToken); - } + arguments: arguments, noWait: noWait, + cancellationToken: cancellationToken); /// /// Asynchronously declare an exchange. /// - public static Task ExchangeDeclareAsync(this IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false, - IDictionary? arguments = null, bool noWait = false, CancellationToken cancellationToken = default) - { - return channel.ExchangeDeclareAsync(exchange, type, durable, autoDelete, - arguments: arguments, passive: false, noWait: noWait, cancellationToken: cancellationToken); - } + public static Task ExchangeDeclareAsync(this IChannel channel, + string exchange, + string type, + bool durable = false, + bool autoDelete = false, + IDictionary? arguments = null, + bool noWait = false, + CancellationToken cancellationToken = default) => + channel.ExchangeDeclareAsync(exchange: exchange, type: type, durable: durable, + autoDelete: autoDelete, arguments: arguments, passive: false, noWait: noWait, + cancellationToken: cancellationToken); /// /// Asynchronously deletes a queue. /// - public static Task QueueDeleteAsync(this IChannel channel, string queue, bool ifUnused = false, bool ifEmpty = false, CancellationToken cancellationToken = default) - { - return channel.QueueDeleteAsync(queue, ifUnused, ifEmpty, false, cancellationToken); - } + public static Task QueueDeleteAsync(this IChannel channel, + string queue, + bool ifUnused = false, + bool ifEmpty = false, + CancellationToken cancellationToken = default) => + channel.QueueDeleteAsync(queue, ifUnused, ifEmpty, false, cancellationToken); /// /// Asynchronously unbinds a queue. /// - public static Task QueueUnbindAsync(this IChannel channel, string queue, string exchange, string routingKey, IDictionary? arguments = null, CancellationToken cancellationToken = default) - { - return channel.QueueUnbindAsync(queue, exchange, routingKey, arguments, cancellationToken); - } + public static Task QueueUnbindAsync(this IChannel channel, + string queue, + string exchange, + string routingKey, + IDictionary? arguments = null, + CancellationToken cancellationToken = default) => + channel.QueueUnbindAsync(queue, exchange, routingKey, arguments, cancellationToken); /// /// Asynchronously abort this session. @@ -153,11 +171,10 @@ public static Task QueueUnbindAsync(this IChannel channel, string queue, string /// In comparison to normal method, will not throw /// or or any other during closing channel. /// - public static Task AbortAsync(this IChannel channel, CancellationToken cancellationToken = default) - { - return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", true, + public static Task AbortAsync(this IChannel channel, + CancellationToken cancellationToken = default) => + channel.CloseAsync(Constants.ReplySuccess, "Goodbye", true, cancellationToken); - } /// Asynchronously close this session. /// @@ -166,11 +183,10 @@ public static Task AbortAsync(this IChannel channel, CancellationToken cancellat /// operation to complete. This method will not return to the /// caller until the shutdown is complete. /// - public static Task CloseAsync(this IChannel channel, CancellationToken cancellationToken = default) - { - return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", false, + public static Task CloseAsync(this IChannel channel, + CancellationToken cancellationToken = default) => + channel.CloseAsync(Constants.ReplySuccess, "Goodbye", false, cancellationToken); - } /// /// Asynchronously close this channel. @@ -189,10 +205,10 @@ public static Task CloseAsync(this IChannel channel, CancellationToken cancellat /// A message indicating the reason for closing the channel /// /// - public static Task CloseAsync(this IChannel channel, ushort replyCode, string replyText, - CancellationToken cancellationToken = default) - { - return channel.CloseAsync(replyCode, replyText, false, cancellationToken); - } + public static Task CloseAsync(this IChannel channel, + ushort replyCode, + string replyText, + CancellationToken cancellationToken = default) => + channel.CloseAsync(replyCode, replyText, false, cancellationToken); } } diff --git a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs index 889c26b45..0ab41bc12 100644 --- a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs +++ b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs @@ -79,7 +79,8 @@ public async Task TestRecoveringConsumerEventHandler_EventArgumentsArePassedDown RabbitMQ.Client.QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false); var cons = new AsyncEventingBasicConsumer(_channel); - string expectedCTag = await _channel.BasicConsumeAsync(cons, q, arguments: arguments); + string expectedCTag = await _channel.BasicConsumeAsync(consumer: cons, queue: q, autoAck: false, + arguments: arguments, consumerTag: string.Empty); bool ctagMatches = false; bool consumerArgumentMatches = false; diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 73762e47b..19abe2d21 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -664,7 +664,7 @@ public async Task TestCloseWithinEventHandler_GH1567() tcs.TrySetResult(true); }; - await _channel.BasicConsumeAsync(consumer, queueName, true); + await _channel.BasicConsumeAsync(consumer: consumer, queue: queueName, autoAck: true); var bp = new BasicProperties(); From 5abd91233704d27fb4ffc331ae9bdc5f1e1a55e0 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 6 Sep 2024 09:27:20 -0700 Subject: [PATCH 2/3] * Ensure `BasicPublishAsync` API and extensions use the same parameter order as the 6.x version (cc @danielmarbach) --- .../RabbitMQ.Client/PublicAPI.Shipped.txt | 10 +-- .../RabbitMQ.Client/client/api/IChannel.cs | 12 ++-- .../client/api/IChannelExtensions.cs | 62 ++++++++++++++++--- .../client/impl/AutorecoveringChannel.cs | 20 +++--- .../client/impl/ChannelBase.cs | 8 +-- projects/Test/Applications/GH-1647/Program.cs | 3 +- .../TestExchangeRecovery.cs | 2 +- .../TestRpcAfterRecovery.cs | 3 +- .../Test/Integration/TestAsyncConsumer.cs | 5 +- .../TestAsyncEventingBasicConsumer.cs | 3 +- projects/Test/Integration/TestBasicGet.cs | 2 +- projects/Test/Integration/TestBasicPublish.cs | 6 +- ...ncurrentAccessWithSharedConnectionAsync.cs | 2 +- .../Test/Integration/TestConfirmSelect.cs | 7 ++- .../TestConnectionTopologyRecovery.cs | 4 +- projects/Test/Integration/TestExtensions.cs | 6 +- .../Test/Integration/TestFloodPublishing.cs | 2 +- .../TestPublishSharedChannelAsync.cs | 2 +- projects/Test/OAuth2/TestOAuth2.cs | 2 +- .../TestActivitySource.cs | 13 ++-- .../TestConnectionBlockedChannelLeak.cs | 2 +- .../TestOpenTelemetry.cs | 6 +- 22 files changed, 118 insertions(+), 64 deletions(-) diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index 5cfab29f5..b8748390a 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -433,8 +433,6 @@ RabbitMQ.Client.IChannel.BasicAckAsync(ulong deliveryTag, bool multiple, System. RabbitMQ.Client.IChannel.BasicAcks -> System.EventHandler RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler -RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -RabbitMQ.Client.IChannel.BasicPublishAsync(string! exchange, string! routingKey, TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.BasicReturn -> System.EventHandler RabbitMQ.Client.IChannel.CallbackException -> System.EventHandler RabbitMQ.Client.IChannel.ChannelNumber.get -> int @@ -859,8 +857,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, string! consumerTag, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, string! consumerTag, System.Collections.Generic.IDictionary? arguments, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel! channel, ushort replyCode, string! replyText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary? arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! @@ -895,4 +891,10 @@ RabbitMQ.Client.ICredentialsProvider.GetCredentialsAsync(System.Threading.Cancel RabbitMQ.Client.ICredentialsProvider.Name.get -> string! RabbitMQ.Client.PlainMechanism.HandleChallengeAsync(byte[]? challenge, RabbitMQ.Client.ConnectionConfig! config, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! readonly RabbitMQ.Client.ConnectionConfig.CredentialsProvider -> RabbitMQ.Client.ICredentialsProvider! +RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IChannel.BasicPublishAsync(string! exchange, string! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! diff --git a/projects/RabbitMQ.Client/client/api/IChannel.cs b/projects/RabbitMQ.Client/client/api/IChannel.cs index 94a21c0a9..cb535f405 100644 --- a/projects/RabbitMQ.Client/client/api/IChannel.cs +++ b/projects/RabbitMQ.Client/client/api/IChannel.cs @@ -198,15 +198,15 @@ Task BasicConsumeAsync(string queue, bool autoAck, string consumerTag, b /// /// The exchange. /// The routing key. + /// If set to true, the message must route to a queue. /// The message properties. /// The message body. - /// If set to true, the message must route to a queue. /// CancellationToken for this operation. /// /// Routing key must be shorter than 255 bytes. /// - ValueTask BasicPublishAsync(string exchange, string routingKey, TProperties basicProperties, - ReadOnlyMemory body = default, bool mandatory = false, + ValueTask BasicPublishAsync(string exchange, string routingKey, + bool mandatory, TProperties basicProperties, ReadOnlyMemory body, CancellationToken cancellationToken = default) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; @@ -215,15 +215,15 @@ ValueTask BasicPublishAsync(string exchange, string routingKey, TPr /// /// The exchange. /// The routing key. + /// If set to true, the message must route to a queue. /// The message properties. /// The message body. - /// If set to true, the message must route to a queue. /// CancellationToken for this operation. /// /// Routing key must be shorter than 255 bytes. /// - ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, TProperties basicProperties, - ReadOnlyMemory body = default, bool mandatory = false, + ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, + bool mandatory, TProperties basicProperties, ReadOnlyMemory body, CancellationToken cancellationToken = default) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs index ca86e257e..a277ded4d 100644 --- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs @@ -73,10 +73,10 @@ public static Task BasicConsumeAsync(this IChannel channel, cancellationToken); /// - /// (Extension method) Convenience overload of BasicPublish. + /// (Extension method) Convenience overload of /// /// - /// The publication occurs with mandatory=false and immediate=false. + /// The publication occurs with mandatory=false. /// public static ValueTask BasicPublishAsync(this IChannel channel, PublicationAddress addr, @@ -85,28 +85,70 @@ public static ValueTask BasicPublishAsync(this IChannel channel, CancellationToken cancellationToken = default) where T : IReadOnlyBasicProperties, IAmqpHeader => channel.BasicPublishAsync(exchange: addr.ExchangeName, routingKey: addr.RoutingKey, - basicProperties: basicProperties, body: body, mandatory: false, + mandatory: false, basicProperties: basicProperties, body: body, cancellationToken); + /// + /// (Extension method) Convenience overload of + /// + /// + /// The publication occurs with mandatory=false and empty BasicProperties + /// public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, - ReadOnlyMemory body = default, - bool mandatory = false, + ReadOnlyMemory body, CancellationToken cancellationToken = default) => channel.BasicPublishAsync(exchange: exchange, routingKey: routingKey, - basicProperties: EmptyBasicProperty.Empty, body: body, mandatory: mandatory, + mandatory: false, basicProperties: EmptyBasicProperty.Empty, body: body, cancellationToken); + /// + /// (Extension method) Convenience overload of + /// + /// + /// The publication occurs with mandatory=false and empty BasicProperties + /// public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, - ReadOnlyMemory body = default, - bool mandatory = false, + ReadOnlyMemory body, CancellationToken cancellationToken = default) => channel.BasicPublishAsync(exchange: exchange, routingKey: routingKey, - basicProperties: EmptyBasicProperty.Empty, body: body, mandatory: mandatory, - cancellationToken); + mandatory: false, basicProperties: EmptyBasicProperty.Empty, body: body, + cancellationToken); + + /// + /// (Extension method) Convenience overload of + /// + /// + /// The publication occurs with empty BasicProperties + /// + public static ValueTask BasicPublishAsync(this IChannel channel, + string exchange, + string routingKey, + bool mandatory, + ReadOnlyMemory body, + CancellationToken cancellationToken = default) => + channel.BasicPublishAsync(exchange: exchange, routingKey: routingKey, + mandatory: mandatory, basicProperties: EmptyBasicProperty.Empty, body: body, + cancellationToken); + + /// + /// (Extension method) Convenience overload of + /// + /// + /// The publication occurs with empty BasicProperties + /// + public static ValueTask BasicPublishAsync(this IChannel channel, + CachedString exchange, + CachedString routingKey, + bool mandatory, + ReadOnlyMemory body, + CancellationToken cancellationToken = default) => + channel.BasicPublishAsync(exchange: exchange, routingKey: routingKey, + mandatory: mandatory, basicProperties: EmptyBasicProperty.Empty, body: body, + cancellationToken); /// /// Asynchronously declare a queue. diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index a9ab40f0c..b939c8c9c 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -306,17 +306,21 @@ await _connection.RecordConsumerAsync(rc, recordedEntitiesSemaphoreHeld: false) public ValueTask BasicGetAsync(string queue, bool autoAck, CancellationToken cancellationToken) => InnerChannel.BasicGetAsync(queue, autoAck, cancellationToken); - public ValueTask BasicPublishAsync(string exchange, string routingKey, TProperties basicProperties, - ReadOnlyMemory body, bool mandatory, - CancellationToken cancellationToken) + public ValueTask BasicPublishAsync(string exchange, string routingKey, + bool mandatory, + TProperties basicProperties, + ReadOnlyMemory body, + CancellationToken cancellationToken = default) where TProperties : IReadOnlyBasicProperties, IAmqpHeader - => InnerChannel.BasicPublishAsync(exchange, routingKey, basicProperties, body, mandatory, cancellationToken); + => InnerChannel.BasicPublishAsync(exchange, routingKey, mandatory, basicProperties, body, cancellationToken); - public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, TProperties basicProperties, - ReadOnlyMemory body, bool mandatory, - CancellationToken cancellationToken) + public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, + bool mandatory, + TProperties basicProperties, + ReadOnlyMemory body, + CancellationToken cancellationToken = default) where TProperties : IReadOnlyBasicProperties, IAmqpHeader - => InnerChannel.BasicPublishAsync(exchange, routingKey, basicProperties, body, mandatory, cancellationToken); + => InnerChannel.BasicPublishAsync(exchange, routingKey, mandatory, basicProperties, body, cancellationToken); public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global, CancellationToken cancellationToken) diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index d860fcd74..e4cb0b508 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -925,8 +925,8 @@ await ModelSendAsync(method, k.CancellationToken) } public async ValueTask BasicPublishAsync(string exchange, string routingKey, - TProperties basicProperties, ReadOnlyMemory body, bool mandatory, - CancellationToken cancellationToken) + bool mandatory, TProperties basicProperties, ReadOnlyMemory body, + CancellationToken cancellationToken = default) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { if (ConfirmsAreEnabled) @@ -1004,8 +1004,8 @@ await _confirmSemaphore.WaitAsync(cancellationToken) } public async ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, - TProperties basicProperties, ReadOnlyMemory body, bool mandatory, - CancellationToken cancellationToken) + bool mandatory, TProperties basicProperties, ReadOnlyMemory body, + CancellationToken cancellationToken = default) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { if (ConfirmsAreEnabled) diff --git a/projects/Test/Applications/GH-1647/Program.cs b/projects/Test/Applications/GH-1647/Program.cs index f3cccb3da..2ed2c9f64 100644 --- a/projects/Test/Applications/GH-1647/Program.cs +++ b/projects/Test/Applications/GH-1647/Program.cs @@ -18,7 +18,8 @@ { using var channel = await connection.CreateChannelAsync(); // New channel for each message await Task.Delay(1000); - await channel.BasicPublishAsync(string.Empty, string.Empty, props, msg); + await channel.BasicPublishAsync(exchange: string.Empty, routingKey: string.Empty, + mandatory: false, basicProperties: props, body: msg); Console.WriteLine($"Sent message {i}"); } catch (Exception ex) diff --git a/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs index cb83177c0..0cdfe8e17 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs @@ -72,7 +72,7 @@ public async Task TestExchangeToExchangeBindingRecovery() { await CloseAndWaitForRecoveryAsync(); Assert.True(_channel.IsOpen); - await _channel.BasicPublishAsync(ex_source, "", _encoding.GetBytes("msg"), mandatory: true); + await _channel.BasicPublishAsync(ex_source, "", body: _encoding.GetBytes("msg"), mandatory: true); await _channel.WaitForConfirmsOrDieAsync(); await AssertMessageCountAsync(q, 1); } diff --git a/projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs index 61708203e..874b39f20 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs @@ -77,7 +77,8 @@ public async Task TestPublishRpcRightAfterReconnect() try { - await _channel.BasicPublishAsync(string.Empty, testQueueName, properties, _messageBody); + await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: testQueueName, + mandatory: false, basicProperties: properties, body: _messageBody); } catch (Exception e) { diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 19abe2d21..8cab7d1ed 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -31,6 +31,7 @@ using System; using System.Collections.Generic; +using System.Text; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; @@ -620,7 +621,9 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() using (IChannel innerChannel = await _conn.CreateChannelAsync()) { await innerChannel.ConfirmSelectAsync(); - await innerChannel.BasicPublishAsync(exchangeName, queue2Name, mandatory: true); + await innerChannel.BasicPublishAsync(exchangeName, queue2Name, + mandatory: true, + body: Encoding.ASCII.GetBytes(nameof(TestCreateChannelWithinAsyncConsumerCallback_GH650))); await innerChannel.WaitForConfirmsOrDieAsync(); await innerChannel.CloseAsync(); } diff --git a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs index 0fad67446..762ca6e49 100644 --- a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs +++ b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs @@ -100,7 +100,8 @@ public async Task TestAsyncEventingBasicConsumer_GH1038() using IChannel publisherChannel = await _conn.CreateChannelAsync(); byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!"); var props = new BasicProperties(); - await publisherChannel.BasicPublishAsync(exchangeName, "", props, messageBodyBytes); + await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty, + mandatory: false, basicProperties: props, body: messageBodyBytes); await Task.WhenAll(_onReceivedTcs.Task, _onCallbackExceptionTcs.Task); Assert.True(await _onReceivedTcs.Task); diff --git a/projects/Test/Integration/TestBasicGet.cs b/projects/Test/Integration/TestBasicGet.cs index 1935af56d..05a7812d7 100644 --- a/projects/Test/Integration/TestBasicGet.cs +++ b/projects/Test/Integration/TestBasicGet.cs @@ -52,7 +52,7 @@ public async Task TestBasicGetRoundTrip() QueueDeclareOk queueResult = await _channel.QueueDeclareAsync(string.Empty, false, true, true); string queueName = queueResult.QueueName; - await _channel.BasicPublishAsync(string.Empty, queueName, _encoding.GetBytes(msg), true); + await _channel.BasicPublishAsync(string.Empty, queueName, true, _encoding.GetBytes(msg)); BasicGetResult getResult = await _channel.BasicGetAsync(queueName, true); Assert.Equal(msg, _encoding.GetString(getResult.Body.ToArray())); diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs index 23612a8ee..c4266bc3d 100644 --- a/projects/Test/Integration/TestBasicPublish.cs +++ b/projects/Test/Integration/TestBasicPublish.cs @@ -76,7 +76,7 @@ public async Task TestBasicRoundtripArray() }; string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - await _channel.BasicPublishAsync("", q.QueueName, bp, sendBody); + await _channel.BasicPublishAsync("", q.QueueName, true, bp, sendBody); bool waitRes = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5)); await _channel.BasicCancelAsync(tag); @@ -106,7 +106,7 @@ public async Task TestBasicRoundtripCachedString() }; string tag = await _channel.BasicConsumeAsync(queueName.Value, true, consumer); - await _channel.BasicPublishAsync(exchangeName, queueName, sendBody); + await _channel.BasicPublishAsync(exchange: exchangeName, routingKey: queueName, body: sendBody); bool waitResFalse = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(2)); await _channel.BasicCancelAsync(tag); @@ -319,7 +319,7 @@ public async Task TestPropertiesRoundtrip_Headers() }; string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - await _channel.BasicPublishAsync("", q.QueueName, bp, sendBody); + await _channel.BasicPublishAsync("", q.QueueName, false, bp, sendBody); bool waitResFalse = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5)); await _channel.BasicCancelAsync(tag); Assert.True(waitResFalse); diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs index b441152f6..37e3450da 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs @@ -130,7 +130,7 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in QueueDeclareOk q = await ch.QueueDeclareAsync(queue: string.Empty, passive: false, durable: false, exclusive: true, autoDelete: true, arguments: null); for (ushort j = 0; j < _messageCount; j++) { - await ch.BasicPublishAsync("", q.QueueName, body, mandatory: true); + await ch.BasicPublishAsync("", q.QueueName, mandatory: true, body: body); } Assert.True(await tcs.Task); diff --git a/projects/Test/Integration/TestConfirmSelect.cs b/projects/Test/Integration/TestConfirmSelect.cs index f73e00c3a..cc2493f4b 100644 --- a/projects/Test/Integration/TestConfirmSelect.cs +++ b/projects/Test/Integration/TestConfirmSelect.cs @@ -81,7 +81,8 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength) var properties = new BasicProperties(); // _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo); - await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, properties, body); + await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, + mandatory: false, basicProperties: properties, body: body); await _channel.WaitForConfirmsOrDieAsync(); try @@ -91,7 +92,7 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength) CorrelationId = new string('o', correlationIdLength) }; // _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo); - await _channel.BasicPublishAsync("sample", string.Empty, properties, body); + await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body); await _channel.WaitForConfirmsOrDieAsync(); } catch @@ -101,7 +102,7 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength) properties = new BasicProperties(); // _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo); - await _channel.BasicPublishAsync("sample", string.Empty, properties, body); + await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body); await _channel.WaitForConfirmsOrDieAsync(); // _output.WriteLine("I'm done..."); } diff --git a/projects/Test/Integration/TestConnectionTopologyRecovery.cs b/projects/Test/Integration/TestConnectionTopologyRecovery.cs index 998339894..8fdfb1d8c 100644 --- a/projects/Test/Integration/TestConnectionTopologyRecovery.cs +++ b/projects/Test/Integration/TestConnectionTopologyRecovery.cs @@ -281,10 +281,10 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() await ch.QueueDeclarePassiveAsync(queue1); await ch.QueueDeclarePassiveAsync(queue2); - await ch.BasicPublishAsync(exchange, binding1, _encoding.GetBytes("test message"), mandatory: true); + await ch.BasicPublishAsync(exchange, binding1, true, _encoding.GetBytes("test message")); // await ch.WaitForConfirmsOrDieAsync(); - await ch.BasicPublishAsync(exchange, binding2, _encoding.GetBytes("test message"), mandatory: true); + await ch.BasicPublishAsync(exchange, binding2, true, _encoding.GetBytes("test message")); // await ch.WaitForConfirmsOrDieAsync(); await consumerReceivedTcs1.Task.WaitAsync(TimeSpan.FromSeconds(5)); diff --git a/projects/Test/Integration/TestExtensions.cs b/projects/Test/Integration/TestExtensions.cs index 673eff392..fd55296e6 100644 --- a/projects/Test/Integration/TestExtensions.cs +++ b/projects/Test/Integration/TestExtensions.cs @@ -49,7 +49,7 @@ public async Task TestConfirmBarrier() await _channel.ConfirmSelectAsync(); for (int i = 0; i < 10; i++) { - await _channel.BasicPublishAsync(string.Empty, string.Empty); + await _channel.BasicPublishAsync(string.Empty, string.Empty, Array.Empty()); } Assert.True(await _channel.WaitForConfirmsAsync()); } @@ -72,12 +72,12 @@ public async Task TestExchangeBinding() await _channel.ExchangeBindAsync("dest", "src", string.Empty); await _channel.QueueBindAsync(queue, "dest", string.Empty); - await _channel.BasicPublishAsync("src", string.Empty); + await _channel.BasicPublishAsync("src", string.Empty, Array.Empty()); await _channel.WaitForConfirmsAsync(); Assert.NotNull(await _channel.BasicGetAsync(queue, true)); await _channel.ExchangeUnbindAsync("dest", "src", string.Empty); - await _channel.BasicPublishAsync("src", string.Empty); + await _channel.BasicPublishAsync("src", string.Empty, Array.Empty()); await _channel.WaitForConfirmsAsync(); Assert.Null(await _channel.BasicGetAsync(queue, true)); diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs index 513373a3c..d9c1698bf 100644 --- a/projects/Test/Integration/TestFloodPublishing.cs +++ b/projects/Test/Integration/TestFloodPublishing.cs @@ -194,7 +194,7 @@ public async Task TestMultithreadFloodPublishing() for (int i = 0; i < publishCount && false == stop; i++) { - await publishChannel.BasicPublishAsync(string.Empty, queueName, sendBody, true); + await publishChannel.BasicPublishAsync(string.Empty, queueName, true, sendBody); } await publishChannel.WaitForConfirmsOrDieAsync(); diff --git a/projects/Test/Integration/TestPublishSharedChannelAsync.cs b/projects/Test/Integration/TestPublishSharedChannelAsync.cs index 4723cdb8d..7c7d094ca 100644 --- a/projects/Test/Integration/TestPublishSharedChannelAsync.cs +++ b/projects/Test/Integration/TestPublishSharedChannelAsync.cs @@ -94,7 +94,7 @@ await channel.ExchangeDeclareAsync(ExchangeName.Value, ExchangeType.Topic, passi { for (int j = 0; j < Repeats; j++) { - await channel.BasicPublishAsync(ExchangeName, PublishKey, _body, false); + await channel.BasicPublishAsync(ExchangeName, PublishKey, false, _body); } } } diff --git a/projects/Test/OAuth2/TestOAuth2.cs b/projects/Test/OAuth2/TestOAuth2.cs index 176e4da6b..9a25b687d 100644 --- a/projects/Test/OAuth2/TestOAuth2.cs +++ b/projects/Test/OAuth2/TestOAuth2.cs @@ -243,7 +243,7 @@ private async Task PublishAsync(IChannel publishChannel) AppId = "oauth2", }; - await publishChannel.BasicPublishAsync(exchange: Exchange, routingKey: "hello", basicProperties: properties, body: body); + await publishChannel.BasicPublishAsync(exchange: Exchange, routingKey: "hello", false, basicProperties: properties, body: body); _testOutputHelper.WriteLine("Sent message"); await publishChannel.WaitForConfirmsOrDieAsync(); diff --git a/projects/Test/SequentialIntegration/TestActivitySource.cs b/projects/Test/SequentialIntegration/TestActivitySource.cs index c8434f2c2..d1622647a 100644 --- a/projects/Test/SequentialIntegration/TestActivitySource.cs +++ b/projects/Test/SequentialIntegration/TestActivitySource.cs @@ -37,7 +37,6 @@ using System.Threading.Tasks; using RabbitMQ.Client; -using RabbitMQ.Client.client.impl; using RabbitMQ.Client.Events; using Xunit; using Xunit.Abstractions; @@ -103,7 +102,7 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera }; string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - await _channel.BasicPublishAsync("", q.QueueName, sendBody, mandatory: true); + await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); await _channel.WaitForConfirmsOrDieAsync(); await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); @@ -144,7 +143,7 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool use string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); CachedString exchange = new CachedString(""); CachedString routingKey = new CachedString(q.QueueName); - await _channel.BasicPublishAsync(exchange, routingKey, sendBody, mandatory: true); + await _channel.BasicPublishAsync(exchange, routingKey, true, sendBody); await _channel.WaitForConfirmsOrDieAsync(); await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); @@ -224,7 +223,7 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs }; string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - await _channel.BasicPublishAsync("", q.QueueName, sendBody, mandatory: true); + await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); await _channel.WaitForConfirmsOrDieAsync(); await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); @@ -266,7 +265,7 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); CachedString exchange = new CachedString(""); CachedString routingKey = new CachedString(q.QueueName); - await _channel.BasicPublishAsync(exchange, routingKey, sendBody, mandatory: true); + await _channel.BasicPublishAsync(exchange, routingKey, true, sendBody); await _channel.WaitForConfirmsOrDieAsync(); await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); @@ -336,7 +335,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera try { await _channel.QueueDeclareAsync(queue, false, false, false, null); - await _channel.BasicPublishAsync("", queue, Encoding.UTF8.GetBytes(msg), mandatory: true); + await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg)); await _channel.WaitForConfirmsOrDieAsync(); QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); Assert.Equal(1u, ok.MessageCount); @@ -373,7 +372,7 @@ public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool use CachedString exchange = new CachedString(""); CachedString routingKey = new CachedString(queue); await _channel.QueueDeclareAsync(queue, false, false, false, null); - await _channel.BasicPublishAsync(exchange, routingKey, Encoding.UTF8.GetBytes(msg), mandatory: true); + await _channel.BasicPublishAsync(exchange, routingKey, true, Encoding.UTF8.GetBytes(msg)); await _channel.WaitForConfirmsOrDieAsync(); QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); Assert.Equal(1u, ok.MessageCount); diff --git a/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs b/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs index 2ed94e3c6..e1f50e1a0 100644 --- a/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs +++ b/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs @@ -97,7 +97,7 @@ async Task ExchangeDeclareAndPublish() using (IChannel publishChannel = await _conn.CreateChannelAsync()) { await publishChannel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true); - await publishChannel.BasicPublishAsync(exchangeName, exchangeName, GetRandomBody(), mandatory: true); + await publishChannel.BasicPublishAsync(exchangeName, exchangeName, true, GetRandomBody()); await publishChannel.CloseAsync(); } } diff --git a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs index f8088baf0..f7e0742ed 100644 --- a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs +++ b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs @@ -125,7 +125,7 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera }; string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - await _channel.BasicPublishAsync("", q.QueueName, sendBody, mandatory: true); + await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); await _channel.WaitForConfirmsOrDieAsync(); Baggage.ClearBaggage(); Assert.Null(Baggage.GetBaggage("TestItem")); @@ -183,7 +183,7 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs }; string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - await _channel.BasicPublishAsync("", q.QueueName, sendBody, mandatory: true); + await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); await _channel.WaitForConfirmsOrDieAsync(); Baggage.ClearBaggage(); Assert.Null(Baggage.GetBaggage("TestItem")); @@ -339,7 +339,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera try { await _channel.QueueDeclareAsync(queue, false, false, false, null); - await _channel.BasicPublishAsync("", queue, Encoding.UTF8.GetBytes(msg), mandatory: true); + await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg)); await _channel.WaitForConfirmsOrDieAsync(); Baggage.ClearBaggage(); Assert.Null(Baggage.GetBaggage("TestItem")); From 298fbe55b17f18080bc48c0c26212892745fc60c Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 6 Sep 2024 10:38:13 -0700 Subject: [PATCH 3/3] * Since `BasicGetAsync` uses an async continuation, it should return `Task<>` --- projects/RabbitMQ.Client/PublicAPI.Shipped.txt | 2 +- projects/RabbitMQ.Client/client/api/IChannel.cs | 2 +- projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs | 2 +- projects/RabbitMQ.Client/client/impl/ChannelBase.cs | 2 +- projects/Test/Integration/TestBasicGet.cs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index b8748390a..36617b407 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -806,7 +806,7 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IChannel.BasicCancelAsync(string consumerTag, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IChannel.BasicConsumeAsync(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, System.Collections.Generic.IDictionary arguments, RabbitMQ.Client.IAsyncBasicConsumer consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task -~RabbitMQ.Client.IChannel.BasicGetAsync(string queue, bool autoAck, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +~RabbitMQ.Client.IChannel.BasicGetAsync(string queue, bool autoAck, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IChannel.BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IChannel.BasicRejectAsync(ulong deliveryTag, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask ~RabbitMQ.Client.IChannel.CloseAsync(RabbitMQ.Client.ShutdownEventArgs reason, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task diff --git a/projects/RabbitMQ.Client/client/api/IChannel.cs b/projects/RabbitMQ.Client/client/api/IChannel.cs index cb535f405..84dff9517 100644 --- a/projects/RabbitMQ.Client/client/api/IChannel.cs +++ b/projects/RabbitMQ.Client/client/api/IChannel.cs @@ -190,7 +190,7 @@ Task BasicConsumeAsync(string queue, bool autoAck, string consumerTag, b /// If set to true, automatically ack the message. /// Cancellation token for this operation. /// - ValueTask BasicGetAsync(string queue, bool autoAck, + Task BasicGetAsync(string queue, bool autoAck, CancellationToken cancellationToken = default); /// diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index b939c8c9c..c39c8be65 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -303,7 +303,7 @@ await _connection.RecordConsumerAsync(rc, recordedEntitiesSemaphoreHeld: false) return resultConsumerTag; } - public ValueTask BasicGetAsync(string queue, bool autoAck, CancellationToken cancellationToken) + public Task BasicGetAsync(string queue, bool autoAck, CancellationToken cancellationToken) => InnerChannel.BasicGetAsync(queue, autoAck, cancellationToken); public ValueTask BasicPublishAsync(string exchange, string routingKey, diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index e4cb0b508..4991fdcc6 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -885,7 +885,7 @@ await ModelSendAsync(method, k.CancellationToken) } } - public async ValueTask BasicGetAsync(string queue, bool autoAck, + public async Task BasicGetAsync(string queue, bool autoAck, CancellationToken cancellationToken) { bool enqueued = false; diff --git a/projects/Test/Integration/TestBasicGet.cs b/projects/Test/Integration/TestBasicGet.cs index 05a7812d7..6afcef81d 100644 --- a/projects/Test/Integration/TestBasicGet.cs +++ b/projects/Test/Integration/TestBasicGet.cs @@ -72,7 +72,7 @@ public Task TestBasicGetWithClosedChannel() { return Assert.ThrowsAsync(() => { - return ch.BasicGetAsync(q, true).AsTask(); + return ch.BasicGetAsync(q, true); }); }); });