Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump RabbitMQ.Client from 7.0.0-rc.11 to 7.0.0-rc.14 in /src #1472

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<ItemGroup>
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="9.2.2" GeneratePathProperty="true" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.11" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.14" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,10 @@

# Justification: Test project
dotnet_diagnostic.CA2007.severity = none
dotnet_diagnostic.PS0018.severity = none

# may be enabled in future
dotnet_diagnostic.PS0018.severity = none # A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext
dotnet_diagnostic.PS0004.severity = none # Make the CancellationToken parameter required

# Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken
dotnet_diagnostic.NSB0002.severity = suggestion
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public async Task Should_handle_failure_after_unbind()

await PrepareTestEndpoint(endpointName);

await ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.QueueUnbindAsync(endpointName, endpointName, string.Empty, cancellationToken: cancellationToken), CancellationToken.None);
await ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.QueueUnbindAsync(endpointName, endpointName, string.Empty, cancellationToken: cancellationToken));

await ExecuteMigration(endpointName);

Expand Down Expand Up @@ -340,12 +340,10 @@ Task TryDeleteQueue(string queueName) => ExecuteBrokerCommand(async (channel, ca
catch (Exception ex) when (!ex.IsCausedBy(cancellationToken))
{
}
},
CancellationToken.None);
});

Task CreateQueue(string queueName, bool quorum)
{
return ExecuteBrokerCommand(async (channel, cancellationToken) =>
Task CreateQueue(string queueName, bool quorum) =>
ExecuteBrokerCommand(async (channel, cancellationToken) =>
{
var queueArguments = new Dictionary<string, object>();

Expand All @@ -355,38 +353,30 @@ Task CreateQueue(string queueName, bool quorum)
}

await channel.QueueDeclareAsync(queueName, true, false, false, queueArguments, cancellationToken: cancellationToken);
},
CancellationToken.None);
}
});

Task CreateExchange(string exchangeName) => ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Fanout, true, cancellationToken: cancellationToken), CancellationToken.None);
Task CreateExchange(string exchangeName) => ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Fanout, true, cancellationToken: cancellationToken));

Task BindQueue(string queueName, string exchangeName) => ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.QueueBindAsync(queueName, exchangeName, string.Empty, cancellationToken: cancellationToken), CancellationToken.None);
Task BindQueue(string queueName, string exchangeName) => ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.QueueBindAsync(queueName, exchangeName, string.Empty, cancellationToken: cancellationToken));

Task AddMessages(string queueName, int numMessages, Action<IBasicProperties> modifications = null)
{
return ExecuteBrokerCommand(async (channel, cancellationToken) =>
Task AddMessages(string queueName, int numMessages, Action<IBasicProperties> modifications = null) =>
ExecuteBrokerCommand(async (channel, cancellationToken) =>
{
await channel.ConfirmSelectAsync(trackConfirmations: true, cancellationToken);

for (var i = 0; i < numMessages; i++)
{
var properties = new BasicProperties();

modifications?.Invoke(properties);

await channel.BasicPublishAsync(string.Empty, queueName, true, properties, ReadOnlyMemory<byte>.Empty, cancellationToken);
await channel.WaitForConfirmsOrDieAsync(cancellationToken);
}
},
CancellationToken.None);
}
}, new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true, outstandingPublisherConfirmationsRateLimiter: null));

async Task<uint> MessageCount(string queueName)
{
uint messageCount = 0;

await ExecuteBrokerCommand(async (channel, cancellationToken) => messageCount = await channel.MessageCountAsync(queueName, cancellationToken), CancellationToken.None);
await ExecuteBrokerCommand(async (channel, cancellationToken) => messageCount = await channel.MessageCountAsync(queueName, cancellationToken));

return messageCount;
}
Expand All @@ -406,15 +396,14 @@ await ExecuteBrokerCommand(async (channel, cancellationToken) =>
{
queueExists = false;
}
},
CancellationToken.None);
});

return queueExists;
}

