Skip to content

Commit

Permalink
Wip upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmarbach committed Aug 28, 2024
1 parent eb816a4 commit 02123a9
Show file tree
Hide file tree
Showing 24 changed files with 224 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

<ItemGroup>
<PackageReference Include="BitFaster.Caching" Version="2.5.1" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="9.2.2" GeneratePathProperty="true" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="9.0.2" GeneratePathProperty="true" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.8" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public Task Run(CancellationToken cancellationToken = default)
return Task.CompletedTask;
}

void MigrateQueue(IModel channel, int delayLevel, CancellationToken cancellationToken)
void MigrateQueue(IChannel channel, int delayLevel, CancellationToken cancellationToken)
{
var currentDelayQueue = $"nsb.delay-level-{delayLevel:00}";
var messageCount = channel.MessageCount(currentDelayQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public QueueMigrateCommand(string queueName, BrokerConnection brokerConnection,
migrationState = new MigrationState();
}

public Task Run(CancellationToken cancellationToken = default)
public async Task Run(CancellationToken cancellationToken = default)
{
console.WriteLine($"Starting migration of '{queueName}'");

Expand All @@ -54,7 +54,7 @@ public Task Run(CancellationToken cancellationToken = default)
switch (migrationState.CurrentStage)
{
case MigrationStage.Starting:
migrationState.CurrentStage = MoveMessagesToHoldingQueue(connection, cancellationToken);
migrationState.CurrentStage = await MoveMessagesToHoldingQueue(connection, cancellationToken).ConfigureAwait(false);
break;
case MigrationStage.MessagesMovedToHoldingQueue:
migrationState.CurrentStage = DeleteMainQueue(connection);
Expand All @@ -73,18 +73,16 @@ public Task Run(CancellationToken cancellationToken = default)
break;
}
}

return Task.CompletedTask;
}

MigrationStage MoveMessagesToHoldingQueue(IConnection connection, CancellationToken cancellationToken)
async Task<MigrationStage> MoveMessagesToHoldingQueue(IConnection connection, CancellationToken cancellationToken)
{
using var channel = connection.CreateModel();
using var channel = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false);

console.WriteLine($"Migrating messages from '{queueName}' to '{holdingQueueName}'");

// does the holding queue need to be quorum?
channel.QueueDeclare(holdingQueueName, true, false, false, quorumQueueArguments);
await channel.QueueDeclareAsync(holdingQueueName, true, false, false, quorumQueueArguments).ConfigureAwait(false);
console.WriteLine($"Created queue '{holdingQueueName}'");

// bind the holding queue to the exchange of the queue under migration
Expand Down Expand Up @@ -140,25 +138,25 @@ MigrationStage CreateMainQueueAsQuorum(IConnection connection)
return MigrationStage.QuorumQueueCreated;
}

MigrationStage RestoreMessages(IConnection connection, CancellationToken cancellationToken)
async Task<MigrationStage> RestoreMessages(IConnection connection, CancellationToken cancellationToken)
{
using var channel = connection.CreateModel();
using var channel = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false);

channel.QueueBind(queueName, queueName, string.Empty);
await channel.QueueBindAsync(queueName, queueName, string.Empty, cancellationToken: cancellationToken).ConfigureAwait(false);
console.WriteLine($"Re-bound '{queueName}' to exchange '{queueName}'");

channel.QueueUnbind(holdingQueueName, queueName, string.Empty);
await channel.QueueUnbindAsync(holdingQueueName, queueName, string.Empty, cancellationToken: cancellationToken).ConfigureAwait(false);
console.WriteLine($"Unbound '{holdingQueueName}' from exchange '{queueName}'");

var messageIds = new Dictionary<string, string>();

// move all messages in the holding queue back to the main queue
channel.ConfirmSelect();
await channel.ConfirmSelectAsync(cancellationToken).ConfigureAwait(false);

var numMessageMovedBackToMain = ProcessMessages(
var numMessageMovedBackToMain = await ProcessMessages(
channel,
holdingQueueName,
message =>
async (message, token) =>
{
string? messageIdString = null;
Expand All @@ -175,55 +173,55 @@ MigrationStage RestoreMessages(IConnection connection, CancellationToken cancell
}
}
channel.BasicPublish(string.Empty, queueName, message.BasicProperties, message.Body);
channel.WaitForConfirmsOrDie();
await channel.BasicPublishAsync(string.Empty, queueName, new BasicProperties(message.BasicProperties), message.Body, cancellationToken: token).ConfigureAwait(false);
await channel.WaitForConfirmsOrDieAsync(token).ConfigureAwait(false);
if (messageIdString != null)
{
messageIds.Add(messageIdString, string.Empty);
}
},
cancellationToken);
cancellationToken).ConfigureAwait(false);

console.WriteLine($"Moved {numMessageMovedBackToMain} messages from '{holdingQueueName}' to '{queueName}'");

return MigrationStage.MessagesMovedToQuorumQueue;
}

MigrationStage CleanUpHoldingQueue(IConnection connection)
async Task<MigrationStage> CleanUpHoldingQueue(IConnection connection, CancellationToken cancellationToken)
{
using var channel = connection.CreateModel();
using var channel = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false);

