From 30faa443874f185bed6b224505d0cfb41345ce6e Mon Sep 17 00:00:00 2001 From: liusong Date: Sun, 25 Jun 2023 23:25:40 +0800 Subject: [PATCH] improving code style --- buffer.go | 4 ++-- session.go | 14 +++++++++----- stream.go | 42 +++++++++++++++++++++++------------------- 3 files changed, 34 insertions(+), 26 deletions(-) diff --git a/buffer.go b/buffer.go index 3941ffc..f25eee6 100644 --- a/buffer.go +++ b/buffer.go @@ -31,7 +31,7 @@ var ( // BufferWriter used to write data to stream. type BufferWriter interface { //Len() return the current wrote size of buffer. - //It will traverse all underlying slices to compute the unread size, please dont's call frequently. + //It will traverse all underlying slices to compute the unread size, please don't call frequently. Len() int io.ByteWriter //Reserve `size` byte share memory space, user could use it implement zero copy write. @@ -49,7 +49,7 @@ type BufferReader interface { io.ByteReader //Len() return the current unread size of buffer. - //It will traverse all underlying slices to compute the unread size, please dont's call frequently. + //It will traverse all underlying slices to compute the unread size, please don't call frequently. Len() int //Read `size` bytes from share memory, which maybe block if size is greater than Len(). diff --git a/session.go b/session.go index a79bb6f..59d4c8b 100644 --- a/session.go +++ b/session.go @@ -220,7 +220,11 @@ func (s *Session) initProtocol() error { // IsClosed does a safe check to see if we have shutdown func (s *Session) IsClosed() bool { - return atomic.LoadUint32(&s.shutdown) == 1 + return s.getSessionShutdown() == 1 +} + +func (s *Session) getSessionShutdown() uint32 { + return atomic.LoadUint32(&s.shutdown) } // IsHealthy return whether the session is healthy @@ -247,7 +251,7 @@ func (s *Session) OpenStream() (*Stream, error) { if s.IsClosed() { return nil, s.shutdownErr } - if atomic.LoadUint32(&s.unhealthy) == 1 { + if !s.IsHealthy() { return nil, ErrSessionUnhealthy } @@ -485,7 +489,7 @@ func (s *Session) monitorLoop() { } func (s *Session) onEventData(buf []byte, conn eventConn) error { - if atomic.LoadUint32(&s.shutdown) == 1 { + if s.IsClosed() { return nil } consumed, err := s.handleEvents(buf) @@ -716,13 +720,13 @@ func (s *Session) GetMetrics() (PerformanceMetrics, StabilityMetrics, ShareMemor //so here we need ensure that the session hadn't shutdown. s.shutdownLock.Lock() var sendQueueCount, receiveQueueCount uint64 - if s.queueManager != nil && atomic.LoadUint32(&s.shutdown) == 0 { + if s.queueManager != nil && s.getSessionShutdown() == 0 { sendQueueCount = uint64(atomic.LoadInt64(s.queueManager.sendQueue.tail)) receiveQueueCount = uint64(atomic.LoadInt64(s.queueManager.recvQueue.tail)) } s.shutdownLock.Unlock() var smm ShareMemoryMetrics - if s.bufferManager != nil && atomic.LoadUint32(&s.shutdown) == 0 { + if s.bufferManager != nil && s.getSessionShutdown() == 0 { allShmBytes, inUsedShmBytes := uint32(0), uint32(0) for _, l := range s.bufferManager.lists { allShmBytes += (*l.cap) * (*l.capPerBuffer) diff --git a/stream.go b/stream.go index ae1b5fa..93fff98 100644 --- a/stream.go +++ b/stream.go @@ -108,8 +108,8 @@ func newStream(session *Session, id uint32) *Stream { return s } -//SetCallbacks used to set the StreamCallbacks. -//Notice: It was just called only once, or return the error named ErrStreamCallbackHadExisted. +// SetCallbacks used to set the StreamCallbacks. +// Notice: It was just called only once, or return the error named ErrStreamCallbackHadExisted. func (s *Stream) SetCallbacks(callback StreamCallbacks) error { if s.getCallbacks() != nil { return ErrStreamCallbackHadExisted @@ -139,7 +139,7 @@ func (s *Stream) readMore(minSize int) (err error) { return nil } - if recvLen == 0 && atomic.LoadUint32(&s.state) != uint32(streamOpened) { + if recvLen == 0 && !s.IsOpen() { return ErrEndOfStream } @@ -174,7 +174,7 @@ func (s *Stream) readMore(minSize int) (err error) { if s.recvBuf.Len() >= minSize { return nil } - if atomic.LoadUint32(&s.state) == uint32(streamHalfClosed) { + if s.getStreamState() == uint32(streamHalfClosed) { return ErrEndOfStream } return ErrStreamClosed @@ -201,7 +201,7 @@ func (s *Stream) Flush(endStream bool) error { return nil } atomic.AddUint64(&s.session.stats.outFlowBytes, uint64(s.sendBuf.Len())) - state := atomic.LoadUint32(&s.state) + state := s.getStreamState() if state != uint32(streamOpened) { s.sendBuf.recycle() return ErrStreamClosed @@ -270,8 +270,8 @@ func (s *Stream) writeFallback(streamStatus uint32, err error) error { return s.session.waitForSend(nil, data) } -//Close used to close the stream, which maybe block if there is StreamCallbacks running. -//if a stream was leaked, it's also mean that some share memory was leaked. +// Close used to close the stream, which maybe block if there is StreamCallbacks running. +// if a stream was leaked, it's also mean that some share memory was leaked. func (s *Stream) Close() error { if s.getCallbacks() != nil { atomic.StoreUint32(&s.callbackCloseState, uint32(callbackWaitExit)) @@ -287,7 +287,7 @@ func (s *Stream) Close() error { // close the stream. after close stream, any operation will return ErrStreamClosed. // unread data will be drained and released. func (s *Stream) close() error { - oldState := atomic.LoadUint32(&s.state) + oldState := s.getStreamState() if oldState == uint32(streamClosed) { return nil } @@ -301,13 +301,13 @@ func (s *Stream) close() error { s.safeCloseNotify() callback := s.getCallbacks() if callback != nil { - if atomic.LoadUint32(&s.session.shutdown) == 1 { + if s.session.IsClosed() { callback.OnRemoteClose() } else { callback.OnLocalClose() } } - if atomic.LoadUint32(&s.session.shutdown) == 1 { + if s.session.IsClosed() { return nil } // notify peer @@ -327,7 +327,7 @@ func (s *Stream) close() error { } func (s *Stream) clean() { - s.session.onStreamClose(s.id, streamState(atomic.LoadUint32(&s.state))) + s.session.onStreamClose(s.id, streamState(s.getStreamState())) s.pendingData.clear() s.recvBuf.recycle() s.sendBuf.recycle() @@ -345,7 +345,7 @@ func (s *Stream) halfClose() { // clean the stream's all status for reusing. func (s *Stream) reset() error { - if atomic.LoadUint32(&s.state) != uint32(streamOpened) { + if !s.IsOpen() { return ErrStreamClosed } // return error if has any unread data @@ -388,7 +388,7 @@ func (s *Stream) ReleaseReadAndReuse() { func (s *Stream) fillDataToReadBuffer(buf bufferSliceWrapper) error { s.pendingData.add(buf) //stream had closed, which maybe closed by user due to timeout. - if atomic.LoadUint32(&s.state) == uint32(streamClosed) { + if s.getStreamState() == uint32(streamClosed) { s.pendingData.clear() s.recvBuf.recycle() return nil @@ -446,9 +446,13 @@ func (s *Stream) SetWriteDeadline(t time.Time) error { return nil } -//IsOpen return whether the stream is open +// IsOpen return whether the stream is open func (s *Stream) IsOpen() bool { - return atomic.LoadUint32(&s.state) == uint32(streamOpened) + return s.getStreamState() == uint32(streamOpened) +} + +func (s *Stream) getStreamState() uint32 { + return atomic.LoadUint32(&s.state) } func (s *Stream) safeCloseNotify() { @@ -539,14 +543,14 @@ func (s *Stream) getCallbacks() StreamCallbacks { return nil } -//Low performance api, it just adapt to the interface net.Conn, which will copy data from read buffer to `p` -//please use BufferReader() API to implement zero copy read +// Low performance api, it just adapt to the interface net.Conn, which will copy data from read buffer to `p` +// please use BufferReader() API to implement zero copy read func (s *Stream) Read(p []byte) (int, error) { return s.copyRead(p) } -//Low performance api, it just adapt to the interface net.Conn, which will do copy data from `p` to write buffer -//please use BufferWriter() API to implement zero copy write +// Low performance api, it just adapt to the interface net.Conn, which will do copy data from `p` to write buffer +// please use BufferWriter() API to implement zero copy write func (s *Stream) Write(p []byte) (int, error) { return s.copyWriteAndFlush(p) }