Skip to content

Commit

Permalink
abolish an extra goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
andriibeee committed Nov 13, 2024
1 parent 1253b77 commit ad3f0c2
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 39 deletions.
6 changes: 0 additions & 6 deletions close.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,6 @@ func (c *Conn) waitGoroutines() error {
t := time.NewTimer(time.Second * 15)
defer t.Stop()

select {
case <-c.timeoutLoopDone:
case <-t.C:
return errors.New("failed to wait for timeoutLoop goroutine to exit")
}

c.closeReadMu.Lock()
closeRead := c.closeReadCtx != nil
c.closeReadMu.Unlock()
Expand Down
63 changes: 36 additions & 27 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ type Conn struct {
br *bufio.Reader
bw *bufio.Writer

readTimeout chan context.Context
writeTimeout chan context.Context
timeoutLoopDone chan struct{}
readTimeoutCloser atomic.Value
writeTimeoutCloser atomic.Value

// Read state.
readMu *mu
Expand Down Expand Up @@ -104,10 +103,6 @@ func newConn(cfg connConfig) *Conn {
br: cfg.br,
bw: cfg.bw,

readTimeout: make(chan context.Context),
writeTimeout: make(chan context.Context),
timeoutLoopDone: make(chan struct{}),

closed: make(chan struct{}),
activePings: make(map[string]chan<- struct{}),
}
Expand All @@ -133,8 +128,6 @@ func newConn(cfg connConfig) *Conn {
c.close()
})

go c.timeoutLoop()

return c
}

Expand Down Expand Up @@ -164,26 +157,42 @@ func (c *Conn) close() error {
return err
}

func (c *Conn) timeoutLoop() {
defer close(c.timeoutLoopDone)
func (c *Conn) setupWriteTimeout(ctx context.Context) {
hammerTime := context.AfterFunc(ctx, func() {

Check failure on line 161 in conn.go

View workflow job for this annotation

GitHub Actions / bench

undefined: context.AfterFunc
c.close()
})

readCtx := context.Background()
writeCtx := context.Background()
if closer := c.writeTimeoutCloser.Swap(hammerTime); closer != nil {
if fn, ok := closer.(func() bool); ok {
fn()
}
}
}

for {
select {
case <-c.closed:
return

case writeCtx = <-c.writeTimeout:
case readCtx = <-c.readTimeout:

case <-readCtx.Done():
c.close()
return
case <-writeCtx.Done():
c.close()
return
func (c *Conn) clearWriteTimeout() {
if closer := c.writeTimeoutCloser.Load(); closer != nil {
if fn, ok := closer.(func() bool); ok {
fn()
}
}
}

func (c *Conn) setupReadTimeout(ctx context.Context) {
hammerTime := context.AfterFunc(ctx, func() {

Check failure on line 181 in conn.go

View workflow job for this annotation

GitHub Actions / bench

undefined: context.AfterFunc
defer c.close()
})

if closer := c.readTimeoutCloser.Swap(hammerTime); closer != nil {
if fn, ok := closer.(func() bool); ok {
fn()
}
}
}

func (c *Conn) clearReadTimeout() {
if closer := c.readTimeoutCloser.Load(); closer != nil {
if fn, ok := closer.(func() bool); ok {
fn()
}
}
}
Expand Down
12 changes: 8 additions & 4 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ func (c *Conn) readFrameHeader(ctx context.Context) (header, error) {
select {
case <-c.closed:
return header{}, net.ErrClosed
case c.readTimeout <- ctx:
default:
c.setupReadTimeout(ctx)
}

h, err := readFrameHeader(c.br, c.readHeaderBuf[:])
Expand All @@ -239,7 +240,8 @@ func (c *Conn) readFrameHeader(ctx context.Context) (header, error) {
select {
case <-c.closed:
return header{}, net.ErrClosed
case c.readTimeout <- context.Background():
default:
c.clearReadTimeout()
}

return h, nil
Expand All @@ -249,7 +251,8 @@ func (c *Conn) readFramePayload(ctx context.Context, p []byte) (int, error) {
select {
case <-c.closed:
return 0, net.ErrClosed
case c.readTimeout <- ctx:
default:
c.setupReadTimeout(ctx)
}

n, err := io.ReadFull(c.br, p)
Expand All @@ -267,7 +270,8 @@ func (c *Conn) readFramePayload(ctx context.Context, p []byte) (int, error) {
select {
case <-c.closed:
return n, net.ErrClosed
case c.readTimeout <- context.Background():
default:
c.clearReadTimeout()
}

return n, err
Expand Down
6 changes: 4 additions & 2 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ func (c *Conn) writeFrame(ctx context.Context, fin bool, flate bool, opcode opco
select {
case <-c.closed:
return 0, net.ErrClosed
case c.writeTimeout <- ctx:
default:
c.setupWriteTimeout(ctx)
}

defer func() {
Expand Down Expand Up @@ -309,7 +310,8 @@ func (c *Conn) writeFrame(ctx context.Context, fin bool, flate bool, opcode opco
return n, nil
}
return n, net.ErrClosed
case c.writeTimeout <- context.Background():
default:
c.clearWriteTimeout()
}

return n, nil
Expand Down

0 comments on commit ad3f0c2

Please sign in to comment.