Skip to content

Commit

Permalink
Merge pull request #1708 from rabbitmq/lukebakken/more-todos
Browse files Browse the repository at this point in the history
Address some more TODOs
  • Loading branch information
lukebakken authored Oct 22, 2024
2 parents 86827c3 + 38d7f80 commit 9a822fa
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 94 deletions.
2 changes: 0 additions & 2 deletions projects/RabbitMQ.Client.OAuth2/OAuth2Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ public async Task<IToken> RequestTokenAsync(CancellationToken cancellationToken

if (token is null)
{
// TODO specific exception?
throw new InvalidOperationException("token is null");
}

Expand Down Expand Up @@ -274,7 +273,6 @@ public async Task<IToken> RefreshTokenAsync(IToken token,

if (refreshedToken is null)
{
// TODO specific exception?
throw new InvalidOperationException("refreshed token is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public class CallbackExceptionEventArgs : BaseExceptionEventArgs
private const string ContextString = "context";
private const string ConsumerString = "consumer";

// TODO Why is this public when there is a build method?
public CallbackExceptionEventArgs(IDictionary<string, object> detail, Exception exception, CancellationToken cancellationToken = default)
: base(detail, exception, cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
namespace RabbitMQ.Client.Exceptions
{
/// <summary>
/// TODO WHY IS THIS UNREFERENCED?
/// Thrown when the channel receives an RPC reply that it wasn't expecting.
/// </summary>
[Serializable]
Expand Down
29 changes: 8 additions & 21 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
using RabbitMQ.Client.ConsumerDispatching;
using RabbitMQ.Client.Events;
Expand All @@ -43,18 +42,17 @@ namespace RabbitMQ.Client.Impl
{
internal sealed class AutorecoveringChannel : IChannel, IRecoverable
{
private readonly ChannelOptions _channelOptions;
private readonly List<string> _recordedConsumerTags = new List<string>();

private AutorecoveringConnection _connection;
private RecoveryAwareChannel _innerChannel;
private bool _disposed;
private readonly List<string> _recordedConsumerTags = new List<string>();

private ushort _prefetchCountConsumer;
private ushort _prefetchCountGlobal;
private bool _publisherConfirmationsEnabled = false;
private bool _publisherConfirmationTrackingEnabled = false;
private RateLimiter? _outstandingPublisherConfirmationsRateLimiter = null;

private bool _usesTransactions;
private ushort _consumerDispatchConcurrency;

internal IConsumerDispatcher ConsumerDispatcher => InnerChannel.ConsumerDispatcher;

Expand All @@ -73,20 +71,13 @@ public TimeSpan ContinuationTimeout
set => InnerChannel.ContinuationTimeout = value;
}

// TODO just pass create channel options
public AutorecoveringChannel(AutorecoveringConnection conn,
RecoveryAwareChannel innerChannel,
ushort consumerDispatchConcurrency,
bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
RateLimiter? outstandingPublisherConfirmationsRateLimiter)
ChannelOptions channelOptions)
{
_connection = conn;
_innerChannel = innerChannel;
_consumerDispatchConcurrency = consumerDispatchConcurrency;
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
_outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
_channelOptions = channelOptions;
}

public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
Expand Down Expand Up @@ -171,13 +162,9 @@ internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection con

_connection = conn;

RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(
_publisherConfirmationsEnabled,
_publisherConfirmationTrackingEnabled,
_outstandingPublisherConfirmationsRateLimiter,
_consumerDispatchConcurrency,
cancellationToken)
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_channelOptions, cancellationToken)
.ConfigureAwait(false);

newChannel.TakeOver(_innerChannel);

if (_prefetchCountConsumer != 0)
Expand Down
36 changes: 9 additions & 27 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
Expand Down Expand Up @@ -185,22 +184,12 @@ public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumerAs

public IProtocol Protocol => Endpoint.Protocol;

// TODO pass channel creation options?
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(
bool publisherConfirmationsEnabled = false,
bool publisherConfirmationTrackingEnabled = false,
RateLimiter? outstandingPublisherConfirmationsRateLimiter = null,
ushort? consumerDispatchConcurrency = null,
public ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(
ChannelOptions channelOptions,
CancellationToken cancellationToken = default)
{
ISession session = InnerConnection.CreateSession();
var result = new RecoveryAwareChannel(_config, session, consumerDispatchConcurrency);
return (RecoveryAwareChannel)await result.OpenAsync(
publisherConfirmationsEnabled,
publisherConfirmationTrackingEnabled,
outstandingPublisherConfirmationsRateLimiter,
cancellationToken)
.ConfigureAwait(false);
return RecoveryAwareChannel.CreateAndOpenAsync(session, channelOptions, cancellationToken);
}

public override string ToString()
Expand Down Expand Up @@ -271,23 +260,16 @@ public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = d

ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);

RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
options.OutstandingPublisherConfirmationsRateLimiter,
cdc,
cancellationToken)
var channelOptions = ChannelOptions.From(options, _config);

RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(channelOptions, cancellationToken)
.ConfigureAwait(false);

// TODO just pass create channel options
var autorecoveringChannel = new AutorecoveringChannel(this,
recoveryAwareChannel,
cdc,
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
options.OutstandingPublisherConfirmationsRateLimiter);
var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, channelOptions);

await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);

return autorecoveringChannel;
}

Expand Down
27 changes: 16 additions & 11 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
using RabbitMQ.Client.ConsumerDispatching;
using RabbitMQ.Client.Events;
Expand All @@ -62,11 +61,10 @@ internal partial class Channel : IChannel, IRecoverable

internal readonly IConsumerDispatcher ConsumerDispatcher;

public Channel(ConnectionConfig config, ISession session, ushort? perChannelConsumerDispatchConcurrency = null)
public Channel(ISession session, ChannelOptions channelOptions)
{
ContinuationTimeout = config.ContinuationTimeout;
ConsumerDispatcher = new AsyncConsumerDispatcher(this,
perChannelConsumerDispatchConcurrency.GetValueOrDefault(config.ConsumerDispatchConcurrency));
ContinuationTimeout = channelOptions.ContinuationTimeout;
ConsumerDispatcher = new AsyncConsumerDispatcher(this, channelOptions.ConsumerDispatchConcurrency);
Func<Exception, string, CancellationToken, Task> onExceptionAsync = (exception, context, cancellationToken) =>
OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken));
_basicAcksAsyncWrapper = new AsyncEventingWrapper<BasicAckEventArgs>("OnBasicAck", onExceptionAsync);
Expand Down Expand Up @@ -361,14 +359,12 @@ protected bool Enqueue(IRpcContinuation k)
}
}

