Skip to content

Commit

Permalink
Remove ChannelOptions internal class (#1712)
Browse files Browse the repository at this point in the history
* Remove `ChannelOptions` internal class

Update `CreateChannelOptions` so that it provides the same internal behavior that `ChannelOptions` did.

* * Make `CreateChannelOptions` an immutable class.

* * Fix test failures due to not passing `_consumerDispatchConcurrency` correctly to `CreateChannelOptions`
  • Loading branch information
lukebakken authored Oct 24, 2024
1 parent c642f7b commit 195059c
Show file tree
Hide file tree
Showing 26 changed files with 172 additions and 184 deletions.
6 changes: 1 addition & 5 deletions projects/Applications/GH-1647/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@
Password = "guest"
};

var channelOptions = new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true
};
var channelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true);

var props = new BasicProperties();
byte[] msg = Encoding.UTF8.GetBytes("test");
Expand Down
4 changes: 3 additions & 1 deletion projects/Applications/MassPublish/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer

publishTasks.Add(Task.Run(async () =>
{
using IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true);
using IChannel publishChannel = await publishConnection.CreateChannelAsync(createChannelOptions);
publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;
for (int i = 0; i < ItemsPerBatch; i++)
Expand Down
13 changes: 6 additions & 7 deletions projects/Applications/PublisherConfirms/PublisherConfirms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@
const int MESSAGE_COUNT = 50_000;
bool debug = false;

var channelOpts = new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true,
OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
};
var channelOpts = new CreateChannelOptions(
publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true,
outstandingPublisherConfirmationsRateLimiter: new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
);

var props = new BasicProperties
{
Expand Down Expand Up @@ -177,7 +176,7 @@ async Task HandlePublishConfirmsAsynchronously()

await using IConnection connection = await CreateConnectionAsync();

channelOpts.PublisherConfirmationTrackingEnabled = false;
channelOpts = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: false);
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);

// declare a server-named queue
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
/// </summary>
public TimeSpan NetworkRecoveryInterval { get; set; } = TimeSpan.FromSeconds(5);

private TimeSpan _handshakeContinuationTimeout = TimeSpan.FromSeconds(10);
private TimeSpan _continuationTimeout = TimeSpan.FromSeconds(20);
private TimeSpan _handshakeContinuationTimeout = Constants.DefaultHandshakeContinuationTimeout;
private TimeSpan _continuationTimeout = Constants.DefaultContinuationTimeout;

// just here to hold the value that was set through the setter
private string? _clientProvidedName;
Expand Down
12 changes: 12 additions & 0 deletions projects/RabbitMQ.Client/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System;

namespace RabbitMQ.Client
{
public static class Constants
Expand Down Expand Up @@ -97,5 +99,15 @@ public static class Constants
/// <c>basic.return</c> is sent via the broker.
/// </summary>
public const string PublishSequenceNumberHeader = "x-dotnet-pub-seq-no";

/// <summary>
/// The default timeout for initial AMQP handshake
/// </summary>
public static readonly TimeSpan DefaultHandshakeContinuationTimeout = TimeSpan.FromSeconds(10);

/// <summary>
/// The default timeout for RPC methods
/// </summary>
public static readonly TimeSpan DefaultContinuationTimeout = TimeSpan.FromSeconds(20);
}
}
71 changes: 63 additions & 8 deletions projects/RabbitMQ.Client/CreateChannelOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System;
using System.Threading.RateLimiting;

