Skip to content

Commit

Permalink
Merge pull request #31 from liu-song/main
Browse files Browse the repository at this point in the history
improve code style
  • Loading branch information
zhquzzuli authored Jul 13, 2023
2 parents ac16ec2 + 30faa44 commit 9f40696
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 26 deletions.
4 changes: 2 additions & 2 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
// BufferWriter used to write data to stream.
type BufferWriter interface {
//Len() return the current wrote size of buffer.
//It will traverse all underlying slices to compute the unread size, please dont's call frequently.
//It will traverse all underlying slices to compute the unread size, please don't call frequently.
Len() int
io.ByteWriter
//Reserve `size` byte share memory space, user could use it implement zero copy write.
Expand All @@ -49,7 +49,7 @@ type BufferReader interface {
io.ByteReader

//Len() return the current unread size of buffer.
//It will traverse all underlying slices to compute the unread size, please dont's call frequently.
//It will traverse all underlying slices to compute the unread size, please don't call frequently.
Len() int

//Read `size` bytes from share memory, which maybe block if size is greater than Len().
Expand Down
14 changes: 9 additions & 5 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,11 @@ func (s *Session) initProtocol() error {

// IsClosed does a safe check to see if we have shutdown
func (s *Session) IsClosed() bool {
return atomic.LoadUint32(&s.shutdown) == 1
return s.getSessionShutdown() == 1
}

func (s *Session) getSessionShutdown() uint32 {
return atomic.LoadUint32(&s.shutdown)
}

// IsHealthy return whether the session is healthy
Expand All @@ -247,7 +251,7 @@ func (s *Session) OpenStream() (*Stream, error) {
if s.IsClosed() {
return nil, s.shutdownErr
}
if atomic.LoadUint32(&s.unhealthy) == 1 {
if !s.IsHealthy() {
return nil, ErrSessionUnhealthy
}

Expand Down Expand Up @@ -485,7 +489,7 @@ func (s *Session) monitorLoop() {
}

func (s *Session) onEventData(buf []byte, conn eventConn) error {
if atomic.LoadUint32(&s.shutdown) == 1 {
if s.IsClosed() {
return nil
}
consumed, err := s.handleEvents(buf)
Expand Down Expand Up @@ -716,13 +720,13 @@ func (s *Session) GetMetrics() (PerformanceMetrics, StabilityMetrics, ShareMemor
//so here we need ensure that the session hadn't shutdown.
s.shutdownLock.Lock()
var sendQueueCount, receiveQueueCount uint64
if s.queueManager != nil && atomic.LoadUint32(&s.shutdown) == 0 {
if s.queueManager != nil && s.getSessionShutdown() == 0 {
sendQueueCount = uint64(atomic.LoadInt64(s.queueManager.sendQueue.tail))
receiveQueueCount = uint64(atomic.LoadInt64(s.queueManager.recvQueue.tail))
}
s.shutdownLock.Unlock()
var smm ShareMemoryMetrics
if s.bufferManager != nil && atomic.LoadUint32(&s.shutdown) == 0 {
if s.bufferManager != nil && s.getSessionShutdown() == 0 {
allShmBytes, inUsedShmBytes := uint32(0), uint32(0)
for _, l := range s.bufferManager.lists {
allShmBytes += (*l.cap) * (*l.capPerBuffer)
Expand Down
42 changes: 23 additions & 19 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ func newStream(session *Session, id uint32) *Stream {
return s
}

//SetCallbacks used to set the StreamCallbacks.
//Notice: It was just called only once, or return the error named ErrStreamCallbackHadExisted.
// SetCallbacks used to set the StreamCallbacks.
// Notice: It was just called only once, or return the error named ErrStreamCallbackHadExisted.
func (s *Stream) SetCallbacks(callback StreamCallbacks) error {
if s.getCallbacks() != nil {
return ErrStreamCallbackHadExisted
Expand Down Expand Up @@ -139,7 +139,7 @@ func (s *Stream) readMore(minSize int) (err error) {
return nil
}

if recvLen == 0 && atomic.LoadUint32(&s.state) != uint32(streamOpened) {
if recvLen == 0 && !s.IsOpen() {
return ErrEndOfStream
}

Expand Down Expand Up @@ -174,7 +174,7 @@ func (s *Stream) readMore(minSize int) (err error) {
if s.recvBuf.Len() >= minSize {
return nil
}
if atomic.LoadUint32(&s.state) == uint32(streamHalfClosed) {
if s.getStreamState() == uint32(streamHalfClosed) {
return ErrEndOfStream
}
return ErrStreamClosed
Expand All @@ -201,7 +201,7 @@ func (s *Stream) Flush(endStream bool) error {
return nil
}
atomic.AddUint64(&s.session.stats.outFlowBytes, uint64(s.sendBuf.Len()))
state := atomic.LoadUint32(&s.state)
state := s.getStreamState()
if state != uint32(streamOpened) {
s.sendBuf.recycle()
return ErrStreamClosed
Expand Down Expand Up @@ -270,8 +270,8 @@ func (s *Stream) writeFallback(streamStatus uint32, err error) error {
return s.session.waitForSend(nil, data)
}

//Close used to close the stream, which maybe block if there is StreamCallbacks running.
//if a stream was leaked, it's also mean that some share memory was leaked.
// Close used to close the stream, which maybe block if there is StreamCallbacks running.
// if a stream was leaked, it's also mean that some share memory was leaked.
func (s *Stream) Close() error {
if s.getCallbacks() != nil {
atomic.StoreUint32(&s.callbackCloseState, uint32(callbackWaitExit))
Expand All @@ -287,7 +287,7 @@ func (s *Stream) Close() error {
// close the stream. after close stream, any operation will return ErrStreamClosed.
// unread data will be drained and released.
func (s *Stream) close() error {
oldState := atomic.LoadUint32(&s.state)
oldState := s.getStreamState()
if oldState == uint32(streamClosed) {
return nil
}
Expand All @@ -301,13 +301,13 @@ func (s *Stream) close() error {
s.safeCloseNotify()
callback := s.getCallbacks()
if callback != nil {
if atomic.LoadUint32(&s.session.shutdown) == 1 {
if s.session.IsClosed() {
callback.OnRemoteClose()
} else {
callback.OnLocalClose()
}
}
if atomic.LoadUint32(&s.session.shutdown) == 1 {
if s.session.IsClosed() {
return nil
}
// notify peer
Expand All @@ -327,7 +327,7 @@ func (s *Stream) close() error {
}

func (s *Stream) clean() {
s.session.onStreamClose(s.id, streamState(atomic.LoadUint32(&s.state)))
s.session.onStreamClose(s.id, streamState(s.getStreamState()))
s.pendingData.clear()
s.recvBuf.recycle()
s.sendBuf.recycle()
Expand All @@ -345,7 +345,7 @@ func (s *Stream) halfClose() {

// clean the stream's all status for reusing.
func (s *Stream) reset() error {
if atomic.LoadUint32(&s.state) != uint32(streamOpened) {
if !s.IsOpen() {
return ErrStreamClosed
}
// return error if has any unread data
Expand Down Expand Up @@ -388,7 +388,7 @@ func (s *Stream) ReleaseReadAndReuse() {
func (s *Stream) fillDataToReadBuffer(buf bufferSliceWrapper) error {
s.pendingData.add(buf)
//stream had closed, which maybe closed by user due to timeout.
if atomic.LoadUint32(&s.state) == uint32(streamClosed) {
if s.getStreamState() == uint32(streamClosed) {
s.pendingData.clear()
s.recvBuf.recycle()
return nil
Expand Down Expand Up @@ -446,9 +446,13 @@ func (s *Stream) SetWriteDeadline(t time.Time) error {
return nil
}

//IsOpen return whether the stream is open
// IsOpen return whether the stream is open
func (s *Stream) IsOpen() bool {
return atomic.LoadUint32(&s.state) == uint32(streamOpened)
return s.getStreamState() == uint32(streamOpened)
}

func (s *Stream) getStreamState() uint32 {
return atomic.LoadUint32(&s.state)
}

func (s *Stream) safeCloseNotify() {
Expand Down Expand Up @@ -539,14 +543,14 @@ func (s *Stream) getCallbacks() StreamCallbacks {
return nil
}

//Low performance api, it just adapt to the interface net.Conn, which will copy data from read buffer to `p`
//please use BufferReader() API to implement zero copy read
// Low performance api, it just adapt to the interface net.Conn, which will copy data from read buffer to `p`
// please use BufferReader() API to implement zero copy read
func (s *Stream) Read(p []byte) (int, error) {
return s.copyRead(p)
}

//Low performance api, it just adapt to the interface net.Conn, which will do copy data from `p` to write buffer
//please use BufferWriter() API to implement zero copy write
// Low performance api, it just adapt to the interface net.Conn, which will do copy data from `p` to write buffer
// please use BufferWriter() API to implement zero copy write
func (s *Stream) Write(p []byte) (int, error) {
return s.copyWriteAndFlush(p)
}
Expand Down

0 comments on commit 9f40696

Please sign in to comment.