internal async Task<IChannel> OpenAsync(bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
RateLimiter? outstandingPublisherConfirmationsRateLimiter,
internal async Task<IChannel> OpenAsync(ChannelOptions channelOptions,
CancellationToken cancellationToken)
{
ConfigurePublisherConfirmations(publisherConfirmationsEnabled,
publisherConfirmationTrackingEnabled,
outstandingPublisherConfirmationsRateLimiter);
ConfigurePublisherConfirmations(channelOptions.PublisherConfirmationsEnabled,
channelOptions.PublisherConfirmationTrackingEnabled,
channelOptions.OutstandingPublisherConfirmationsRateLimiter);

bool enqueued = false;
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
Expand Down Expand Up @@ -1497,6 +1493,15 @@ await ModelSendAsync(in method, k.CancellationToken)
}
}

internal static Task<IChannel> CreateAndOpenAsync(CreateChannelOptions createChannelOptions,
ConnectionConfig connectionConfig, ISession session,
CancellationToken cancellationToken)
{
ChannelOptions channelOptions = ChannelOptions.From(createChannelOptions, connectionConfig);
var channel = new Channel(session, channelOptions);
return channel.OpenAsync(channelOptions, cancellationToken);
}

/// <summary>
/// Returning <c>true</c> from this method means that the command was server-originated,
/// and handled already.
Expand Down
90 changes: 90 additions & 0 deletions projects/RabbitMQ.Client/Impl/ChannelOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System;
using System.Threading.RateLimiting;