namespace RabbitMQ.Client
Expand All @@ -38,6 +39,9 @@ namespace RabbitMQ.Client
/// </summary>
public sealed class CreateChannelOptions
{
private ushort? _connectionConfigConsumerDispatchConcurrency;
private TimeSpan _connectionConfigContinuationTimeout;

/// <summary>
/// Enable or disable publisher confirmations on this channel. Defaults to <c>false</c>
///
Expand All @@ -49,7 +53,7 @@ public sealed class CreateChannelOptions
/// <see cref="IChannel.GetNextPublishSequenceNumberAsync(System.Threading.CancellationToken)"/> to allow correlation
/// of the response with the correct message.
/// </summary>
public bool PublisherConfirmationsEnabled { get; set; } = false;
public readonly bool PublisherConfirmationsEnabled = false;

/// <summary>
/// Should this library track publisher confirmations for you? Defaults to <c>false</c>
Expand All @@ -59,7 +63,7 @@ public sealed class CreateChannelOptions
/// If the broker then sends a <c>basic.return</c> response for the message, this library can
/// then correctly handle the message.
/// </summary>
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;
public readonly bool PublisherConfirmationTrackingEnabled = false;

/// <summary>
/// If the publisher confirmation tracking is enabled, this represents the rate limiter used to
Expand All @@ -68,7 +72,7 @@ public sealed class CreateChannelOptions
/// Defaults to a <see cref="ThrottlingRateLimiter"/> with a limit of 128 and a throttling percentage of 50% with a delay during throttling.
/// </summary>
/// <remarks>Setting the rate limiter to <c>null</c> disables the rate limiting entirely.</remarks>
public RateLimiter? OutstandingPublisherConfirmationsRateLimiter { get; set; } = new ThrottlingRateLimiter(128);
public readonly RateLimiter? OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(128);

/// <summary>
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
Expand All @@ -80,11 +84,62 @@ public sealed class CreateChannelOptions
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.
/// </summary>
public ushort? ConsumerDispatchConcurrency { get; set; } = null;
public readonly ushort? ConsumerDispatchConcurrency = null;

/// <summary>
/// The default channel options.
/// </summary>
public static CreateChannelOptions Default { get; } = new CreateChannelOptions();
public CreateChannelOptions(bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
RateLimiter? outstandingPublisherConfirmationsRateLimiter = null,
ushort? consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
{
PublisherConfirmationsEnabled = publisherConfirmationsEnabled;
PublisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
OutstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
ConsumerDispatchConcurrency = consumerDispatchConcurrency;
}

internal ushort InternalConsumerDispatchConcurrency
{
get
{
if (ConsumerDispatchConcurrency is not null)
{
return ConsumerDispatchConcurrency.Value;
}

if (_connectionConfigConsumerDispatchConcurrency is not null)
{
return _connectionConfigConsumerDispatchConcurrency.Value;
}

return Constants.DefaultConsumerDispatchConcurrency;
}
}

internal TimeSpan ContinuationTimeout => _connectionConfigContinuationTimeout;

internal CreateChannelOptions(ConnectionConfig connectionConfig)
{
_connectionConfigConsumerDispatchConcurrency = connectionConfig.ConsumerDispatchConcurrency;
_connectionConfigContinuationTimeout = connectionConfig.ContinuationTimeout;
}

private CreateChannelOptions WithConnectionConfig(ConnectionConfig connectionConfig)
{
_connectionConfigConsumerDispatchConcurrency = connectionConfig.ConsumerDispatchConcurrency;
_connectionConfigContinuationTimeout = connectionConfig.ContinuationTimeout;
return this;
}

internal static CreateChannelOptions CreateOrUpdate(CreateChannelOptions? createChannelOptions, ConnectionConfig config)
{
if (createChannelOptions is null)
{
return new CreateChannelOptions(config);
}
else
{
return createChannelOptions.WithConnectionConfig(config);
}
}
}
}
8 changes: 4 additions & 4 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace RabbitMQ.Client.Impl
{
internal sealed class AutorecoveringChannel : IChannel, IRecoverable
{
private readonly ChannelOptions _channelOptions;
private readonly CreateChannelOptions _createChannelOptions;
private readonly List<string> _recordedConsumerTags = new List<string>();

private AutorecoveringConnection _connection;
Expand Down Expand Up @@ -73,11 +73,11 @@ public TimeSpan ContinuationTimeout

public AutorecoveringChannel(AutorecoveringConnection conn,
RecoveryAwareChannel innerChannel,
ChannelOptions channelOptions)
CreateChannelOptions createChannelOptions)
{
_connection = conn;
_innerChannel = innerChannel;
_channelOptions = channelOptions;
_createChannelOptions = createChannelOptions;
}

public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
Expand Down Expand Up @@ -162,7 +162,7 @@ internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection con

_connection = conn;

RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_channelOptions, cancellationToken)
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_createChannelOptions, cancellationToken)
.ConfigureAwait(false);