if (channel.MessageCount(holdingQueueName) != 0)
if (await channel.MessageCountAsync(holdingQueueName, cancellationToken).ConfigureAwait(false) != 0)
{
throw new Exception($"'{holdingQueueName}' is not empty and was not deleted. Run the command again to retry message processing.");
}

channel.QueueDelete(holdingQueueName);
await channel.QueueDeleteAsync(holdingQueueName, cancellationToken: cancellationToken).ConfigureAwait(false);
console.WriteLine($"Removed '{holdingQueueName}'");

return MigrationStage.CleanUpCompleted;
}

uint ProcessMessages(IModel channel, string sourceQueue, Action<BasicGetResult> onMoveMessage, CancellationToken cancellationToken)
async Task<uint> ProcessMessages(IChannel channel, string sourceQueue, Func<BasicGetResult, CancellationToken, Task> onMoveMessage, CancellationToken cancellationToken)
{
var messageCount = channel.MessageCount(sourceQueue);
var messageCount = await channel.MessageCountAsync(sourceQueue, cancellationToken).ConfigureAwait(false);

for (var i = 0; i < messageCount; i++)
{
cancellationToken.ThrowIfCancellationRequested();

var message = channel.BasicGet(sourceQueue, false);
var message = await channel.BasicGetAsync(sourceQueue, false, cancellationToken).ConfigureAwait(false);

if (message == null)
{
// Queue is empty
break;
}

onMoveMessage(message);
await onMoveMessage(message, cancellationToken).ConfigureAwait(false);

channel.BasicAck(message.DeliveryTag, false);
await channel.BasicAckAsync(message.DeliveryTag, false, cancellationToken).ConfigureAwait(false);
}

return messageCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

<ItemGroup>
<PackageReference Include="NServiceBus" Version="9.1.1" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.8" />
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<PackageReference Include="NServiceBus" Version="9.2.2" />
<PackageReference Include="Particular.Approvals" Version="1.0.0" />
<PackageReference Include="PublicApiGenerator" Version="11.1.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.8" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<ItemGroup>
<PackageReference Include="BitFaster.Caching" Version="2.5.1" />
<PackageReference Include="NServiceBus.TransportTests.Sources" Version="9.2.2" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.8" />
</ItemGroup>

<ItemGroup>
Expand Down
13 changes: 7 additions & 6 deletions src/NServiceBus.Transport.RabbitMQ/Administration/QueuePurger.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
namespace NServiceBus.Transport.RabbitMQ
{
using System.Threading;
using System.Threading.Tasks;

class QueuePurger
{
readonly ConnectionFactory connectionFactory;
Expand All @@ -9,13 +12,11 @@ public QueuePurger(ConnectionFactory connectionFactory)
this.connectionFactory = connectionFactory;
}

public void Purge(string queue)
public async Task Purge(string queue, CancellationToken cancellationToken = default)
{
using (var connection = connectionFactory.CreateAdministrationConnection())
using (var channel = connection.CreateModel())
{
channel.QueuePurge(queue);
}
using var connection = await connectionFactory.CreateAdministrationConnection(cancellationToken).ConfigureAwait(false);
using var channel = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false);
await channel.QueuePurgeAsync(queue, cancellationToken).ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,22 @@ public SubscriptionManager(ConnectionFactory connectionFactory, IRoutingTopology
this.localQueue = localQueue;
}

public Task SubscribeAll(MessageMetadata[] eventTypes, ContextBag context, CancellationToken cancellationToken = default)
public async Task SubscribeAll(MessageMetadata[] eventTypes, ContextBag context, CancellationToken cancellationToken = default)
{
using (var connection = connectionFactory.CreateAdministrationConnection())
using (var channel = connection.CreateModel())
using var connection = await connectionFactory.CreateAdministrationConnection(cancellationToken).ConfigureAwait(false);
using var channel = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false);
// TODO: Parallelize?
foreach (var eventType in eventTypes)
{
foreach (var eventType in eventTypes)
{
routingTopology.SetupSubscription(channel, eventType, localQueue);
}
await routingTopology.SetupSubscription(channel, eventType, localQueue, cancellationToken).ConfigureAwait(false);
}
return Task.CompletedTask;
}

public Task Unsubscribe(MessageMetadata eventType, ContextBag context, CancellationToken cancellationToken = default)
public async Task Unsubscribe(MessageMetadata eventType, ContextBag context, CancellationToken cancellationToken = default)
{
using (var connection = connectionFactory.CreateAdministrationConnection())
using (var channel = connection.CreateModel())
{
routingTopology.TeardownSubscription(channel, eventType, localQueue);
}

return Task.CompletedTask;
using var connection = await connectionFactory.CreateAdministrationConnection(cancellationToken).ConfigureAwait(false);
using var channel = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false);
await routingTopology.TeardownSubscription(channel, eventType, localQueue, cancellationToken).ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay,
channels = new ConcurrentQueue<ConfirmsAwareChannel>();
}