namespace RabbitMQ.Client.Impl
{
internal sealed class ChannelOptions
{
private readonly bool _publisherConfirmationEnabled;
private readonly bool _publisherConfirmationTrackingEnabled;
private readonly ushort _consumerDispatchConcurrency;
private readonly RateLimiter? _outstandingPublisherConfirmationsRateLimiter;
private readonly TimeSpan _continuationTimeout;

public ChannelOptions(bool publisherConfirmationEnabled,
bool publisherConfirmationTrackingEnabled,
ushort consumerDispatchConcurrency,
RateLimiter? outstandingPublisherConfirmationsRateLimiter,
TimeSpan continuationTimeout)
{
_publisherConfirmationEnabled = publisherConfirmationEnabled;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
_consumerDispatchConcurrency = consumerDispatchConcurrency;
_outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
_continuationTimeout = continuationTimeout;
}

public bool PublisherConfirmationsEnabled => _publisherConfirmationEnabled;

public bool PublisherConfirmationTrackingEnabled => _publisherConfirmationTrackingEnabled;

public ushort ConsumerDispatchConcurrency => _consumerDispatchConcurrency;

public RateLimiter? OutstandingPublisherConfirmationsRateLimiter => _outstandingPublisherConfirmationsRateLimiter;

public TimeSpan ContinuationTimeout => _continuationTimeout;

public static ChannelOptions From(CreateChannelOptions createChannelOptions,
ConnectionConfig connectionConfig)
{
ushort cdc = createChannelOptions.ConsumerDispatchConcurrency.GetValueOrDefault(
connectionConfig.ConsumerDispatchConcurrency);

return new ChannelOptions(createChannelOptions.PublisherConfirmationsEnabled,
createChannelOptions.PublisherConfirmationTrackingEnabled,
cdc,
createChannelOptions.OutstandingPublisherConfirmationsRateLimiter,
connectionConfig.ContinuationTimeout);
}

public static ChannelOptions From(ConnectionConfig connectionConfig)
{
return new ChannelOptions(publisherConfirmationEnabled: false,
publisherConfirmationTrackingEnabled: false,
consumerDispatchConcurrency: Constants.DefaultConsumerDispatchConcurrency,
outstandingPublisherConfirmationsRateLimiter: null,
continuationTimeout: connectionConfig.ContinuationTimeout);
}
}
}
16 changes: 4 additions & 12 deletions projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)

_sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize);
_session0 = new MainSession(this, config.MaxInboundMessageBodySize);
_channel0 = new Channel(_config, _session0);
_channel0 = new Channel(_session0, ChannelOptions.From(config));

ClientProperties = new Dictionary<string, object?>(_config.ClientProperties)
{
Expand Down Expand Up @@ -263,23 +263,15 @@ await CloseAsync(ea, true,
}
}

public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = default,
public Task<IChannel> CreateChannelAsync(CreateChannelOptions? createChannelOptions = default,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();

options ??= CreateChannelOptions.Default;
createChannelOptions ??= CreateChannelOptions.Default;
ISession session = CreateSession();

// TODO channel CreateChannelAsync() to combine ctor and OpenAsync
var channel = new Channel(_config, session, options.ConsumerDispatchConcurrency);
IChannel ch = await channel.OpenAsync(
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
options.OutstandingPublisherConfirmationsRateLimiter,
cancellationToken)
.ConfigureAwait(false);
return ch;
return Channel.CreateAndOpenAsync(createChannelOptions, _config, session, cancellationToken);
}

internal ISession CreateSession()
Expand Down
13 changes: 10 additions & 3 deletions projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@

using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Framing;

namespace RabbitMQ.Client.Impl
{
internal sealed class RecoveryAwareChannel : Channel
{
public RecoveryAwareChannel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null)
: base(config, session, consumerDispatchConcurrency)
public RecoveryAwareChannel(ISession session, ChannelOptions channelOptions)
: base(session, channelOptions)
{
ActiveDeliveryTagOffset = 0;
MaxSeenDeliveryTag = 0;
Expand Down Expand Up @@ -104,5 +103,13 @@ public override ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue,
return default;
}
}

internal static async ValueTask<RecoveryAwareChannel> CreateAndOpenAsync(ISession session, ChannelOptions channelOptions,
CancellationToken cancellationToken)
{
var result = new RecoveryAwareChannel(session, channelOptions);
return (RecoveryAwareChannel)await result.OpenAsync(channelOptions, cancellationToken)
.ConfigureAwait(false);
}
}
}
Loading

0 comments on commit 9a822fa

Please sign in to comment.