Skip to content

Commit

Permalink
Merge pull request #1613 from bollhals/fix/InboundFrame
Browse files Browse the repository at this point in the history
change InboundFrame to a class
  • Loading branch information
lukebakken authored Jun 26, 2024
2 parents 3188515 + 004ce32 commit 56b3ebf
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public PublicationAddress? ReplyToAddress

public ReadOnlyBasicProperties(ReadOnlySpan<byte> span)
{
if (span.IsEmpty)
{
return;
}

int offset = 2;
ref readonly byte bits = ref span[0];
if (bits.IsBitSet(BasicProperties.ContentTypeBit)) { offset += WireFormatting.ReadShortstr(span.Slice(offset), out _contentType); }
Expand Down
77 changes: 33 additions & 44 deletions projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,9 @@ internal sealed class CommandAssembler
private const int MaxArrayOfBytesSize = 2_147_483_591;

private ProtocolCommandId _commandId;
private ReadOnlyMemory<byte> _methodMemory;
private byte[]? _rentedMethodArray;
private ReadOnlyMemory<byte> _headerMemory;
private byte[]? _rentedHeaderArray;
private ReadOnlyMemory<byte> _bodyMemory;
private byte[]? _rentedBodyArray;
private RentedMemory _methodMemory;
private RentedMemory _headerMemory;
private RentedMemory _bodyMemory;
private int _remainingBodyByteCount;
private int _offset;
private AssemblyState _state;
Expand All @@ -66,61 +63,49 @@ public CommandAssembler(uint maxBodyLength)
private void Reset()
{
_commandId = default;
_methodMemory = ReadOnlyMemory<byte>.Empty;
_rentedMethodArray = null;
_headerMemory = ReadOnlyMemory<byte>.Empty;
_rentedHeaderArray = null;
_bodyMemory = ReadOnlyMemory<byte>.Empty;
_rentedBodyArray = null;
_methodMemory = default;
_headerMemory = default;
_bodyMemory = default;
_remainingBodyByteCount = 0;
_offset = 0;
_state = AssemblyState.ExpectingMethod;
}

public bool HandleFrame(in InboundFrame frame, out IncomingCommand command)
public void HandleFrame(InboundFrame frame, out IncomingCommand command)
{
bool shallReturn = true;
switch (_state)
{
case AssemblyState.ExpectingMethod:
ParseMethodFrame(in frame);
shallReturn = false;
ParseMethodFrame(frame);
break;
case AssemblyState.ExpectingContentHeader:
shallReturn = ParseHeaderFrame(in frame);
ParseHeaderFrame(frame);
break;
case AssemblyState.ExpectingContentBody:
shallReturn = ParseBodyFrame(in frame);
ParseBodyFrame(frame);
break;
}

if (_state != AssemblyState.Complete)
{
command = IncomingCommand.Empty;
return shallReturn;
return;
}

RabbitMqClientEventSource.Log.CommandReceived();

var method = new RentedMemory(_methodMemory, _rentedMethodArray);
var header = new RentedMemory(_headerMemory, _rentedHeaderArray);
var body = new RentedMemory(_bodyMemory, _rentedBodyArray);

command = new IncomingCommand(_commandId, method, header, body);
command = new IncomingCommand(_commandId, _methodMemory, _headerMemory, _bodyMemory);
Reset();
return shallReturn;
}

private void ParseMethodFrame(in InboundFrame frame)
private void ParseMethodFrame(InboundFrame frame)
{
if (frame.Type != FrameType.FrameMethod)
{
throw new UnexpectedFrameException(frame.Type);
}

_rentedMethodArray = frame.TakeoverPayload();
_commandId = (ProtocolCommandId)NetworkOrderDeserializer.ReadUInt32(frame.Payload.Span);
_methodMemory = frame.Payload.Slice(4);
_methodMemory = frame.TakeoverPayload(Framing.Method.ArgumentsOffset);

switch (_commandId)
{
Expand All @@ -136,7 +121,7 @@ private void ParseMethodFrame(in InboundFrame frame)
}
}

private bool ParseHeaderFrame(in InboundFrame frame)
private void ParseHeaderFrame(InboundFrame frame)
{
if (frame.Type != FrameType.FrameHeader)
{
Expand All @@ -150,7 +135,7 @@ private bool ParseHeaderFrame(in InboundFrame frame)
throw new UnknownClassOrMethodException(classId, 0);
}

ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(4));
ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(Framing.Header.BodyLengthOffset));
if (totalBodyBytes > MaxArrayOfBytesSize)
{
throw new UnexpectedFrameException(frame.Type);
Expand All @@ -162,16 +147,21 @@ private bool ParseHeaderFrame(in InboundFrame frame)
throw new MalformedFrameException(message: msg, canShutdownCleanly: false);
}

_rentedHeaderArray = totalBodyBytes != 0 ? frame.TakeoverPayload() : Array.Empty<byte>();

