Skip to content

Commit

Permalink
Remove more synchronous code
Browse files Browse the repository at this point in the history
Part of the fix to #1472

* Remove `ISession.Transmit`
* Change `HeartbeatWriteTimerCallback` to be `async` and use `WriteAsync`

* Make `HeartbeatReadTimerCallback` async

* Ensure toxiproxy is reset at the end of the test run.

* Make frame / command dispatch async

* Make a couple more methods async

* Update Makefile

* Do not release a SemaphoreSlim that is about to be disposed.

* Fix toxiproxy duplicated entity for good, hopefully

* Add two missing `.ConfigureAwait(false)` calls

* Add missing `.ConfigureAwait(false)` call
  • Loading branch information
lukebakken committed Feb 24, 2024
1 parent 9bf0e5c commit 3a3d40c
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 145 deletions.
2 changes: 1 addition & 1 deletion .ci/ubuntu/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ function start_toxiproxy
{
if [[ $run_toxiproxy == 'true' ]]
then
sudo ss -4nlp
# sudo ss -4nlp
echo "[INFO] starting Toxiproxy server docker container"
docker rm --force "$toxiproxy_docker_name" 2>/dev/null || echo "[INFO] $toxiproxy_docker_name was not running"
docker run --pull always --detach \
Expand Down
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ build:

test:
dotnet test $(CURDIR)/projects/Test/Unit/Unit.csproj --logger 'console;verbosity=detailed'
dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" $(CURDIR)/projects/Test/Integration/Integration.csproj --logger 'console;verbosity=detailed'
dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' $(CURDIR)/projects/Test/Integration/Integration.csproj --logger 'console;verbosity=detailed'
--environment 'RABBITMQ_TOXIPROXY_TESTS=true' \
--environment 'PASSWORD=grapefruit' \
--environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \
"$(CURDIR)/projects/Test/Integration/Integration.csproj" --logger 'console;verbosity=detailed'
dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" $(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed'

# Note:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,35 @@
using System.Collections.Generic;
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2020 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System.Collections.Generic;

namespace RabbitMQ
{
Expand Down
62 changes: 33 additions & 29 deletions projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,22 @@ public override Task _Private_ChannelCloseOkAsync(CancellationToken cancellation
return ModelSendAsync(method, cancellationToken).AsTask();
}

public override void _Private_ChannelFlowOk(bool active)
public override Task _Private_ChannelFlowOkAsync(bool active, CancellationToken cancellationToken)
{
ChannelSend(new ChannelFlowOk(active));
var method = new ChannelFlowOk(active);
return ModelSendAsync(method, cancellationToken).AsTask();
}

public override void _Private_ConnectionCloseOk()
public override Task _Private_ConnectionCloseOkAsync(CancellationToken cancellationToken)
{
ChannelSend(new ConnectionCloseOk());
var method = new ConnectionCloseOk();
return ModelSendAsync(method, cancellationToken).AsTask();
}

public override ValueTask BasicAckAsync(ulong deliveryTag, bool multiple)
{
var method = new BasicAck(deliveryTag, multiple);
// TODO cancellation token?
// TODO use cancellation token
return ModelSendAsync(method, CancellationToken.None);
}

Expand All @@ -85,101 +87,103 @@ public override Task BasicRejectAsync(ulong deliveryTag, bool requeue)
return ModelSendAsync(method, CancellationToken.None).AsTask();
}

protected override bool DispatchAsynchronous(in IncomingCommand cmd)
protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
switch (cmd.CommandId)
{
case ProtocolCommandId.BasicDeliver:
{
HandleBasicDeliver(in cmd);
return true;
return Task.FromResult(true);
}
case ProtocolCommandId.BasicAck:
{
HandleBasicAck(in cmd);
return true;
return Task.FromResult(true);
}
case ProtocolCommandId.BasicCancel:
{
HandleBasicCancel(in cmd);
return true;
return Task.FromResult(true);
}
case ProtocolCommandId.BasicCancelOk:
{
return HandleBasicCancelOk(in cmd);
bool result = HandleBasicCancelOk(in cmd);
return Task.FromResult(result);
}
case ProtocolCommandId.BasicConsumeOk:
{
return HandleBasicConsumeOk(in cmd);
bool result = HandleBasicConsumeOk(in cmd);
return Task.FromResult(result);
}
case ProtocolCommandId.BasicGetEmpty:
{
return HandleBasicGetEmpty(in cmd);
bool result = HandleBasicGetEmpty(in cmd);
return Task.FromResult(result);
}
case ProtocolCommandId.BasicGetOk:
{
return HandleBasicGetOk(in cmd);
bool result = HandleBasicGetOk(in cmd);
return Task.FromResult(result);
}
case ProtocolCommandId.BasicNack:
{
HandleBasicNack(in cmd);
return true;
return Task.FromResult(true);
}
case ProtocolCommandId.BasicReturn:
{
HandleBasicReturn(in cmd);
return true;
return Task.FromResult(true);
}
case ProtocolCommandId.ChannelClose:
{
HandleChannelClose(in cmd);
return true;
return HandleChannelCloseAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ChannelCloseOk:
{
HandleChannelCloseOk(in cmd);
return true;
return Task.FromResult(true);
}
case ProtocolCommandId.ChannelFlow:
{
HandleChannelFlow(in cmd);
return true;
return HandleChannelFlowAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionBlocked:
{
HandleConnectionBlocked(in cmd);
return true;
return Task.FromResult(true);
}
case ProtocolCommandId.ConnectionClose:
{
HandleConnectionClose(in cmd);
return true;
return HandleConnectionCloseAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionSecure:
{
HandleConnectionSecure(in cmd);
return true;
return Task.FromResult(true);
}
case ProtocolCommandId.ConnectionStart:
{
HandleConnectionStart(in cmd);
return true;
return Task.FromResult(true);
}
case ProtocolCommandId.ConnectionTune:
{
HandleConnectionTune(in cmd);
return true;
return Task.FromResult(true);
}
case ProtocolCommandId.ConnectionUnblocked:
{
HandleConnectionUnblocked(in cmd);
return true;
return Task.FromResult(true);
}
case ProtocolCommandId.QueueDeclareOk:
{
return HandleQueueDeclareOk(in cmd);
bool result = HandleQueueDeclareOk(in cmd);
return Task.FromResult(result);
}
default: return false;
default: return Task.FromResult(false);
}
}
}
Expand Down
45 changes: 22 additions & 23 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected ChannelBase(ConnectionConfig config, ISession session)
_flowControlWrapper = new EventingWrapper<FlowControlEventArgs>("OnFlowControl", onException);
_channelShutdownWrapper = new EventingWrapper<ShutdownEventArgs>("OnChannelShutdown", onException);
_recoveryWrapper = new EventingWrapper<EventArgs>("OnChannelRecovery", onException);
session.CommandReceived = HandleCommand;
session.CommandReceived = HandleCommandAsync;
session.SessionShutdown += OnSessionShutdown;
Session = session;
}
Expand Down Expand Up @@ -344,7 +344,7 @@ await ModelSendAsync(method, k.CancellationToken)
}
}

