Skip to content

Commit

Permalink
Fix liveness goroutine leak in TaskListManager (#6563)
Browse files Browse the repository at this point in the history
What changed?

Liveness.Stop should not call shutdownFn because there is not IsLive check. Even adding IsLive check in Stop would be confusing since the function literally means that shut down liveness only.
Liveness.eventLoop should not call Stop() which caused the deadlock (cyclic wait).
Why?

Liveness can cause deadlock which caused a goroutine leak

How did you test it?
Unit Test with go routine leak detector
  • Loading branch information
shijiesheng authored Dec 16, 2024
1 parent c2a8701 commit 1b1afb9
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
7 changes: 5 additions & 2 deletions service/matching/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type (

var _ common.Daemon = (*Liveness)(nil)

// NewLiveness creates a Liveness daemon that calls the broadcastShutdownFn if it does not receive MarkAlive() within ttl
// NOTE: livesness needs to be stopped explicitly to avoid go routine leak
func NewLiveness(timeSource clock.TimeSource, ttl time.Duration, broadcastShutdownFn func()) *Liveness {
return &Liveness{
status: common.DaemonStatusInitialized,
Expand All @@ -72,13 +74,13 @@ func (l *Liveness) Start() {
go l.eventLoop(checkTimer)
}

// Stop ONLY shuts down liveness does not block on broadcastShutdownFn
func (l *Liveness) Stop() {
if !atomic.CompareAndSwapInt32(&l.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}

close(l.stopCh)
l.broadcastShutdownFn()
l.wg.Wait()
}

Expand All @@ -90,7 +92,8 @@ func (l *Liveness) eventLoop(ticker clock.Ticker) {
select {
case <-ticker.Chan():
if !l.IsAlive() {
l.Stop()
go l.broadcastShutdownFn() // do not block shutdown
return
}

case <-l.stopCh:
Expand Down
5 changes: 4 additions & 1 deletion service/matching/liveness/liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ func (s *livenessSuite) SetupTest() {
s.ttl = 500 * time.Millisecond
s.timeSource = clock.NewMockedTimeSource()
s.shutdownFlag = 0
s.liveness = NewLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
s.liveness = NewLiveness(s.timeSource, s.ttl, func() {
atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1)
s.liveness.Stop()
})
}

func (s *livenessSuite) TestIsAlive_No() {
Expand Down
2 changes: 2 additions & 0 deletions service/matching/tasklist/task_list_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
"go.uber.org/goleak"
"golang.org/x/sync/errgroup"

"github.com/uber/cadence/client/history"
Expand Down Expand Up @@ -300,6 +301,7 @@ func TestDescribeTaskList(t *testing.T) {
}

func TestCheckIdleTaskList(t *testing.T) {
defer goleak.VerifyNone(t)
cfg := config.NewConfig(dynamicconfig.NewNopCollection(), "some random hostname", getIsolationgroupsHelper)
cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)

Expand Down

0 comments on commit 1b1afb9

Please sign in to comment.