_headerMemory = frame.Payload.Slice(12);
// There are always at least 2 bytes, even for empty ones
if (frame.Payload.Length <= Framing.Header.HeaderArgumentOffset + 2)
{
frame.TryReturnPayload();
}
else
{
_headerMemory = frame.TakeoverPayload(Framing.Header.HeaderArgumentOffset);
}

_remainingBodyByteCount = (int)totalBodyBytes;
UpdateContentBodyState();
return _rentedHeaderArray.Length == 0;
}

private bool ParseBodyFrame(in InboundFrame frame)
private void ParseBodyFrame(InboundFrame frame)
{
if (frame.Type != FrameType.FrameBody)
{
Expand All @@ -184,27 +174,26 @@ private bool ParseBodyFrame(in InboundFrame frame)
throw new MalformedFrameException($"Overlong content body received - {_remainingBodyByteCount} bytes remaining, {payloadLength} bytes received");
}

if (_rentedBodyArray is null)
if (_bodyMemory.RentedArray is null)
{
// check for single frame payload for an early exit
if (payloadLength == _remainingBodyByteCount)
{
_rentedBodyArray = frame.TakeoverPayload();
_bodyMemory = frame.Payload;
_bodyMemory = frame.TakeoverPayload(0);
_state = AssemblyState.Complete;
return false;
return;
}

// Is returned by IncomingCommand.ReturnPayload in Session.HandleFrame
_rentedBodyArray = ArrayPool<byte>.Shared.Rent(_remainingBodyByteCount);
_bodyMemory = new ReadOnlyMemory<byte>(_rentedBodyArray, 0, _remainingBodyByteCount);
var rentedBodyArray = ArrayPool<byte>.Shared.Rent(_remainingBodyByteCount);
_bodyMemory = new RentedMemory(new ReadOnlyMemory<byte>(rentedBodyArray, 0, _remainingBodyByteCount), rentedBodyArray);
}

frame.Payload.Span.CopyTo(_rentedBodyArray.AsSpan(_offset));
frame.Payload.Span.CopyTo(_bodyMemory.RentedArray.AsSpan(_offset));
frame.TryReturnPayload();
_remainingBodyByteCount -= payloadLength;
_offset += payloadLength;
UpdateContentBodyState();
return true;
}

private void UpdateContentBodyState()
Expand Down
20 changes: 9 additions & 11 deletions projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,29 +123,30 @@ await FinishCloseAsync(cts.Token)

private async Task ReceiveLoopAsync(CancellationToken mainLoopCancellationToken)
{
InboundFrame frame = new InboundFrame();

while (false == _closed)
{
mainLoopCancellationToken.ThrowIfCancellationRequested();

while (_frameHandler.TryReadFrame(out InboundFrame frame))
while (_frameHandler.TryReadFrame(frame))
{
NotifyHeartbeatListener();
await ProcessFrameAsync(frame, mainLoopCancellationToken)
.ConfigureAwait(false);
}

// Done reading frames synchronously, go async
InboundFrame asyncFrame = await _frameHandler.ReadFrameAsync(mainLoopCancellationToken)
await _frameHandler.ReadFrameAsync(frame, mainLoopCancellationToken)
.ConfigureAwait(false);
NotifyHeartbeatListener();
await ProcessFrameAsync(asyncFrame, mainLoopCancellationToken)
.ConfigureAwait(false);
await ProcessFrameAsync(frame, mainLoopCancellationToken)
.ConfigureAwait(false);
}
}

private async Task ProcessFrameAsync(InboundFrame frame, CancellationToken cancellationToken)
{
bool shallReturnPayload = true;
if (frame.Channel == 0)
{
if (frame.Type == FrameType.FrameHeartbeat)
Expand All @@ -164,7 +165,7 @@ private async Task ProcessFrameAsync(InboundFrame frame, CancellationToken cance
// quiescing situation, even though technically we
// should be ignoring everything except
// connection.close-ok.
shallReturnPayload = await _session0.HandleFrameAsync(frame, cancellationToken)
await _session0.HandleFrameAsync(frame, cancellationToken)
.ConfigureAwait(false);
}
}
Expand All @@ -182,15 +183,12 @@ private async Task ProcessFrameAsync(InboundFrame frame, CancellationToken cance
// Session itself may be quiescing this particular
// channel, but that's none of our concern.)
ISession session = _sessionManager.Lookup(frame.Channel);
shallReturnPayload = await session.HandleFrameAsync(frame, cancellationToken)
await session.HandleFrameAsync(frame, cancellationToken)
.ConfigureAwait(false);
}
}

if (shallReturnPayload)
{
frame.ReturnPayload();
}
frame.TryReturnPayload();
}

///<remarks>
Expand Down
Loading

0 comments on commit 56b3ebf

Please sign in to comment.