Skip to content

Commit

Permalink
Expose amqp objects state as public (#388)
Browse files Browse the repository at this point in the history
* Expose Link State

* Expose Session State

* Expose Connection state
  • Loading branch information
Havret authored and xinchen10 committed Oct 14, 2019
1 parent 4ad528d commit 47debd4
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 90 deletions.
160 changes: 111 additions & 49 deletions src/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,83 @@ namespace Amqp
using Amqp.Handler;
using Amqp.Sasl;
using Amqp.Types;

/// <summary>
/// The state of a connection.
/// </summary>
public enum ConnectionState
{
/// <summary>
/// The connection is started.
/// </summary>
Start,

/// <summary>
/// Header frame was sent.
/// </summary>
HeaderSent,

/// <summary>
/// The connection is opening.
/// </summary>
OpenPipe,

/// <summary>
/// Header frame was received.
/// </summary>
HeaderReceived,

/// <summary>
/// Header frame exchanged.
/// </summary>
HeaderExchanged,

/// <summary>
/// Open frame was sent.
/// </summary>
OpenSent,

/// <summary>
/// Open frame was received.
/// </summary>
OpenReceived,

/// <summary>
/// The connection is opened.
/// </summary>
Opened,

/// <summary>
/// Close frame received.
/// </summary>
CloseReceived,

/// <summary>
/// Close frame sent.
/// </summary>
CloseSent,

/// <summary>
/// The connection is opening or closing.
/// </summary>
OpenClosePipe,

/// <summary>
/// The connection is closing.
/// </summary>
ClosePipe,

/// <summary>
/// The connection is closed.
/// </summary>
End
}

/// <summary>
/// The Connection class represents an AMQP connection.
/// </summary>
public partial class Connection : AmqpObject
{
enum State
{
Start,
HeaderSent,
OpenPipe,
HeaderReceived,
HeaderExchanged,
OpenSent,
OpenReceived,
Opened,
CloseReceived,
CloseSent,
OpenClosePipe,
ClosePipe,
End
}

/// <summary>
/// A flag to disable server certificate validation when TLS is used.
/// </summary>
Expand All @@ -61,7 +115,7 @@ enum State
Session[] localSessions;
Session[] remoteSessions;
ushort channelMax;
State state;
ConnectionState state;
uint maxFrameSize;
uint remoteMaxFrameSize;
ITransport writer;
Expand Down Expand Up @@ -174,7 +228,7 @@ internal Connection(IBufferManager bufferManager, AmqpSettings amqpSettings, Add

this.SendHeader();
this.SendOpen(open);
this.state = State.OpenPipe;
this.state = ConnectionState.OpenPipe;
}

/// <summary>
Expand All @@ -185,6 +239,14 @@ public static ConnectionFactory Factory
get { return new ConnectionFactory(); }
}

/// <summary>
/// Gets the connection state.
/// </summary>
public ConnectionState ConnectionState
{
get { return this.state; }
}

internal IBufferManager BufferManager
{
get;
Expand Down Expand Up @@ -253,7 +315,7 @@ internal ushort AddSession(Session session)

internal void SendCommand(ushort channel, DescribedList command)
{
if (command.Descriptor.Code == Codec.Close.Code || this.state < State.CloseSent)
if (command.Descriptor.Code == Codec.Close.Code || this.state < ConnectionState.CloseSent)
{
ByteBuffer buffer = this.AllocateBuffer(Frame.CmdBufferSize);
Frame.Encode(buffer, FrameType.Amqp, channel, command);
Expand Down Expand Up @@ -322,24 +384,24 @@ protected override bool OnClose(Error error)
{
lock (this.ThisLock)
{
State newState = State.Start;
if (this.state == State.OpenPipe )
ConnectionState newState = ConnectionState.Start;
if (this.state == ConnectionState.OpenPipe )
{
newState = State.OpenClosePipe;
newState = ConnectionState.OpenClosePipe;
}
else if (state == State.OpenSent)
else if (state == ConnectionState.OpenSent)
{
newState = State.ClosePipe;
newState = ConnectionState.ClosePipe;
}
else if (this.state == State.Opened)
else if (this.state == ConnectionState.Opened)
{
newState = State.CloseSent;
newState = ConnectionState.CloseSent;
}
else if (this.state == State.CloseReceived)
else if (this.state == ConnectionState.CloseReceived)
{
newState = State.End;
newState = ConnectionState.End;
}
else if (this.state == State.End)
else if (this.state == ConnectionState.End)
{
return true;
}
Expand All @@ -351,7 +413,7 @@ protected override bool OnClose(Error error)

this.SendClose(error);
this.state = newState;
return this.state == State.End;
return this.state == ConnectionState.End;
}
}

Expand Down Expand Up @@ -416,15 +478,15 @@ void Connect(SaslProfile saslProfile, Open open)
// after getting the transport, move state to open pipe before starting the pump
this.SendHeader();
this.SendOpen(open);
this.state = State.OpenPipe;
this.state = ConnectionState.OpenPipe;

this.reader = new Pump(this, transport);
this.reader.Start();
}

void ThrowIfClosed(string operation)
{
if (this.state >= State.CloseSent)
if (this.state >= ConnectionState.CloseSent)
{
throw new AmqpException(this.Error ??
new Error(ErrorCode.IllegalState)
Expand Down Expand Up @@ -474,13 +536,13 @@ void OnOpen(Open open)

lock (this.ThisLock)
{
if (this.state == State.OpenSent)
if (this.state == ConnectionState.OpenSent)
{
this.state = State.Opened;
this.state = ConnectionState.Opened;
}
else if (this.state == State.ClosePipe)
else if (this.state == ConnectionState.ClosePipe)
{
this.state = State.CloseSent;
this.state = ConnectionState.CloseSent;
}
else
{
Expand Down Expand Up @@ -522,11 +584,11 @@ void OnClose(Close close)

lock (this.ThisLock)
{
if (this.state == State.Opened)
if (this.state == ConnectionState.Opened)
{
this.SendClose(null);
}
else if (this.state == State.CloseSent)
else if (this.state == ConnectionState.CloseSent)
{
}
else
Expand All @@ -535,7 +597,7 @@ void OnClose(Close close)
Fx.Format(SRAmqp.AmqpIllegalOperationState, "OnClose", this.state));
}

this.state = State.End;
this.state = ConnectionState.End;
this.OnEnded(close.Error);
}
}
Expand Down Expand Up @@ -632,13 +694,13 @@ internal bool OnHeader(ProtocolHeader header)

lock (this.ThisLock)
{
if (this.state == State.OpenPipe)
if (this.state == ConnectionState.OpenPipe)
{
this.state = State.OpenSent;
this.state = ConnectionState.OpenSent;
}
else if (this.state == State.OpenClosePipe)
else if (this.state == ConnectionState.OpenClosePipe)
{
this.state = State.ClosePipe;
this.state = ConnectionState.ClosePipe;
}
else
{
Expand Down Expand Up @@ -717,7 +779,7 @@ void OnException(Exception exception)
amqpException.Error :
new Error(ErrorCode.InternalError) { Description = exception.Message };

if (this.state < State.CloseSent)
if (this.state < ConnectionState.CloseSent)
{
// send close and shutdown the transport.
try
Expand All @@ -729,16 +791,16 @@ void OnException(Exception exception)
}
}

this.state = State.End;
this.state = ConnectionState.End;
this.OnEnded(error);
}

internal void OnIoException(Exception exception)
{
Trace.WriteLine(TraceLevel.Error, "I/O: {0}", exception.ToString());
if (this.state != State.End)
if (this.state != ConnectionState.End)
{
this.state = State.End;
this.state = ConnectionState.End;
this.CloseCalled = true;
Error error = new Error(ErrorCode.ConnectionForced) { Description = exception.Message };
this.OnEnded(error);
Expand Down Expand Up @@ -964,7 +1026,7 @@ void PumpThread()
}

byte[] sizeBuffer = new byte[FixedWidth.UInt];
while (this.connection.state != State.End)
while (this.connection.state != ConnectionState.End)
{
try
{
Expand Down
49 changes: 43 additions & 6 deletions src/Link.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,49 @@ namespace Amqp
using Amqp.Framing;
using Amqp.Handler;

enum LinkState
/// <summary>
/// The state of a link.
/// </summary>
public enum LinkState
{
/// <summary>
/// The link is started.
/// </summary>
Start,

/// <summary>
/// Attach frame was sent.
/// </summary>
AttachSent,

/// <summary>
/// Attach frame was received.
/// </summary>
AttachReceived,

/// <summary>
/// The link is attached.
/// </summary>
Attached,

/// <summary>
/// The link is detaching.
/// </summary>
DetachPipe,

/// <summary>
/// Detach frame was sent.
/// </summary>
DetachSent,

/// <summary>
/// Detach frame was received.
/// </summary>
DetachReceived,

/// <summary>
/// The link is closed.
/// </summary>
End
}

Expand Down Expand Up @@ -94,6 +128,14 @@ public Session Session
{
get { return this.session; }
}

/// <summary>
/// Gets the link state.
/// </summary>
public LinkState LinkState
{
get { return this.state; }
}

internal object ThisLock
{
Expand All @@ -105,11 +147,6 @@ internal bool IsDetaching
get { return this.state >= LinkState.DetachPipe; }
}

internal LinkState LinkState
{
get { return this.state; }
}

/// <summary>
/// Detaches the link endpoint without closing it.
/// </summary>
Expand Down
Loading

0 comments on commit 47debd4

Please sign in to comment.