Skip to content

Commit

Permalink
Remove more synchronous code.
Browse files Browse the repository at this point in the history
Part of the fix for #1472

* Remove `Close` from `ChannelBase`

* Remove unreachable methods in `ChannelBase`

* Remove `_Private_ChannelClose`

* Convert `ConnectionTuneOk` into `ConnectionTuneOkAsync`

* Making `HandleChannleClose` async
  • Loading branch information
lukebakken committed Feb 19, 2024
1 parent 2c54f96 commit 7c4eb52
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 147 deletions.
16 changes: 6 additions & 10 deletions projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.client.framing;
Expand All @@ -43,19 +42,16 @@ public Channel(ConnectionConfig config, ISession session) : base(config, session
{
}

public override void ConnectionTuneOk(ushort channelMax, uint frameMax, ushort heartbeat)
public override Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)
{
ChannelSend(new ConnectionTuneOk(channelMax, frameMax, heartbeat));
var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat);
return ModelSendAsync(method, cancellationToken).AsTask();
}

public override void _Private_ChannelClose(ushort replyCode, string replyText, ushort classId, ushort methodId)
public override Task _Private_ChannelCloseOkAsync(CancellationToken cancellationToken)
{
ChannelSend(new ChannelClose(replyCode, replyText, classId, methodId));
}

public override void _Private_ChannelCloseOk()
{
ChannelSend(new ChannelCloseOk());
var method = new ChannelCloseOk();
return ModelSendAsync(method, cancellationToken).AsTask();
}

public override void _Private_ChannelFlowOk(bool active)
Expand Down
14 changes: 0 additions & 14 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,20 +201,6 @@ await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaph
_innerChannel.RunRecoveryEventHandlers(this);
}

public void Close(ushort replyCode, string replyText, bool abort)
{
ThrowIfDisposed();
try
{
_innerChannel.Close(replyCode, replyText, abort);
}
finally
{
_connection.DeleteRecordedChannel(this,
channelsSemaphoreHeld: false, recordedEntitiesSemaphoreHeld: false);
}
}

public Task CloseAsync(ushort replyCode, string replyText, bool abort)
{
ThrowIfDisposed();
Expand Down
128 changes: 7 additions & 121 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,52 +211,6 @@ protected void TakeOver(ChannelBase other)
_recoveryWrapper.Takeover(other._recoveryWrapper);
}

public void Close(ushort replyCode, string replyText, bool abort)
{
var reason = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText);
var k = new ShutdownContinuation();
ChannelShutdown += k.OnConnectionShutdown;

try
{
ConsumerDispatcher.Quiesce();

if (SetCloseReason(reason))
{
_Private_ChannelClose(reason.ReplyCode, reason.ReplyText, reason.ClassId, reason.MethodId);
}

k.Wait(TimeSpan.FromMilliseconds(10000));

ConsumerDispatcher.WaitForShutdown();
}
catch (AlreadyClosedException)
{
if (!abort)
{
throw;
}
}
catch (IOException)
{
if (!abort)
{
throw;
}
}
catch (Exception)
{
if (!abort)
{
throw;
}
}
finally
{
ChannelShutdown -= k.OnConnectionShutdown;
}
}

public Task CloseAsync(ushort replyCode, string replyText, bool abort)
{
var args = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText);
Expand Down Expand Up @@ -448,63 +402,7 @@ private void HandleCommand(in IncomingCommand cmd)
}
}

protected void ChannelRpc<TMethod>(in TMethod method, ProtocolCommandId returnCommandId)
where TMethod : struct, IOutgoingAmqpMethod
{
var k = new SimpleBlockingRpcContinuation();
IncomingCommand reply = default;
_rpcSemaphore.Wait();
try
{
Enqueue(k);
Session.Transmit(in method);
k.GetReply(ContinuationTimeout, out reply);

if (reply.CommandId != returnCommandId)
{
throw new UnexpectedMethodException(reply.CommandId, returnCommandId);
}
}
finally
{
reply.ReturnBuffers();
_rpcSemaphore.Release();
}
}

