diff --git a/.ci/ubuntu/gha-setup.sh b/.ci/ubuntu/gha-setup.sh index a1852f37a0..6cb4451a13 100755 --- a/.ci/ubuntu/gha-setup.sh +++ b/.ci/ubuntu/gha-setup.sh @@ -40,7 +40,7 @@ function start_toxiproxy { if [[ $run_toxiproxy == 'true' ]] then - sudo ss -4nlp + # sudo ss -4nlp echo "[INFO] starting Toxiproxy server docker container" docker rm --force "$toxiproxy_docker_name" 2>/dev/null || echo "[INFO] $toxiproxy_docker_name was not running" docker run --pull always --detach \ diff --git a/Makefile b/Makefile index 7a47cc721f..a5ce1e9360 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,12 @@ build: test: dotnet test $(CURDIR)/projects/Test/Unit/Unit.csproj --logger 'console;verbosity=detailed' - dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" $(CURDIR)/projects/Test/Integration/Integration.csproj --logger 'console;verbosity=detailed' + dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \ + --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' $(CURDIR)/projects/Test/Integration/Integration.csproj --logger 'console;verbosity=detailed' + --environment 'RABBITMQ_TOXIPROXY_TESTS=true' \ + --environment 'PASSWORD=grapefruit' \ + --environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \ + "$(CURDIR)/projects/Test/Integration/Integration.csproj" --logger 'console;verbosity=detailed' dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" $(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed' # Note: diff --git a/projects/RabbitMQ.Client/FrameworkExtension/DictionaryExtension.cs b/projects/RabbitMQ.Client/FrameworkExtension/DictionaryExtension.cs index 2fcbaa2aa6..3619dfe724 100644 --- a/projects/RabbitMQ.Client/FrameworkExtension/DictionaryExtension.cs +++ b/projects/RabbitMQ.Client/FrameworkExtension/DictionaryExtension.cs @@ -1,4 +1,35 @@ -using System.Collections.Generic; +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2020 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using System.Collections.Generic; namespace RabbitMQ { diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index e322949875..b28b55f3c3 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -54,20 +54,22 @@ public override Task _Private_ChannelCloseOkAsync(CancellationToken cancellation return ModelSendAsync(method, cancellationToken).AsTask(); } - public override void _Private_ChannelFlowOk(bool active) + public override Task _Private_ChannelFlowOkAsync(bool active, CancellationToken cancellationToken) { - ChannelSend(new ChannelFlowOk(active)); + var method = new ChannelFlowOk(active); + return ModelSendAsync(method, cancellationToken).AsTask(); } - public override void _Private_ConnectionCloseOk() + public override Task _Private_ConnectionCloseOkAsync(CancellationToken cancellationToken) { - ChannelSend(new ConnectionCloseOk()); + var method = new ConnectionCloseOk(); + return ModelSendAsync(method, cancellationToken).AsTask(); } public override ValueTask BasicAckAsync(ulong deliveryTag, bool multiple) { var method = new BasicAck(deliveryTag, multiple); - // TODO cancellation token? + // TODO use cancellation token return ModelSendAsync(method, CancellationToken.None); } @@ -85,101 +87,103 @@ public override Task BasicRejectAsync(ulong deliveryTag, bool requeue) return ModelSendAsync(method, CancellationToken.None).AsTask(); } - protected override bool DispatchAsynchronous(in IncomingCommand cmd) + protected override Task DispatchCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken) { switch (cmd.CommandId) { case ProtocolCommandId.BasicDeliver: { HandleBasicDeliver(in cmd); - return true; + return Task.FromResult(true); } case ProtocolCommandId.BasicAck: { HandleBasicAck(in cmd); - return true; + return Task.FromResult(true); } case ProtocolCommandId.BasicCancel: { HandleBasicCancel(in cmd); - return true; + return Task.FromResult(true); } case ProtocolCommandId.BasicCancelOk: { - return HandleBasicCancelOk(in cmd); + bool result = HandleBasicCancelOk(in cmd); + return Task.FromResult(result); } case ProtocolCommandId.BasicConsumeOk: { - return HandleBasicConsumeOk(in cmd); + bool result = HandleBasicConsumeOk(in cmd); + return Task.FromResult(result); } case ProtocolCommandId.BasicGetEmpty: { - return HandleBasicGetEmpty(in cmd); + bool result = HandleBasicGetEmpty(in cmd); + return Task.FromResult(result); } case ProtocolCommandId.BasicGetOk: { - return HandleBasicGetOk(in cmd); + bool result = HandleBasicGetOk(in cmd); + return Task.FromResult(result); } case ProtocolCommandId.BasicNack: { HandleBasicNack(in cmd); - return true; + return Task.FromResult(true); } case ProtocolCommandId.BasicReturn: { HandleBasicReturn(in cmd); - return true; + return Task.FromResult(true); } case ProtocolCommandId.ChannelClose: { - HandleChannelClose(in cmd); - return true; + return HandleChannelCloseAsync(cmd, cancellationToken); } case ProtocolCommandId.ChannelCloseOk: { HandleChannelCloseOk(in cmd); - return true; + return Task.FromResult(true); } case ProtocolCommandId.ChannelFlow: { - HandleChannelFlow(in cmd); - return true; + return HandleChannelFlowAsync(cmd, cancellationToken); } case ProtocolCommandId.ConnectionBlocked: { HandleConnectionBlocked(in cmd); - return true; + return Task.FromResult(true); } case ProtocolCommandId.ConnectionClose: { - HandleConnectionClose(in cmd); - return true; + return HandleConnectionCloseAsync(cmd, cancellationToken); } case ProtocolCommandId.ConnectionSecure: { HandleConnectionSecure(in cmd); - return true; + return Task.FromResult(true); } case ProtocolCommandId.ConnectionStart: { HandleConnectionStart(in cmd); - return true; + return Task.FromResult(true); } case ProtocolCommandId.ConnectionTune: { HandleConnectionTune(in cmd); - return true; + return Task.FromResult(true); } case ProtocolCommandId.ConnectionUnblocked: { HandleConnectionUnblocked(in cmd); - return true; + return Task.FromResult(true); } case ProtocolCommandId.QueueDeclareOk: { - return HandleQueueDeclareOk(in cmd); + bool result = HandleQueueDeclareOk(in cmd); + return Task.FromResult(result); } - default: return false; + default: return Task.FromResult(false); } } } diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 2ba0bc0898..e01427b66b 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -92,7 +92,7 @@ protected ChannelBase(ConnectionConfig config, ISession session) _flowControlWrapper = new EventingWrapper("OnFlowControl", onException); _channelShutdownWrapper = new EventingWrapper("OnChannelShutdown", onException); _recoveryWrapper = new EventingWrapper("OnChannelRecovery", onException); - session.CommandReceived = HandleCommand; + session.CommandReceived = HandleCommandAsync; session.SessionShutdown += OnSessionShutdown; Session = session; } @@ -344,7 +344,7 @@ await ModelSendAsync(method, k.CancellationToken) } } - protected abstract bool DispatchAsynchronous(in IncomingCommand cmd); + protected abstract Task DispatchCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken); protected void Enqueue(IRpcContinuation k) { @@ -393,22 +393,16 @@ internal void FinishClose() m_connectionStartCell?.TrySetResult(null); } - private void HandleCommand(in IncomingCommand cmd) + private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken) { - if (!DispatchAsynchronous(in cmd)) // Was asynchronous. Already processed. No need to process further. + // Was asynchronous. Already processed. No need to process further. + if (false == await DispatchCommandAsync(cmd, cancellationToken).ConfigureAwait(false)) { IRpcContinuation c = _continuationQueue.Next(); c.HandleCommand(in cmd); } } - // TODO REMOVE rabbitmq-dotnet-client-1472 - [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected void ChannelSend(in T method) where T : struct, IOutgoingAmqpMethod - { - Session.Transmit(in method); - } - [MethodImpl(MethodImplOptions.AggressiveInlining)] protected ValueTask ModelSendAsync(in T method, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod { @@ -756,7 +750,7 @@ protected void HandleBasicReturn(in IncomingCommand cmd) } } - protected void HandleChannelClose(in IncomingCommand cmd) + protected async Task HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) { try { @@ -769,8 +763,9 @@ protected void HandleChannelClose(in IncomingCommand cmd) Session.Close(CloseReason, false); - // TODO async - _Private_ChannelCloseOkAsync(CancellationToken.None).EnsureCompleted(); + await _Private_ChannelCloseOkAsync(cancellationToken) + .ConfigureAwait(false); + return true; } finally { @@ -801,11 +796,11 @@ protected void HandleChannelCloseOk(in IncomingCommand cmd) } } - protected void HandleChannelFlow(in IncomingCommand cmd) + protected async Task HandleChannelFlowAsync(IncomingCommand cmd, CancellationToken cancellationToken) { try { - var active = new ChannelFlow(cmd.MethodSpan)._active; + bool active = new ChannelFlow(cmd.MethodSpan)._active; if (active) { _flowControlBlock.Set(); @@ -815,12 +810,15 @@ protected void HandleChannelFlow(in IncomingCommand cmd) _flowControlBlock.Reset(); } - _Private_ChannelFlowOk(active); + await _Private_ChannelFlowOkAsync(active, cancellationToken) + .ConfigureAwait(false); if (!_flowControlWrapper.IsEmpty) { _flowControlWrapper.Invoke(this, new FlowControlEventArgs(active)); } + + return true; } finally { @@ -841,7 +839,7 @@ protected void HandleConnectionBlocked(in IncomingCommand cmd) } } - protected void HandleConnectionClose(in IncomingCommand cmd) + protected async Task HandleConnectionCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) { try { @@ -850,7 +848,8 @@ protected void HandleConnectionClose(in IncomingCommand cmd) try { Session.Connection.ClosedViaPeer(reason); - _Private_ConnectionCloseOk(); + await _Private_ConnectionCloseOkAsync(cancellationToken) + .ConfigureAwait(false); SetCloseReason(Session.Connection.CloseReason); } catch (IOException) @@ -863,6 +862,8 @@ protected void HandleConnectionClose(in IncomingCommand cmd) // Ignored. We're only trying to be polite by sending // the close-ok, after all. } + + return true; } finally { @@ -955,11 +956,9 @@ protected bool HandleQueueDeclareOk(in IncomingCommand cmd) public abstract Task _Private_ChannelCloseOkAsync(CancellationToken cancellationToken); - // TODO async - public abstract void _Private_ChannelFlowOk(bool active); + public abstract Task _Private_ChannelFlowOkAsync(bool active, CancellationToken cancellationToken); - // TODO async - public abstract void _Private_ConnectionCloseOk(); + public abstract Task _Private_ConnectionCloseOkAsync(CancellationToken cancellationToken); public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple); diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs b/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs index 47b8f1e2a6..2f6750fe17 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs @@ -39,7 +39,7 @@ namespace RabbitMQ.Client.Framing.Impl internal sealed partial class Connection { private TimeSpan _heartbeat; - private TimeSpan _heartbeatTimeSpan; + private TimeSpan _heartbeatWriteTimeSpan; private int _missedHeartbeats; private bool _heartbeatDetected; @@ -54,7 +54,7 @@ public TimeSpan Heartbeat _heartbeat = value; // timers fire at slightly below half the interval to avoid race // conditions - _heartbeatTimeSpan = TimeSpan.FromMilliseconds(_heartbeat.TotalMilliseconds / 4); + _heartbeatWriteTimeSpan = TimeSpan.FromMilliseconds(_heartbeat.TotalMilliseconds / 2); _frameHandler.ReadTimeout = TimeSpan.FromMilliseconds(_heartbeat.TotalMilliseconds * 2); } } @@ -80,7 +80,7 @@ private void NotifyHeartbeatListener() _heartbeatDetected = true; } - private void HeartbeatReadTimerCallback(object? state) + private async void HeartbeatReadTimerCallback(object? state) { if (_heartbeatReadTimer is null) { @@ -117,8 +117,8 @@ private void HeartbeatReadTimerCallback(object? state) if (shouldTerminate) { TerminateMainloop(); - // TODO hmmm - FinishCloseAsync(CancellationToken.None).EnsureCompleted(); + await FinishCloseAsync(_mainLoopCts.Token) + .ConfigureAwait(false); } else { @@ -137,7 +137,7 @@ private void HeartbeatReadTimerCallback(object? state) } } - private void HeartbeatWriteTimerCallback(object? state) + private async void HeartbeatWriteTimerCallback(object? state) { if (_heartbeatWriteTimer is null) { @@ -148,8 +148,9 @@ private void HeartbeatWriteTimerCallback(object? state) { if (false == _closed) { - Write(Client.Impl.Framing.Heartbeat.GetHeartbeatFrame()); - _heartbeatWriteTimer?.Change((int)_heartbeatTimeSpan.TotalMilliseconds, Timeout.Infinite); + await WriteAsync(Client.Impl.Framing.Heartbeat.GetHeartbeatFrame(), _mainLoopCts.Token) + .ConfigureAwait(false); + _heartbeatWriteTimer?.Change((int)_heartbeatWriteTimeSpan.TotalMilliseconds, Timeout.Infinite); } } catch (ObjectDisposedException) diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs index 44e4443c2c..077b3f19a9 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs @@ -100,19 +100,20 @@ private async Task ReceiveLoopAsync(CancellationToken mainLoopCancelllationToken while (_frameHandler.TryReadFrame(out InboundFrame frame)) { NotifyHeartbeatListener(); - ProcessFrame(frame); + await ProcessFrameAsync(frame, mainLoopCancelllationToken) + .ConfigureAwait(false); } // Done reading frames synchronously, go async InboundFrame asyncFrame = await _frameHandler.ReadFrameAsync(mainLoopCancelllationToken) .ConfigureAwait(false); NotifyHeartbeatListener(); - ProcessFrame(asyncFrame); + await ProcessFrameAsync(asyncFrame, mainLoopCancelllationToken) + .ConfigureAwait(false); } } - // TODO async - private void ProcessFrame(InboundFrame frame) + private async Task ProcessFrameAsync(InboundFrame frame, CancellationToken cancellationToken) { bool shallReturnPayload = true; if (frame.Channel == 0) @@ -133,7 +134,8 @@ private void ProcessFrame(InboundFrame frame) // quiescing situation, even though technically we // should be ignoring everything except // connection.close-ok. - shallReturnPayload = _session0.HandleFrame(in frame); + shallReturnPayload = await _session0.HandleFrameAsync(frame, cancellationToken) + .ConfigureAwait(false); } } else @@ -150,7 +152,8 @@ private void ProcessFrame(InboundFrame frame) // Session itself may be quiescing this particular // channel, but that's none of our concern.) ISession session = _sessionManager.Lookup(frame.Channel); - shallReturnPayload = session.HandleFrame(in frame); + shallReturnPayload = await session.HandleFrameAsync(frame, cancellationToken) + .ConfigureAwait(false); } } diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index b00225be38..797a534209 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -416,7 +416,8 @@ private async Task FinishCloseAsync(CancellationToken cancellationToken) _closed = true; MaybeStopHeartbeatTimers(); - await _frameHandler.CloseAsync(cancellationToken); + await _frameHandler.CloseAsync(cancellationToken) + .ConfigureAwait(false); _channel0.SetCloseReason(CloseReason); _channel0.FinishClose(); RabbitMqClientEventSource.Log.ConnectionClosed(); @@ -452,12 +453,6 @@ internal void OnCallbackException(CallbackExceptionEventArgs args) _callbackExceptionWrapper.Invoke(this, args); } - internal void Write(RentedMemory frames) - { - Activity.Current.SetNetworkTags(_frameHandler); - _frameHandler.Write(frames); - } - internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellationToken) { Activity.Current.SetNetworkTags(_frameHandler); @@ -473,11 +468,6 @@ public void Dispose() try { - /* - * TODO rabbitmq-dotnet-client-1472 - * this.Abort(InternalConstants.DefaultConnectionAbortTimeout); - * _mainLoopTask.Wait(); - */ if (IsOpen) { throw new InvalidOperationException("Connection must be closed before calling Dispose!"); diff --git a/projects/RabbitMQ.Client/client/impl/ISession.cs b/projects/RabbitMQ.Client/client/impl/ISession.cs index 69f9a8076e..500dfbcfa0 100644 --- a/projects/RabbitMQ.Client/client/impl/ISession.cs +++ b/projects/RabbitMQ.Client/client/impl/ISession.cs @@ -36,7 +36,7 @@ namespace RabbitMQ.Client.Impl { - internal delegate void CommandReceivedAction(in IncomingCommand cmd); + internal delegate Task CommandReceivedAction(IncomingCommand cmd, CancellationToken cancellationToken); internal interface ISession { @@ -74,16 +74,10 @@ internal interface ISession void Close(ShutdownEventArgs reason, bool notify); - bool HandleFrame(in InboundFrame frame); + Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken); void Notify(); - void Transmit(in T cmd) where T : struct, IOutgoingAmqpMethod; - - void Transmit(in TMethod cmd, in THeader header, ReadOnlyMemory body) - where TMethod : struct, IOutgoingAmqpMethod - where THeader : IAmqpHeader; - ValueTask TransmitAsync(in T cmd, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod; ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlyMemory body, CancellationToken cancellationToken) diff --git a/projects/RabbitMQ.Client/client/impl/MainSession.cs b/projects/RabbitMQ.Client/client/impl/MainSession.cs index 79423ca354..3aa3bef75c 100644 --- a/projects/RabbitMQ.Client/client/impl/MainSession.cs +++ b/projects/RabbitMQ.Client/client/impl/MainSession.cs @@ -53,7 +53,7 @@ public MainSession(Connection connection) : base(connection, 0) { } - public override bool HandleFrame(in InboundFrame frame) + public override Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken) { if (_closing) { @@ -64,7 +64,7 @@ public override bool HandleFrame(in InboundFrame frame) switch (Connection.Protocol.DecodeCommandIdFrom(frame.Payload.Span)) { case ProtocolCommandId.ConnectionClose: - return base.HandleFrame(in frame); + return base.HandleFrameAsync(frame, cancellationToken); case ProtocolCommandId.ConnectionCloseOk: // This is the reply (CloseOk) we were looking for // Call any listener attached to this session @@ -75,10 +75,10 @@ public override bool HandleFrame(in InboundFrame frame) // Either a non-method frame, or not what we were looking // for. Ignore it - we're quiescing. - return true; + return Task.FromResult(true); } - return base.HandleFrame(in frame); + return base.HandleFrameAsync(frame, cancellationToken); } /// Set channel 0 as quiescing @@ -133,23 +133,6 @@ public async Task SetSessionClosingAsync(bool closeIsServerInitiated) } } - public override void Transmit(in T cmd) - { - // Are we closing? - if (_closing) - { - if ((cmd.ProtocolCommandId != ProtocolCommandId.ConnectionCloseOk) && // is this not a close-ok? - (_closeIsServerInitiated || cmd.ProtocolCommandId != ProtocolCommandId.ConnectionClose)) // is this either server initiated or not a close? - { - // We shouldn't do anything since we are closing, not sending a connection-close-ok command - // and this is either a server-initiated close or not a connection-close command. - return; - } - } - - base.Transmit(in cmd); - } - public override ValueTask TransmitAsync(in T cmd, CancellationToken cancellationToken) { // Are we closing? diff --git a/projects/RabbitMQ.Client/client/impl/Session.cs b/projects/RabbitMQ.Client/client/impl/Session.cs index 0b4ba32e1c..1fdd285c87 100644 --- a/projects/RabbitMQ.Client/client/impl/Session.cs +++ b/projects/RabbitMQ.Client/client/impl/Session.cs @@ -29,8 +29,8 @@ // Copyright (c) 2007-2020 VMware, Inc. All rights reserved. //--------------------------------------------------------------------------- -using System.Diagnostics; - +using System.Threading; +using System.Threading.Tasks; using RabbitMQ.Client.Framing.Impl; namespace RabbitMQ.Client.Impl @@ -45,13 +45,14 @@ public Session(Connection connection, ushort channelNumber) : base(connection, c _assembler = new CommandAssembler(); } - public override bool HandleFrame(in InboundFrame frame) + public override async Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken) { bool shallReturnFramePayload = _assembler.HandleFrame(in frame, out IncomingCommand cmd); if (!cmd.IsEmpty) { - CommandReceived.Invoke(in cmd); + await CommandReceived.Invoke(cmd, cancellationToken) + .ConfigureAwait(false); } return shallReturnFramePayload; diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index 961d43cf6e..ea91ac30d5 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -117,7 +117,7 @@ public void Close(ShutdownEventArgs reason, bool notify) } } - public abstract bool HandleFrame(in InboundFrame frame); + public abstract Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken); public void Notify() { @@ -126,24 +126,12 @@ public void Notify() ShutdownEventArgs reason = CloseReason; if (reason is null) { - throw new Exception("Internal Error in Session.Close"); + throw new InvalidOperationException("Internal Error in Session.Close"); } OnSessionShutdown(reason); } - public virtual void Transmit(in T cmd) where T : struct, IOutgoingAmqpMethod - { - if (!IsOpen && cmd.ProtocolCommandId != client.framing.ProtocolCommandId.ChannelCloseOk) - { - ThrowAlreadyClosedException(); - } - - RentedMemory bytes = Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ChannelNumber); - RabbitMQActivitySource.PopulateMessageEnvelopeSize(Activity.Current, bytes.Size); - Connection.Write(bytes); - } - public virtual ValueTask TransmitAsync(in T cmd, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod { if (!IsOpen && cmd.ProtocolCommandId != client.framing.ProtocolCommandId.ChannelCloseOk) @@ -156,21 +144,6 @@ public virtual ValueTask TransmitAsync(in T cmd, CancellationToken cancellati return Connection.WriteAsync(bytes, cancellationToken); } - public void Transmit(in TMethod cmd, in THeader header, ReadOnlyMemory body) - where TMethod : struct, IOutgoingAmqpMethod - where THeader : IAmqpHeader - { - if (!IsOpen && cmd.ProtocolCommandId != ProtocolCommandId.ChannelCloseOk) - { - ThrowAlreadyClosedException(); - } - - RentedMemory bytes = Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, - ChannelNumber, Connection.MaxPayloadSize); - RabbitMQActivitySource.PopulateMessageEnvelopeSize(Activity.Current, bytes.Size); - Connection.Write(bytes); - } - public ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlyMemory body, CancellationToken cancellationToken = default) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader diff --git a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs index d2f3927f6d..115ab15ecb 100644 --- a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs @@ -273,7 +273,6 @@ await _pipeReader.CompleteAsync() } finally { - _closingSemaphore.Release(); _closingSemaphore.Dispose(); _closed = true; } diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs index 82c5b5a869..11d59fbead 100644 --- a/projects/Test/Integration/TestToxiproxy.cs +++ b/projects/Test/Integration/TestToxiproxy.cs @@ -44,6 +44,7 @@ namespace Test.Integration { public class TestToxiproxy : IntegrationFixture { + private const string ProxyName = "rmq-localhost"; private const ushort ProxyPort = 55672; private readonly TimeSpan _heartbeatTimeout = TimeSpan.FromSeconds(1); private readonly Connection _proxyConnection; @@ -54,7 +55,7 @@ public TestToxiproxy(ITestOutputHelper output) : base(output) { if (AreToxiproxyTestsEnabled) { - _proxyConnection = new Connection(); + _proxyConnection = new Connection(resetAllToxicsAndProxiesOnClose: true); _proxyClient = _proxyConnection.Client(); // to start, assume everything is on localhost