public void CreateConnection() => connection = CreateConnectionWithShutdownListener();
public async Task<IConnection> CreateConnection(CancellationToken cancellationToken = default) => connection = await CreateConnectionWithShutdownListener(cancellationToken).ConfigureAwait(false);

protected virtual IConnection CreatePublishConnection() => connectionFactory.CreatePublishConnection();
protected virtual Task<IConnection> CreatePublishConnection(CancellationToken cancellationToken = default) => connectionFactory.CreatePublishConnection(cancellationToken);

IConnection CreateConnectionWithShutdownListener()
async Task<IConnection> CreateConnectionWithShutdownListener(CancellationToken cancellationToken)
{
var newConnection = CreatePublishConnection();
var newConnection = await CreatePublishConnection(cancellationToken).ConfigureAwait(false);
newConnection.ConnectionShutdown += Connection_ConnectionShutdown;
return newConnection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,25 @@ sealed class ConfirmsAwareChannel : IDisposable
{
public ConfirmsAwareChannel(IConnection connection, IRoutingTopology routingTopology)
{
channel = connection.CreateModel();
// TODO This should move into some for of initialization method
channel = connection.CreateChannelAsync().GetAwaiter().GetResult();
channel.BasicAcks += Channel_BasicAcks;
channel.BasicNacks += Channel_BasicNacks;
channel.BasicReturn += Channel_BasicReturn;
channel.ModelShutdown += Channel_ModelShutdown;
channel.ChannelShutdown += Channel_ModelShutdown;

channel.ConfirmSelect();
channel.ConfirmSelectAsync().GetAwaiter().GetResult();

this.routingTopology = routingTopology;

messages = new ConcurrentDictionary<ulong, TaskCompletionSource<bool>>();
}

public IBasicProperties CreateBasicProperties() => channel.CreateBasicProperties();

public bool IsOpen => channel.IsOpen;

public bool IsClosed => channel.IsClosed;

public Task SendMessage(string address, OutgoingMessage message, IBasicProperties properties, CancellationToken cancellationToken = default)
public async Task SendMessage(string address, OutgoingMessage message, BasicProperties properties, CancellationToken cancellationToken = default)
{
var task = GetConfirmationTask(cancellationToken);
properties.SetConfirmationId(channel.NextPublishSeqNo);
Expand All @@ -41,38 +40,38 @@ public Task SendMessage(string address, OutgoingMessage message, IBasicPropertie
{
var routingKey = DelayInfrastructure.CalculateRoutingKey((int)delayValue, address, out var startingDelayLevel);

routingTopology.BindToDelayInfrastructure(channel, address, DelayInfrastructure.DeliveryExchange, DelayInfrastructure.BindingKey(address));
channel.BasicPublish(DelayInfrastructure.LevelName(startingDelayLevel), routingKey, true, properties, message.Body);
await routingTopology.BindToDelayInfrastructure(channel, address, DelayInfrastructure.DeliveryExchange, DelayInfrastructure.BindingKey(address), cancellationToken).ConfigureAwait(false);
await channel.BasicPublishAsync(DelayInfrastructure.LevelName(startingDelayLevel), routingKey, properties, message.Body, true, cancellationToken).ConfigureAwait(false);
}
else
{
routingTopology.Send(channel, address, message, properties);
await routingTopology.Send(channel, address, message, properties, cancellationToken).ConfigureAwait(false);
}

return task;
await task.ConfigureAwait(false);
}

public Task PublishMessage(Type type, OutgoingMessage message, IBasicProperties properties, CancellationToken cancellationToken = default)
public async Task PublishMessage(Type type, OutgoingMessage message, BasicProperties properties, CancellationToken cancellationToken = default)
{
var task = GetConfirmationTask(cancellationToken);
properties.SetConfirmationId(channel.NextPublishSeqNo);

routingTopology.Publish(channel, type, message, properties);
await routingTopology.Publish(channel, type, message, properties, cancellationToken).ConfigureAwait(false);

return task;
await task.ConfigureAwait(false);
}

public Task RawSendInCaseOfFailure(string address, ReadOnlyMemory<byte> body, IBasicProperties properties, CancellationToken cancellationToken = default)
public async Task RawSendInCaseOfFailure(string address, ReadOnlyMemory<byte> body, BasicProperties properties, CancellationToken cancellationToken = default)
{
var task = GetConfirmationTask(cancellationToken);

properties.Headers ??= new Dictionary<string, object>();

properties.SetConfirmationId(channel.NextPublishSeqNo);

routingTopology.RawSendInCaseOfFailure(channel, address, body, properties);
await routingTopology.RawSendInCaseOfFailure(channel, address, body, properties, cancellationToken).ConfigureAwait(false);

return task;
await task.ConfigureAwait(false);
}

Task GetConfirmationTask(CancellationToken cancellationToken)
Expand Down Expand Up @@ -171,7 +170,7 @@ public void Dispose()
channel?.Dispose();
}

IModel channel;
IChannel channel;

readonly IRoutingTopology routingTopology;
readonly ConcurrentDictionary<ulong, TaskCompletionSource<bool>> messages;
Expand Down
Loading

0 comments on commit 02123a9

Please sign in to comment.