Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ChannelOptions internal class #1712

Merged
merged 3 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions projects/Applications/GH-1647/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@
Password = "guest"
};

var channelOptions = new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true
};
var channelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true);

var props = new BasicProperties();
byte[] msg = Encoding.UTF8.GetBytes("test");
Expand Down
4 changes: 3 additions & 1 deletion projects/Applications/MassPublish/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer

publishTasks.Add(Task.Run(async () =>
{
using IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true);
using IChannel publishChannel = await publishConnection.CreateChannelAsync(createChannelOptions);
publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;

for (int i = 0; i < ItemsPerBatch; i++)
Expand Down
13 changes: 6 additions & 7 deletions projects/Applications/PublisherConfirms/PublisherConfirms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@
const int MESSAGE_COUNT = 50_000;
bool debug = false;

var channelOpts = new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true,
OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
};
var channelOpts = new CreateChannelOptions(
publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true,
outstandingPublisherConfirmationsRateLimiter: new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
);

var props = new BasicProperties
{
Expand Down Expand Up @@ -177,7 +176,7 @@ async Task HandlePublishConfirmsAsynchronously()

await using IConnection connection = await CreateConnectionAsync();

channelOpts.PublisherConfirmationTrackingEnabled = false;
channelOpts = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: false);
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);

// declare a server-named queue
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
/// </summary>
public TimeSpan NetworkRecoveryInterval { get; set; } = TimeSpan.FromSeconds(5);

private TimeSpan _handshakeContinuationTimeout = TimeSpan.FromSeconds(10);
private TimeSpan _continuationTimeout = TimeSpan.FromSeconds(20);
private TimeSpan _handshakeContinuationTimeout = Constants.DefaultHandshakeContinuationTimeout;
private TimeSpan _continuationTimeout = Constants.DefaultContinuationTimeout;

// just here to hold the value that was set through the setter
private string? _clientProvidedName;
Expand Down
12 changes: 12 additions & 0 deletions projects/RabbitMQ.Client/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System;

namespace RabbitMQ.Client
{
public static class Constants
Expand Down Expand Up @@ -97,5 +99,15 @@ public static class Constants
/// <c>basic.return</c> is sent via the broker.
/// </summary>
public const string PublishSequenceNumberHeader = "x-dotnet-pub-seq-no";

/// <summary>
/// The default timeout for initial AMQP handshake
/// </summary>
public static readonly TimeSpan DefaultHandshakeContinuationTimeout = TimeSpan.FromSeconds(10);

/// <summary>
/// The default timeout for RPC methods
/// </summary>
public static readonly TimeSpan DefaultContinuationTimeout = TimeSpan.FromSeconds(20);
}
}
71 changes: 63 additions & 8 deletions projects/RabbitMQ.Client/CreateChannelOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System;
using System.Threading.RateLimiting;

namespace RabbitMQ.Client
Expand All @@ -38,6 +39,9 @@ namespace RabbitMQ.Client
/// </summary>
public sealed class CreateChannelOptions
{
private ushort? _connectionConfigConsumerDispatchConcurrency;
private TimeSpan _connectionConfigContinuationTimeout;

/// <summary>
/// Enable or disable publisher confirmations on this channel. Defaults to <c>false</c>
///
Expand All @@ -49,7 +53,7 @@ public sealed class CreateChannelOptions
/// <see cref="IChannel.GetNextPublishSequenceNumberAsync(System.Threading.CancellationToken)"/> to allow correlation
/// of the response with the correct message.
/// </summary>
public bool PublisherConfirmationsEnabled { get; set; } = false;
public readonly bool PublisherConfirmationsEnabled = false;
lukebakken marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Should this library track publisher confirmations for you? Defaults to <c>false</c>
Expand All @@ -59,7 +63,7 @@ public sealed class CreateChannelOptions
/// If the broker then sends a <c>basic.return</c> response for the message, this library can
/// then correctly handle the message.
/// </summary>
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;
public readonly bool PublisherConfirmationTrackingEnabled = false;

/// <summary>
/// If the publisher confirmation tracking is enabled, this represents the rate limiter used to
Expand All @@ -68,7 +72,7 @@ public sealed class CreateChannelOptions
/// Defaults to a <see cref="ThrottlingRateLimiter"/> with a limit of 128 and a throttling percentage of 50% with a delay during throttling.
/// </summary>
/// <remarks>Setting the rate limiter to <c>null</c> disables the rate limiting entirely.</remarks>
public RateLimiter? OutstandingPublisherConfirmationsRateLimiter { get; set; } = new ThrottlingRateLimiter(128);
public readonly RateLimiter? OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(128);

/// <summary>
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
Expand All @@ -80,11 +84,62 @@ public sealed class CreateChannelOptions
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.
/// </summary>
public ushort? ConsumerDispatchConcurrency { get; set; } = null;
public readonly ushort? ConsumerDispatchConcurrency = null;

/// <summary>
/// The default channel options.
/// </summary>
public static CreateChannelOptions Default { get; } = new CreateChannelOptions();
public CreateChannelOptions(bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
RateLimiter? outstandingPublisherConfirmationsRateLimiter = null,
ushort? consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
{
PublisherConfirmationsEnabled = publisherConfirmationsEnabled;
PublisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
OutstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
ConsumerDispatchConcurrency = consumerDispatchConcurrency;
}

internal ushort InternalConsumerDispatchConcurrency
{
get
{
if (ConsumerDispatchConcurrency is not null)
{
return ConsumerDispatchConcurrency.Value;
}

if (_connectionConfigConsumerDispatchConcurrency is not null)
{
return _connectionConfigConsumerDispatchConcurrency.Value;
}

return Constants.DefaultConsumerDispatchConcurrency;
}
}

internal TimeSpan ContinuationTimeout => _connectionConfigContinuationTimeout;

internal CreateChannelOptions(ConnectionConfig connectionConfig)
{
_connectionConfigConsumerDispatchConcurrency = connectionConfig.ConsumerDispatchConcurrency;
_connectionConfigContinuationTimeout = connectionConfig.ContinuationTimeout;
}

private CreateChannelOptions WithConnectionConfig(ConnectionConfig connectionConfig)
{
_connectionConfigConsumerDispatchConcurrency = connectionConfig.ConsumerDispatchConcurrency;
_connectionConfigContinuationTimeout = connectionConfig.ContinuationTimeout;
return this;
}

internal static CreateChannelOptions CreateOrUpdate(CreateChannelOptions? createChannelOptions, ConnectionConfig config)
{
if (createChannelOptions is null)
{
return new CreateChannelOptions(config);
}
else
{
return createChannelOptions.WithConnectionConfig(config);
}
}
}
}
8 changes: 4 additions & 4 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace RabbitMQ.Client.Impl
{
internal sealed class AutorecoveringChannel : IChannel, IRecoverable
{
private readonly ChannelOptions _channelOptions;
private readonly CreateChannelOptions _createChannelOptions;
private readonly List<string> _recordedConsumerTags = new List<string>();

private AutorecoveringConnection _connection;
Expand Down Expand Up @@ -73,11 +73,11 @@ public TimeSpan ContinuationTimeout

public AutorecoveringChannel(AutorecoveringConnection conn,
RecoveryAwareChannel innerChannel,
ChannelOptions channelOptions)
CreateChannelOptions createChannelOptions)
{
_connection = conn;
_innerChannel = innerChannel;
_channelOptions = channelOptions;
_createChannelOptions = createChannelOptions;
}

public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
Expand Down Expand Up @@ -162,7 +162,7 @@ internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection con

_connection = conn;

RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_channelOptions, cancellationToken)
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_createChannelOptions, cancellationToken)
.ConfigureAwait(false);

