From 0a998074d8b97cdb6f3c491af25f891ec22eb0bd Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 14 Mar 2023 17:06:36 +0800 Subject: [PATCH 1/4] support nonblock WriteSyncer --- config.go | 3 +++ log.go | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/config.go b/config.go index 0d262cd..28f18f3 100644 --- a/config.go +++ b/config.go @@ -70,6 +70,9 @@ type Config struct { // If this field is not set, the internal logger errors will be sent to the same file as in File field. // Note: if we want to output the logger errors to stderr, we can just set this field to "stderr" ErrorOutputPath string `toml:"error-output-path" json:"error-output-path"` + // Nonblock makes the write operation nonblock. + // Note: the log might loss if the write operation would block, and error is ignored. + Nonblock bool `toml:"nonblock" json:"nonblock"` } // ZapProperties records some information about zap. diff --git a/log.go b/log.go index 040ec6d..8773c37 100644 --- a/log.go +++ b/log.go @@ -97,6 +97,11 @@ func InitLoggerWithWriteSyncer(cfg *Config, output, errOutput zapcore.WriteSynce if err != nil { return nil, nil, err } + if cfg.Nonblock { + output = Nonblock(output) + errOutput = Nonblock(errOutput) + } + core := NewTextCore(encoder, output, level) opts = append(cfg.buildOptions(errOutput), opts...) lg := zap.New(core, opts...) @@ -108,6 +113,50 @@ func InitLoggerWithWriteSyncer(cfg *Config, output, errOutput zapcore.WriteSynce return lg, r, nil } +// Nonblock wraps a WriteSyncer make it safe for concurrent use, just like zapcore.Lock() +func Nonblock(ws zapcore.WriteSyncer) zapcore.WriteSyncer { + r := &nonblockWrapper{ + ws: ws, + writeCh: make(chan []byte, 4096), + syncCh: make(chan struct{}, 512), + } + go r.bgWorkLoop() + return r +} + +type nonblockWrapper struct { + ws zapcore.WriteSyncer + writeCh chan []byte + syncCh chan struct{} +} + +func (s *nonblockWrapper) bgWorkLoop() { + for { + select { + case bs := <-s.writeCh: + s.ws.Write(bs) + case <-s.syncCh: + s.ws.Sync() + } + } +} + +func (s *nonblockWrapper) Write(bs []byte) (int, error) { + select { + case s.writeCh <- bs: + default: + } + return len(bs), nil +} + +func (s *nonblockWrapper) Sync() error { + select { + case s.syncCh <- struct{}{}: + default: + } + return nil +} + // initFileLog initializes file based logging options. func initFileLog(cfg *FileLogConfig) (*lumberjack.Logger, error) { if st, err := os.Stat(cfg.Filename); err == nil { From bed80f5069964521fec8c7cd34f978fd1ea422b6 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 15 Mar 2023 23:43:29 +0800 Subject: [PATCH 2/4] change nonblock to timeout panic --- config.go | 6 ++--- log.go | 69 +++++++++++++++++++++++++++++++---------------------- log_test.go | 30 +++++++++++++++++++++++ 3 files changed, 74 insertions(+), 31 deletions(-) diff --git a/config.go b/config.go index 28f18f3..2d581d1 100644 --- a/config.go +++ b/config.go @@ -70,9 +70,9 @@ type Config struct { // If this field is not set, the internal logger errors will be sent to the same file as in File field. // Note: if we want to output the logger errors to stderr, we can just set this field to "stderr" ErrorOutputPath string `toml:"error-output-path" json:"error-output-path"` - // Nonblock makes the write operation nonblock. - // Note: the log might loss if the write operation would block, and error is ignored. - Nonblock bool `toml:"nonblock" json:"nonblock"` + // Timeout for writing log, if TiDB hang on writing log, make it panic. + // The value is seconds, 0 means no timeout + Timeout int `toml:"timeout" json:"timeout"` } // ZapProperties records some information about zap. diff --git a/log.go b/log.go index 8773c37..5f5205f 100644 --- a/log.go +++ b/log.go @@ -19,6 +19,7 @@ import ( "os" "sync" "sync/atomic" + "time" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -97,9 +98,9 @@ func InitLoggerWithWriteSyncer(cfg *Config, output, errOutput zapcore.WriteSynce if err != nil { return nil, nil, err } - if cfg.Nonblock { - output = Nonblock(output) - errOutput = Nonblock(errOutput) + if cfg.Timeout > 0 { + output = LockWithTimeout(output, cfg.Timeout) + errOutput = LockWithTimeout(errOutput, cfg.Timeout) } core := NewTextCore(encoder, output, level) @@ -113,48 +114,60 @@ func InitLoggerWithWriteSyncer(cfg *Config, output, errOutput zapcore.WriteSynce return lg, r, nil } -// Nonblock wraps a WriteSyncer make it safe for concurrent use, just like zapcore.Lock() -func Nonblock(ws zapcore.WriteSyncer) zapcore.WriteSyncer { - r := &nonblockWrapper{ +// LockWithTimeout wraps a WriteSyncer make it safe for concurrent use, just like zapcore.Lock() +// timeout seconds. +func LockWithTimeout(ws zapcore.WriteSyncer, timeout int) zapcore.WriteSyncer { + r := &lockWithTimeoutWrapper{ ws: ws, - writeCh: make(chan []byte, 4096), - syncCh: make(chan struct{}, 512), + lock: make(chan struct{}, 1), + t: time.NewTicker(time.Second), + timeout: timeout, } - go r.bgWorkLoop() return r } -type nonblockWrapper struct { +type lockWithTimeoutWrapper struct { ws zapcore.WriteSyncer - writeCh chan []byte - syncCh chan struct{} + lock chan struct{} + t *time.Ticker + timeout int } -func (s *nonblockWrapper) bgWorkLoop() { - for { +// getLockOrBlock returns true when get block success, and false otherwise. +func (s *lockWithTimeoutWrapper) getLockOrBlock() bool { + for i := 0; i < s.timeout; { select { - case bs := <-s.writeCh: - s.ws.Write(bs) - case <-s.syncCh: - s.ws.Sync() + case s.lock <- struct{}{}: + return true + case <-s.t.C: + i++ } } + return false } -func (s *nonblockWrapper) Write(bs []byte) (int, error) { - select { - case s.writeCh <- bs: - default: +func (s *lockWithTimeoutWrapper) unlock() { + <-s.lock +} + +func (s *lockWithTimeoutWrapper) Write(bs []byte) (int, error) { + succ := s.getLockOrBlock() + if !succ { + panic("write log hang") } - return len(bs), nil + defer s.unlock() + + return s.ws.Write(bs) } -func (s *nonblockWrapper) Sync() error { - select { - case s.syncCh <- struct{}{}: - default: +func (s *lockWithTimeoutWrapper) Sync() error { + succ := s.getLockOrBlock() + if !succ { + panic("sync log hang") } - return nil + defer s.unlock() + + return s.ws.Sync() } // initFileLog initializes file based logging options. diff --git a/log_test.go b/log_test.go index 58c7a37..a3b6161 100644 --- a/log_test.go +++ b/log_test.go @@ -125,3 +125,33 @@ type testingSink struct { // method is provided by the embedded buffer. func (s *testingSink) Close() error { return nil } func (s *testingSink) Sync() error { return nil } + +func TestTimeout(t *testing.T) { + sink := &testingSink{new(bytes.Buffer)} + ws := LockWithTimeout(sink, 3) + ws.Write([]byte("abc")) + ws.Sync() + require.Contains(t, sink.String(), `abc`) + + var h hang + ws = LockWithTimeout(zapcore.AddSync(h), 3) + panicCh := make(chan struct{}, 2) + for i := 0; i < 2; i++ { + go func() { + defer func() { + if x := recover(); x != nil { + panicCh <- struct{}{} + } + }() + ws.Write([]byte("abc")) // This should not make the caller hang + }() + } + <-panicCh +} + +type hang struct{} + +func (_ hang) Write(_ []byte) (int, error) { + <-make(chan struct{}) // block forever + return 0, nil +} From 5a7442f6bf79ccd2310abc98323d2cbf5eaa1aa0 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 16 Mar 2023 00:58:53 +0800 Subject: [PATCH 3/4] Update log.go --- log.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/log.go b/log.go index 5f5205f..5d70e2f 100644 --- a/log.go +++ b/log.go @@ -133,7 +133,7 @@ type lockWithTimeoutWrapper struct { timeout int } -// getLockOrBlock returns true when get block success, and false otherwise. +// getLockOrBlock returns true when get lock success, and false otherwise. func (s *lockWithTimeoutWrapper) getLockOrBlock() bool { for i := 0; i < s.timeout; { select { From 92baffd073e4ea3900125c046fa06c29d1b67b1c Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 16 Mar 2023 19:35:08 +0800 Subject: [PATCH 4/4] address comment --- log.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/log.go b/log.go index 5d70e2f..01edeb8 100644 --- a/log.go +++ b/log.go @@ -16,6 +16,7 @@ package log import ( "errors" + "fmt" "os" "sync" "sync/atomic" @@ -153,7 +154,7 @@ func (s *lockWithTimeoutWrapper) unlock() { func (s *lockWithTimeoutWrapper) Write(bs []byte) (int, error) { succ := s.getLockOrBlock() if !succ { - panic("write log hang") + panic(fmt.Sprintf("Timeout of %ds when trying to write log", s.timeout)) } defer s.unlock() @@ -163,7 +164,7 @@ func (s *lockWithTimeoutWrapper) Write(bs []byte) (int, error) { func (s *lockWithTimeoutWrapper) Sync() error { succ := s.getLockOrBlock() if !succ { - panic("sync log hang") + panic(fmt.Sprintf("Timeout of %ds when trying to sync log", s.timeout)) } defer s.unlock()