From 1fef49eded87a3d7124e5b566a2bc4e3431491b3 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 25 Oct 2024 12:46:06 -0700 Subject: [PATCH] 7.0.0-rc.14 code changes. (#1475) --- ....Transport.RabbitMQ.AcceptanceTests.csproj | 2 +- .../MigrateQueue/QueueMigrateToQuorumTests.cs | 11 +++++++--- .../Commands/Delays/DelaysMigrateCommand.cs | 10 +++++----- .../Commands/Queue/QueueMigrateCommand.cs | 20 +++++++++---------- ...eBus.Transport.RabbitMQ.CommandLine.csproj | 2 +- ...ServiceBus.Transport.RabbitMQ.Tests.csproj | 2 +- ...s.Transport.RabbitMQ.TransportTests.csproj | 2 +- .../Connection/ConfirmsAwareChannel.cs | 16 +++++++-------- .../NServiceBus.Transport.RabbitMQ.csproj | 2 +- .../Receiving/MessagePump.cs | 9 +++++---- 10 files changed, 41 insertions(+), 35 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj index ab954ecf5..0732b81e4 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj @@ -21,7 +21,7 @@ - + diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs b/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs index dd08f3f99..de89cde74 100644 --- a/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs @@ -361,8 +361,12 @@ Task CreateQueue(string queueName, bool quorum) Task BindQueue(string queueName, string exchangeName) => ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.QueueBindAsync(queueName, exchangeName, string.Empty, cancellationToken: cancellationToken)); - Task AddMessages(string queueName, int numMessages, Action modifications = null) => - ExecuteBrokerCommand(async (channel, cancellationToken) => + Task AddMessages(string queueName, int numMessages, Action modifications = null) + { + var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, + publisherConfirmationTrackingEnabled: true, + outstandingPublisherConfirmationsRateLimiter: null); + return ExecuteBrokerCommand(async (channel, cancellationToken) => { for (var i = 0; i < numMessages; i++) { @@ -372,7 +376,8 @@ Task AddMessages(string queueName, int numMessages, Action mod await channel.BasicPublishAsync(string.Empty, queueName, true, properties, ReadOnlyMemory.Empty, cancellationToken); } - }, createChannelOptions: new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); + }, createChannelOptions: createChannelOptions); + } async Task MessageCount(string queueName) { diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Delays/DelaysMigrateCommand.cs b/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Delays/DelaysMigrateCommand.cs index 0ff9ae2fd..e2277f791 100644 --- a/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Delays/DelaysMigrateCommand.cs +++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Delays/DelaysMigrateCommand.cs @@ -49,11 +49,11 @@ public DelaysMigrateCommand(BrokerConnection brokerConnection, IRoutingTopology public async Task Run(CancellationToken cancellationToken = default) { await using var connection = await brokerConnection.Create(cancellationToken); - await using var channel = await connection.CreateChannelAsync(new CreateChannelOptions - { - PublisherConfirmationsEnabled = true, - PublisherConfirmationTrackingEnabled = true - }, cancellationToken: cancellationToken); + var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, + publisherConfirmationTrackingEnabled: true, + outstandingPublisherConfirmationsRateLimiter: null); + await using var channel = await connection.CreateChannelAsync(createChannelOptions, + cancellationToken: cancellationToken); for (int currentDelayLevel = DelayInfrastructure.MaxLevel; currentDelayLevel >= 0 && !cancellationToken.IsCancellationRequested; currentDelayLevel--) { diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Queue/QueueMigrateCommand.cs b/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Queue/QueueMigrateCommand.cs index e9215365b..d5355575a 100644 --- a/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Queue/QueueMigrateCommand.cs +++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Queue/QueueMigrateCommand.cs @@ -77,11 +77,11 @@ public async Task Run(CancellationToken cancellationToken = default) async Task MoveMessagesToHoldingQueue(IConnection connection, CancellationToken cancellationToken) { - await using var channel = await connection.CreateChannelAsync(new CreateChannelOptions - { - PublisherConfirmationsEnabled = true, - PublisherConfirmationTrackingEnabled = true - }, cancellationToken: cancellationToken); + var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, + publisherConfirmationTrackingEnabled: true, + outstandingPublisherConfirmationsRateLimiter: null); + await using var channel = await connection.CreateChannelAsync(createChannelOptions, + cancellationToken: cancellationToken); console.WriteLine($"Migrating messages from '{queueName}' to '{holdingQueueName}'"); @@ -141,11 +141,11 @@ async Task CreateMainQueueAsQuorum(IConnection connection, Cance async Task RestoreMessages(IConnection connection, CancellationToken cancellationToken) { - await using var channel = await connection.CreateChannelAsync(new CreateChannelOptions - { - PublisherConfirmationsEnabled = true, - PublisherConfirmationTrackingEnabled = true, - }, cancellationToken: cancellationToken); + var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, + publisherConfirmationTrackingEnabled: true, + outstandingPublisherConfirmationsRateLimiter: null); + await using var channel = await connection.CreateChannelAsync(createChannelOptions, + cancellationToken: cancellationToken); await channel.QueueBindAsync(queueName, queueName, string.Empty, cancellationToken: cancellationToken); console.WriteLine($"Re-bound '{queueName}' to exchange '{queueName}'"); diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj b/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj index 3c7b5a248..322a2c176 100644 --- a/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj +++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj @@ -16,7 +16,7 @@ - + diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj b/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj index c25ce6067..4b06edcec 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj @@ -23,7 +23,7 @@ - + diff --git a/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj b/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj index edf1c608e..c8701d774 100644 --- a/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj +++ b/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj @@ -19,7 +19,7 @@ - + diff --git a/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs b/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs index c469a0957..3a3cec087 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs @@ -12,14 +12,14 @@ sealed class ConfirmsAwareChannel(IConnection connection, IRoutingTopology routi public bool IsClosed => channel.IsClosed; - public async Task Initialize(CancellationToken cancellationToken = default) => - channel = await connection.CreateChannelAsync(new CreateChannelOptions - { - PublisherConfirmationsEnabled = true, - PublisherConfirmationTrackingEnabled = true, - // The client never had rate limiting enabled before so we want to first explore the impact of enabling it - OutstandingPublisherConfirmationsRateLimiter = null, - }, cancellationToken: cancellationToken).ConfigureAwait(false); + public async Task Initialize(CancellationToken cancellationToken = default) + { + var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, + publisherConfirmationTrackingEnabled: true, + outstandingPublisherConfirmationsRateLimiter: null); + channel = await connection.CreateChannelAsync(createChannelOptions, + cancellationToken: cancellationToken).ConfigureAwait(false); + } public async ValueTask SendMessage(string address, OutgoingMessage message, BasicProperties properties, CancellationToken cancellationToken = default) { diff --git a/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj b/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj index 71a5d95c0..e2ef50a8f 100644 --- a/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj +++ b/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj @@ -10,7 +10,7 @@ - + diff --git a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs index 0dcce7164..2ec75ff75 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs @@ -130,10 +130,11 @@ async Task ConnectToBroker(CancellationToken cancellationToken) prefetchCount = maxConcurrency; } - var channel = await connection.CreateChannelAsync(new CreateChannelOptions - { - ConsumerDispatchConcurrency = (ushort)maxConcurrency, - }, cancellationToken).ConfigureAwait(false); + var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: false, + publisherConfirmationTrackingEnabled: false, + consumerDispatchConcurrency: (ushort)maxConcurrency); + var channel = await connection.CreateChannelAsync(createChannelOptions, + cancellationToken).ConfigureAwait(false); channel.ChannelShutdownAsync += Channel_ModelShutdown; await channel.BasicQosAsync(0, (ushort)Math.Min(prefetchCount, ushort.MaxValue), false, cancellationToken).ConfigureAwait(false);