Skip to content

Commit

Permalink
Gold-plating async tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Nov 17, 2023
1 parent 1fca1ba commit 923d31c
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 42 deletions.
13 changes: 13 additions & 0 deletions projects/Test/AsyncIntegration/AsyncIntegrationFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.Client;
using Xunit;
Expand All @@ -53,6 +54,18 @@ protected override void SetUp()
// InitializeAsync
}

protected static Task AssertRanToCompletion(params Task[] tasks)
{
return AssertRanToCompletion(tasks);
}

protected static async Task AssertRanToCompletion(IEnumerable<Task> tasks)
{
Task whenAllTask = Task.WhenAll(tasks);
await whenAllTask;
Assert.Equal(TaskStatus.RanToCompletion, whenAllTask.Status);
}

public virtual async Task InitializeAsync()
{
_connFactory = CreateConnectionFactory();
Expand Down
9 changes: 5 additions & 4 deletions projects/Test/AsyncIntegration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public class TestAsyncConsumer : AsyncIntegrationFixture
{
private readonly ShutdownEventArgs _closeArgs = new ShutdownEventArgs(ShutdownInitiator.Application, Constants.ReplySuccess, "normal shutdown");

public TestAsyncConsumer(ITestOutputHelper output) : base(output, true, 2)
public TestAsyncConsumer(ITestOutputHelper output)
: base(output, dispatchConsumersAsync: true, consumerDispatchConcurrency: 2)
{
}

Expand Down Expand Up @@ -113,7 +114,7 @@ public async Task TestBasicRoundtripConcurrent()
await _channel.BasicConsumeAsync(q.QueueName, true, string.Empty, false, false, null, consumer);

// ensure we get a delivery
await Task.WhenAll(publish1SyncSource.Task, publish2SyncSource.Task);
await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task);

bool result1 = await publish1SyncSource.Task;
Assert.True(result1, $"1 - Non concurrent dispatch lead to deadlock after {maximumWaitTime}");
Expand Down Expand Up @@ -216,7 +217,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
await _channel.BasicConsumeAsync(queueName, true, string.Empty, false, false, null, consumer);
// ensure we get a delivery
await Task.WhenAll(publish1SyncSource.Task, publish2SyncSource.Task);
await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task);
bool result1 = await publish1SyncSource.Task;
Assert.True(result1, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}");
Expand All @@ -226,7 +227,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
}
});

await Task.WhenAll(publishTask, consumeTask);
await AssertRanToCompletion(publishTask, consumeTask);
}

[Fact]
Expand Down
25 changes: 13 additions & 12 deletions projects/Test/AsyncIntegration/TestAsyncConsumerExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,53 +43,54 @@ public class TestAsyncConsumerExceptions : AsyncIntegrationFixture
{
private static readonly Exception TestException = new Exception("oops");

public TestAsyncConsumerExceptions(ITestOutputHelper output) : base(output, true, 1)
public TestAsyncConsumerExceptions(ITestOutputHelper output)
: base(output, dispatchConsumersAsync: true, consumerDispatchConcurrency: 1)
{
}

[Fact]
public async Task TestCancelNotificationExceptionHandling()
public Task TestCancelNotificationExceptionHandling()
{
IBasicConsumer consumer = new ConsumerFailingOnCancel(_channel);
await TestExceptionHandlingWith(consumer, async (ch, q, c, ct) =>
return TestExceptionHandlingWith(consumer, async (ch, q, c, ct) =>
{
await ch.QueueDeleteAsync(q, false, false);
});
}

[Fact]
public async Task TestConsumerCancelOkExceptionHandling()
public Task TestConsumerCancelOkExceptionHandling()
{
IBasicConsumer consumer = new ConsumerFailingOnCancelOk(_channel);
await TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.BasicCancelAsync(ct));
return TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.BasicCancelAsync(ct));
}

[Fact]
public async Task TestConsumerConsumeOkExceptionHandling()
public Task TestConsumerConsumeOkExceptionHandling()
{
IBasicConsumer consumer = new ConsumerFailingOnConsumeOk(_channel);
await TestExceptionHandlingWith(consumer, async (ch, q, c, ct) => await Task.Yield());
return TestExceptionHandlingWith(consumer, async (ch, q, c, ct) => await Task.Yield());
}

[Fact]
public async Task TestConsumerShutdownExceptionHandling()
public Task TestConsumerShutdownExceptionHandling()
{
IBasicConsumer consumer = new ConsumerFailingOnShutdown(_channel);
await TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.CloseAsync());
return TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.CloseAsync());
}

[Fact]
public async Task TestDeliveryExceptionHandling()
public Task TestDeliveryExceptionHandling()
{
IBasicConsumer consumer = new ConsumerFailingOnDelivery(_channel);
await TestExceptionHandlingWith(consumer, (ch, q, c, ct) =>
return TestExceptionHandlingWith(consumer, (ch, q, c, ct) =>
ch.BasicPublishAsync("", q, _encoding.GetBytes("msg")));
}

protected async Task TestExceptionHandlingWith(IBasicConsumer consumer,
Func<IChannel, string, IBasicConsumer, string, ValueTask> action)
{
var waitSpan = TimeSpan.FromSeconds(2);
var waitSpan = TimeSpan.FromSeconds(5);
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var cts = new CancellationTokenSource(waitSpan);
cts.Token.Register(() => tcs.TrySetResult(false));
Expand Down
2 changes: 2 additions & 0 deletions projects/Test/AsyncIntegration/TestBasicGetAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public async Task TestBasicGet()

QueueDeclareOk queueResultPassive = await _channel.QueueDeclareAsync(queueName, true, true, true, true, null);
Assert.Equal((uint)0, queueResultPassive.MessageCount);

Assert.Null(await _channel.BasicGetAsync(queueName, true));
}
}
}
11 changes: 4 additions & 7 deletions projects/Test/AsyncIntegration/TestBasicPublishAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
// Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using RabbitMQ.Client;
using Xunit;
Expand All @@ -53,22 +52,20 @@ public async Task TestQueuePurgeAsync()
await _channel.ConfirmSelectAsync();

QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null);
string queueName = q.QueueName;

var publishTask = Task.Run(async () =>
{
byte[] body = GetRandomBody(512);
for (int i = 0; i < messageCount; i++)
{
byte[] body = _encoding.GetBytes(Guid.NewGuid().ToString());
await _channel.BasicPublishAsync(string.Empty, queueName, body);
await _channel.BasicPublishAsync(string.Empty, q, body);
}
await _channel.WaitForConfirmsOrDieAsync();
publishSyncSource.SetResult(true);
});

await _channel.WaitForConfirmsOrDieAsync();
Assert.True(await publishSyncSource.Task);

Assert.Equal((uint)messageCount, await _channel.QueuePurgeAsync(queueName));
Assert.Equal((uint)messageCount, await _channel.QueuePurgeAsync(q));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,7 @@ private async Task TestConcurrentChannelOperationsAsync(Func<IConnection, Task>
tasks.Add(action(_conn));
}
}
Task t = Task.WhenAll(tasks);
await t;
Assert.Equal(TaskStatus.RanToCompletion, t.Status);
await AssertRanToCompletion(tasks);

// incorrect frame interleaving in these tests will result
// in an unrecoverable connection-level exception, thus
Expand Down
4 changes: 3 additions & 1 deletion projects/Test/AsyncIntegration/TestConfirmSelectAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ namespace Test.AsyncIntegration
{
public class TestConfirmSelectAsync : AsyncIntegrationFixture
{
readonly byte[] _message = GetRandomBody(64);

public TestConfirmSelectAsync(ITestOutputHelper output) : base(output)
{
}
Expand All @@ -63,7 +65,7 @@ public async Task TestConfirmSelectIdempotency()

private ValueTask Publish()
{
return _channel.BasicPublishAsync("", "amq.fanout", _encoding.GetBytes("message"));
return _channel.BasicPublishAsync("", "amq.fanout", _message);
}
}
}
19 changes: 13 additions & 6 deletions projects/Test/AsyncIntegration/TestExchangeDeclareAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public TestExchangeDeclareAsync(ITestOutputHelper output) : base(output)
public async Task TestConcurrentExchangeDeclareAndBindAsync()
{
var exchangeNames = new ConcurrentBag<string>();
var ts = new List<Task>();
var tasks = new List<Task>();
NotSupportedException nse = null;
for (int i = 0; i < 256; i++)
{
Expand All @@ -70,12 +70,12 @@ async Task f()
}
}
var t = Task.Run(f);
ts.Add(t);
tasks.Add(t);
}

await Task.WhenAll(ts);
await AssertRanToCompletion(tasks);
Assert.Null(nse);
ts.Clear();
tasks.Clear();

foreach (string exchangeName in exchangeNames)
{
Expand All @@ -93,11 +93,18 @@ async Task f()
}
}
var t = Task.Run(f);
ts.Add(t);
tasks.Add(t);
}

await Task.WhenAll(ts);
await AssertRanToCompletion(tasks);
Assert.Null(nse);

async Task AssertRanToCompletion(IEnumerable<Task> tasks)
{
Task whenAllTask = Task.WhenAll(tasks);
await whenAllTask;
Assert.Equal(TaskStatus.RanToCompletion, whenAllTask.Status);
}
}
}
}
18 changes: 9 additions & 9 deletions projects/Test/AsyncIntegration/TestQueueDeclareAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public async void TestConcurrentQueueDeclareAndBindAsync()
});
};

var ts = new List<Task>();
var qs = new ConcurrentBag<string>();
var tasks = new List<Task>();
var queues = new ConcurrentBag<string>();

NotSupportedException nse = null;
for (int i = 0; i < 256; i++)
Expand All @@ -100,23 +100,23 @@ async Task f()
QueueDeclareOk r = await _channel.QueueDeclareAsync(queue: string.Empty, passive: false, false, false, false, null);
string queueName = r.QueueName;
await _channel.QueueBindAsync(queue: queueName, exchange: "amq.fanout", routingKey: queueName, null);
qs.Add(queueName);
queues.Add(queueName);
}
catch (NotSupportedException e)
{
nse = e;
}
}
var t = Task.Run(f);
ts.Add(t);
tasks.Add(t);
}

await Task.WhenAll(ts);
await AssertRanToCompletion(tasks);
Assert.Null(nse);
ts.Clear();
tasks.Clear();

nse = null;
foreach (string q in qs)
foreach (string q in queues)
{
async Task f()
{
Expand All @@ -139,10 +139,10 @@ async Task f()
}
}
var t = Task.Run(f);
ts.Add(t);
tasks.Add(t);
}

await Task.WhenAll(ts);
await AssertRanToCompletion(tasks);
Assert.Null(nse);
Assert.False(sawShutdown);
}
Expand Down

0 comments on commit 923d31c

Please sign in to comment.