From ec61e457b0c5873e2c683b80a82004b92b82c7d1 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 21 Oct 2024 15:48:39 -0700 Subject: [PATCH] Modify `TestPublisherConfirmationThrottling` to use toxiproxy. --- projects/Test/Integration/TestBasicPublish.cs | 68 ------------ projects/Test/Integration/TestToxiproxy.cs | 104 ++++++++++++++++++ 2 files changed, 104 insertions(+), 68 deletions(-) diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs index 2ef059e74..107460fd8 100644 --- a/projects/Test/Integration/TestBasicPublish.cs +++ b/projects/Test/Integration/TestBasicPublish.cs @@ -322,73 +322,5 @@ public async Task TestPropertiesRoundtrip_Headers() Assert.Equal(sendBody, consumeBody); Assert.Equal("World", response); } - - [Fact] - public async Task TestPublisherConfirmationThrottling() - { - const int MaxOutstandingConfirms = 4; - - var channelOpts = new CreateChannelOptions - { - PublisherConfirmationsEnabled = true, - PublisherConfirmationTrackingEnabled = true, - OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms) - }; - - var channelCreatedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var messagesPublishedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - Task publishTask = Task.Run(async () => - { - ConnectionFactory cf = CreateConnectionFactory(); - - await using (IConnection conn = await cf.CreateConnectionAsync()) - { - await using (IChannel ch = await conn.CreateChannelAsync(channelOpts)) - { - QueueDeclareOk q = await ch.QueueDeclareAsync(); - - channelCreatedTcs.SetResult(true); - - int publishCount = 0; - /* - * Note: if batchSize equals MaxOutstandingConfirms, - * a delay is added per-publish and this test takes much longer - * to run. TODO figure out how the heck to test that - */ - int batchSize = MaxOutstandingConfirms / 2; - try - { - while (publishCount < 128) - { - var publishBatch = new List(); - for (int i = 0; i < batchSize; i++) - { - publishBatch.Add(ch.BasicPublishAsync("", q.QueueName, GetRandomBody())); - } - - foreach (ValueTask pt in publishBatch) - { - await pt; - publishCount++; - } - - publishBatch.Clear(); - publishBatch = null; - } - - messagesPublishedTcs.SetResult(true); - } - catch (Exception ex) - { - messagesPublishedTcs.SetException(ex); - } - } - } - }); - - await channelCreatedTcs.Task.WaitAsync(WaitSpan); - await messagesPublishedTcs.Task.WaitAsync(WaitSpan); - await publishTask.WaitAsync(WaitSpan); - } } } diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs index dc445147a..d6567ee1b 100644 --- a/projects/Test/Integration/TestToxiproxy.cs +++ b/projects/Test/Integration/TestToxiproxy.cs @@ -30,7 +30,9 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Net; +using System.Threading; using System.Threading.Tasks; using Integration; using RabbitMQ.Client; @@ -284,6 +286,108 @@ public async Task TestTcpReset_GH1464() await recoveryTask; } + [SkippableFact] + [Trait("Category", "Toxiproxy")] + public async Task TestPublisherConfirmationThrottling() + { + Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); + + const int TotalMessageCount = 64; + const int MaxOutstandingConfirms = 8; + const int BatchSize = MaxOutstandingConfirms * 2; + + using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows); + await pm.InitializeAsync(); + + ConnectionFactory cf = CreateConnectionFactory(); + cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), pm.ProxyPort); + cf.RequestedHeartbeat = TimeSpan.FromSeconds(5); + cf.AutomaticRecoveryEnabled = true; + + var channelOpts = new CreateChannelOptions + { + PublisherConfirmationsEnabled = true, + PublisherConfirmationTrackingEnabled = true, + OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms) + }; + + var channelCreatedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var messagesPublishedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + long publishCount = 0; + Task publishTask = Task.Run(async () => + { + await using (IConnection conn = await cf.CreateConnectionAsync()) + { + await using (IChannel ch = await conn.CreateChannelAsync(channelOpts)) + { + QueueDeclareOk q = await ch.QueueDeclareAsync(); + + channelCreatedTcs.SetResult(true); + + try + { + var publishBatch = new List(); + while (publishCount < TotalMessageCount) + { + for (int i = 0; i < BatchSize; i++) + { + publishBatch.Add(ch.BasicPublishAsync("", q.QueueName, GetRandomBody())); + } + + foreach (ValueTask pt in publishBatch) + { + await pt; + Interlocked.Increment(ref publishCount); + } + + publishBatch.Clear(); + } + + messagesPublishedTcs.SetResult(true); + } + catch (Exception ex) + { + messagesPublishedTcs.SetException(ex); + } + } + } + }); + + await channelCreatedTcs.Task; + + const string toxicName = "rmq-localhost-bandwidth"; + var bandwidthToxic = new BandwidthToxic(); + bandwidthToxic.Name = toxicName; + bandwidthToxic.Attributes.Rate = 0; + bandwidthToxic.Toxicity = 1.0; + bandwidthToxic.Stream = ToxicDirection.DownStream; + + await Task.Delay(TimeSpan.FromSeconds(1)); + + Task addToxicTask = pm.AddToxicAsync(bandwidthToxic); + + while (true) + { + long publishCount0 = Interlocked.Read(ref publishCount); + await Task.Delay(TimeSpan.FromSeconds(5)); + long publishCount1 = Interlocked.Read(ref publishCount); + + if (publishCount0 == publishCount1) + { + // Publishing has "settled" due to being blocked + break; + } + } + + await addToxicTask.WaitAsync(WaitSpan); + await pm.RemoveToxicAsync(toxicName).WaitAsync(WaitSpan); + + await messagesPublishedTcs.Task.WaitAsync(WaitSpan); + await publishTask.WaitAsync(WaitSpan); + + Assert.Equal(TotalMessageCount, publishCount); + } + private bool AreToxiproxyTestsEnabled { get