diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index 6f9bcba8f2..98fa9fc672 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -1 +1,2 @@ -RabbitMQ.Client.BasicProperties.BasicProperties(RabbitMQ.Client.ReadOnlyBasicProperties! input) -> void \ No newline at end of file +RabbitMQ.Client.BasicProperties.BasicProperties(RabbitMQ.Client.ReadOnlyBasicProperties! input) -> void +RabbitMQ.Client.IConnection.DispatchConsumersAsyncEnabled.get -> bool \ No newline at end of file diff --git a/projects/RabbitMQ.Client/client/api/IConnection.cs b/projects/RabbitMQ.Client/client/api/IConnection.cs index a699b9ce27..286b810570 100644 --- a/projects/RabbitMQ.Client/client/api/IConnection.cs +++ b/projects/RabbitMQ.Client/client/api/IConnection.cs @@ -126,6 +126,11 @@ public interface IConnection : INetworkConnection, IDisposable /// IEnumerable ShutdownReport { get; } + /// + /// Returns true if the connection is set to use asynchronous consumer dispatchers. + /// + public bool DispatchConsumersAsyncEnabled { get; } + /// /// Application-specific connection name, will be displayed in the management UI /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot @@ -236,5 +241,6 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo /// /// Cancellation token Task CreateChannelAsync(CancellationToken cancellationToken = default); + } } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index d09becd74f..298e3066c2 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -176,6 +176,8 @@ public event EventHandler RecoveringConsumer public IProtocol Protocol => Endpoint.Protocol; + public bool DispatchConsumersAsyncEnabled => _config.DispatchConsumersAsync; + public async ValueTask CreateNonRecoveringChannelAsync(CancellationToken cancellationToken) { ISession session = InnerConnection.CreateSession(); diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index f13d9e66b2..db66cce057 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -975,12 +975,20 @@ public async Task BasicConsumeAsync(string queue, bool autoAck, string c { if (ConsumerDispatcher is AsyncConsumerDispatcher) { - if (!(consumer is IAsyncBasicConsumer)) + if (false == (consumer is IAsyncBasicConsumer)) { throw new InvalidOperationException("When using an AsyncConsumerDispatcher, the consumer must implement IAsyncBasicConsumer"); } } + if (ConsumerDispatcher is ConsumerDispatcher) + { + if (consumer is IAsyncBasicConsumer) + { + throw new InvalidOperationException("When using an ConsumerDispatcher, the consumer must not implement IAsyncBasicConsumer"); + } + } + // NOTE: // Maybe don't dispose this instance because the CancellationToken must remain // valid for processing the response. diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 54a84fe062..ad1aeb859d 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -101,6 +101,8 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) public int LocalPort => _frameHandler.LocalPort; public int RemotePort => _frameHandler.RemotePort; + public bool DispatchConsumersAsyncEnabled => _config.DispatchConsumersAsync; + public IDictionary? ServerProperties { get; private set; } public IEnumerable ShutdownReport => _shutdownReport; diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index b871eb7332..ef4cd879a1 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -52,6 +52,8 @@ public TestAsyncConsumer(ITestOutputHelper output) [Fact] public async Task TestBasicRoundtripConcurrent() { + Assert.True(_conn.DispatchConsumersAsyncEnabled); + AddCallbackExceptionHandlers(); _channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output); @@ -145,6 +147,8 @@ public async Task TestBasicRoundtripConcurrent() [Fact] public async Task TestBasicRoundtripConcurrentManyMessages() { + Assert.True(_conn.DispatchConsumersAsyncEnabled); + AddCallbackExceptionHandlers(); _channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output); @@ -320,6 +324,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages() [Fact] public async Task TestBasicRejectAsync() { + Assert.True(_conn.DispatchConsumersAsyncEnabled); + string queueName = GenerateQueueName(); var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -485,6 +491,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, [Fact] public async Task TestBasicNackAsync() { + Assert.True(_conn.DispatchConsumersAsyncEnabled); + var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _conn.ConnectionShutdown += (o, ea) => @@ -558,6 +566,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, [Fact] public async Task NonAsyncConsumerShouldThrowInvalidOperationException() { + Assert.True(_conn.DispatchConsumersAsyncEnabled); + bool sawException = false; QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, false); await _channel.BasicPublishAsync(string.Empty, q.QueueName, GetRandomBody(1024)); @@ -576,6 +586,8 @@ public async Task NonAsyncConsumerShouldThrowInvalidOperationException() [Fact] public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() { + Assert.True(_conn.DispatchConsumersAsyncEnabled); + AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0); var tasks = new List(); for (int i = 0; i < 256; i++) @@ -596,6 +608,8 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() [Fact] public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() { + Assert.True(_conn.DispatchConsumersAsyncEnabled); + string exchangeName = GenerateExchangeName(); string queue1Name = GenerateQueueName(); string queue2Name = GenerateQueueName(); @@ -663,6 +677,8 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() [Fact] public async Task TestCloseWithinEventHandler_GH1567() { + Assert.True(_conn.DispatchConsumersAsyncEnabled); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); QueueDeclareOk q = await _channel.QueueDeclareAsync(); diff --git a/projects/Test/Integration/TestConsumer.cs b/projects/Test/Integration/TestConsumer.cs index d747a51b3e..3b95c14677 100644 --- a/projects/Test/Integration/TestConsumer.cs +++ b/projects/Test/Integration/TestConsumer.cs @@ -49,9 +49,31 @@ public TestConsumer(ITestOutputHelper output) : base(output) { } + [Fact] + public async Task AsyncConsumerShouldThrowInvalidOperationException() + { + Assert.False(_conn.DispatchConsumersAsyncEnabled); + + bool sawException = false; + QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, false); + await _channel.BasicPublishAsync(string.Empty, q.QueueName, GetRandomBody(1024)); + var consumer = new AsyncEventingBasicConsumer(_channel); + try + { + string consumerTag = await _channel.BasicConsumeAsync(q.QueueName, false, string.Empty, false, false, null, consumer); + } + catch (InvalidOperationException) + { + sawException = true; + } + Assert.True(sawException, "did not see expected InvalidOperationException"); + } + [Fact] public async Task TestBasicRoundtrip() { + Assert.False(_conn.DispatchConsumersAsyncEnabled); + TimeSpan waitSpan = TimeSpan.FromSeconds(2); QueueDeclareOk q = await _channel.QueueDeclareAsync(); await _channel.BasicPublishAsync("", q.QueueName, _body); @@ -77,6 +99,8 @@ public async Task TestBasicRoundtrip() [Fact] public async Task TestBasicRoundtripNoWait() { + Assert.False(_conn.DispatchConsumersAsyncEnabled); + QueueDeclareOk q = await _channel.QueueDeclareAsync(); await _channel.BasicPublishAsync("", q.QueueName, _body); var consumer = new EventingBasicConsumer(_channel); @@ -101,6 +125,8 @@ public async Task TestBasicRoundtripNoWait() [Fact] public async Task ConcurrentEventingTestForReceived() { + Assert.False(_conn.DispatchConsumersAsyncEnabled); + const int NumberOfThreads = 4; const int NumberOfRegistrations = 5000;