newChannel.TakeOver(_innerChannel);
Expand Down
17 changes: 6 additions & 11 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumerAs
public IProtocol Protocol => Endpoint.Protocol;

public ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(
ChannelOptions channelOptions,
CreateChannelOptions createChannelOptions,
CancellationToken cancellationToken = default)
{
ISession session = InnerConnection.CreateSession();
return RecoveryAwareChannel.CreateAndOpenAsync(session, channelOptions, cancellationToken);
return RecoveryAwareChannel.CreateAndOpenAsync(session, createChannelOptions, cancellationToken);
}

public override string ToString()
Expand Down Expand Up @@ -251,21 +251,16 @@ await CloseInnerConnectionAsync()
}
}

public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = default,
public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? createChannelOptions = default,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();

options ??= CreateChannelOptions.Default;

ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);

var channelOptions = ChannelOptions.From(options, _config);

RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(channelOptions, cancellationToken)
createChannelOptions = CreateChannelOptions.CreateOrUpdate(createChannelOptions, _config);
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(createChannelOptions, cancellationToken)
.ConfigureAwait(false);

var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, channelOptions);
var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, createChannelOptions);

await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);
Expand Down
22 changes: 10 additions & 12 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ internal partial class Channel : IChannel, IRecoverable

internal readonly IConsumerDispatcher ConsumerDispatcher;

public Channel(ISession session, ChannelOptions channelOptions)
public Channel(ISession session, CreateChannelOptions createChannelOptions)
{
ContinuationTimeout = channelOptions.ContinuationTimeout;
ConsumerDispatcher = new AsyncConsumerDispatcher(this, channelOptions.ConsumerDispatchConcurrency);
ContinuationTimeout = createChannelOptions.ContinuationTimeout;
ConsumerDispatcher = new AsyncConsumerDispatcher(this, createChannelOptions.InternalConsumerDispatchConcurrency);
Func<Exception, string, CancellationToken, Task> onExceptionAsync = (exception, context, cancellationToken) =>
OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken));
_basicAcksAsyncWrapper = new AsyncEventingWrapper<BasicAckEventArgs>("OnBasicAck", onExceptionAsync);
Expand Down Expand Up @@ -359,12 +359,12 @@ protected bool Enqueue(IRpcContinuation k)
}
}

internal async Task<IChannel> OpenAsync(ChannelOptions channelOptions,
internal async Task<IChannel> OpenAsync(CreateChannelOptions createChannelOptions,
CancellationToken cancellationToken)
{
ConfigurePublisherConfirmations(channelOptions.PublisherConfirmationsEnabled,
channelOptions.PublisherConfirmationTrackingEnabled,
channelOptions.OutstandingPublisherConfirmationsRateLimiter);
ConfigurePublisherConfirmations(createChannelOptions.PublisherConfirmationsEnabled,
createChannelOptions.PublisherConfirmationTrackingEnabled,
createChannelOptions.OutstandingPublisherConfirmationsRateLimiter);

bool enqueued = false;
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
Expand Down Expand Up @@ -1493,13 +1493,11 @@ await ModelSendAsync(in method, k.CancellationToken)
}
}

internal static Task<IChannel> CreateAndOpenAsync(CreateChannelOptions createChannelOptions,
ConnectionConfig connectionConfig, ISession session,
internal static Task<IChannel> CreateAndOpenAsync(CreateChannelOptions createChannelOptions, ISession session,
CancellationToken cancellationToken)
{
ChannelOptions channelOptions = ChannelOptions.From(createChannelOptions, connectionConfig);
var channel = new Channel(session, channelOptions);
return channel.OpenAsync(channelOptions, cancellationToken);
var channel = new Channel(session, createChannelOptions);
return channel.OpenAsync(createChannelOptions, cancellationToken);
}

/// <summary>
Expand Down
Loading

0 comments on commit 195059c

Please sign in to comment.