newChannel.TakeOver(_innerChannel);
Expand Down
17 changes: 6 additions & 11 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumerAs
public IProtocol Protocol => Endpoint.Protocol;

public ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(
ChannelOptions channelOptions,
CreateChannelOptions createChannelOptions,
CancellationToken cancellationToken = default)
{
ISession session = InnerConnection.CreateSession();
return RecoveryAwareChannel.CreateAndOpenAsync(session, channelOptions, cancellationToken);
return RecoveryAwareChannel.CreateAndOpenAsync(session, createChannelOptions, cancellationToken);
}

public override string ToString()
Expand Down Expand Up @@ -251,21 +251,16 @@ await CloseInnerConnectionAsync()
}
}

public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = default,
public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? createChannelOptions = default,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();

options ??= CreateChannelOptions.Default;

ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);

var channelOptions = ChannelOptions.From(options, _config);

RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(channelOptions, cancellationToken)
createChannelOptions = CreateChannelOptions.CreateOrUpdate(createChannelOptions, _config);
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(createChannelOptions, cancellationToken)
.ConfigureAwait(false);

var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, channelOptions);
var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, createChannelOptions);

await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);
Expand Down
22 changes: 10 additions & 12 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ internal partial class Channel : IChannel, IRecoverable

internal readonly IConsumerDispatcher ConsumerDispatcher;

public Channel(ISession session, ChannelOptions channelOptions)
public Channel(ISession session, CreateChannelOptions createChannelOptions)
{
ContinuationTimeout = channelOptions.ContinuationTimeout;
ConsumerDispatcher = new AsyncConsumerDispatcher(this, channelOptions.ConsumerDispatchConcurrency);
ContinuationTimeout = createChannelOptions.ContinuationTimeout;
ConsumerDispatcher = new AsyncConsumerDispatcher(this, createChannelOptions.InternalConsumerDispatchConcurrency);
Func<Exception, string, CancellationToken, Task> onExceptionAsync = (exception, context, cancellationToken) =>
OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken));
_basicAcksAsyncWrapper = new AsyncEventingWrapper<BasicAckEventArgs>("OnBasicAck", onExceptionAsync);
Expand Down Expand Up @@ -359,12 +359,12 @@ protected bool Enqueue(IRpcContinuation k)
}
}

internal async Task<IChannel> OpenAsync(ChannelOptions channelOptions,
internal async Task<IChannel> OpenAsync(CreateChannelOptions createChannelOptions,
CancellationToken cancellationToken)
{
ConfigurePublisherConfirmations(channelOptions.PublisherConfirmationsEnabled,
channelOptions.PublisherConfirmationTrackingEnabled,
channelOptions.OutstandingPublisherConfirmationsRateLimiter);
ConfigurePublisherConfirmations(createChannelOptions.PublisherConfirmationsEnabled,
createChannelOptions.PublisherConfirmationTrackingEnabled,
createChannelOptions.OutstandingPublisherConfirmationsRateLimiter);

bool enqueued = false;
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
Expand Down Expand Up @@ -1493,13 +1493,11 @@ await ModelSendAsync(in method, k.CancellationToken)
}
}

internal static Task<IChannel> CreateAndOpenAsync(CreateChannelOptions createChannelOptions,
ConnectionConfig connectionConfig, ISession session,
internal static Task<IChannel> CreateAndOpenAsync(CreateChannelOptions createChannelOptions, ISession session,
CancellationToken cancellationToken)
{
ChannelOptions channelOptions = ChannelOptions.From(createChannelOptions, connectionConfig);
var channel = new Channel(session, channelOptions);
return channel.OpenAsync(channelOptions, cancellationToken);
var channel = new Channel(session, createChannelOptions);
return channel.OpenAsync(createChannelOptions, cancellationToken);
}

/// <summary>
Expand Down
Loading
Loading