Skip to content

Commit

Permalink
Add DispatchConsumersAsyncEnabled property on IConnection (#1611)
Browse files Browse the repository at this point in the history
* Name new property `DispatchConsumersAsyncEnabled`.
* Add a check on `BasicConsume` for when a regular dispatcher is used, and an async consumer passed.
* Test the new `DispatchConsumersAsyncEnabled` property.
  • Loading branch information
luizcarlosfaria authored and lukebakken committed Jun 27, 2024
1 parent 56b3ebf commit 5ccfd7f
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 2 deletions.
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
RabbitMQ.Client.BasicProperties.BasicProperties(RabbitMQ.Client.ReadOnlyBasicProperties! input) -> void
RabbitMQ.Client.BasicProperties.BasicProperties(RabbitMQ.Client.ReadOnlyBasicProperties! input) -> void
RabbitMQ.Client.IConnection.DispatchConsumersAsyncEnabled.get -> bool
6 changes: 6 additions & 0 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ public interface IConnection : INetworkConnection, IDisposable
/// </summary>
IEnumerable<ShutdownReportEntry> ShutdownReport { get; }

/// <summary>
/// Returns <c>true</c> if the connection is set to use asynchronous consumer dispatchers.
/// </summary>
public bool DispatchConsumersAsyncEnabled { get; }

/// <summary>
/// Application-specific connection name, will be displayed in the management UI
/// if RabbitMQ server supports it. This value doesn't have to be unique and cannot
Expand Down Expand Up @@ -236,5 +241,6 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
/// </summary>
/// <param name="cancellationToken">Cancellation token</param>
Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer

public IProtocol Protocol => Endpoint.Protocol;

public bool DispatchConsumersAsyncEnabled => _config.DispatchConsumersAsync;

public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(CancellationToken cancellationToken)
{
ISession session = InnerConnection.CreateSession();
Expand Down
10 changes: 9 additions & 1 deletion projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -975,12 +975,20 @@ public async Task<string> BasicConsumeAsync(string queue, bool autoAck, string c
{
if (ConsumerDispatcher is AsyncConsumerDispatcher)
{
if (!(consumer is IAsyncBasicConsumer))
if (false == (consumer is IAsyncBasicConsumer))
{
throw new InvalidOperationException("When using an AsyncConsumerDispatcher, the consumer must implement IAsyncBasicConsumer");
}
}

if (ConsumerDispatcher is ConsumerDispatcher)
{
if (consumer is IAsyncBasicConsumer)
{
throw new InvalidOperationException("When using an ConsumerDispatcher, the consumer must not implement IAsyncBasicConsumer");
}
}

// NOTE:
// Maybe don't dispose this instance because the CancellationToken must remain
// valid for processing the response.
Expand Down
2 changes: 2 additions & 0 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
public int LocalPort => _frameHandler.LocalPort;
public int RemotePort => _frameHandler.RemotePort;

public bool DispatchConsumersAsyncEnabled => _config.DispatchConsumersAsync;

public IDictionary<string, object?>? ServerProperties { get; private set; }

public IEnumerable<ShutdownReportEntry> ShutdownReport => _shutdownReport;
Expand Down
16 changes: 16 additions & 0 deletions projects/Test/Integration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public TestAsyncConsumer(ITestOutputHelper output)
[Fact]
public async Task TestBasicRoundtripConcurrent()
{
Assert.True(_conn.DispatchConsumersAsyncEnabled);

AddCallbackExceptionHandlers();
_channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output);

Expand Down Expand Up @@ -145,6 +147,8 @@ public async Task TestBasicRoundtripConcurrent()
[Fact]
public async Task TestBasicRoundtripConcurrentManyMessages()
{
Assert.True(_conn.DispatchConsumersAsyncEnabled);

AddCallbackExceptionHandlers();
_channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output);

Expand Down Expand Up @@ -320,6 +324,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
[Fact]
public async Task TestBasicRejectAsync()
{
Assert.True(_conn.DispatchConsumersAsyncEnabled);

string queueName = GenerateQueueName();

var publishSyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down Expand Up @@ -485,6 +491,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
[Fact]
public async Task TestBasicNackAsync()
{
Assert.True(_conn.DispatchConsumersAsyncEnabled);

var publishSyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

_conn.ConnectionShutdown += (o, ea) =>
Expand Down Expand Up @@ -558,6 +566,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
[Fact]
public async Task NonAsyncConsumerShouldThrowInvalidOperationException()
{
Assert.True(_conn.DispatchConsumersAsyncEnabled);

bool sawException = false;
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, false);
await _channel.BasicPublishAsync(string.Empty, q.QueueName, GetRandomBody(1024));
Expand All @@ -576,6 +586,8 @@ public async Task NonAsyncConsumerShouldThrowInvalidOperationException()
[Fact]
public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
{
Assert.True(_conn.DispatchConsumersAsyncEnabled);

AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0);
var tasks = new List<Task>();
for (int i = 0; i < 256; i++)
Expand All @@ -596,6 +608,8 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
[Fact]
public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
{
Assert.True(_conn.DispatchConsumersAsyncEnabled);

string exchangeName = GenerateExchangeName();
string queue1Name = GenerateQueueName();
string queue2Name = GenerateQueueName();
Expand Down Expand Up @@ -663,6 +677,8 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
[Fact]
public async Task TestCloseWithinEventHandler_GH1567()
{
Assert.True(_conn.DispatchConsumersAsyncEnabled);

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

QueueDeclareOk q = await _channel.QueueDeclareAsync();
Expand Down
26 changes: 26 additions & 0 deletions projects/Test/Integration/TestConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,31 @@ public TestConsumer(ITestOutputHelper output) : base(output)
{
}

[Fact]
public async Task AsyncConsumerShouldThrowInvalidOperationException()
{
Assert.False(_conn.DispatchConsumersAsyncEnabled);

bool sawException = false;
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, false);
await _channel.BasicPublishAsync(string.Empty, q.QueueName, GetRandomBody(1024));
var consumer = new AsyncEventingBasicConsumer(_channel);
try
{
string consumerTag = await _channel.BasicConsumeAsync(q.QueueName, false, string.Empty, false, false, null, consumer);
}
catch (InvalidOperationException)
{
sawException = true;
}
Assert.True(sawException, "did not see expected InvalidOperationException");
}

[Fact]
public async Task TestBasicRoundtrip()
{
Assert.False(_conn.DispatchConsumersAsyncEnabled);

TimeSpan waitSpan = TimeSpan.FromSeconds(2);
QueueDeclareOk q = await _channel.QueueDeclareAsync();
await _channel.BasicPublishAsync("", q.QueueName, _body);
Expand All @@ -77,6 +99,8 @@ public async Task TestBasicRoundtrip()
[Fact]
public async Task TestBasicRoundtripNoWait()
{
Assert.False(_conn.DispatchConsumersAsyncEnabled);

QueueDeclareOk q = await _channel.QueueDeclareAsync();
await _channel.BasicPublishAsync("", q.QueueName, _body);
var consumer = new EventingBasicConsumer(_channel);
Expand All @@ -101,6 +125,8 @@ public async Task TestBasicRoundtripNoWait()
[Fact]
public async Task ConcurrentEventingTestForReceived()
{
Assert.False(_conn.DispatchConsumersAsyncEnabled);

const int NumberOfThreads = 4;
const int NumberOfRegistrations = 5000;

Expand Down

0 comments on commit 5ccfd7f

Please sign in to comment.