diff --git a/projects/Test/AsyncIntegration/AsyncIntegrationFixture.cs b/projects/Test/AsyncIntegration/AsyncIntegrationFixture.cs index 5b21e1ea3..a0da8bc5e 100644 --- a/projects/Test/AsyncIntegration/AsyncIntegrationFixture.cs +++ b/projects/Test/AsyncIntegration/AsyncIntegrationFixture.cs @@ -29,6 +29,7 @@ // Copyright (c) 2007-2020 VMware, Inc. All rights reserved. //--------------------------------------------------------------------------- +using System.Collections.Generic; using System.Threading.Tasks; using RabbitMQ.Client; using Xunit; @@ -53,6 +54,18 @@ protected override void SetUp() // InitializeAsync } + protected static Task AssertRanToCompletion(params Task[] tasks) + { + return AssertRanToCompletion(tasks); + } + + protected static async Task AssertRanToCompletion(IEnumerable tasks) + { + Task whenAllTask = Task.WhenAll(tasks); + await whenAllTask; + Assert.Equal(TaskStatus.RanToCompletion, whenAllTask.Status); + } + public virtual async Task InitializeAsync() { _connFactory = CreateConnectionFactory(); diff --git a/projects/Test/AsyncIntegration/TestAsyncConsumer.cs b/projects/Test/AsyncIntegration/TestAsyncConsumer.cs index c9134b6ef..cf7536018 100644 --- a/projects/Test/AsyncIntegration/TestAsyncConsumer.cs +++ b/projects/Test/AsyncIntegration/TestAsyncConsumer.cs @@ -43,7 +43,8 @@ public class TestAsyncConsumer : AsyncIntegrationFixture { private readonly ShutdownEventArgs _closeArgs = new ShutdownEventArgs(ShutdownInitiator.Application, Constants.ReplySuccess, "normal shutdown"); - public TestAsyncConsumer(ITestOutputHelper output) : base(output, true, 2) + public TestAsyncConsumer(ITestOutputHelper output) + : base(output, dispatchConsumersAsync: true, consumerDispatchConcurrency: 2) { } @@ -113,7 +114,7 @@ public async Task TestBasicRoundtripConcurrent() await _channel.BasicConsumeAsync(q.QueueName, true, string.Empty, false, false, null, consumer); // ensure we get a delivery - await Task.WhenAll(publish1SyncSource.Task, publish2SyncSource.Task); + await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task); bool result1 = await publish1SyncSource.Task; Assert.True(result1, $"1 - Non concurrent dispatch lead to deadlock after {maximumWaitTime}"); @@ -216,7 +217,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() await _channel.BasicConsumeAsync(queueName, true, string.Empty, false, false, null, consumer); // ensure we get a delivery - await Task.WhenAll(publish1SyncSource.Task, publish2SyncSource.Task); + await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task); bool result1 = await publish1SyncSource.Task; Assert.True(result1, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}"); @@ -226,7 +227,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() } }); - await Task.WhenAll(publishTask, consumeTask); + await AssertRanToCompletion(publishTask, consumeTask); } [Fact] diff --git a/projects/Test/AsyncIntegration/TestAsyncConsumerExceptions.cs b/projects/Test/AsyncIntegration/TestAsyncConsumerExceptions.cs index 196898431..d26b85fd6 100644 --- a/projects/Test/AsyncIntegration/TestAsyncConsumerExceptions.cs +++ b/projects/Test/AsyncIntegration/TestAsyncConsumerExceptions.cs @@ -43,53 +43,54 @@ public class TestAsyncConsumerExceptions : AsyncIntegrationFixture { private static readonly Exception TestException = new Exception("oops"); - public TestAsyncConsumerExceptions(ITestOutputHelper output) : base(output, true, 1) + public TestAsyncConsumerExceptions(ITestOutputHelper output) + : base(output, dispatchConsumersAsync: true, consumerDispatchConcurrency: 1) { } [Fact] - public async Task TestCancelNotificationExceptionHandling() + public Task TestCancelNotificationExceptionHandling() { IBasicConsumer consumer = new ConsumerFailingOnCancel(_channel); - await TestExceptionHandlingWith(consumer, async (ch, q, c, ct) => + return TestExceptionHandlingWith(consumer, async (ch, q, c, ct) => { await ch.QueueDeleteAsync(q, false, false); }); } [Fact] - public async Task TestConsumerCancelOkExceptionHandling() + public Task TestConsumerCancelOkExceptionHandling() { IBasicConsumer consumer = new ConsumerFailingOnCancelOk(_channel); - await TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.BasicCancelAsync(ct)); + return TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.BasicCancelAsync(ct)); } [Fact] - public async Task TestConsumerConsumeOkExceptionHandling() + public Task TestConsumerConsumeOkExceptionHandling() { IBasicConsumer consumer = new ConsumerFailingOnConsumeOk(_channel); - await TestExceptionHandlingWith(consumer, async (ch, q, c, ct) => await Task.Yield()); + return TestExceptionHandlingWith(consumer, async (ch, q, c, ct) => await Task.Yield()); } [Fact] - public async Task TestConsumerShutdownExceptionHandling() + public Task TestConsumerShutdownExceptionHandling() { IBasicConsumer consumer = new ConsumerFailingOnShutdown(_channel); - await TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.CloseAsync()); + return TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.CloseAsync()); } [Fact] - public async Task TestDeliveryExceptionHandling() + public Task TestDeliveryExceptionHandling() { IBasicConsumer consumer = new ConsumerFailingOnDelivery(_channel); - await TestExceptionHandlingWith(consumer, (ch, q, c, ct) => + return TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.BasicPublishAsync("", q, _encoding.GetBytes("msg"))); } protected async Task TestExceptionHandlingWith(IBasicConsumer consumer, Func action) { - var waitSpan = TimeSpan.FromSeconds(2); + var waitSpan = TimeSpan.FromSeconds(5); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var cts = new CancellationTokenSource(waitSpan); cts.Token.Register(() => tcs.TrySetResult(false)); diff --git a/projects/Test/AsyncIntegration/TestBasicGetAsync.cs b/projects/Test/AsyncIntegration/TestBasicGetAsync.cs index f53d50fb6..6d43ebd0a 100644 --- a/projects/Test/AsyncIntegration/TestBasicGetAsync.cs +++ b/projects/Test/AsyncIntegration/TestBasicGetAsync.cs @@ -57,6 +57,8 @@ public async Task TestBasicGet() QueueDeclareOk queueResultPassive = await _channel.QueueDeclareAsync(queueName, true, true, true, true, null); Assert.Equal((uint)0, queueResultPassive.MessageCount); + + Assert.Null(await _channel.BasicGetAsync(queueName, true)); } } } diff --git a/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs b/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs index 7bfa6d427..e6a526a13 100644 --- a/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs +++ b/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs @@ -29,7 +29,6 @@ // Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved. //--------------------------------------------------------------------------- -using System; using System.Threading.Tasks; using RabbitMQ.Client; using Xunit; @@ -53,22 +52,20 @@ public async Task TestQueuePurgeAsync() await _channel.ConfirmSelectAsync(); QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null); - string queueName = q.QueueName; var publishTask = Task.Run(async () => { + byte[] body = GetRandomBody(512); for (int i = 0; i < messageCount; i++) { - byte[] body = _encoding.GetBytes(Guid.NewGuid().ToString()); - await _channel.BasicPublishAsync(string.Empty, queueName, body); + await _channel.BasicPublishAsync(string.Empty, q, body); } + await _channel.WaitForConfirmsOrDieAsync(); publishSyncSource.SetResult(true); }); - await _channel.WaitForConfirmsOrDieAsync(); Assert.True(await publishSyncSource.Task); - - Assert.Equal((uint)messageCount, await _channel.QueuePurgeAsync(queueName)); + Assert.Equal((uint)messageCount, await _channel.QueuePurgeAsync(q)); } } } diff --git a/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs b/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs index dc250bd5c..78e672fa7 100644 --- a/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs +++ b/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs @@ -151,9 +151,7 @@ private async Task TestConcurrentChannelOperationsAsync(Func tasks.Add(action(_conn)); } } - Task t = Task.WhenAll(tasks); - await t; - Assert.Equal(TaskStatus.RanToCompletion, t.Status); + await AssertRanToCompletion(tasks); // incorrect frame interleaving in these tests will result // in an unrecoverable connection-level exception, thus diff --git a/projects/Test/AsyncIntegration/TestConfirmSelectAsync.cs b/projects/Test/AsyncIntegration/TestConfirmSelectAsync.cs index 6e53e0852..e755c3e5e 100644 --- a/projects/Test/AsyncIntegration/TestConfirmSelectAsync.cs +++ b/projects/Test/AsyncIntegration/TestConfirmSelectAsync.cs @@ -38,6 +38,8 @@ namespace Test.AsyncIntegration { public class TestConfirmSelectAsync : AsyncIntegrationFixture { + readonly byte[] _message = GetRandomBody(64); + public TestConfirmSelectAsync(ITestOutputHelper output) : base(output) { } @@ -63,7 +65,7 @@ public async Task TestConfirmSelectIdempotency() private ValueTask Publish() { - return _channel.BasicPublishAsync("", "amq.fanout", _encoding.GetBytes("message")); + return _channel.BasicPublishAsync("", "amq.fanout", _message); } } } diff --git a/projects/Test/AsyncIntegration/TestExchangeDeclareAsync.cs b/projects/Test/AsyncIntegration/TestExchangeDeclareAsync.cs index 70b8c5918..ed460f498 100644 --- a/projects/Test/AsyncIntegration/TestExchangeDeclareAsync.cs +++ b/projects/Test/AsyncIntegration/TestExchangeDeclareAsync.cs @@ -50,7 +50,7 @@ public TestExchangeDeclareAsync(ITestOutputHelper output) : base(output) public async Task TestConcurrentExchangeDeclareAndBindAsync() { var exchangeNames = new ConcurrentBag(); - var ts = new List(); + var tasks = new List(); NotSupportedException nse = null; for (int i = 0; i < 256; i++) { @@ -70,12 +70,12 @@ async Task f() } } var t = Task.Run(f); - ts.Add(t); + tasks.Add(t); } - await Task.WhenAll(ts); + await AssertRanToCompletion(tasks); Assert.Null(nse); - ts.Clear(); + tasks.Clear(); foreach (string exchangeName in exchangeNames) { @@ -93,11 +93,18 @@ async Task f() } } var t = Task.Run(f); - ts.Add(t); + tasks.Add(t); } - await Task.WhenAll(ts); + await AssertRanToCompletion(tasks); Assert.Null(nse); + + async Task AssertRanToCompletion(IEnumerable tasks) + { + Task whenAllTask = Task.WhenAll(tasks); + await whenAllTask; + Assert.Equal(TaskStatus.RanToCompletion, whenAllTask.Status); + } } } } diff --git a/projects/Test/AsyncIntegration/TestQueueDeclareAsync.cs b/projects/Test/AsyncIntegration/TestQueueDeclareAsync.cs index 7deb6bd3e..f7ca9367d 100644 --- a/projects/Test/AsyncIntegration/TestQueueDeclareAsync.cs +++ b/projects/Test/AsyncIntegration/TestQueueDeclareAsync.cs @@ -84,8 +84,8 @@ public async void TestConcurrentQueueDeclareAndBindAsync() }); }; - var ts = new List(); - var qs = new ConcurrentBag(); + var tasks = new List(); + var queues = new ConcurrentBag(); NotSupportedException nse = null; for (int i = 0; i < 256; i++) @@ -100,7 +100,7 @@ async Task f() QueueDeclareOk r = await _channel.QueueDeclareAsync(queue: string.Empty, passive: false, false, false, false, null); string queueName = r.QueueName; await _channel.QueueBindAsync(queue: queueName, exchange: "amq.fanout", routingKey: queueName, null); - qs.Add(queueName); + queues.Add(queueName); } catch (NotSupportedException e) { @@ -108,15 +108,15 @@ async Task f() } } var t = Task.Run(f); - ts.Add(t); + tasks.Add(t); } - await Task.WhenAll(ts); + await AssertRanToCompletion(tasks); Assert.Null(nse); - ts.Clear(); + tasks.Clear(); nse = null; - foreach (string q in qs) + foreach (string q in queues) { async Task f() { @@ -139,10 +139,10 @@ async Task f() } } var t = Task.Run(f); - ts.Add(t); + tasks.Add(t); } - await Task.WhenAll(ts); + await AssertRanToCompletion(tasks); Assert.Null(nse); Assert.False(sawShutdown); }