Skip to content

Commit

Permalink
feat(utils): add goroutine lock
Browse files Browse the repository at this point in the history
  • Loading branch information
sainnhe committed Jan 10, 2025
1 parent 7306cc8 commit 6d8f536
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 1 deletion.
23 changes: 23 additions & 0 deletions internal/utils/goroutine_lock/goroutine_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package goroutinelock implements goroutine locks.
package goroutinelock

import "sync"

// GoroutineWg is used to implement goroutine locks.
var GoroutineWg = &sync.WaitGroup{}
5 changes: 4 additions & 1 deletion pkg/remote/trans/netpollmux/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/cloudwego/netpoll"

goroutinelock "github.com/cloudwego/kitex/internal/utils/goroutine_lock"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/gofunc"
"github.com/cloudwego/kitex/pkg/kerrors"
Expand Down Expand Up @@ -365,7 +366,9 @@ func (t *svrTransHandler) GracefulShutdown(ctx context.Context) error {
}
return true
})
// 4. waiting all crrst packets received by client
// 4. waiting for goroutine locks to be released
goroutinelock.GoroutineWg.Wait()
// 5. waiting all crrst packets received by client
deadline := time.Now().Add(defaultExitWaitGracefulShutdownTime)
ticker := time.NewTicker(defaultExitWaitGracefulShutdownTime / 10)
defer ticker.Stop()
Expand Down
4 changes: 4 additions & 0 deletions pkg/remote/trans/nphttp2/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/cloudwego/netpoll"

goroutinelock "github.com/cloudwego/kitex/internal/utils/goroutine_lock"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/gofunc"
"github.com/cloudwego/kitex/pkg/kerrors"
Expand Down Expand Up @@ -416,6 +417,9 @@ func (t *svrTransHandler) GracefulShutdown(ctx context.Context) error {
svrTrans := elem.Value.(*SvrTrans)
svrTrans.tr.Close()
}
// Goroutine Locks must wait after all connections are closed, otherwise new connections might be created
// while waiting for goroutine locks.
goroutinelock.GoroutineWg.Wait()
t.mu.Unlock()
return nil
}
Expand Down
31 changes: 31 additions & 0 deletions pkg/utils/goroutine_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2021 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package utils

import goroutinelock "github.com/cloudwego/kitex/internal/utils/goroutine_lock"

// GoroutineLock locks the goroutine so that graceful shutdown will wait until the lock is released
func GoroutineLock() {
goroutinelock.GoroutineWg.Add(1)
}

// GoroutineUnlock unlocks the goroutine to allow graceful shutdown to continue.
// NOTE: This function should be executed using defer to avoid panic in the middle and causing the lock to not be
// released.
func GoroutineUnlock() {
goroutinelock.GoroutineWg.Done()
}
40 changes: 40 additions & 0 deletions pkg/utils/goroutine_lock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2021 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package utils_test

import (
"testing"
"time"

"github.com/cloudwego/kitex/internal/utils/goroutine_lock"

Check failure on line 23 in pkg/utils/goroutine_lock_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not properly formatted (goimports)
"github.com/cloudwego/kitex/pkg/utils"
)

func TestGoroutineLockAndUnlock(t *testing.T) {
t.Parallel()
startTime := time.Now()
go func() {
utils.GoroutineLock()
time.Sleep(time.Second)
utils.GoroutineUnlock()
}()
goroutinelock.GoroutineWg.Wait()
diff := time.Since(startTime)
if diff < time.Second {
t.Errorf("Expect diff >= 1s, get %v", diff)
}
}

0 comments on commit 6d8f536

Please sign in to comment.