Skip to content

Commit

Permalink
Merge pull request #699 from cameron-p-m/fix/allow-setting-logger
Browse files Browse the repository at this point in the history
Allow logger override
  • Loading branch information
lance6716 authored Jun 11, 2022
2 parents 33ea963 + f703fba commit 62e8407
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 48 deletions.
12 changes: 6 additions & 6 deletions canal/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/go-mysql-org/go-mysql/schema"
"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/siddontang/go-log/log"
)

// Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc...
Expand Down Expand Up @@ -72,7 +71,7 @@ func NewCanal(cfg *Config) (*Canal, error) {
if c.cfg.DiscardNoMetaRowEvent {
c.errorTablesGetTime = make(map[string]time.Time)
}
c.master = &masterInfo{}
c.master = &masterInfo{logger: c.cfg.Logger}

c.delay = new(uint32)

Expand Down Expand Up @@ -222,14 +221,14 @@ func (c *Canal) run() error {
close(c.dumpDoneCh)

if err != nil {
log.Errorf("canal dump mysql err: %v", err)
c.cfg.Logger.Errorf("canal dump mysql err: %v", err)
return errors.Trace(err)
}
}

if err := c.runSyncBinlog(); err != nil {
if errors.Cause(err) != context.Canceled {
log.Errorf("canal start sync binlog err: %v", err)
c.cfg.Logger.Errorf("canal start sync binlog err: %v", err)
return errors.Trace(err)
}
}
Expand All @@ -238,7 +237,7 @@ func (c *Canal) run() error {
}

func (c *Canal) Close() {
log.Infof("closing canal")
c.cfg.Logger.Infof("closing canal")
c.m.Lock()
defer c.m.Unlock()

Expand Down Expand Up @@ -353,7 +352,7 @@ func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
c.errorTablesGetTime[key] = time.Now()
c.tableLock.Unlock()
// log error and return ErrMissingTableMeta
log.Errorf("canal get table meta err: %v", errors.Trace(err))
c.cfg.Logger.Errorf("canal get table meta err: %v", errors.Trace(err))
return nil, schema.ErrMissingTableMeta
}
return nil, err
Expand Down Expand Up @@ -427,6 +426,7 @@ func (c *Canal) prepareSyncer() error {
DisableRetrySync: c.cfg.DisableRetrySync,
TimestampStringLocation: c.cfg.TimestampStringLocation,
TLSConfig: c.cfg.TLSConfig,
Logger: c.cfg.Logger,
}

if strings.Contains(c.cfg.Addr, "/") {
Expand Down
8 changes: 8 additions & 0 deletions canal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"crypto/tls"
"io/ioutil"
"math/rand"
"os"
"time"

"github.com/BurntSushi/toml"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/errors"
"github.com/siddontang/go-log/log"
)

type DumpConfig struct {
Expand Down Expand Up @@ -86,6 +88,9 @@ type Config struct {

// Set TLS config
TLSConfig *tls.Config

//Set Logger
Logger *log.Logger
}

func NewConfigWithFile(name string) (*Config, error) {
Expand Down Expand Up @@ -124,5 +129,8 @@ func NewDefaultConfig() *Config {
c.Dump.DiscardErr = true
c.Dump.SkipMasterData = false

streamHandler, _ := log.NewStreamHandler(os.Stdout)
c.Logger = log.NewDefault(streamHandler)

return c
}
13 changes: 6 additions & 7 deletions canal/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/go-mysql-org/go-mysql/schema"
"github.com/pingcap/errors"
"github.com/shopspring/decimal"
"github.com/siddontang/go-log/log"
)

type dumpParseHandler struct {
Expand Down Expand Up @@ -49,7 +48,7 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error
e == schema.ErrMissingTableMeta {
return nil
}
log.Errorf("get %s.%s information err: %v", db, table, err)
h.c.cfg.Logger.Errorf("get %s.%s information err: %v", db, table, err)
return errors.Trace(err)
}

Expand Down Expand Up @@ -163,13 +162,13 @@ func (c *Canal) dump() error {
if err != nil {
return errors.Trace(err)
}
log.Infof("skip master data, get current binlog position %v", pos)
c.cfg.Logger.Infof("skip master data, get current binlog position %v", pos)
h.name = pos.Name
h.pos = uint64(pos.Pos)
}

start := time.Now()
log.Info("try dump MySQL and parse")
c.cfg.Logger.Info("try dump MySQL and parse")
if err := c.dumper.DumpAndParse(h); err != nil {
return errors.Trace(err)
}
Expand All @@ -185,7 +184,7 @@ func (c *Canal) dump() error {
c.master.UpdateGTIDSet(h.gset)
startPos = h.gset
}
log.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at %s",
c.cfg.Logger.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at %s",
time.Since(start).Seconds(), startPos)
return nil
}
Expand All @@ -196,12 +195,12 @@ func (c *Canal) tryDump() error {
if (len(pos.Name) > 0 && pos.Pos > 0) ||
(gset != nil && gset.String() != "") {
// we will sync with binlog name and position
log.Infof("skip dump, use last binlog replication pos %s or GTID set %v", pos, gset)
c.cfg.Logger.Infof("skip dump, use last binlog replication pos %s or GTID set %v", pos, gset)
return nil
}

if c.dumper == nil {
log.Info("skip dump, no mysqldump")
c.cfg.Logger.Info("skip dump, no mysqldump")
return nil
}

Expand Down
8 changes: 5 additions & 3 deletions canal/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,28 @@ type masterInfo struct {
gset mysql.GTIDSet

timestamp uint32

logger *log.Logger
}

func (m *masterInfo) Update(pos mysql.Position) {
log.Debugf("update master position %s", pos)
m.logger.Debugf("update master position %s", pos)

m.Lock()
m.pos = pos
m.Unlock()
}

func (m *masterInfo) UpdateTimestamp(ts uint32) {
log.Debugf("update master timestamp %d", ts)
m.logger.Debugf("update master timestamp %d", ts)

m.Lock()
m.timestamp = ts
m.Unlock()
}

func (m *masterInfo) UpdateGTIDSet(gset mysql.GTIDSet) {
log.Debugf("update master gtid set %s", gset)
m.logger.Debugf("update master gtid set %s", gset)

m.Lock()
m.gset = gset
Expand Down
18 changes: 9 additions & 9 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/siddontang/go-log/log"
)

func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
Expand All @@ -22,15 +21,15 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
if err != nil {
return nil, errors.Errorf("start sync replication at binlog %v error %v", pos, err)
}
log.Infof("start sync binlog at binlog file %v", pos)
c.cfg.Logger.Infof("start sync binlog at binlog file %v", pos)
return s, nil
} else {
gsetClone := gset.Clone()
s, err := c.syncer.StartSyncGTID(gset)
if err != nil {
return nil, errors.Errorf("start sync replication at GTID set %v error %v", gset, err)
}
log.Infof("start sync binlog at GTID set %v", gsetClone)
c.cfg.Logger.Infof("start sync binlog at GTID set %v", gsetClone)
return s, nil
}
}
Expand Down Expand Up @@ -65,7 +64,7 @@ func (c *Canal) runSyncBinlog() error {
switch e := ev.Event.(type) {
case *replication.RotateEvent:
fakeRotateLogName = string(e.NextLogName)
log.Infof("received fake rotate event, next log name is %s", e.NextLogName)
c.cfg.Logger.Infof("received fake rotate event, next log name is %s", e.NextLogName)
}

continue
Expand All @@ -76,6 +75,7 @@ func (c *Canal) runSyncBinlog() error {
pos := c.master.Position()

curPos := pos.Pos

// next binlog pos
pos.Pos = ev.Header.LogPos

Expand All @@ -92,7 +92,7 @@ func (c *Canal) runSyncBinlog() error {
case *replication.RotateEvent:
pos.Name = string(e.NextLogName)
pos.Pos = uint32(e.Position)
log.Infof("rotate binlog to %s", pos)
c.cfg.Logger.Infof("rotate binlog to %s", pos)
savePos = true
force = true
if err = c.eventHandler.OnRotate(e); err != nil {
Expand All @@ -107,7 +107,7 @@ func (c *Canal) runSyncBinlog() error {
if e != ErrExcludedTable &&
e != schema.ErrTableNotExist &&
e != schema.ErrMissingTableMeta {
log.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
return errors.Trace(err)
}
}
Expand Down Expand Up @@ -142,7 +142,7 @@ func (c *Canal) runSyncBinlog() error {
case *replication.QueryEvent:
stmts, _, err := c.parser.Parse(string(e.Query), "", "")
if err != nil {
log.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
continue
}
for _, stmt := range stmts {
Expand Down Expand Up @@ -230,7 +230,7 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) {

func (c *Canal) updateTable(db, table string) (err error) {
c.ClearTableCache([]byte(db), []byte(table))
log.Infof("table structure changed, clear table cache: %s.%s\n", db, table)
c.cfg.Logger.Infof("table structure changed, clear table cache: %s.%s\n", db, table)
if err = c.eventHandler.OnTableChanged(db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist {
return errors.Trace(err)
}
Expand Down Expand Up @@ -291,7 +291,7 @@ func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error {
if curPos.Compare(pos) >= 0 {
return nil
} else {
log.Debugf("master pos is %v, wait catching %v", curPos, pos)
c.cfg.Logger.Debugf("master pos is %v, wait catching %v", curPos, pos)
time.Sleep(100 * time.Millisecond)
}
}
Expand Down
Loading

0 comments on commit 62e8407

Please sign in to comment.