Skip to content

Commit

Permalink
drainer:sync lob data to oracle (#1158)
Browse files Browse the repository at this point in the history
ref #1157
  • Loading branch information
cartersz authored May 5, 2022
1 parent fc14c4a commit 09d4a33
Show file tree
Hide file tree
Showing 12 changed files with 284 additions and 284 deletions.
77 changes: 50 additions & 27 deletions drainer/translator/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

const implicitColID = -1

func genDBInsert(schema string, ptable, table *model.TableInfo, row []byte) (names []string, args []interface{}, err error) {
func genDBInsert(schema string, ptable, table *model.TableInfo, row []byte, destDBType loader.DBType) (names []string, args []interface{}, err error) {
columns := writableColumns(table)

columnValues, err := insertRowToDatums(table, row)
Expand All @@ -46,7 +46,7 @@ func genDBInsert(schema string, ptable, table *model.TableInfo, row []byte) (nam
val = getDefaultOrZeroValue(ptable, col)
}

value, err := formatData(val, col.FieldType)
value, err := formatData(val, col.FieldType, destDBType)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand All @@ -58,7 +58,7 @@ func genDBInsert(schema string, ptable, table *model.TableInfo, row []byte) (nam
return names, args, nil
}

func genDBUpdate(schema string, ptable, table *model.TableInfo, row []byte, canAppendDefaultValue bool) (names []string, values []interface{}, oldValues []interface{}, err error) {
func genDBUpdate(schema string, ptable, table *model.TableInfo, row []byte, canAppendDefaultValue bool, destDBType loader.DBType) (names []string, values []interface{}, oldValues []interface{}, err error) {
columns := writableColumns(table)
updtDecoder := newUpdateDecoder(ptable, table, canAppendDefaultValue)

Expand All @@ -69,12 +69,12 @@ func genDBUpdate(schema string, ptable, table *model.TableInfo, row []byte, canA
return nil, nil, nil, errors.Annotatef(err, "table `%s`.`%s`", schema, table.Name)
}

_, oldValues, err = generateColumnAndValue(columns, oldColumnValues)
_, oldValues, err = generateColumnAndValue(columns, oldColumnValues, destDBType)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}

updateColumns, values, err = generateColumnAndValue(columns, newColumnValues)
updateColumns, values, err = generateColumnAndValue(columns, newColumnValues, destDBType)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
Expand All @@ -84,7 +84,7 @@ func genDBUpdate(schema string, ptable, table *model.TableInfo, row []byte, canA
return
}

func genDBDelete(schema string, table *model.TableInfo, row []byte) (names []string, values []interface{}, err error) {
func genDBDelete(schema string, table *model.TableInfo, row []byte, destDBType loader.DBType) (names []string, values []interface{}, err error) {
columns := table.Columns
colsTypeMap := util.ToColumnTypeMap(columns)

Expand All @@ -93,7 +93,7 @@ func genDBDelete(schema string, table *model.TableInfo, row []byte) (names []str
return nil, nil, errors.Trace(err)
}

columns, values, err = generateColumnAndValue(columns, columnValues)
columns, values, err = generateColumnAndValue(columns, columnValues, destDBType)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down Expand Up @@ -144,33 +144,35 @@ func TiBinlogToTxn(infoGetter TableInfoGetter, schema string, table string, tiBi

switch mutType {
case tipb.MutationType_Insert:
names, args, err := genDBInsert(schema, pinfo, info, row)
names, args, err := genDBInsert(schema, pinfo, info, row, loader.MysqlDB)
if err != nil {
return nil, errors.Annotate(err, "gen insert fail")
}

dml := &loader.DML{
Tp: loader.InsertDMLType,
Database: schema,
Table: table,
Values: make(map[string]interface{}),
Tp: loader.InsertDMLType,
Database: schema,
Table: table,
Values: make(map[string]interface{}),
DestDBType: loader.MysqlDB,
}
txn.DMLs = append(txn.DMLs, dml)
for i, name := range names {
dml.Values[name] = args[i]
}
case tipb.MutationType_Update:
names, args, oldArgs, err := genDBUpdate(schema, pinfo, info, row, canAppendDefaultValue)
names, args, oldArgs, err := genDBUpdate(schema, pinfo, info, row, canAppendDefaultValue, loader.MysqlDB)
if err != nil {
return nil, errors.Annotate(err, "gen update fail")
}

dml := &loader.DML{
Tp: loader.UpdateDMLType,
Database: schema,
Table: table,
Values: make(map[string]interface{}),
OldValues: make(map[string]interface{}),
Tp: loader.UpdateDMLType,
Database: schema,
Table: table,
Values: make(map[string]interface{}),
OldValues: make(map[string]interface{}),
DestDBType: loader.MysqlDB,
}
txn.DMLs = append(txn.DMLs, dml)
for i, name := range names {
Expand All @@ -179,16 +181,17 @@ func TiBinlogToTxn(infoGetter TableInfoGetter, schema string, table string, tiBi
}

case tipb.MutationType_DeleteRow:
names, args, err := genDBDelete(schema, info, row)
names, args, err := genDBDelete(schema, info, row, loader.MysqlDB)
if err != nil {
return nil, errors.Annotate(err, "gen delete fail")
}

dml := &loader.DML{
Tp: loader.DeleteDMLType,
Database: schema,
Table: table,
Values: make(map[string]interface{}),
Tp: loader.DeleteDMLType,
Database: schema,
Table: table,
Values: make(map[string]interface{}),
DestDBType: loader.MysqlDB,
}
txn.DMLs = append(txn.DMLs, dml)
for i, name := range names {
Expand Down Expand Up @@ -225,15 +228,15 @@ func genColumnNameList(columns []*model.ColumnInfo) (names []string) {
return
}

func generateColumnAndValue(columns []*model.ColumnInfo, columnValues map[int64]types.Datum) ([]*model.ColumnInfo, []interface{}, error) {
func generateColumnAndValue(columns []*model.ColumnInfo, columnValues map[int64]types.Datum, destDBType loader.DBType) ([]*model.ColumnInfo, []interface{}, error) {
var newColumn []*model.ColumnInfo
var newColumnsValues []interface{}

for _, col := range columns {
val, ok := columnValues[col.ID]
if ok {
newColumn = append(newColumn, col)
value, err := formatData(val, col.FieldType)
value, err := formatData(val, col.FieldType, destDBType)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand All @@ -245,13 +248,19 @@ func generateColumnAndValue(columns []*model.ColumnInfo, columnValues map[int64]
return newColumn, newColumnsValues, nil
}

func formatData(data types.Datum, ft types.FieldType) (types.Datum, error) {
func formatData(data types.Datum, ft types.FieldType, destDBType loader.DBType) (types.Datum, error) {
if data.GetValue() == nil {
return data, nil
}

switch ft.Tp {
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp, mysql.TypeDuration, mysql.TypeNewDecimal, mysql.TypeJSON:
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp, mysql.TypeNewDecimal, mysql.TypeJSON:
data = types.NewDatum(fmt.Sprintf("%v", data.GetValue()))
case mysql.TypeDuration:
//only for oracle db
if destDBType == loader.OracleDB {
return types.Datum{}, errors.New("unsupported column type[time]")
}
data = types.NewDatum(fmt.Sprintf("%v", data.GetValue()))
case mysql.TypeEnum:
data = types.NewDatum(data.GetMysqlEnum().Value)
Expand All @@ -264,7 +273,21 @@ func formatData(data types.Datum, ft types.FieldType) (types.Datum, error) {
return types.Datum{}, err
}
data = types.NewUintDatum(val)
case mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
//only for oracle db
if destDBType == loader.OracleDB && isBlob(ft) {
data = types.NewBytesDatum(data.GetBytes())
}
}

return data, nil
}

func isBlob(ft types.FieldType) bool {
stype := types.TypeToStr(ft.Tp, ft.Charset)
switch stype {
case "blob", "tinyblob", "mediumblob", "longblob":
return true
}
return false
}
9 changes: 6 additions & 3 deletions drainer/translator/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TiBinlogToOracleTxn(infoGetter TableInfoGetter, schema string, table string

switch mutType {
case tipb.MutationType_Insert:
names, args, err := genDBInsert(schema, pinfo, info, row)
names, args, err := genDBInsert(schema, pinfo, info, row, loader.OracleDB)
if err != nil {
return nil, errors.Annotate(err, "gen insert fail")
}
Expand All @@ -77,13 +77,14 @@ func TiBinlogToOracleTxn(infoGetter TableInfoGetter, schema string, table string
Table: downStreamTable,
Values: make(map[string]interface{}),
UpColumnsInfoMap: tableIDColumnsMap[mut.GetTableId()],
DestDBType: loader.OracleDB,
}
txn.DMLs = append(txn.DMLs, dml)
for i, name := range names {
dml.Values[strings.ToUpper(name)] = args[i]
}
case tipb.MutationType_Update:
names, args, oldArgs, err := genDBUpdate(schema, pinfo, info, row, canAppendDefaultValue)
names, args, oldArgs, err := genDBUpdate(schema, pinfo, info, row, canAppendDefaultValue, loader.OracleDB)
if err != nil {
return nil, errors.Annotate(err, "gen update fail")
}
Expand All @@ -95,6 +96,7 @@ func TiBinlogToOracleTxn(infoGetter TableInfoGetter, schema string, table string
Values: make(map[string]interface{}),
OldValues: make(map[string]interface{}),
UpColumnsInfoMap: tableIDColumnsMap[mut.GetTableId()],
DestDBType: loader.OracleDB,
}
txn.DMLs = append(txn.DMLs, dml)
for i, name := range names {
Expand All @@ -103,7 +105,7 @@ func TiBinlogToOracleTxn(infoGetter TableInfoGetter, schema string, table string
}

case tipb.MutationType_DeleteRow:
names, args, err := genDBDelete(schema, info, row)
names, args, err := genDBDelete(schema, info, row, loader.OracleDB)
if err != nil {
return nil, errors.Annotate(err, "gen delete fail")
}
Expand All @@ -114,6 +116,7 @@ func TiBinlogToOracleTxn(infoGetter TableInfoGetter, schema string, table string
Table: downStreamTable,
Values: make(map[string]interface{}),
UpColumnsInfoMap: tableIDColumnsMap[mut.GetTableId()],
DestDBType: loader.OracleDB,
}
txn.DMLs = append(txn.DMLs, dml)
for i, name := range names {
Expand Down
9 changes: 5 additions & 4 deletions drainer/translator/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/util/codec"
tipb "github.com/pingcap/tipb/go-binlog"

"github.com/pingcap/tidb-binlog/pkg/loader"
"github.com/pingcap/tidb-binlog/pkg/util"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
)
Expand Down Expand Up @@ -137,7 +138,7 @@ func genInsert(schema string, ptable, table *model.TableInfo, row []byte) (event
val = getDefaultOrZeroValue(ptable, col)
}

value, err := formatData(val, col.FieldType)
value, err := formatData(val, col.FieldType, loader.DBTypeUnknown)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -173,11 +174,11 @@ func genUpdate(schema string, ptable, table *model.TableInfo, row []byte, canApp
for _, col := range columns {
val, ok := newColumnValues[col.ID]
if ok {
oldValue, err := formatData(oldColumnValues[col.ID], col.FieldType)
oldValue, err := formatData(oldColumnValues[col.ID], col.FieldType, loader.DBTypeUnknown)
if err != nil {
return nil, errors.Trace(err)
}
newValue, err := formatData(val, col.FieldType)
newValue, err := formatData(val, col.FieldType, loader.DBTypeUnknown)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -217,7 +218,7 @@ func genDelete(schema string, table *model.TableInfo, row []byte) (event *pb.Eve
for _, col := range columns {
val, ok := columnValues[col.ID]
if ok {
value, err := formatData(val, col.FieldType)
value, err := formatData(val, col.FieldType, loader.DBTypeUnknown)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/Shopify/sarama v1.30.0
github.com/dustin/go-humanize v1.0.0
github.com/go-sql-driver/mysql v1.6.0
github.com/godror/godror v0.29.0
github.com/godror/godror v0.33.0
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
Expand Down Expand Up @@ -71,7 +71,8 @@ require (
github.com/eapache/queue v1.1.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
github.com/go-logfmt/logfmt v0.5.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/godror/knownpb v0.1.0 // indirect
github.com/golang/glog v1.0.0 // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ github.com/Shopify/sarama v1.30.0 h1:TOZL6r37xJBDEMLx4yjB77jxbZYXPaDow08TSK6vIL0
github.com/Shopify/sarama v1.30.0/go.mod h1:zujlQQx1kzHsh4jfV1USnptCQrHAEZ2Hk8fTKCulPVs=
github.com/Shopify/toxiproxy/v2 v2.1.6-0.20210914104332-15ea381dcdae h1:ePgznFqEG1v3AjMklnK8H7BSc++FDSo7xfK9K7Af+0Y=
github.com/Shopify/toxiproxy/v2 v2.1.6-0.20210914104332-15ea381dcdae/go.mod h1:/cvHQkZ1fst0EmZnA5dFtiQdWCNCFYzb+uE2vqVgvx0=
github.com/UNO-SOFT/knownpb v0.0.2/go.mod h1:p80FhK7Efqtw1I44+KdbwHKT2Fg2KluTHKtkGN8YXfE=
github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -218,8 +217,11 @@ github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0 h1:TrB8swr/68K7m9CcGut2g3UOihhbcbiMAYiuTXdEih4=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
Expand All @@ -228,8 +230,8 @@ github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godror/godror v0.29.0 h1:J5PiWMy7glh4cZnExYk5ryAYx0c972YQUavh/ml+wlM=
github.com/godror/godror v0.29.0/go.mod h1:dwNYusI/Ug2JlbJuVvQQMhzlxVEJeq+MwaXwTYlDyC8=
github.com/godror/godror v0.33.0 h1:ZK1W7GohHVDPoLp/37U9QCSHARnYB4vVxNJya+CyWQ4=
github.com/godror/godror v0.33.0/go.mod h1:qHYnDISFm/h0vM+HDwg0LpyoLvxRKFRSwvhYF7ufjZ8=
github.com/godror/knownpb v0.1.0 h1:dJPK8s/I3PQzGGaGcUStL2zIaaICNzKKAK8BzP1uLio=
github.com/godror/knownpb v0.1.0/go.mod h1:4nRFbQo1dDuwKnblRXDxrfCFYeT4hjg3GjMqef58eRE=
github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
Expand Down
Loading

0 comments on commit 09d4a33

Please sign in to comment.