Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drainer:sync lob data to oracle #1158

Merged
merged 7 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions drainer/sync/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,15 @@ func CreateLoader(
enableDispatch bool,
enableCausility bool,
) (ld loader.Loader, err error) {

destDBTypeInt := loader.DBTypeUnknown
if destDBType == "oracle" {
destDBTypeInt = loader.OracleDB
} else if destDBType == "tidb" || destDBType == "mysql" {
destDBTypeInt = loader.MysqlDB
}
var opts []loader.Option
opts = append(opts, loader.DestinationDBType(destDBType), loader.WorkerCount(worker), loader.BatchSize(batchSize),
loader.SaveAppliedTS(destDBType == "tidb" || destDBType == "oracle"), loader.SetloopBackSyncInfo(info))
opts = append(opts, loader.DestinationDBType(destDBTypeInt), loader.WorkerCount(worker), loader.BatchSize(batchSize),
loader.SaveAppliedTS(destDBTypeInt == loader.MysqlDB || destDBTypeInt == loader.OracleDB), loader.SetloopBackSyncInfo(info))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From L60-63, the destDBTypeInt is either MysqlDB or OracleDB, then destDBTypeInt == loader.MysqlDB || destDBTypeInt == loader.OracleDB is always true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when downstream is mysql or oracle DB, we need to save appliedTS, if it is not , we do not need to save appliedTS.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic has been changed. When destDBType is mysql we shouldn't save applied ts while for tidb we should save.

if queryHistogramVec != nil {
opts = append(opts, loader.Metrics(&loader.MetricsGroup{
QueryHistogramVec: queryHistogramVec,
Expand Down
76 changes: 49 additions & 27 deletions drainer/translator/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

const implicitColID = -1

func genDBInsert(schema string, ptable, table *model.TableInfo, row []byte) (names []string, args []interface{}, err error) {
func genDBInsert(schema string, ptable, table *model.TableInfo, row []byte, destDBType int) (names []string, args []interface{}, err error) {
columns := writableColumns(table)

columnValues, err := insertRowToDatums(table, row)
Expand All @@ -46,7 +46,7 @@ func genDBInsert(schema string, ptable, table *model.TableInfo, row []byte) (nam
val = getDefaultOrZeroValue(ptable, col)
}

value, err := formatData(val, col.FieldType)
value, err := formatData(val, col.FieldType, destDBType)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand All @@ -58,7 +58,7 @@ func genDBInsert(schema string, ptable, table *model.TableInfo, row []byte) (nam
return names, args, nil
}

func genDBUpdate(schema string, ptable, table *model.TableInfo, row []byte, canAppendDefaultValue bool) (names []string, values []interface{}, oldValues []interface{}, err error) {
func genDBUpdate(schema string, ptable, table *model.TableInfo, row []byte, canAppendDefaultValue bool, destDBType int) (names []string, values []interface{}, oldValues []interface{}, err error) {
columns := writableColumns(table)
updtDecoder := newUpdateDecoder(ptable, table, canAppendDefaultValue)

Expand All @@ -69,12 +69,12 @@ func genDBUpdate(schema string, ptable, table *model.TableInfo, row []byte, canA
return nil, nil, nil, errors.Annotatef(err, "table `%s`.`%s`", schema, table.Name)
}

_, oldValues, err = generateColumnAndValue(columns, oldColumnValues)
_, oldValues, err = generateColumnAndValue(columns, oldColumnValues, destDBType)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}

updateColumns, values, err = generateColumnAndValue(columns, newColumnValues)
updateColumns, values, err = generateColumnAndValue(columns, newColumnValues, destDBType)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
Expand All @@ -84,7 +84,7 @@ func genDBUpdate(schema string, ptable, table *model.TableInfo, row []byte, canA
return
}

func genDBDelete(schema string, table *model.TableInfo, row []byte) (names []string, values []interface{}, err error) {
func genDBDelete(schema string, table *model.TableInfo, row []byte, destDBType int) (names []string, values []interface{}, err error) {
columns := table.Columns
colsTypeMap := util.ToColumnTypeMap(columns)

Expand All @@ -93,7 +93,7 @@ func genDBDelete(schema string, table *model.TableInfo, row []byte) (names []str
return nil, nil, errors.Trace(err)
}

columns, values, err = generateColumnAndValue(columns, columnValues)
columns, values, err = generateColumnAndValue(columns, columnValues, destDBType)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down Expand Up @@ -144,33 +144,35 @@ func TiBinlogToTxn(infoGetter TableInfoGetter, schema string, table string, tiBi

switch mutType {
case tipb.MutationType_Insert:
names, args, err := genDBInsert(schema, pinfo, info, row)
names, args, err := genDBInsert(schema, pinfo, info, row, loader.MysqlDB)
if err != nil {
return nil, errors.Annotate(err, "gen insert fail")
}

dml := &loader.DML{
Tp: loader.InsertDMLType,
Database: schema,
Table: table,
Values: make(map[string]interface{}),
Tp: loader.InsertDMLType,
Database: schema,
Table: table,
Values: make(map[string]interface{}),
DestDBType: loader.MysqlDB,
}
txn.DMLs = append(txn.DMLs, dml)
for i, name := range names {
dml.Values[name] = args[i]
}
case tipb.MutationType_Update:
names, args, oldArgs, err := genDBUpdate(schema, pinfo, info, row, canAppendDefaultValue)
names, args, oldArgs, err := genDBUpdate(schema, pinfo, info, row, canAppendDefaultValue, loader.MysqlDB)
if err != nil {
return nil, errors.Annotate(err, "gen update fail")
}

dml := &loader.DML{
Tp: loader.UpdateDMLType,
Database: schema,
Table: table,
Values: make(map[string]interface{}),
OldValues: make(map[string]interface{}),
Tp: loader.UpdateDMLType,
Database: schema,
Table: table,
Values: make(map[string]interface{}),
OldValues: make(map[string]interface{}),
DestDBType: loader.MysqlDB,
}
txn.DMLs = append(txn.DMLs, dml)
for i, name := range names {
Expand All @@ -179,16 +181,17 @@ func TiBinlogToTxn(infoGetter TableInfoGetter, schema string, table string, tiBi
}

case tipb.MutationType_DeleteRow:
names, args, err := genDBDelete(schema, info, row)
names, args, err := genDBDelete(schema, info, row, loader.MysqlDB)
if err != nil {
return nil, errors.Annotate(err, "gen delete fail")
}

dml := &loader.DML{
Tp: loader.DeleteDMLType,
Database: schema,
Table: table,
Values: make(map[string]interface{}),
Tp: loader.DeleteDMLType,
Database: schema,
Table: table,
Values: make(map[string]interface{}),
DestDBType: loader.MysqlDB,
}
txn.DMLs = append(txn.DMLs, dml)
for i, name := range names {
Expand Down Expand Up @@ -225,15 +228,15 @@ func genColumnNameList(columns []*model.ColumnInfo) (names []string) {
return
}

func generateColumnAndValue(columns []*model.ColumnInfo, columnValues map[int64]types.Datum) ([]*model.ColumnInfo, []interface{}, error) {
func generateColumnAndValue(columns []*model.ColumnInfo, columnValues map[int64]types.Datum, destDBType int) ([]*model.ColumnInfo, []interface{}, error) {
var newColumn []*model.ColumnInfo
var newColumnsValues []interface{}

for _, col := range columns {
val, ok := columnValues[col.ID]
if ok {
newColumn = append(newColumn, col)
value, err := formatData(val, col.FieldType)
value, err := formatData(val, col.FieldType, destDBType)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand All @@ -245,13 +248,19 @@ func generateColumnAndValue(columns []*model.ColumnInfo, columnValues map[int64]
return newColumn, newColumnsValues, nil
}

func formatData(data types.Datum, ft types.FieldType) (types.Datum, error) {
func formatData(data types.Datum, ft types.FieldType, destDBType int) (types.Datum, error) {
if data.GetValue() == nil {
return data, nil
}

switch ft.Tp {
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp, mysql.TypeDuration, mysql.TypeNewDecimal, mysql.TypeJSON:
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp, mysql.TypeNewDecimal, mysql.TypeJSON:
data = types.NewDatum(fmt.Sprintf("%v", data.GetValue()))
case mysql.TypeDuration:
//only for oracle db
if destDBType == loader.OracleDB {
return data, errors.New("unsupported column type[time]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return data, errors.New("unsupported column type[time]")
return types.Datum{}, errors.New("unsupported column type[time]")

}
data = types.NewDatum(fmt.Sprintf("%v", data.GetValue()))
case mysql.TypeEnum:
data = types.NewDatum(data.GetMysqlEnum().Value)
Expand All @@ -264,7 +273,20 @@ func formatData(data types.Datum, ft types.FieldType) (types.Datum, error) {
return types.Datum{}, err
}
data = types.NewUintDatum(val)
case mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
//only for oracle db
if destDBType == loader.OracleDB && isBlob(ft) {
data = types.NewBytesDatum(data.GetBytes())
}
}

return data, nil
}

func isBlob(ft types.FieldType) bool {
stype := types.TypeToStr(ft.Tp, ft.Charset)
if stype == "blob" || stype == "tinyblob" || stype == "mediumblob" || stype == "longblob" {
glorv marked this conversation as resolved.
Show resolved Hide resolved
glorv marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if stype == "blob" || stype == "tinyblob" || stype == "mediumblob" || stype == "longblob" {
switch stype:
case "blob", "tinyblob", "mediumblob", "longblob":
return true

return true
}
return false
}
9 changes: 6 additions & 3 deletions drainer/translator/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TiBinlogToOracleTxn(infoGetter TableInfoGetter, schema string, table string

switch mutType {
case tipb.MutationType_Insert:
names, args, err := genDBInsert(schema, pinfo, info, row)
names, args, err := genDBInsert(schema, pinfo, info, row, loader.OracleDB)
if err != nil {
return nil, errors.Annotate(err, "gen insert fail")
}
Expand All @@ -77,13 +77,14 @@ func TiBinlogToOracleTxn(infoGetter TableInfoGetter, schema string, table string
Table: downStreamTable,
Values: make(map[string]interface{}),
UpColumnsInfoMap: tableIDColumnsMap[mut.GetTableId()],
DestDBType: loader.OracleDB,
}
txn.DMLs = append(txn.DMLs, dml)
for i, name := range names {
dml.Values[strings.ToUpper(name)] = args[i]
}
case tipb.MutationType_Update:
names, args, oldArgs, err := genDBUpdate(schema, pinfo, info, row, canAppendDefaultValue)
names, args, oldArgs, err := genDBUpdate(schema, pinfo, info, row, canAppendDefaultValue, loader.OracleDB)
if err != nil {
return nil, errors.Annotate(err, "gen update fail")
}
Expand All @@ -95,6 +96,7 @@ func TiBinlogToOracleTxn(infoGetter TableInfoGetter, schema string, table string
Values: make(map[string]interface{}),
OldValues: make(map[string]interface{}),
UpColumnsInfoMap: tableIDColumnsMap[mut.GetTableId()],
DestDBType: loader.OracleDB,
}
txn.DMLs = append(txn.DMLs, dml)
for i, name := range names {
Expand All @@ -103,7 +105,7 @@ func TiBinlogToOracleTxn(infoGetter TableInfoGetter, schema string, table string
}

case tipb.MutationType_DeleteRow:
names, args, err := genDBDelete(schema, info, row)
names, args, err := genDBDelete(schema, info, row, loader.OracleDB)
if err != nil {
return nil, errors.Annotate(err, "gen delete fail")
}
Expand All @@ -114,6 +116,7 @@ func TiBinlogToOracleTxn(infoGetter TableInfoGetter, schema string, table string
Table: downStreamTable,
Values: make(map[string]interface{}),
UpColumnsInfoMap: tableIDColumnsMap[mut.GetTableId()],
DestDBType: loader.OracleDB,
}
txn.DMLs = append(txn.DMLs, dml)
for i, name := range names {
Expand Down
10 changes: 6 additions & 4 deletions drainer/translator/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"strings"
"time"

"github.com/pingcap/tidb-binlog/pkg/loader"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this to L34~35


//nolint
"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -137,7 +139,7 @@ func genInsert(schema string, ptable, table *model.TableInfo, row []byte) (event
val = getDefaultOrZeroValue(ptable, col)
}

value, err := formatData(val, col.FieldType)
value, err := formatData(val, col.FieldType, loader.DBTypeUnknown)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -173,11 +175,11 @@ func genUpdate(schema string, ptable, table *model.TableInfo, row []byte, canApp
for _, col := range columns {
val, ok := newColumnValues[col.ID]
if ok {
oldValue, err := formatData(oldColumnValues[col.ID], col.FieldType)
oldValue, err := formatData(oldColumnValues[col.ID], col.FieldType, loader.DBTypeUnknown)
if err != nil {
return nil, errors.Trace(err)
}
newValue, err := formatData(val, col.FieldType)
newValue, err := formatData(val, col.FieldType, loader.DBTypeUnknown)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -217,7 +219,7 @@ func genDelete(schema string, table *model.TableInfo, row []byte) (event *pb.Eve
for _, col := range columns {
val, ok := columnValues[col.ID]
if ok {
value, err := formatData(val, col.FieldType)
value, err := formatData(val, col.FieldType, loader.DBTypeUnknown)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/Shopify/sarama v1.30.0
github.com/dustin/go-humanize v1.0.0
github.com/go-sql-driver/mysql v1.6.0
github.com/godror/godror v0.29.0
github.com/godror/godror v0.33.0
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
Expand Down Expand Up @@ -71,7 +71,8 @@ require (
github.com/eapache/queue v1.1.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
github.com/go-logfmt/logfmt v0.5.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/godror/knownpb v0.1.0 // indirect
github.com/golang/glog v1.0.0 // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ github.com/Shopify/sarama v1.30.0 h1:TOZL6r37xJBDEMLx4yjB77jxbZYXPaDow08TSK6vIL0
github.com/Shopify/sarama v1.30.0/go.mod h1:zujlQQx1kzHsh4jfV1USnptCQrHAEZ2Hk8fTKCulPVs=
github.com/Shopify/toxiproxy/v2 v2.1.6-0.20210914104332-15ea381dcdae h1:ePgznFqEG1v3AjMklnK8H7BSc++FDSo7xfK9K7Af+0Y=
github.com/Shopify/toxiproxy/v2 v2.1.6-0.20210914104332-15ea381dcdae/go.mod h1:/cvHQkZ1fst0EmZnA5dFtiQdWCNCFYzb+uE2vqVgvx0=
github.com/UNO-SOFT/knownpb v0.0.2/go.mod h1:p80FhK7Efqtw1I44+KdbwHKT2Fg2KluTHKtkGN8YXfE=
github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -218,8 +217,11 @@ github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0 h1:TrB8swr/68K7m9CcGut2g3UOihhbcbiMAYiuTXdEih4=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
Expand All @@ -228,8 +230,8 @@ github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godror/godror v0.29.0 h1:J5PiWMy7glh4cZnExYk5ryAYx0c972YQUavh/ml+wlM=
github.com/godror/godror v0.29.0/go.mod h1:dwNYusI/Ug2JlbJuVvQQMhzlxVEJeq+MwaXwTYlDyC8=
github.com/godror/godror v0.33.0 h1:ZK1W7GohHVDPoLp/37U9QCSHARnYB4vVxNJya+CyWQ4=
github.com/godror/godror v0.33.0/go.mod h1:qHYnDISFm/h0vM+HDwg0LpyoLvxRKFRSwvhYF7ufjZ8=
github.com/godror/knownpb v0.1.0 h1:dJPK8s/I3PQzGGaGcUStL2zIaaICNzKKAK8BzP1uLio=
github.com/godror/knownpb v0.1.0/go.mod h1:4nRFbQo1dDuwKnblRXDxrfCFYeT4hjg3GjMqef58eRE=
github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
Expand Down
Loading