protected abstract bool DispatchAsynchronous(in IncomingCommand cmd);
protected abstract Task<bool> DispatchCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken);

protected void Enqueue(IRpcContinuation k)
{
Expand Down Expand Up @@ -393,22 +393,16 @@ internal void FinishClose()
m_connectionStartCell?.TrySetResult(null);
}

private void HandleCommand(in IncomingCommand cmd)
private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
if (!DispatchAsynchronous(in cmd)) // Was asynchronous. Already processed. No need to process further.
// Was asynchronous. Already processed. No need to process further.
if (false == await DispatchCommandAsync(cmd, cancellationToken).ConfigureAwait(false))
{
IRpcContinuation c = _continuationQueue.Next();
c.HandleCommand(in cmd);
}
}

// TODO REMOVE rabbitmq-dotnet-client-1472
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected void ChannelSend<T>(in T method) where T : struct, IOutgoingAmqpMethod
{
Session.Transmit(in method);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected ValueTask ModelSendAsync<T>(in T method, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod
{
Expand Down Expand Up @@ -756,7 +750,7 @@ protected void HandleBasicReturn(in IncomingCommand cmd)
}
}

protected void HandleChannelClose(in IncomingCommand cmd)
protected async Task<bool> HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
try
{
Expand All @@ -769,8 +763,9 @@ protected void HandleChannelClose(in IncomingCommand cmd)

Session.Close(CloseReason, false);

// TODO async
_Private_ChannelCloseOkAsync(CancellationToken.None).EnsureCompleted();
await _Private_ChannelCloseOkAsync(cancellationToken)
.ConfigureAwait(false);
return true;
}
finally
{
Expand Down Expand Up @@ -801,11 +796,11 @@ protected void HandleChannelCloseOk(in IncomingCommand cmd)
}
}

protected void HandleChannelFlow(in IncomingCommand cmd)
protected async Task<bool> HandleChannelFlowAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
try
{
var active = new ChannelFlow(cmd.MethodSpan)._active;
bool active = new ChannelFlow(cmd.MethodSpan)._active;
if (active)
{
_flowControlBlock.Set();
Expand All @@ -815,12 +810,15 @@ protected void HandleChannelFlow(in IncomingCommand cmd)
_flowControlBlock.Reset();
}

_Private_ChannelFlowOk(active);
await _Private_ChannelFlowOkAsync(active, cancellationToken)
.ConfigureAwait(false);

if (!_flowControlWrapper.IsEmpty)
{
_flowControlWrapper.Invoke(this, new FlowControlEventArgs(active));
}

return true;
}
finally
{
Expand All @@ -841,7 +839,7 @@ protected void HandleConnectionBlocked(in IncomingCommand cmd)
}
}

protected void HandleConnectionClose(in IncomingCommand cmd)
protected async Task<bool> HandleConnectionCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
try
{
Expand All @@ -850,7 +848,8 @@ protected void HandleConnectionClose(in IncomingCommand cmd)
try
{
Session.Connection.ClosedViaPeer(reason);
_Private_ConnectionCloseOk();
await _Private_ConnectionCloseOkAsync(cancellationToken)
.ConfigureAwait(false);
SetCloseReason(Session.Connection.CloseReason);
}
catch (IOException)
Expand All @@ -863,6 +862,8 @@ protected void HandleConnectionClose(in IncomingCommand cmd)
// Ignored. We're only trying to be polite by sending
// the close-ok, after all.
}

return true;
}
finally
{
Expand Down Expand Up @@ -955,11 +956,9 @@ protected bool HandleQueueDeclareOk(in IncomingCommand cmd)

public abstract Task _Private_ChannelCloseOkAsync(CancellationToken cancellationToken);

// TODO async
public abstract void _Private_ChannelFlowOk(bool active);
public abstract Task _Private_ChannelFlowOkAsync(bool active, CancellationToken cancellationToken);

// TODO async
public abstract void _Private_ConnectionCloseOk();
public abstract Task _Private_ConnectionCloseOkAsync(CancellationToken cancellationToken);

public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple);

Expand Down
Loading

0 comments on commit 3a3d40c

Please sign in to comment.