Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix sync-ddl control #1130

Merged
merged 2 commits into from
Jan 19, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 33 additions & 32 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,49 +456,50 @@ ForLoop:
continue
}

var (
shouldSkip bool
schema, table string
)
sql := b.job.Query
var schema, table string
schema, table, err = s.schema.getSchemaTableAndDelete(b.job.BinlogInfo.SchemaVersion)
if err != nil {
err = errors.Trace(err)
break ForLoop
}

if s.filter.SkipSchemaAndTable(schema, table) {
log.Info("skip ddl by block allow filter", zap.String("schema", schema), zap.String("table", table),
zap.String("sql", sql), zap.Int64("commit ts", commitTS))
appendFakeBinlogIfNeeded(nil, commitTS)
continue
}

// shouldSkip is used specially for database dsyncers like tidb/mysql/oracle
// although we skip some ddls, but we still need to update table info
// ignore means whether we should should this ddl event after binlogFilter
var (
shouldSkip, ignore bool
stmt ast.StmtNode
)

if stmt, ignore, err = skipDDLEvent(sql, schema, table, p, s.binlogFilter); err != nil {
break ForLoop
} else if ignore {
log.Info("skip ddl by binlog event filter", zap.String("schema", schema), zap.String("table", table),
zap.String("sql", sql), zap.Int64("commit ts", commitTS))
// A empty sql force it to evict the downstream table info.
if s.cfg.DestDBType == "tidb" || s.cfg.DestDBType == "mysql" || s.cfg.DestDBType == "oracle" {
shouldSkip = true
} else {
if s.cfg.SyncDDL {
if s.filter.SkipSchemaAndTable(schema, table) {
log.Info("skip ddl by block allow filter", zap.String("schema", schema), zap.String("table", table),
zap.String("sql", sql), zap.Int64("commit ts", commitTS))
appendFakeBinlogIfNeeded(nil, commitTS)
continue
}
} else if !ignore && s.cfg.DestDBType == "oracle" {
if _, ok := stmt.(*ast.TruncateTableStmt); !ok {
err = errors.Errorf("unsupported ddl %s, you should skip commit ts %d", sql, commitTS)

// shouldSkip is used specially for database dsyncers like tidb/mysql/oracle
// although we skip some ddls, but we still need to update table info
// ignore means whether we should should this ddl event after binlogFilter
var (
ignore bool
stmt ast.StmtNode
)
if stmt, ignore, err = skipDDLEvent(sql, schema, table, p, s.binlogFilter); err != nil {
break ForLoop
} else if ignore {
log.Info("skip ddl by binlog event filter", zap.String("schema", schema), zap.String("table", table),
zap.String("sql", sql), zap.Int64("commit ts", commitTS))
// A empty sql force it to evict the downstream table info.
if s.cfg.DestDBType == "tidb" || s.cfg.DestDBType == "mysql" || s.cfg.DestDBType == "oracle" {
shouldSkip = true
} else {
appendFakeBinlogIfNeeded(nil, commitTS)
continue
}
} else if !ignore && s.cfg.DestDBType == "oracle" {
if _, ok := stmt.(*ast.TruncateTableStmt); !ok {
err = errors.Errorf("unsupported ddl %s, you should skip commit ts %d", sql, commitTS)
break ForLoop
}
}
}

if !s.cfg.SyncDDL {
} else {
log.Info("skip ddl by SyncDDL setting to false", zap.String("schema", schema), zap.String("table", table),
zap.String("sql", sql), zap.Int64("commit ts", commitTS))
// A empty sql force it to evict the downstream table info.
Expand Down