Skip to content

Commit

Permalink
support lock with timeout WriteSyncer
Browse files Browse the repository at this point in the history
  • Loading branch information
bb7133 authored Mar 17, 2023
2 parents 734d527 + 92baffd commit a0d097d
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 0 deletions.
3 changes: 3 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type Config struct {
// If this field is not set, the internal logger errors will be sent to the same file as in File field.
// Note: if we want to output the logger errors to stderr, we can just set this field to "stderr"
ErrorOutputPath string `toml:"error-output-path" json:"error-output-path"`
// Timeout for writing log, if TiDB hang on writing log, make it panic.
// The value is seconds, 0 means no timeout
Timeout int `toml:"timeout" json:"timeout"`
}

// ZapProperties records some information about zap.
Expand Down
63 changes: 63 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package log

import (
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -97,6 +99,11 @@ func InitLoggerWithWriteSyncer(cfg *Config, output, errOutput zapcore.WriteSynce
if err != nil {
return nil, nil, err
}
if cfg.Timeout > 0 {
output = LockWithTimeout(output, cfg.Timeout)
errOutput = LockWithTimeout(errOutput, cfg.Timeout)
}

core := NewTextCore(encoder, output, level)
opts = append(cfg.buildOptions(errOutput), opts...)
lg := zap.New(core, opts...)
Expand All @@ -108,6 +115,62 @@ func InitLoggerWithWriteSyncer(cfg *Config, output, errOutput zapcore.WriteSynce
return lg, r, nil
}

// LockWithTimeout wraps a WriteSyncer make it safe for concurrent use, just like zapcore.Lock()
// timeout seconds.
func LockWithTimeout(ws zapcore.WriteSyncer, timeout int) zapcore.WriteSyncer {
r := &lockWithTimeoutWrapper{
ws: ws,
lock: make(chan struct{}, 1),
t: time.NewTicker(time.Second),
timeout: timeout,
}
return r
}

type lockWithTimeoutWrapper struct {
ws zapcore.WriteSyncer
lock chan struct{}
t *time.Ticker
timeout int
}

// getLockOrBlock returns true when get lock success, and false otherwise.
func (s *lockWithTimeoutWrapper) getLockOrBlock() bool {
for i := 0; i < s.timeout; {
select {
case s.lock <- struct{}{}:
return true
case <-s.t.C:
i++
}
}
return false
}

func (s *lockWithTimeoutWrapper) unlock() {
<-s.lock
}

func (s *lockWithTimeoutWrapper) Write(bs []byte) (int, error) {
succ := s.getLockOrBlock()
if !succ {
panic(fmt.Sprintf("Timeout of %ds when trying to write log", s.timeout))
}
defer s.unlock()

return s.ws.Write(bs)
}

func (s *lockWithTimeoutWrapper) Sync() error {
succ := s.getLockOrBlock()
if !succ {
panic(fmt.Sprintf("Timeout of %ds when trying to sync log", s.timeout))
}
defer s.unlock()

return s.ws.Sync()
}

// initFileLog initializes file based logging options.
func initFileLog(cfg *FileLogConfig) (*lumberjack.Logger, error) {
if st, err := os.Stat(cfg.Filename); err == nil {
Expand Down
30 changes: 30 additions & 0 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,33 @@ type testingSink struct {
// method is provided by the embedded buffer.
func (s *testingSink) Close() error { return nil }
func (s *testingSink) Sync() error { return nil }

func TestTimeout(t *testing.T) {
sink := &testingSink{new(bytes.Buffer)}
ws := LockWithTimeout(sink, 3)
ws.Write([]byte("abc"))
ws.Sync()
require.Contains(t, sink.String(), `abc`)

var h hang
ws = LockWithTimeout(zapcore.AddSync(h), 3)
panicCh := make(chan struct{}, 2)
for i := 0; i < 2; i++ {
go func() {
defer func() {
if x := recover(); x != nil {
panicCh <- struct{}{}
}
}()
ws.Write([]byte("abc")) // This should not make the caller hang
}()
}
<-panicCh
}

type hang struct{}

func (_ hang) Write(_ []byte) (int, error) {
<-make(chan struct{}) // block forever
return 0, nil
}

0 comments on commit a0d097d

Please sign in to comment.