Skip to content

Commit

Permalink
wsjs: Ensure no goroutines leak after Close
Browse files Browse the repository at this point in the history
Closes #330
  • Loading branch information
nhooyr committed Oct 19, 2023
1 parent 7b1a6bb commit 77f7262
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 10 deletions.
4 changes: 2 additions & 2 deletions close.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ func CloseStatus(err error) StatusCode {
// Close will unblock all goroutines interacting with the connection once
// complete.
func (c *Conn) Close(code StatusCode, reason string) error {
defer c.wgWait()
defer c.wg.Wait()
return c.closeHandshake(code, reason)
}

// CloseNow closes the WebSocket connection without attempting a close handshake.
// Use when you do not want the overhead of the close handshake.
func (c *Conn) CloseNow() (err error) {
defer c.wgWait()
defer c.wg.Wait()
defer errd.Wrap(&err, "failed to close WebSocket")

if c.isClosed() {
Expand Down
7 changes: 1 addition & 6 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ const (
type Conn struct {
noCopy

wg sync.WaitGroup

subprotocol string
rwc io.ReadWriteCloser
client bool
Expand All @@ -72,6 +70,7 @@ type Conn struct {
writeHeaderBuf [8]byte
writeHeader header

wg sync.WaitGroup
closed chan struct{}
closeMu sync.Mutex
closeErr error
Expand Down Expand Up @@ -310,7 +309,3 @@ func (c *Conn) wgGo(fn func()) {
fn()
}()
}

func (c *Conn) wgWait() {
c.wg.Wait()
}
20 changes: 18 additions & 2 deletions ws_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Conn struct {
// read limit for a message in bytes.
msgReadLimit xsync.Int64

wg sync.WaitGroup
closingMu sync.Mutex
isReadClosed xsync.Int64
closeOnce sync.Once
Expand Down Expand Up @@ -223,6 +224,7 @@ func (c *Conn) write(ctx context.Context, typ MessageType, p []byte) error {
// or the connection is closed.
// It thus performs the full WebSocket close handshake.
func (c *Conn) Close(code StatusCode, reason string) error {
defer c.wg.Wait()
err := c.exportedClose(code, reason)
if err != nil {
return fmt.Errorf("failed to close WebSocket: %w", err)
Expand All @@ -236,6 +238,7 @@ func (c *Conn) Close(code StatusCode, reason string) error {
// note: No different from Close(StatusGoingAway, "") in WASM as there is no way to close
// a WebSocket without the close handshake.
func (c *Conn) CloseNow() error {
defer c.wg.Wait()
return c.Close(StatusGoingAway, "")
}

Expand Down Expand Up @@ -388,10 +391,15 @@ func (c *Conn) CloseRead(ctx context.Context) context.Context {
c.isReadClosed.Store(1)

ctx, cancel := context.WithCancel(ctx)
c.wg.Add(1)
go func() {
defer c.CloseNow()
defer c.wg.Done()
defer cancel()
c.read(ctx)
c.Close(StatusPolicyViolation, "unexpected data message")
_, _, err := c.read(ctx)
if err != nil {
c.Close(StatusPolicyViolation, "unexpected data message")
}
}()
return ctx
}
Expand Down Expand Up @@ -580,3 +588,11 @@ func (m *mu) unlock() {
type noCopy struct{}

func (*noCopy) Lock() {}

func (c *Conn) wgGo(fn func()) {

Check failure on line 592 in ws_js.go

View workflow job for this annotation

GitHub Actions / lint

func (*Conn).wgGo is unused (U1000)

Check failure on line 592 in ws_js.go

View workflow job for this annotation

GitHub Actions / lint

func (*Conn).wgGo is unused (U1000)
c.wg.Add(1)
go func() {
defer c.wg.Done()
fn()
}()
}

0 comments on commit 77f7262

Please sign in to comment.