From 47debd4950a03e7df73b5966381879c604c6fb75 Mon Sep 17 00:00:00 2001 From: Havret Date: Mon, 14 Oct 2019 19:19:31 +0200 Subject: [PATCH] Expose amqp objects state as public (#388) * Expose Link State * Expose Session State * Expose Connection state --- src/Connection.cs | 160 ++++++++++++++++++++++++++++++++-------------- src/Link.cs | 49 ++++++++++++-- src/Session.cs | 112 ++++++++++++++++++++++---------- 3 files changed, 231 insertions(+), 90 deletions(-) diff --git a/src/Connection.cs b/src/Connection.cs index cd5c89dd..a69873ac 100644 --- a/src/Connection.cs +++ b/src/Connection.cs @@ -23,29 +23,83 @@ namespace Amqp using Amqp.Handler; using Amqp.Sasl; using Amqp.Types; + + /// + /// The state of a connection. + /// + public enum ConnectionState + { + /// + /// The connection is started. + /// + Start, + + /// + /// Header frame was sent. + /// + HeaderSent, + + /// + /// The connection is opening. + /// + OpenPipe, + + /// + /// Header frame was received. + /// + HeaderReceived, + + /// + /// Header frame exchanged. + /// + HeaderExchanged, + + /// + /// Open frame was sent. + /// + OpenSent, + + /// + /// Open frame was received. + /// + OpenReceived, + + /// + /// The connection is opened. + /// + Opened, + + /// + /// Close frame received. + /// + CloseReceived, + + /// + /// Close frame sent. + /// + CloseSent, + + /// + /// The connection is opening or closing. + /// + OpenClosePipe, + + /// + /// The connection is closing. + /// + ClosePipe, + + /// + /// The connection is closed. + /// + End + } /// /// The Connection class represents an AMQP connection. /// public partial class Connection : AmqpObject { - enum State - { - Start, - HeaderSent, - OpenPipe, - HeaderReceived, - HeaderExchanged, - OpenSent, - OpenReceived, - Opened, - CloseReceived, - CloseSent, - OpenClosePipe, - ClosePipe, - End - } - /// /// A flag to disable server certificate validation when TLS is used. /// @@ -61,7 +115,7 @@ enum State Session[] localSessions; Session[] remoteSessions; ushort channelMax; - State state; + ConnectionState state; uint maxFrameSize; uint remoteMaxFrameSize; ITransport writer; @@ -174,7 +228,7 @@ internal Connection(IBufferManager bufferManager, AmqpSettings amqpSettings, Add this.SendHeader(); this.SendOpen(open); - this.state = State.OpenPipe; + this.state = ConnectionState.OpenPipe; } /// @@ -185,6 +239,14 @@ public static ConnectionFactory Factory get { return new ConnectionFactory(); } } + /// + /// Gets the connection state. + /// + public ConnectionState ConnectionState + { + get { return this.state; } + } + internal IBufferManager BufferManager { get; @@ -253,7 +315,7 @@ internal ushort AddSession(Session session) internal void SendCommand(ushort channel, DescribedList command) { - if (command.Descriptor.Code == Codec.Close.Code || this.state < State.CloseSent) + if (command.Descriptor.Code == Codec.Close.Code || this.state < ConnectionState.CloseSent) { ByteBuffer buffer = this.AllocateBuffer(Frame.CmdBufferSize); Frame.Encode(buffer, FrameType.Amqp, channel, command); @@ -322,24 +384,24 @@ protected override bool OnClose(Error error) { lock (this.ThisLock) { - State newState = State.Start; - if (this.state == State.OpenPipe ) + ConnectionState newState = ConnectionState.Start; + if (this.state == ConnectionState.OpenPipe ) { - newState = State.OpenClosePipe; + newState = ConnectionState.OpenClosePipe; } - else if (state == State.OpenSent) + else if (state == ConnectionState.OpenSent) { - newState = State.ClosePipe; + newState = ConnectionState.ClosePipe; } - else if (this.state == State.Opened) + else if (this.state == ConnectionState.Opened) { - newState = State.CloseSent; + newState = ConnectionState.CloseSent; } - else if (this.state == State.CloseReceived) + else if (this.state == ConnectionState.CloseReceived) { - newState = State.End; + newState = ConnectionState.End; } - else if (this.state == State.End) + else if (this.state == ConnectionState.End) { return true; } @@ -351,7 +413,7 @@ protected override bool OnClose(Error error) this.SendClose(error); this.state = newState; - return this.state == State.End; + return this.state == ConnectionState.End; } } @@ -416,7 +478,7 @@ void Connect(SaslProfile saslProfile, Open open) // after getting the transport, move state to open pipe before starting the pump this.SendHeader(); this.SendOpen(open); - this.state = State.OpenPipe; + this.state = ConnectionState.OpenPipe; this.reader = new Pump(this, transport); this.reader.Start(); @@ -424,7 +486,7 @@ void Connect(SaslProfile saslProfile, Open open) void ThrowIfClosed(string operation) { - if (this.state >= State.CloseSent) + if (this.state >= ConnectionState.CloseSent) { throw new AmqpException(this.Error ?? new Error(ErrorCode.IllegalState) @@ -474,13 +536,13 @@ void OnOpen(Open open) lock (this.ThisLock) { - if (this.state == State.OpenSent) + if (this.state == ConnectionState.OpenSent) { - this.state = State.Opened; + this.state = ConnectionState.Opened; } - else if (this.state == State.ClosePipe) + else if (this.state == ConnectionState.ClosePipe) { - this.state = State.CloseSent; + this.state = ConnectionState.CloseSent; } else { @@ -522,11 +584,11 @@ void OnClose(Close close) lock (this.ThisLock) { - if (this.state == State.Opened) + if (this.state == ConnectionState.Opened) { this.SendClose(null); } - else if (this.state == State.CloseSent) + else if (this.state == ConnectionState.CloseSent) { } else @@ -535,7 +597,7 @@ void OnClose(Close close) Fx.Format(SRAmqp.AmqpIllegalOperationState, "OnClose", this.state)); } - this.state = State.End; + this.state = ConnectionState.End; this.OnEnded(close.Error); } } @@ -632,13 +694,13 @@ internal bool OnHeader(ProtocolHeader header) lock (this.ThisLock) { - if (this.state == State.OpenPipe) + if (this.state == ConnectionState.OpenPipe) { - this.state = State.OpenSent; + this.state = ConnectionState.OpenSent; } - else if (this.state == State.OpenClosePipe) + else if (this.state == ConnectionState.OpenClosePipe) { - this.state = State.ClosePipe; + this.state = ConnectionState.ClosePipe; } else { @@ -717,7 +779,7 @@ void OnException(Exception exception) amqpException.Error : new Error(ErrorCode.InternalError) { Description = exception.Message }; - if (this.state < State.CloseSent) + if (this.state < ConnectionState.CloseSent) { // send close and shutdown the transport. try @@ -729,16 +791,16 @@ void OnException(Exception exception) } } - this.state = State.End; + this.state = ConnectionState.End; this.OnEnded(error); } internal void OnIoException(Exception exception) { Trace.WriteLine(TraceLevel.Error, "I/O: {0}", exception.ToString()); - if (this.state != State.End) + if (this.state != ConnectionState.End) { - this.state = State.End; + this.state = ConnectionState.End; this.CloseCalled = true; Error error = new Error(ErrorCode.ConnectionForced) { Description = exception.Message }; this.OnEnded(error); @@ -964,7 +1026,7 @@ void PumpThread() } byte[] sizeBuffer = new byte[FixedWidth.UInt]; - while (this.connection.state != State.End) + while (this.connection.state != ConnectionState.End) { try { diff --git a/src/Link.cs b/src/Link.cs index ea437fd7..3c985992 100644 --- a/src/Link.cs +++ b/src/Link.cs @@ -21,15 +21,49 @@ namespace Amqp using Amqp.Framing; using Amqp.Handler; - enum LinkState + /// + /// The state of a link. + /// + public enum LinkState { + /// + /// The link is started. + /// Start, + + /// + /// Attach frame was sent. + /// AttachSent, + + /// + /// Attach frame was received. + /// AttachReceived, + + /// + /// The link is attached. + /// Attached, + + /// + /// The link is detaching. + /// DetachPipe, + + /// + /// Detach frame was sent. + /// DetachSent, + + /// + /// Detach frame was received. + /// DetachReceived, + + /// + /// The link is closed. + /// End } @@ -94,6 +128,14 @@ public Session Session { get { return this.session; } } + + /// + /// Gets the link state. + /// + public LinkState LinkState + { + get { return this.state; } + } internal object ThisLock { @@ -105,11 +147,6 @@ internal bool IsDetaching get { return this.state >= LinkState.DetachPipe; } } - internal LinkState LinkState - { - get { return this.state; } - } - /// /// Detaches the link endpoint without closing it. /// diff --git a/src/Session.cs b/src/Session.cs index 23624868..c891d921 100644 --- a/src/Session.cs +++ b/src/Session.cs @@ -21,24 +21,58 @@ namespace Amqp using Amqp.Framing; using Amqp.Handler; using Amqp.Types; + + /// + /// The state of a session. + /// + public enum SessionState + { + /// + /// The session is started. + /// + Start, + + /// + /// Begin frame was sent. + /// + BeginSent, + + /// + /// Begin frame was received. + /// + BeginReceived, + + /// + /// The session is opened. + /// + Opened, + + /// + /// End frame received. + /// + EndReceived, + + /// + /// End frame was sent. + /// + EndSent, + + /// + /// The session is closing. + /// + EndPipe, + + /// + /// The session is closed. + /// + End + } /// /// The Session class represents an AMQP session. /// public partial class Session : AmqpObject { - enum State - { - Start, - BeginSent, - BeginReceived, - Opened, - EndReceived, - EndSent, - EndPipe, - End - } - internal const uint defaultWindowSize = 2048; readonly Connection connection; readonly OnBegin onBegin; @@ -46,7 +80,7 @@ enum State uint handleMax; Link[] localLinks; Link[] remoteLinks; - State state; + SessionState state; private readonly object lockObject = new object(); @@ -92,7 +126,7 @@ public Session(Connection connection, Begin begin, OnBegin onBegin) this.outgoingList = new LinkedList(); this.channel = connection.AddSession(this); - this.state = State.BeginSent; + this.state = SessionState.BeginSent; this.SendBegin(begin); } @@ -104,6 +138,14 @@ public Connection Connection get { return this.connection; } } + /// + /// Get the session state. + /// + public SessionState SessionState + { + get { return state; } + } + object ThisLock { get { return this.lockObject; } @@ -120,9 +162,9 @@ internal void Abort(Error error) this.Error = error; this.AbortLinks(error); - if (this.state != State.End) + if (this.state != SessionState.End) { - this.state = State.End; + this.state = SessionState.End; this.NotifyClosed(error); } } @@ -247,7 +289,7 @@ internal void SendFlow(Flow flow) internal void SendCommand(DescribedList command) { - if (command.Descriptor.Code == Codec.End.Code || this.state < State.EndSent) + if (command.Descriptor.Code == Codec.End.Code || this.state < SessionState.EndSent) { this.connection.SendCommand(this.channel, command); } @@ -263,13 +305,13 @@ internal void OnBegin(ushort remoteChannel, Begin begin) lock (this.ThisLock) { - if (this.state == State.BeginSent) + if (this.state == SessionState.BeginSent) { - this.state = State.Opened; + this.state = SessionState.Opened; } - else if (this.state == State.EndPipe) + else if (this.state == SessionState.EndPipe) { - this.state = State.EndSent; + this.state = SessionState.EndSent; } else { @@ -304,14 +346,14 @@ internal bool OnEnd(End end) lock (this.ThisLock) { - if (this.state == State.EndSent) + if (this.state == SessionState.EndSent) { - this.state = State.End; + this.state = SessionState.End; } - else if (this.state == State.Opened) + else if (this.state == SessionState.Opened) { this.SendEnd(); - this.state = State.End; + this.state = SessionState.End; } else { @@ -328,7 +370,7 @@ internal bool OnEnd(End end) internal void OnCommand(DescribedList command, ByteBuffer buffer) { - Fx.Assert(this.state != State.EndReceived && this.state != State.End, + Fx.Assert(this.state != SessionState.EndReceived && this.state != SessionState.End, "Session is ending or ended and cannot receive commands."); if (command.Descriptor.Code == Codec.Attach.Code) { @@ -368,21 +410,21 @@ protected override bool OnClose(Error error) lock (this.ThisLock) { - if (this.state == State.End) + if (this.state == SessionState.End) { return true; } - else if (this.state == State.BeginSent) + else if (this.state == SessionState.BeginSent) { - this.state = State.EndPipe; + this.state = SessionState.EndPipe; } - else if (this.state == State.Opened) + else if (this.state == SessionState.Opened) { - this.state = State.EndSent; + this.state = SessionState.EndSent; } - else if (this.state == State.EndReceived) + else if (this.state == SessionState.EndReceived) { - this.state = State.End; + this.state = SessionState.End; } else { @@ -391,7 +433,7 @@ protected override bool OnClose(Error error) } this.SendEnd(); - return this.state == State.End; + return this.state == SessionState.End; } } @@ -634,7 +676,7 @@ void OnDispose(Dispose dispose) void ThrowIfEnded(string operation) { - if (this.state >= State.EndSent) + if (this.state >= SessionState.EndSent) { throw new AmqpException(this.Error ?? new Error(ErrorCode.IllegalState)