async Task ExecuteBrokerCommand(Func<IChannel, CancellationToken, Task> command, CancellationToken cancellationToken)
async Task ExecuteBrokerCommand(Func<IChannel, CancellationToken, Task> command, CreateChannelOptions createChannelOptions = default, CancellationToken cancellationToken = default)
{
using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
await using var channel = await connection.CreateChannelAsync(createChannelOptions, cancellationToken: cancellationToken);
await command(channel, cancellationToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
using System.Threading.Tasks;
using global::RabbitMQ.Client;

class DelaysMigrateCommand
class DelaysMigrateCommand(BrokerConnection brokerConnection, IRoutingTopology routingTopology, IConsole console)
{
const string poisonMessageQueue = "delays-migrate-poison-messages";
const string timeSentHeader = "NServiceBus.TimeSent";
Expand Down Expand Up @@ -39,18 +39,12 @@ public static Command CreateCommand()
return command;
}

public DelaysMigrateCommand(BrokerConnection brokerConnection, IRoutingTopology routingTopology, IConsole console)
async Task Run(CancellationToken cancellationToken)
{
this.brokerConnection = brokerConnection;
this.routingTopology = routingTopology;
this.console = console;
}

public async Task Run(CancellationToken cancellationToken = default)
{
using var connection = await brokerConnection.Create(cancellationToken);
using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
await channel.ConfirmSelectAsync(trackConfirmations: true, cancellationToken);
await using var connection = await brokerConnection.Create(cancellationToken);
var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true, outstandingPublisherConfirmationsRateLimiter: null);
await using var channel = await connection.CreateChannelAsync(createChannelOptions,
cancellationToken: cancellationToken);

for (int currentDelayLevel = DelayInfrastructure.MaxLevel; currentDelayLevel >= 0 && !cancellationToken.IsCancellationRequested; currentDelayLevel--)
{
Expand Down Expand Up @@ -92,7 +86,6 @@ async Task MigrateQueue(IChannel channel, int delayLevel, CancellationToken canc
}

await channel.BasicPublishAsync(string.Empty, poisonMessageQueue, false, new BasicProperties(message.BasicProperties), message.Body, cancellationToken: cancellationToken);
await channel.WaitForConfirmsOrDieAsync(cancellationToken);
await channel.BasicAckAsync(message.DeliveryTag, false, cancellationToken);

continue;
Expand Down Expand Up @@ -135,7 +128,6 @@ async Task MigrateQueue(IChannel channel, int delayLevel, CancellationToken canc
}

await channel.BasicPublishAsync(publishExchange, newRoutingKey, false, new BasicProperties(message.BasicProperties), message.Body, cancellationToken: cancellationToken);
await channel.WaitForConfirmsOrDieAsync(cancellationToken);
await channel.BasicAckAsync(message.DeliveryTag, false, cancellationToken);
processedMessages++;
}
Expand Down Expand Up @@ -172,18 +164,10 @@ static DateTimeOffset GetTimeSent(BasicGetResult message)
return DateTimeOffset.ParseExact(timeSentString, dateTimeOffsetWireFormat, CultureInfo.InvariantCulture);
}

static bool MessageIsInvalid(BasicGetResult? message)
{
return message == null
|| message.BasicProperties == null
|| message.BasicProperties.Headers == null
|| !message.BasicProperties.Headers.ContainsKey(DelayInfrastructure.DelayHeader)
|| !message.BasicProperties.Headers.ContainsKey(timeSentHeader);
}

readonly BrokerConnection brokerConnection;
readonly IRoutingTopology routingTopology;
readonly IConsole console;
static bool MessageIsInvalid(BasicGetResult? message) =>
message?.BasicProperties.Headers == null
|| !message.BasicProperties.Headers.ContainsKey(DelayInfrastructure.DelayHeader)
|| !message.BasicProperties.Headers.ContainsKey(timeSentHeader);

bool poisonQueueCreated = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using global::RabbitMQ.Client;
using global::RabbitMQ.Client.Exceptions;

class QueueMigrateCommand
class QueueMigrateCommand(string queueName, BrokerConnection brokerConnection, IConsole console)
{
public static Command CreateCommand()
{
Expand All @@ -32,20 +32,11 @@ public static Command CreateCommand()
return command;
}

public QueueMigrateCommand(string queueName, BrokerConnection brokerConnection, IConsole console)
{
this.queueName = queueName;
this.brokerConnection = brokerConnection;
this.console = console;
holdingQueueName = $"{queueName}-migration-temp";
migrationState = new MigrationState();
}

public async Task Run(CancellationToken cancellationToken = default)
{
console.WriteLine($"Starting migration of '{queueName}'");

using var connection = await brokerConnection.Create(cancellationToken);
await using var connection = await brokerConnection.Create(cancellationToken);

await migrationState.SetInitialMigrationStage(queueName, holdingQueueName, connection, cancellationToken);

Expand Down Expand Up @@ -77,7 +68,11 @@ public async Task Run(CancellationToken cancellationToken = default)

async Task<MigrationStage> MoveMessagesToHoldingQueue(IConnection connection, CancellationToken cancellationToken)
{
using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true,
outstandingPublisherConfirmationsRateLimiter: null);
await using var channel = await connection.CreateChannelAsync(createChannelOptions,
cancellationToken: cancellationToken);

console.WriteLine($"Migrating messages from '{queueName}' to '{holdingQueueName}'");

Expand All @@ -95,15 +90,12 @@ async Task<MigrationStage> MoveMessagesToHoldingQueue(IConnection connection, Ca
console.WriteLine($"Unbound '{queueName}' from exchange '{queueName}' ");

// move all existing messages to the holding queue
await channel.ConfirmSelectAsync(trackConfirmations: true, cancellationToken);

var numMessagesMovedToHolding = await ProcessMessages(
channel,
queueName,
async (message, cancellationToken) =>
async (message, token) =>
{
await channel.BasicPublishAsync(string.Empty, holdingQueueName, false, new BasicProperties(message.BasicProperties), message.Body, cancellationToken: cancellationToken);
await channel.WaitForConfirmsOrDieAsync(cancellationToken);
await channel.BasicPublishAsync(string.Empty, holdingQueueName, false, new BasicProperties(message.BasicProperties), message.Body, cancellationToken: token);
},
cancellationToken);

Expand All @@ -114,7 +106,7 @@ async Task<MigrationStage> MoveMessagesToHoldingQueue(IConnection connection, Ca

async Task<MigrationStage> DeleteMainQueue(IConnection connection, CancellationToken cancellationToken)
{
using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
await using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);

if (await channel.MessageCountAsync(queueName, cancellationToken) > 0)
{
Expand All @@ -130,7 +122,7 @@ async Task<MigrationStage> DeleteMainQueue(IConnection connection, CancellationT

async Task<MigrationStage> CreateMainQueueAsQuorum(IConnection connection, CancellationToken cancellationToken)
{
using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
await using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);

await channel.QueueDeclareAsync(queueName, true, false, false, quorumQueueArguments, cancellationToken: cancellationToken);
console.WriteLine($"Recreated '{queueName}' as a quorum queue");
Expand All @@ -140,7 +132,9 @@ async Task<MigrationStage> CreateMainQueueAsQuorum(IConnection connection, Cance

async Task<MigrationStage> RestoreMessages(IConnection connection, CancellationToken cancellationToken)
{
using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true, outstandingPublisherConfirmationsRateLimiter: null);
await using var channel = await connection.CreateChannelAsync(createChannelOptions,
cancellationToken: cancellationToken);

await channel.QueueBindAsync(queueName, queueName, string.Empty, cancellationToken: cancellationToken);
console.WriteLine($"Re-bound '{queueName}' to exchange '{queueName}'");
Expand All @@ -151,8 +145,6 @@ async Task<MigrationStage> RestoreMessages(IConnection connection, CancellationT
var messageIds = new Dictionary<string, string>();

// move all messages in the holding queue back to the main queue
await channel.ConfirmSelectAsync(trackConfirmations: true, cancellationToken);

var numMessageMovedBackToMain = await ProcessMessages(
channel,
holdingQueueName,
Expand All @@ -174,7 +166,6 @@ async Task<MigrationStage> RestoreMessages(IConnection connection, CancellationT
}

await channel.BasicPublishAsync(string.Empty, queueName, false, new BasicProperties(message.BasicProperties), message.Body, cancellationToken: token);
await channel.WaitForConfirmsOrDieAsync(token);

if (messageIdString != null)
{
Expand All @@ -190,7 +181,7 @@ async Task<MigrationStage> RestoreMessages(IConnection connection, CancellationT

async Task<MigrationStage> CleanUpHoldingQueue(IConnection connection, CancellationToken cancellationToken)
{
using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
await using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);

if (await channel.MessageCountAsync(holdingQueueName, cancellationToken) != 0)
{
Expand Down Expand Up @@ -227,13 +218,10 @@ async Task<uint> ProcessMessages(IChannel channel, string sourceQueue, Func<Basi
return messageCount;
}

readonly string queueName;
readonly string holdingQueueName;
readonly BrokerConnection brokerConnection;
readonly IConsole console;
readonly MigrationState migrationState;
readonly string holdingQueueName = $"{queueName}-migration-temp";
readonly MigrationState migrationState = new();

static Dictionary<string, object?> quorumQueueArguments = new() { { "x-queue-type", "quorum" } };
static readonly Dictionary<string, object?> quorumQueueArguments = new() { { "x-queue-type", "quorum" } };

enum MigrationStage
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

<ItemGroup>
<PackageReference Include="NServiceBus" Version="9.2.2" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.11" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.14" />
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
</ItemGroup>

Expand Down
3 changes: 3 additions & 0 deletions src/NServiceBus.Transport.RabbitMQ.Tests/.editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

# Justification: Test project
dotnet_diagnostic.CA2007.severity = none

# may be enabled in future
dotnet_diagnostic.PS0004.severity = none # A parameter of type CancellationToken on a private delegate or method should be required
dotnet_diagnostic.PS0018.severity = none # A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext

# Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken
dotnet_diagnostic.NSB0002.severity = suggestion
Loading