Skip to content

Commit

Permalink
7.0.0-rc.14 code changes. (#1475)
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken authored Oct 25, 2024
1 parent fb0d467 commit 1fef49e
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<ItemGroup>
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="9.2.2" GeneratePathProperty="true" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.13" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.14" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,12 @@ Task CreateQueue(string queueName, bool quorum)

Task BindQueue(string queueName, string exchangeName) => ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.QueueBindAsync(queueName, exchangeName, string.Empty, cancellationToken: cancellationToken));

Task AddMessages(string queueName, int numMessages, Action<IBasicProperties> modifications = null) =>
ExecuteBrokerCommand(async (channel, cancellationToken) =>
Task AddMessages(string queueName, int numMessages, Action<IBasicProperties> modifications = null)
{
var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true,
outstandingPublisherConfirmationsRateLimiter: null);
return ExecuteBrokerCommand(async (channel, cancellationToken) =>
{
for (var i = 0; i < numMessages; i++)
{
Expand All @@ -372,7 +376,8 @@ Task AddMessages(string queueName, int numMessages, Action<IBasicProperties> mod
await channel.BasicPublishAsync(string.Empty, queueName, true, properties, ReadOnlyMemory<byte>.Empty, cancellationToken);
}
}, createChannelOptions: new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
}, createChannelOptions: createChannelOptions);
}

async Task<uint> MessageCount(string queueName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ public DelaysMigrateCommand(BrokerConnection brokerConnection, IRoutingTopology
public async Task Run(CancellationToken cancellationToken = default)
{
await using var connection = await brokerConnection.Create(cancellationToken);
await using var channel = await connection.CreateChannelAsync(new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true
}, cancellationToken: cancellationToken);
var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true,
outstandingPublisherConfirmationsRateLimiter: null);
await using var channel = await connection.CreateChannelAsync(createChannelOptions,
cancellationToken: cancellationToken);

for (int currentDelayLevel = DelayInfrastructure.MaxLevel; currentDelayLevel >= 0 && !cancellationToken.IsCancellationRequested; currentDelayLevel--)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ public async Task Run(CancellationToken cancellationToken = default)

async Task<MigrationStage> MoveMessagesToHoldingQueue(IConnection connection, CancellationToken cancellationToken)
{
await using var channel = await connection.CreateChannelAsync(new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true
}, cancellationToken: cancellationToken);
var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true,
outstandingPublisherConfirmationsRateLimiter: null);
await using var channel = await connection.CreateChannelAsync(createChannelOptions,
cancellationToken: cancellationToken);

console.WriteLine($"Migrating messages from '{queueName}' to '{holdingQueueName}'");

Expand Down Expand Up @@ -141,11 +141,11 @@ async Task<MigrationStage> CreateMainQueueAsQuorum(IConnection connection, Cance

async Task<MigrationStage> RestoreMessages(IConnection connection, CancellationToken cancellationToken)
{
await using var channel = await connection.CreateChannelAsync(new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true,
}, cancellationToken: cancellationToken);
var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true,
outstandingPublisherConfirmationsRateLimiter: null);
await using var channel = await connection.CreateChannelAsync(createChannelOptions,
cancellationToken: cancellationToken);

await channel.QueueBindAsync(queueName, queueName, string.Empty, cancellationToken: cancellationToken);
console.WriteLine($"Re-bound '{queueName}' to exchange '{queueName}'");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

<ItemGroup>
<PackageReference Include="NServiceBus" Version="9.2.2" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.13" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.14" />
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<PackageReference Include="NServiceBus" Version="9.2.2" />
<PackageReference Include="Particular.Approvals" Version="1.0.0" />
<PackageReference Include="PublicApiGenerator" Version="11.1.0" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.13" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.14" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<ItemGroup>
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
<PackageReference Include="NServiceBus.TransportTests.Sources" Version="9.2.2" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.13" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.14" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ sealed class ConfirmsAwareChannel(IConnection connection, IRoutingTopology routi

public bool IsClosed => channel.IsClosed;

public async Task Initialize(CancellationToken cancellationToken = default) =>
channel = await connection.CreateChannelAsync(new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true,
// The client never had rate limiting enabled before so we want to first explore the impact of enabling it
OutstandingPublisherConfirmationsRateLimiter = null,
}, cancellationToken: cancellationToken).ConfigureAwait(false);
public async Task Initialize(CancellationToken cancellationToken = default)
{
var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true,
outstandingPublisherConfirmationsRateLimiter: null);
channel = await connection.CreateChannelAsync(createChannelOptions,
cancellationToken: cancellationToken).ConfigureAwait(false);
}

public async ValueTask SendMessage(string address, OutgoingMessage message, BasicProperties properties, CancellationToken cancellationToken = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<ItemGroup>
<PackageReference Include="BitFaster.Caching" Version="[2.4.1, 3.0.0)" />
<PackageReference Include="NServiceBus" Version="[9.1.0, 10.0.0)" />
<PackageReference Include="RabbitMQ.Client" Version="[7.0.0-rc.13, 8.0.0)" />
<PackageReference Include="RabbitMQ.Client" Version="[7.0.0-rc.14, 8.0.0)" />
</ItemGroup>

<ItemGroup>
Expand Down
9 changes: 5 additions & 4 deletions src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,11 @@ async Task ConnectToBroker(CancellationToken cancellationToken)
prefetchCount = maxConcurrency;
}

var channel = await connection.CreateChannelAsync(new CreateChannelOptions
{
ConsumerDispatchConcurrency = (ushort)maxConcurrency,
}, cancellationToken).ConfigureAwait(false);
var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: false,
publisherConfirmationTrackingEnabled: false,
consumerDispatchConcurrency: (ushort)maxConcurrency);
var channel = await connection.CreateChannelAsync(createChannelOptions,
cancellationToken).ConfigureAwait(false);
channel.ChannelShutdownAsync += Channel_ModelShutdown;
await channel.BasicQosAsync(0, (ushort)Math.Min(prefetchCount, ushort.MaxValue), false, cancellationToken).ConfigureAwait(false);

Expand Down

0 comments on commit 1fef49e

Please sign in to comment.