Skip to content

Commit

Permalink
Modify TestPublisherConfirmationThrottling to use toxiproxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Oct 21, 2024
1 parent e02734d commit ec61e45
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 68 deletions.
68 changes: 0 additions & 68 deletions projects/Test/Integration/TestBasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -322,73 +322,5 @@ public async Task TestPropertiesRoundtrip_Headers()
Assert.Equal(sendBody, consumeBody);
Assert.Equal("World", response);
}

[Fact]
public async Task TestPublisherConfirmationThrottling()
{
const int MaxOutstandingConfirms = 4;

var channelOpts = new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true,
OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms)
};

var channelCreatedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var messagesPublishedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Task publishTask = Task.Run(async () =>
{
ConnectionFactory cf = CreateConnectionFactory();
await using (IConnection conn = await cf.CreateConnectionAsync())
{
await using (IChannel ch = await conn.CreateChannelAsync(channelOpts))
{
QueueDeclareOk q = await ch.QueueDeclareAsync();
channelCreatedTcs.SetResult(true);
int publishCount = 0;
/*
* Note: if batchSize equals MaxOutstandingConfirms,
* a delay is added per-publish and this test takes much longer
* to run. TODO figure out how the heck to test that
*/
int batchSize = MaxOutstandingConfirms / 2;
try
{
while (publishCount < 128)
{
var publishBatch = new List<ValueTask>();
for (int i = 0; i < batchSize; i++)
{
publishBatch.Add(ch.BasicPublishAsync("", q.QueueName, GetRandomBody()));
}
foreach (ValueTask pt in publishBatch)
{
await pt;
publishCount++;
}
publishBatch.Clear();
publishBatch = null;
}
messagesPublishedTcs.SetResult(true);
}
catch (Exception ex)
{
messagesPublishedTcs.SetException(ex);
}
}
}
});

await channelCreatedTcs.Task.WaitAsync(WaitSpan);
await messagesPublishedTcs.Task.WaitAsync(WaitSpan);
await publishTask.WaitAsync(WaitSpan);
}
}
}
104 changes: 104 additions & 0 deletions projects/Test/Integration/TestToxiproxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Integration;
using RabbitMQ.Client;
Expand Down Expand Up @@ -284,6 +286,108 @@ public async Task TestTcpReset_GH1464()
await recoveryTask;
}

[SkippableFact]
[Trait("Category", "Toxiproxy")]
public async Task TestPublisherConfirmationThrottling()
{
Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test");

const int TotalMessageCount = 64;
const int MaxOutstandingConfirms = 8;
const int BatchSize = MaxOutstandingConfirms * 2;

using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows);
await pm.InitializeAsync();

ConnectionFactory cf = CreateConnectionFactory();
cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), pm.ProxyPort);
cf.RequestedHeartbeat = TimeSpan.FromSeconds(5);
cf.AutomaticRecoveryEnabled = true;

var channelOpts = new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true,
OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms)
};

var channelCreatedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var messagesPublishedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
long publishCount = 0;
Task publishTask = Task.Run(async () =>
{
await using (IConnection conn = await cf.CreateConnectionAsync())
{
await using (IChannel ch = await conn.CreateChannelAsync(channelOpts))
{
QueueDeclareOk q = await ch.QueueDeclareAsync();
channelCreatedTcs.SetResult(true);
try
{
var publishBatch = new List<ValueTask>();
while (publishCount < TotalMessageCount)
{
for (int i = 0; i < BatchSize; i++)
{
publishBatch.Add(ch.BasicPublishAsync("", q.QueueName, GetRandomBody()));
}
foreach (ValueTask pt in publishBatch)
{
await pt;
Interlocked.Increment(ref publishCount);
}
publishBatch.Clear();
}
messagesPublishedTcs.SetResult(true);
}
catch (Exception ex)
{
messagesPublishedTcs.SetException(ex);
}
}
}
});

await channelCreatedTcs.Task;

const string toxicName = "rmq-localhost-bandwidth";
var bandwidthToxic = new BandwidthToxic();
bandwidthToxic.Name = toxicName;
bandwidthToxic.Attributes.Rate = 0;
bandwidthToxic.Toxicity = 1.0;
bandwidthToxic.Stream = ToxicDirection.DownStream;

await Task.Delay(TimeSpan.FromSeconds(1));

Task<BandwidthToxic> addToxicTask = pm.AddToxicAsync(bandwidthToxic);

while (true)
{
long publishCount0 = Interlocked.Read(ref publishCount);
await Task.Delay(TimeSpan.FromSeconds(5));
long publishCount1 = Interlocked.Read(ref publishCount);

if (publishCount0 == publishCount1)
{
// Publishing has "settled" due to being blocked
break;
}
}

await addToxicTask.WaitAsync(WaitSpan);
await pm.RemoveToxicAsync(toxicName).WaitAsync(WaitSpan);

await messagesPublishedTcs.Task.WaitAsync(WaitSpan);
await publishTask.WaitAsync(WaitSpan);

Assert.Equal(TotalMessageCount, publishCount);
}

private bool AreToxiproxyTestsEnabled
{
get
Expand Down

0 comments on commit ec61e45

Please sign in to comment.