protected TReturn ChannelRpc<TMethod, TReturn>(in TMethod method, ProtocolCommandId returnCommandId, Func<RentedMemory, TReturn> createFunc)
where TMethod : struct, IOutgoingAmqpMethod
{
IncomingCommand reply = default;
try
{
var k = new SimpleBlockingRpcContinuation();

_rpcSemaphore.Wait();
try
{
Enqueue(k);
Session.Transmit(in method);
k.GetReply(ContinuationTimeout, out reply);
}
finally
{
_rpcSemaphore.Release();
}

if (reply.CommandId != returnCommandId)
{
throw new UnexpectedMethodException(reply.CommandId, returnCommandId);
}

return createFunc(reply.Method);
}
finally
{
reply.ReturnBuffers();
}
}

// TODO REMOVE rabbitmq-dotnet-client-1472
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected void ChannelSend<T>(in T method) where T : struct, IOutgoingAmqpMethod
{
Expand All @@ -517,19 +415,6 @@ protected ValueTask ModelSendAsync<T>(in T method, CancellationToken cancellatio
return Session.TransmitAsync(in method, cancellationToken);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected void ChannelSend<TMethod, THeader>(in TMethod method, in THeader header, ReadOnlyMemory<byte> body)
where TMethod : struct, IOutgoingAmqpMethod
where THeader : IAmqpHeader
{
if (!_flowControlBlock.IsSet)
{
_flowControlBlock.Wait();
}

Session.Transmit(in method, in header, body);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THeader header, ReadOnlyMemory<byte> body, CancellationToken cancellationToken)
where TMethod : struct, IOutgoingAmqpMethod
Expand Down Expand Up @@ -620,7 +505,7 @@ protected virtual void Dispose(bool disposing)
// dispose unmanaged resources
}

public abstract void ConnectionTuneOk(ushort channelMax, uint frameMax, ushort heartbeat);
public abstract Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken);

protected void HandleBasicAck(in IncomingCommand cmd)
{
Expand Down Expand Up @@ -884,7 +769,8 @@ protected void HandleChannelClose(in IncomingCommand cmd)

Session.Close(CloseReason, false);

_Private_ChannelCloseOk();
// TODO async
_Private_ChannelCloseOkAsync(CancellationToken.None).EnsureCompleted();
}
finally
{
Expand Down Expand Up @@ -1067,12 +953,12 @@ protected bool HandleQueueDeclareOk(in IncomingCommand cmd)
}
}

public abstract void _Private_ChannelClose(ushort replyCode, string replyText, ushort classId, ushort methodId);

public abstract void _Private_ChannelCloseOk();
public abstract Task _Private_ChannelCloseOkAsync(CancellationToken cancellationToken);

// TODO async
public abstract void _Private_ChannelFlowOk(bool active);

// TODO async
public abstract void _Private_ConnectionCloseOk();

public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple);
Expand Down
3 changes: 1 addition & 2 deletions projects/RabbitMQ.Client/client/impl/Connection.Commands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ await _frameHandler.SendProtocolHeaderAsync(cancellationToken)
uint heartbeatInSeconds = NegotiatedMaxValue((uint)_config.HeartbeatInterval.TotalSeconds, (uint)connectionTune.m_heartbeatInSeconds);
Heartbeat = TimeSpan.FromSeconds(heartbeatInSeconds);

// TODO cancellationToken / async
_channel0.ConnectionTuneOk(channelMax, frameMax, (ushort)Heartbeat.TotalSeconds);
await _channel0.ConnectionTuneOkAsync(channelMax, frameMax, (ushort)Heartbeat.TotalSeconds, cancellationToken);

// TODO check for cancellation
MaybeStartCredentialRefresher();
Expand Down
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ private async Task ReceiveLoopAsync(CancellationToken mainLoopCancelllationToken
}
}

// TODO async
private void ProcessFrame(InboundFrame frame)
{
bool shallReturnPayload = true;
Expand Down

0 comments on commit 7c4eb52

Please sign in to comment.