From 38d7f804357296fbe9a92e54d38e8b317f76b8c1 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 22 Oct 2024 11:19:26 -0700 Subject: [PATCH] Address some more TODOs * Create `ChannelOptions` to encapsulate common channel options. * Combine creation and opening of channels into CreateAndOpenAsync --- .../RabbitMQ.Client.OAuth2/OAuth2Client.cs | 2 - .../Events/CallbackExceptionEventArgs.cs | 1 - .../Exceptions/UnexpectedMethodException.cs | 1 - .../Impl/AutorecoveringChannel.cs | 29 ++---- .../Impl/AutorecoveringConnection.cs | 36 ++------ projects/RabbitMQ.Client/Impl/Channel.cs | 27 +++--- .../RabbitMQ.Client/Impl/ChannelOptions.cs | 90 +++++++++++++++++++ projects/RabbitMQ.Client/Impl/Connection.cs | 16 +--- .../Impl/RecoveryAwareChannel.cs | 13 ++- .../Test/Integration/TestPublisherConfirms.cs | 15 ---- .../SequentialIntegrationFixture.cs | 1 - 11 files changed, 137 insertions(+), 94 deletions(-) create mode 100644 projects/RabbitMQ.Client/Impl/ChannelOptions.cs diff --git a/projects/RabbitMQ.Client.OAuth2/OAuth2Client.cs b/projects/RabbitMQ.Client.OAuth2/OAuth2Client.cs index 38a304896..7d71685a7 100644 --- a/projects/RabbitMQ.Client.OAuth2/OAuth2Client.cs +++ b/projects/RabbitMQ.Client.OAuth2/OAuth2Client.cs @@ -245,7 +245,6 @@ public async Task RequestTokenAsync(CancellationToken cancellationToken if (token is null) { - // TODO specific exception? throw new InvalidOperationException("token is null"); } @@ -274,7 +273,6 @@ public async Task RefreshTokenAsync(IToken token, if (refreshedToken is null) { - // TODO specific exception? throw new InvalidOperationException("refreshed token is null"); } diff --git a/projects/RabbitMQ.Client/Events/CallbackExceptionEventArgs.cs b/projects/RabbitMQ.Client/Events/CallbackExceptionEventArgs.cs index 3d5b16a82..c2bfb2f35 100644 --- a/projects/RabbitMQ.Client/Events/CallbackExceptionEventArgs.cs +++ b/projects/RabbitMQ.Client/Events/CallbackExceptionEventArgs.cs @@ -78,7 +78,6 @@ public class CallbackExceptionEventArgs : BaseExceptionEventArgs private const string ContextString = "context"; private const string ConsumerString = "consumer"; - // TODO Why is this public when there is a build method? public CallbackExceptionEventArgs(IDictionary detail, Exception exception, CancellationToken cancellationToken = default) : base(detail, exception, cancellationToken) { diff --git a/projects/RabbitMQ.Client/Exceptions/UnexpectedMethodException.cs b/projects/RabbitMQ.Client/Exceptions/UnexpectedMethodException.cs index 7da8804e9..5e925053d 100644 --- a/projects/RabbitMQ.Client/Exceptions/UnexpectedMethodException.cs +++ b/projects/RabbitMQ.Client/Exceptions/UnexpectedMethodException.cs @@ -35,7 +35,6 @@ namespace RabbitMQ.Client.Exceptions { /// - /// TODO WHY IS THIS UNREFERENCED? /// Thrown when the channel receives an RPC reply that it wasn't expecting. /// [Serializable] diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 36c3c7dee..d14fb7e5e 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -33,7 +33,6 @@ using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Threading; -using System.Threading.RateLimiting; using System.Threading.Tasks; using RabbitMQ.Client.ConsumerDispatching; using RabbitMQ.Client.Events; @@ -43,18 +42,17 @@ namespace RabbitMQ.Client.Impl { internal sealed class AutorecoveringChannel : IChannel, IRecoverable { + private readonly ChannelOptions _channelOptions; + private readonly List _recordedConsumerTags = new List(); + private AutorecoveringConnection _connection; private RecoveryAwareChannel _innerChannel; private bool _disposed; - private readonly List _recordedConsumerTags = new List(); private ushort _prefetchCountConsumer; private ushort _prefetchCountGlobal; - private bool _publisherConfirmationsEnabled = false; - private bool _publisherConfirmationTrackingEnabled = false; - private RateLimiter? _outstandingPublisherConfirmationsRateLimiter = null; + private bool _usesTransactions; - private ushort _consumerDispatchConcurrency; internal IConsumerDispatcher ConsumerDispatcher => InnerChannel.ConsumerDispatcher; @@ -73,20 +71,13 @@ public TimeSpan ContinuationTimeout set => InnerChannel.ContinuationTimeout = value; } - // TODO just pass create channel options public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel, - ushort consumerDispatchConcurrency, - bool publisherConfirmationsEnabled, - bool publisherConfirmationTrackingEnabled, - RateLimiter? outstandingPublisherConfirmationsRateLimiter) + ChannelOptions channelOptions) { _connection = conn; _innerChannel = innerChannel; - _consumerDispatchConcurrency = consumerDispatchConcurrency; - _publisherConfirmationsEnabled = publisherConfirmationsEnabled; - _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; - _outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter; + _channelOptions = channelOptions; } public event AsyncEventHandler BasicAcksAsync @@ -171,13 +162,9 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection con _connection = conn; - RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync( - _publisherConfirmationsEnabled, - _publisherConfirmationTrackingEnabled, - _outstandingPublisherConfirmationsRateLimiter, - _consumerDispatchConcurrency, - cancellationToken) + RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_channelOptions, cancellationToken) .ConfigureAwait(false); + newChannel.TakeOver(_innerChannel); if (_prefetchCountConsumer != 0) diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs index 1c7adb0aa..d8c746912 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs @@ -33,7 +33,6 @@ using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Threading; -using System.Threading.RateLimiting; using System.Threading.Tasks; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; @@ -185,22 +184,12 @@ public event AsyncEventHandler RecoveringConsumerAs public IProtocol Protocol => Endpoint.Protocol; - // TODO pass channel creation options? - public async ValueTask CreateNonRecoveringChannelAsync( - bool publisherConfirmationsEnabled = false, - bool publisherConfirmationTrackingEnabled = false, - RateLimiter? outstandingPublisherConfirmationsRateLimiter = null, - ushort? consumerDispatchConcurrency = null, + public ValueTask CreateNonRecoveringChannelAsync( + ChannelOptions channelOptions, CancellationToken cancellationToken = default) { ISession session = InnerConnection.CreateSession(); - var result = new RecoveryAwareChannel(_config, session, consumerDispatchConcurrency); - return (RecoveryAwareChannel)await result.OpenAsync( - publisherConfirmationsEnabled, - publisherConfirmationTrackingEnabled, - outstandingPublisherConfirmationsRateLimiter, - cancellationToken) - .ConfigureAwait(false); + return RecoveryAwareChannel.CreateAndOpenAsync(session, channelOptions, cancellationToken); } public override string ToString() @@ -271,23 +260,16 @@ public async Task CreateChannelAsync(CreateChannelOptions? options = d ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency); - RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync( - options.PublisherConfirmationsEnabled, - options.PublisherConfirmationTrackingEnabled, - options.OutstandingPublisherConfirmationsRateLimiter, - cdc, - cancellationToken) + var channelOptions = ChannelOptions.From(options, _config); + + RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(channelOptions, cancellationToken) .ConfigureAwait(false); - // TODO just pass create channel options - var autorecoveringChannel = new AutorecoveringChannel(this, - recoveryAwareChannel, - cdc, - options.PublisherConfirmationsEnabled, - options.PublisherConfirmationTrackingEnabled, - options.OutstandingPublisherConfirmationsRateLimiter); + var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, channelOptions); + await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken) .ConfigureAwait(false); + return autorecoveringChannel; } diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index c5c2decad..6d27a0571 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -37,7 +37,6 @@ using System.Runtime.CompilerServices; using System.Text; using System.Threading; -using System.Threading.RateLimiting; using System.Threading.Tasks; using RabbitMQ.Client.ConsumerDispatching; using RabbitMQ.Client.Events; @@ -62,11 +61,10 @@ internal partial class Channel : IChannel, IRecoverable internal readonly IConsumerDispatcher ConsumerDispatcher; - public Channel(ConnectionConfig config, ISession session, ushort? perChannelConsumerDispatchConcurrency = null) + public Channel(ISession session, ChannelOptions channelOptions) { - ContinuationTimeout = config.ContinuationTimeout; - ConsumerDispatcher = new AsyncConsumerDispatcher(this, - perChannelConsumerDispatchConcurrency.GetValueOrDefault(config.ConsumerDispatchConcurrency)); + ContinuationTimeout = channelOptions.ContinuationTimeout; + ConsumerDispatcher = new AsyncConsumerDispatcher(this, channelOptions.ConsumerDispatchConcurrency); Func onExceptionAsync = (exception, context, cancellationToken) => OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken)); _basicAcksAsyncWrapper = new AsyncEventingWrapper("OnBasicAck", onExceptionAsync); @@ -361,14 +359,12 @@ protected bool Enqueue(IRpcContinuation k) } } - internal async Task OpenAsync(bool publisherConfirmationsEnabled, - bool publisherConfirmationTrackingEnabled, - RateLimiter? outstandingPublisherConfirmationsRateLimiter, + internal async Task OpenAsync(ChannelOptions channelOptions, CancellationToken cancellationToken) { - ConfigurePublisherConfirmations(publisherConfirmationsEnabled, - publisherConfirmationTrackingEnabled, - outstandingPublisherConfirmationsRateLimiter); + ConfigurePublisherConfirmations(channelOptions.PublisherConfirmationsEnabled, + channelOptions.PublisherConfirmationTrackingEnabled, + channelOptions.OutstandingPublisherConfirmationsRateLimiter); bool enqueued = false; var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken); @@ -1497,6 +1493,15 @@ await ModelSendAsync(in method, k.CancellationToken) } } + internal static Task CreateAndOpenAsync(CreateChannelOptions createChannelOptions, + ConnectionConfig connectionConfig, ISession session, + CancellationToken cancellationToken) + { + ChannelOptions channelOptions = ChannelOptions.From(createChannelOptions, connectionConfig); + var channel = new Channel(session, channelOptions); + return channel.OpenAsync(channelOptions, cancellationToken); + } + /// /// Returning true from this method means that the command was server-originated, /// and handled already. diff --git a/projects/RabbitMQ.Client/Impl/ChannelOptions.cs b/projects/RabbitMQ.Client/Impl/ChannelOptions.cs new file mode 100644 index 000000000..fc6b3fb8f --- /dev/null +++ b/projects/RabbitMQ.Client/Impl/ChannelOptions.cs @@ -0,0 +1,90 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Threading.RateLimiting; + +namespace RabbitMQ.Client.Impl +{ + internal sealed class ChannelOptions + { + private readonly bool _publisherConfirmationEnabled; + private readonly bool _publisherConfirmationTrackingEnabled; + private readonly ushort _consumerDispatchConcurrency; + private readonly RateLimiter? _outstandingPublisherConfirmationsRateLimiter; + private readonly TimeSpan _continuationTimeout; + + public ChannelOptions(bool publisherConfirmationEnabled, + bool publisherConfirmationTrackingEnabled, + ushort consumerDispatchConcurrency, + RateLimiter? outstandingPublisherConfirmationsRateLimiter, + TimeSpan continuationTimeout) + { + _publisherConfirmationEnabled = publisherConfirmationEnabled; + _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; + _consumerDispatchConcurrency = consumerDispatchConcurrency; + _outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter; + _continuationTimeout = continuationTimeout; + } + + public bool PublisherConfirmationsEnabled => _publisherConfirmationEnabled; + + public bool PublisherConfirmationTrackingEnabled => _publisherConfirmationTrackingEnabled; + + public ushort ConsumerDispatchConcurrency => _consumerDispatchConcurrency; + + public RateLimiter? OutstandingPublisherConfirmationsRateLimiter => _outstandingPublisherConfirmationsRateLimiter; + + public TimeSpan ContinuationTimeout => _continuationTimeout; + + public static ChannelOptions From(CreateChannelOptions createChannelOptions, + ConnectionConfig connectionConfig) + { + ushort cdc = createChannelOptions.ConsumerDispatchConcurrency.GetValueOrDefault( + connectionConfig.ConsumerDispatchConcurrency); + + return new ChannelOptions(createChannelOptions.PublisherConfirmationsEnabled, + createChannelOptions.PublisherConfirmationTrackingEnabled, + cdc, + createChannelOptions.OutstandingPublisherConfirmationsRateLimiter, + connectionConfig.ContinuationTimeout); + } + + public static ChannelOptions From(ConnectionConfig connectionConfig) + { + return new ChannelOptions(publisherConfirmationEnabled: false, + publisherConfirmationTrackingEnabled: false, + consumerDispatchConcurrency: Constants.DefaultConsumerDispatchConcurrency, + outstandingPublisherConfirmationsRateLimiter: null, + continuationTimeout: connectionConfig.ContinuationTimeout); + } + } +} diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index f8071d0e5..ed2f30c74 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -78,7 +78,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) _sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize); _session0 = new MainSession(this, config.MaxInboundMessageBodySize); - _channel0 = new Channel(_config, _session0); + _channel0 = new Channel(_session0, ChannelOptions.From(config)); ClientProperties = new Dictionary(_config.ClientProperties) { @@ -263,23 +263,15 @@ await CloseAsync(ea, true, } } - public async Task CreateChannelAsync(CreateChannelOptions? options = default, + public Task CreateChannelAsync(CreateChannelOptions? createChannelOptions = default, CancellationToken cancellationToken = default) { EnsureIsOpen(); - options ??= CreateChannelOptions.Default; + createChannelOptions ??= CreateChannelOptions.Default; ISession session = CreateSession(); - // TODO channel CreateChannelAsync() to combine ctor and OpenAsync - var channel = new Channel(_config, session, options.ConsumerDispatchConcurrency); - IChannel ch = await channel.OpenAsync( - options.PublisherConfirmationsEnabled, - options.PublisherConfirmationTrackingEnabled, - options.OutstandingPublisherConfirmationsRateLimiter, - cancellationToken) - .ConfigureAwait(false); - return ch; + return Channel.CreateAndOpenAsync(createChannelOptions, _config, session, cancellationToken); } internal ISession CreateSession() diff --git a/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs b/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs index 6aefc7bd3..ba6081c98 100644 --- a/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs +++ b/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs @@ -31,14 +31,13 @@ using System.Threading; using System.Threading.Tasks; -using RabbitMQ.Client.Framing; namespace RabbitMQ.Client.Impl { internal sealed class RecoveryAwareChannel : Channel { - public RecoveryAwareChannel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null) - : base(config, session, consumerDispatchConcurrency) + public RecoveryAwareChannel(ISession session, ChannelOptions channelOptions) + : base(session, channelOptions) { ActiveDeliveryTagOffset = 0; MaxSeenDeliveryTag = 0; @@ -104,5 +103,13 @@ public override ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue, return default; } } + + internal static async ValueTask CreateAndOpenAsync(ISession session, ChannelOptions channelOptions, + CancellationToken cancellationToken) + { + var result = new RecoveryAwareChannel(session, channelOptions); + return (RecoveryAwareChannel)await result.OpenAsync(channelOptions, cancellationToken) + .ConfigureAwait(false); + } } } diff --git a/projects/Test/Integration/TestPublisherConfirms.cs b/projects/Test/Integration/TestPublisherConfirms.cs index 301e30b0f..4926aae6b 100644 --- a/projects/Test/Integration/TestPublisherConfirms.cs +++ b/projects/Test/Integration/TestPublisherConfirms.cs @@ -48,21 +48,6 @@ public TestPublisherConfirms(ITestOutputHelper output) _messageBody = GetRandomBody(4096); } - /* - * TODO figure out how to actually test basic.nack and basic.return - [Fact] - public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse() - { - return TestWaitForConfirmsAsync(2000, async (ch) => - { - RecoveryAwareChannel actualChannel = ((AutorecoveringChannel)ch).InnerChannel; - await actualChannel.HandleAckNack(10UL, false, true); - using var cts = new CancellationTokenSource(ShortSpan); - Assert.False(await ch.WaitForConfirmsAsync(cts.Token)); - }); - } - */ - [Fact] public async Task TestWaitForConfirmsWithEventsAsync() { diff --git a/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs b/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs index eaadbdd3b..5eb8eca28 100644 --- a/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs +++ b/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs @@ -50,7 +50,6 @@ public async Task BlockAsync() public async Task BlockAndPublishAsync() { - // TODO fix publisher confirmation tracking to time out so this test succeeds await using IChannel ch = await _conn.CreateChannelAsync(); await BlockAsync(); await ch.BasicPublishAsync(exchange: "amq.direct",