Skip to content

Commit

Permalink
* Move ThrottlingRateLimiter to RabbitMQ.Client namespace.
Browse files Browse the repository at this point in the history
* Add the outline of a test for throttling publishes based on outstanding confirms.
  • Loading branch information
lukebakken committed Oct 21, 2024
1 parent 9d6b0c5 commit 32315b6
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 12 deletions.
16 changes: 8 additions & 8 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
const RabbitMQ.Client.Constants.PublishSequenceNumberHeader = "x-dotnet-pub-seq-no" -> string!
const RabbitMQ.Client.Impl.ThrottlingRateLimiter.DefaultThrottlingPercentage = 50 -> int
override RabbitMQ.Client.Impl.ThrottlingRateLimiter.AcquireAsyncCore(int permitCount, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease!>
override RabbitMQ.Client.Impl.ThrottlingRateLimiter.AttemptAcquireCore(int permitCount) -> System.Threading.RateLimiting.RateLimitLease!
override RabbitMQ.Client.Impl.ThrottlingRateLimiter.Dispose(bool disposing) -> void
override RabbitMQ.Client.Impl.ThrottlingRateLimiter.GetStatistics() -> System.Threading.RateLimiting.RateLimiterStatistics?
override RabbitMQ.Client.Impl.ThrottlingRateLimiter.IdleDuration.get -> System.TimeSpan?
const RabbitMQ.Client.ThrottlingRateLimiter.DefaultThrottlingPercentage = 50 -> int
override RabbitMQ.Client.ThrottlingRateLimiter.AcquireAsyncCore(int permitCount, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease!>
override RabbitMQ.Client.ThrottlingRateLimiter.AttemptAcquireCore(int permitCount) -> System.Threading.RateLimiting.RateLimitLease!
override RabbitMQ.Client.ThrottlingRateLimiter.Dispose(bool disposing) -> void
override RabbitMQ.Client.ThrottlingRateLimiter.GetStatistics() -> System.Threading.RateLimiting.RateLimiterStatistics?
override RabbitMQ.Client.ThrottlingRateLimiter.IdleDuration.get -> System.TimeSpan?
RabbitMQ.Client.CreateChannelOptions
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort?
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void
Expand All @@ -26,8 +26,8 @@ RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException() ->
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string! exchange, string! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IConnection.CreateChannelAsync(RabbitMQ.Client.CreateChannelOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
RabbitMQ.Client.Impl.ThrottlingRateLimiter
RabbitMQ.Client.Impl.ThrottlingRateLimiter.ThrottlingRateLimiter(int maxConcurrentCalls, int? throttlingPercentage = 50) -> void
RabbitMQ.Client.ThrottlingRateLimiter
RabbitMQ.Client.ThrottlingRateLimiter.ThrottlingRateLimiter(int maxConcurrentCalls, int? throttlingPercentage = 50) -> void
static RabbitMQ.Client.CreateChannelOptions.Default.get -> RabbitMQ.Client.CreateChannelOptions!
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
using System.Threading.RateLimiting;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Impl
namespace RabbitMQ.Client
{
public class ThrottlingRateLimiter : RateLimiter
{
Expand Down
68 changes: 68 additions & 0 deletions projects/Test/Integration/TestBasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -322,5 +322,73 @@ public async Task TestPropertiesRoundtrip_Headers()
Assert.Equal(sendBody, consumeBody);
Assert.Equal("World", response);
}

[Fact]
public async Task TestPublisherConfirmationThrottling()
{
const int MaxOutstandingConfirms = 4;

var channelOpts = new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true,
OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms)
};

var channelCreatedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var messagesPublishedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Task publishTask = Task.Run(async () =>
{
ConnectionFactory cf = CreateConnectionFactory();
await using (IConnection conn = await cf.CreateConnectionAsync())
{
await using (IChannel ch = await conn.CreateChannelAsync(channelOpts))
{
QueueDeclareOk q = await ch.QueueDeclareAsync();
channelCreatedTcs.SetResult(true);
int publishCount = 0;
/*
* Note: if batchSize equals MaxOutstandingConfirms,
* a delay is added per-publish and this test takes much longer
* to run. TODO figure out how the heck to test that
*/
int batchSize = MaxOutstandingConfirms / 2;
try
{
while (publishCount < 128)
{
var publishBatch = new List<ValueTask>();
for (int i = 0; i < batchSize; i++)
{
publishBatch.Add(ch.BasicPublishAsync("", q.QueueName, GetRandomBody()));
}
foreach (ValueTask pt in publishBatch)
{
await pt;
publishCount++;
}
publishBatch.Clear();
publishBatch = null;
}
messagesPublishedTcs.SetResult(true);
}
catch (Exception ex)
{
messagesPublishedTcs.SetException(ex);
}
}
}
});

await channelCreatedTcs.Task.WaitAsync(WaitSpan);
await messagesPublishedTcs.Task.WaitAsync(WaitSpan);
await publishTask.WaitAsync(WaitSpan);
}
}
}
6 changes: 3 additions & 3 deletions projects/Test/Integration/TestToxiproxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ public async Task TestCloseConnection()
}
/*
* Note: using TrySetResult because this callback will be called when the
* test exits, and connectionShutdownTcs will have already been set
*/
* Note: using TrySetResult because this callback will be called when the
* test exits, and connectionShutdownTcs will have already been set
*/
connectionShutdownTcs.TrySetResult(true);
return Task.CompletedTask;
};
Expand Down

0 comments on commit 32315b6

Please sign in to comment.