From 7c4eb525e6025a3e83eba470cf14c7ec47a912fa Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 19 Feb 2024 09:57:13 -0800 Subject: [PATCH] Remove more synchronous code. Part of the fix for #1472 * Remove `Close` from `ChannelBase` * Remove unreachable methods in `ChannelBase` * Remove `_Private_ChannelClose` * Convert `ConnectionTuneOk` into `ConnectionTuneOkAsync` * Making `HandleChannleClose` async --- .../RabbitMQ.Client/client/framing/Channel.cs | 16 +-- .../client/impl/AutorecoveringChannel.cs | 14 -- .../client/impl/ChannelBase.cs | 128 +----------------- .../client/impl/Connection.Commands.cs | 3 +- .../client/impl/Connection.Receive.cs | 1 + 5 files changed, 15 insertions(+), 147 deletions(-) diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index 75cb3989ca..e322949875 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -29,7 +29,6 @@ // Copyright (c) 2007-2020 VMware, Inc. All rights reserved. //--------------------------------------------------------------------------- -using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client.client.framing; @@ -43,19 +42,16 @@ public Channel(ConnectionConfig config, ISession session) : base(config, session { } - public override void ConnectionTuneOk(ushort channelMax, uint frameMax, ushort heartbeat) + public override Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken) { - ChannelSend(new ConnectionTuneOk(channelMax, frameMax, heartbeat)); + var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat); + return ModelSendAsync(method, cancellationToken).AsTask(); } - public override void _Private_ChannelClose(ushort replyCode, string replyText, ushort classId, ushort methodId) + public override Task _Private_ChannelCloseOkAsync(CancellationToken cancellationToken) { - ChannelSend(new ChannelClose(replyCode, replyText, classId, methodId)); - } - - public override void _Private_ChannelCloseOk() - { - ChannelSend(new ChannelCloseOk()); + var method = new ChannelCloseOk(); + return ModelSendAsync(method, cancellationToken).AsTask(); } public override void _Private_ChannelFlowOk(bool active) diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 7832dfdde4..707b65843b 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -201,20 +201,6 @@ await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaph _innerChannel.RunRecoveryEventHandlers(this); } - public void Close(ushort replyCode, string replyText, bool abort) - { - ThrowIfDisposed(); - try - { - _innerChannel.Close(replyCode, replyText, abort); - } - finally - { - _connection.DeleteRecordedChannel(this, - channelsSemaphoreHeld: false, recordedEntitiesSemaphoreHeld: false); - } - } - public Task CloseAsync(ushort replyCode, string replyText, bool abort) { ThrowIfDisposed(); diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 9b1f09438f..2ba0bc0898 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -211,52 +211,6 @@ protected void TakeOver(ChannelBase other) _recoveryWrapper.Takeover(other._recoveryWrapper); } - public void Close(ushort replyCode, string replyText, bool abort) - { - var reason = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText); - var k = new ShutdownContinuation(); - ChannelShutdown += k.OnConnectionShutdown; - - try - { - ConsumerDispatcher.Quiesce(); - - if (SetCloseReason(reason)) - { - _Private_ChannelClose(reason.ReplyCode, reason.ReplyText, reason.ClassId, reason.MethodId); - } - - k.Wait(TimeSpan.FromMilliseconds(10000)); - - ConsumerDispatcher.WaitForShutdown(); - } - catch (AlreadyClosedException) - { - if (!abort) - { - throw; - } - } - catch (IOException) - { - if (!abort) - { - throw; - } - } - catch (Exception) - { - if (!abort) - { - throw; - } - } - finally - { - ChannelShutdown -= k.OnConnectionShutdown; - } - } - public Task CloseAsync(ushort replyCode, string replyText, bool abort) { var args = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText); @@ -448,63 +402,7 @@ private void HandleCommand(in IncomingCommand cmd) } } - protected void ChannelRpc(in TMethod method, ProtocolCommandId returnCommandId) - where TMethod : struct, IOutgoingAmqpMethod - { - var k = new SimpleBlockingRpcContinuation(); - IncomingCommand reply = default; - _rpcSemaphore.Wait(); - try - { - Enqueue(k); - Session.Transmit(in method); - k.GetReply(ContinuationTimeout, out reply); - - if (reply.CommandId != returnCommandId) - { - throw new UnexpectedMethodException(reply.CommandId, returnCommandId); - } - } - finally - { - reply.ReturnBuffers(); - _rpcSemaphore.Release(); - } - } - - protected TReturn ChannelRpc(in TMethod method, ProtocolCommandId returnCommandId, Func createFunc) - where TMethod : struct, IOutgoingAmqpMethod - { - IncomingCommand reply = default; - try - { - var k = new SimpleBlockingRpcContinuation(); - - _rpcSemaphore.Wait(); - try - { - Enqueue(k); - Session.Transmit(in method); - k.GetReply(ContinuationTimeout, out reply); - } - finally - { - _rpcSemaphore.Release(); - } - - if (reply.CommandId != returnCommandId) - { - throw new UnexpectedMethodException(reply.CommandId, returnCommandId); - } - - return createFunc(reply.Method); - } - finally - { - reply.ReturnBuffers(); - } - } - + // TODO REMOVE rabbitmq-dotnet-client-1472 [MethodImpl(MethodImplOptions.AggressiveInlining)] protected void ChannelSend(in T method) where T : struct, IOutgoingAmqpMethod { @@ -517,19 +415,6 @@ protected ValueTask ModelSendAsync(in T method, CancellationToken cancellatio return Session.TransmitAsync(in method, cancellationToken); } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected void ChannelSend(in TMethod method, in THeader header, ReadOnlyMemory body) - where TMethod : struct, IOutgoingAmqpMethod - where THeader : IAmqpHeader - { - if (!_flowControlBlock.IsSet) - { - _flowControlBlock.Wait(); - } - - Session.Transmit(in method, in header, body); - } - [MethodImpl(MethodImplOptions.AggressiveInlining)] protected ValueTask ModelSendAsync(in TMethod method, in THeader header, ReadOnlyMemory body, CancellationToken cancellationToken) where TMethod : struct, IOutgoingAmqpMethod @@ -620,7 +505,7 @@ protected virtual void Dispose(bool disposing) // dispose unmanaged resources } - public abstract void ConnectionTuneOk(ushort channelMax, uint frameMax, ushort heartbeat); + public abstract Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken); protected void HandleBasicAck(in IncomingCommand cmd) { @@ -884,7 +769,8 @@ protected void HandleChannelClose(in IncomingCommand cmd) Session.Close(CloseReason, false); - _Private_ChannelCloseOk(); + // TODO async + _Private_ChannelCloseOkAsync(CancellationToken.None).EnsureCompleted(); } finally { @@ -1067,12 +953,12 @@ protected bool HandleQueueDeclareOk(in IncomingCommand cmd) } } - public abstract void _Private_ChannelClose(ushort replyCode, string replyText, ushort classId, ushort methodId); - - public abstract void _Private_ChannelCloseOk(); + public abstract Task _Private_ChannelCloseOkAsync(CancellationToken cancellationToken); + // TODO async public abstract void _Private_ChannelFlowOk(bool active); + // TODO async public abstract void _Private_ConnectionCloseOk(); public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple); diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs index f0e236d57a..8caa3e2843 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs @@ -180,8 +180,7 @@ await _frameHandler.SendProtocolHeaderAsync(cancellationToken) uint heartbeatInSeconds = NegotiatedMaxValue((uint)_config.HeartbeatInterval.TotalSeconds, (uint)connectionTune.m_heartbeatInSeconds); Heartbeat = TimeSpan.FromSeconds(heartbeatInSeconds); - // TODO cancellationToken / async - _channel0.ConnectionTuneOk(channelMax, frameMax, (ushort)Heartbeat.TotalSeconds); + await _channel0.ConnectionTuneOkAsync(channelMax, frameMax, (ushort)Heartbeat.TotalSeconds, cancellationToken); // TODO check for cancellation MaybeStartCredentialRefresher(); diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs index 32cdd2b9bf..44e4443c2c 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs @@ -111,6 +111,7 @@ private async Task ReceiveLoopAsync(CancellationToken mainLoopCancelllationToken } } + // TODO async private void ProcessFrame(InboundFrame frame) { bool shallReturnPayload = true;