diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 1d247689..6c68c2f0 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -216,7 +216,6 @@ func (p *persistentStorage) initializeFromKVStorage(dbPath string, storage kv.St p.gcTs = gcTs p.upperBound = UpperBoundMeta{ FinishedDDLTs: 0, - SchemaVersion: 0, ResolvedTs: gcTs, } writeUpperBoundMeta(p.db, p.upperBound) @@ -1142,14 +1141,23 @@ func updateDatabaseInfoAndTableInfo( delete(databaseInfo.Tables, tableID) } - createTable := func(schemaID int64, tableID int64) { + createTable := func(schemaID int64, tableID int64, tableName string) { addTableToDB(schemaID, tableID) tableMap[tableID] = &BasicTableInfo{ SchemaID: schemaID, - Name: event.TableInfo.Name.O, + Name: tableName, } } + getTableName := func(tableID int64) string { + tableInfo, ok := tableMap[tableID] + if !ok { + log.Panic("table not found", + zap.Int64("tableID", tableID)) + } + return tableInfo.Name + } + dropTable := func(schemaID int64, tableID int64) { removeTableFromDB(schemaID, tableID) delete(tableMap, tableID) @@ -1169,7 +1177,7 @@ func updateDatabaseInfoAndTableInfo( } delete(databaseMap, event.CurrentSchemaID) case model.ActionCreateTable, model.ActionRecoverTable: - createTable(event.CurrentSchemaID, event.CurrentTableID) + createTable(event.CurrentSchemaID, event.CurrentTableID, event.TableInfo.Name.O) if isPartitionTable(event.TableInfo) { partitionInfo := make(BasicPartitionInfo) for _, id := range getAllPartitionIDs(event.TableInfo) { @@ -1191,7 +1199,7 @@ func updateDatabaseInfoAndTableInfo( // ignore case model.ActionTruncateTable: dropTable(event.CurrentSchemaID, event.PrevTableID) - createTable(event.CurrentSchemaID, event.CurrentTableID) + createTable(event.CurrentSchemaID, event.CurrentTableID, event.TableInfo.Name.O) if isPartitionTable(event.TableInfo) { delete(partitionMap, event.PrevTableID) partitionInfo := make(BasicPartitionInfo) @@ -1245,8 +1253,9 @@ func updateDatabaseInfoAndTableInfo( zap.Int64s("droppedIDs", droppedIDs)) } targetPartitionID := droppedIDs[0] + normalTableName := getTableName(event.PrevTableID) dropTable(event.PrevSchemaID, event.PrevTableID) - createTable(event.PrevSchemaID, targetPartitionID) + createTable(event.PrevSchemaID, targetPartitionID, normalTableName) delete(partitionMap[event.CurrentTableID], targetPartitionID) partitionMap[event.CurrentTableID][event.PrevTableID] = nil case model.ActionCreateTables: diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 91c75fa5..276fdce1 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -16,140 +16,43 @@ package schemastore import ( "fmt" "math" - "os" "reflect" "testing" - "github.com/cockroachdb/pebble" "github.com/pingcap/log" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/tidb/pkg/meta/model" + pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/stretchr/testify/require" "go.uber.org/zap" ) -// import ( -// "encoding/json" -// "fmt" -// "os" -// "testing" - -// "github.com/cockroachdb/pebble" -// "github.com/pingcap/log" -// "github.com/pingcap/ticdc/heartbeatpb" -// commonEvent "github.com/pingcap/ticdc/pkg/common/event" -// "github.com/pingcap/ticdc/pkg/filter" -// "github.com/pingcap/tidb/pkg/meta/model" -// "github.com/pingcap/tiflow/pkg/config" -// "github.com/stretchr/testify/require" -// "go.uber.org/zap" -// ) - -func loadPersistentStorageForTest(db *pebble.DB, gcTs uint64, upperBound UpperBoundMeta) *persistentStorage { - p := &persistentStorage{ - pdCli: nil, - kvStorage: nil, - db: db, - gcTs: gcTs, - upperBound: upperBound, - tableMap: make(map[int64]*BasicTableInfo), - partitionMap: make(map[int64]BasicPartitionInfo), - databaseMap: make(map[int64]*BasicDatabaseInfo), - tablesDDLHistory: make(map[int64][]uint64), - tableTriggerDDLHistory: make([]uint64, 0), - tableInfoStoreMap: make(map[int64]*versionedTableInfoStore), - tableRegisteredCount: make(map[int64]int), - } - p.initializeFromDisk() - return p -} - -// create an empty persistent storage at dbPath -func newEmptyPersistentStorageForTest(dbPath string) *persistentStorage { - if err := os.RemoveAll(dbPath); err != nil { - log.Panic("remove path fail", zap.Error(err)) - } - db, err := pebble.Open(dbPath, &pebble.Options{}) - if err != nil { - log.Panic("create database fail", zap.Error(err)) - } - gcTs := uint64(0) - upperBound := UpperBoundMeta{ - FinishedDDLTs: 0, - SchemaVersion: 0, - ResolvedTs: 0, - } - return loadPersistentStorageForTest(db, gcTs, upperBound) -} - -// load a persistent storage from dbPath -func loadPersistentStorageFromPathForTest(dbPath string, maxFinishedDDLTs uint64) *persistentStorage { - db, err := pebble.Open(dbPath, &pebble.Options{}) - if err != nil { - log.Panic("create database fail", zap.Error(err)) - } - gcTs := uint64(0) - upperBound := UpperBoundMeta{ - FinishedDDLTs: maxFinishedDDLTs, - SchemaVersion: 0, - ResolvedTs: 0, - } - return loadPersistentStorageForTest(db, gcTs, upperBound) -} - -// // create a persistent storage with initial db info and table info -// func newPersistentStorageForTest(dbPath string, gcTs uint64, initialDBInfos map[int64]*model.DBInfo) *persistentStorage { -// db, err := pebble.Open(dbPath, &pebble.Options{}) -// if err != nil { -// log.Panic("create database fail") -// } -// if len(initialDBInfos) > 0 { -// mockWriteKVSnapOnDisk(db, gcTs, initialDBInfos) -// } -// upperBound := UpperBoundMeta{ -// FinishedDDLTs: 0, -// SchemaVersion: 0, -// ResolvedTs: gcTs, -// } -// writeUpperBoundMeta(db, upperBound) -// return loadPersistentStorageForTest(db, gcTs, upperBound) -// } - -// func mockWriteKVSnapOnDisk(db *pebble.DB, snapTs uint64, dbInfos map[int64]*model.DBInfo) { -// batch := db.NewBatch() -// defer batch.Close() -// for _, dbInfo := range dbInfos { -// writeSchemaInfoToBatch(batch, snapTs, dbInfo) -// for _, tableInfo := range dbInfo.Tables { -// tableInfoValue, err := json.Marshal(tableInfo) -// if err != nil { -// log.Panic("marshal table info fail", zap.Error(err)) -// } -// writeTableInfoToBatch(batch, snapTs, dbInfo, tableInfoValue) -// } -// } -// if err := batch.Commit(pebble.NoSync); err != nil { -// log.Panic("commit batch fail", zap.Error(err)) -// } -// writeGcTs(db, snapTs) -// } - func TestApplyDDLJobs(t *testing.T) { + type PhysicalTableQueryTestCase struct { + snapTs uint64 + tableFilter filter.Filter + result []commonEvent.Table + } var testCases = []struct { - ddlJobs []*model.Job - tableMap map[int64]*BasicTableInfo - partitionMap map[int64]BasicPartitionInfo - databaseMap map[int64]*BasicDatabaseInfo - tablesDDLHistory map[int64][]uint64 - tableTriggerDDLHistory []uint64 + initailDBInfos []mockDBInfo + ddlJobs []*model.Job + tableMap map[int64]*BasicTableInfo + partitionMap map[int64]BasicPartitionInfo + databaseMap map[int64]*BasicDatabaseInfo + tablesDDLHistory map[int64][]uint64 + tableTriggerDDLHistory []uint64 + physicalTableQueryTestCases []PhysicalTableQueryTestCase }{ // test drop schema can clear table info and partition info { + nil, func() []*model.Job { return []*model.Job{ - buildCreateSchemaJob(100, "test", 1000), // create schema 100 - buildCreateTableJob(100, 200, "t1", 1010), // create table 200 - buildCreatePartitionTableJob(100, 300, "t1", []int64{301, 302, 303}, 1020), // create partition table 300 - buildDropSchemaJob(100, 1030), // drop schema 100 + buildCreateSchemaJobForTest(100, "test", 1000), // create schema 100 + buildCreateTableJobForTest(100, 200, "t1", 1010), // create table 200 + buildCreatePartitionTableJobForTest(100, 300, "t1", []int64{301, 302, 303}, 1020), // create partition table 300 + buildDropSchemaJobForTest(100, 1030), // drop schema 100 } }(), nil, @@ -162,16 +65,18 @@ func TestApplyDDLJobs(t *testing.T) { 303: {1020, 1030}, }, []uint64{1000, 1010, 1020, 1030}, + nil, }, // test create table/drop table/truncate table { + nil, func() []*model.Job { return []*model.Job{ - buildCreateSchemaJob(100, "test", 1000), // create schema 100 - buildCreateTableJob(100, 200, "t1", 1010), // create table 200 - buildCreateTableJob(100, 201, "t2", 1020), // create table 201 - buildDropTableJob(100, 201, 1030), // drop table 201 - buildTruncateTableJob(100, 200, 202, "t1", 1040), // truncate table 200 to 202 + buildCreateSchemaJobForTest(100, "test", 1000), // create schema 100 + buildCreateTableJobForTest(100, 200, "t1", 1010), // create table 200 + buildCreateTableJobForTest(100, 201, "t2", 1020), // create table 201 + buildDropTableJobForTest(100, 201, 1030), // drop table 201 + buildTruncateTableJobForTest(100, 200, 202, "t1", 1040), // truncate table 200 to 202 } }(), map[int64]*BasicTableInfo{ @@ -195,17 +100,73 @@ func TestApplyDDLJobs(t *testing.T) { 202: {1040}, }, []uint64{1000, 1010, 1020, 1030, 1040}, + []PhysicalTableQueryTestCase{ + { + snapTs: 1010, + result: []commonEvent.Table{ + { + SchemaID: 100, + TableID: 200, + SchemaTableName: &commonEvent.SchemaTableName{ + SchemaName: "test", + TableName: "t1", + }, + }, + }, + }, + { + snapTs: 1020, + result: []commonEvent.Table{ + { + SchemaID: 100, + TableID: 200, + SchemaTableName: &commonEvent.SchemaTableName{ + SchemaName: "test", + TableName: "t1", + }, + }, + { + SchemaID: 100, + TableID: 201, + SchemaTableName: &commonEvent.SchemaTableName{ + SchemaName: "test", + TableName: "t2", + }, + }, + }, + }, + { + snapTs: 1040, + tableFilter: buildTableFilterByNameForTest("test", "t1"), + result: []commonEvent.Table{ + { + SchemaID: 100, + TableID: 202, + SchemaTableName: &commonEvent.SchemaTableName{ + SchemaName: "test", + TableName: "t1", + }, + }, + }, + }, + { + snapTs: 1040, + tableFilter: buildTableFilterByNameForTest("test", "t2"), + result: []commonEvent.Table{}, + }, + }, }, // test partition table related ddl { + nil, func() []*model.Job { return []*model.Job{ - buildCreateSchemaJob(100, "test", 1000), // create schema 100 - buildCreatePartitionTableJob(100, 200, "t1", []int64{201, 202, 203}, 1010), // create partition table 200 - buildTruncatePartitionTableJob(100, 200, 300, "t1", []int64{204, 205, 206}, 1020), // truncate partition table 200 to 300 - buildAddPartitionJob(100, 300, "t1", []int64{204, 205, 206, 207}, 1030), // add partition 207 - buildDropPartitionJob(100, 300, "t1", []int64{205, 206, 207}, 1040), // drop partition 204 - buildTruncatePartitionJob(100, 300, "t1", []int64{206, 207, 208}, 1050), // truncate partition 205 to 208 + buildCreateSchemaJobForTest(100, "test", 1000), // create schema 100 + buildCreatePartitionTableJobForTest(100, 200, "t1", []int64{201, 202, 203}, 1010), // create partition table 200 + buildTruncatePartitionTableJobForTest(100, 200, 300, "t1", []int64{204, 205, 206}, 1020), // truncate partition table 200 to 300 + buildAddPartitionJobForTest(100, 300, "t1", []int64{204, 205, 206, 207}, 1030), // add partition 207 + buildDropPartitionJobForTest(100, 300, "t1", []int64{205, 206, 207}, 1040), // drop partition 204 + buildTruncatePartitionJobForTest(100, 300, "t1", []int64{206, 207, 208}, 1050), // truncate partition 205 to 208 } }(), map[int64]*BasicTableInfo{ @@ -240,12 +201,359 @@ func TestApplyDDLJobs(t *testing.T) { 208: {1050}, }, []uint64{1000, 1010, 1020, 1030, 1040, 1050}, + []PhysicalTableQueryTestCase{ + { + snapTs: 1010, + result: []commonEvent.Table{ + { + SchemaID: 100, + TableID: 201, + SchemaTableName: &commonEvent.SchemaTableName{ + SchemaName: "test", + TableName: "t1", + }, + }, + { + SchemaID: 100, + TableID: 202, + SchemaTableName: &commonEvent.SchemaTableName{ + SchemaName: "test", + TableName: "t1", + }, + }, + { + SchemaID: 100, + TableID: 203, + SchemaTableName: &commonEvent.SchemaTableName{ + SchemaName: "test", + TableName: "t1", + }, + }, + }, + }, + { + snapTs: 1050, + result: []commonEvent.Table{ + { + SchemaID: 100, + TableID: 206, + SchemaTableName: &commonEvent.SchemaTableName{ + SchemaName: "test", + TableName: "t1", + }, + }, + { + SchemaID: 100, + TableID: 207, + SchemaTableName: &commonEvent.SchemaTableName{ + SchemaName: "test", + TableName: "t1", + }, + }, + { + SchemaID: 100, + TableID: 208, + SchemaTableName: &commonEvent.SchemaTableName{ + SchemaName: "test", + TableName: "t1", + }, + }, + }, + }, + }, + }, + // test exchange partition + { + []mockDBInfo{ + { + dbInfo: &model.DBInfo{ + ID: 100, + Name: pmodel.NewCIStr("test"), + }, + }, + { + dbInfo: &model.DBInfo{ + ID: 105, + Name: pmodel.NewCIStr("test2"), + }, + }, + }, + func() []*model.Job { + return []*model.Job{ + buildCreatePartitionTableJobForTest(100, 200, "t1", []int64{201, 202, 203}, 1010), // create partition table 200 + buildCreateTableJobForTest(105, 300, "t2", 1020), // create table 300 + buildExchangePartitionJobForTest(105, 300, 200, "t1", []int64{201, 202, 300}, 1030), // exchange partition 203 with table 300 + } + }(), + map[int64]*BasicTableInfo{ + 200: { + SchemaID: 100, + Name: "t1", + }, + 203: { + SchemaID: 105, + Name: "t2", + }, + }, + map[int64]BasicPartitionInfo{ + 200: { + 201: nil, + 202: nil, + 300: nil, + }, + }, + map[int64]*BasicDatabaseInfo{ + 100: { + Name: "test", + Tables: map[int64]bool{ + 200: true, + }, + }, + 105: { + Name: "test2", + Tables: map[int64]bool{ + 203: true, + }, + }, + }, + map[int64][]uint64{ + 300: {1020, 1030}, + 201: {1010}, + 202: {1010}, + 203: {1010, 1030}, + }, + []uint64{1010, 1020, 1030}, + nil, + }, + // test rename table + { + []mockDBInfo{ + { + dbInfo: &model.DBInfo{ + ID: 100, + Name: pmodel.NewCIStr("test"), + }, + }, + { + dbInfo: &model.DBInfo{ + ID: 105, + Name: pmodel.NewCIStr("test2"), + }, + }, + }, + func() []*model.Job { + return []*model.Job{ + buildCreateTableJobForTest(100, 300, "t1", 1010), // create table 300 + buildRenameTableJobForTest(105, 300, "t2", 1020), // rename table 300 to schema 105 + } + }(), + map[int64]*BasicTableInfo{ + 300: { + SchemaID: 105, + Name: "t2", + }, + }, + nil, + map[int64]*BasicDatabaseInfo{ + 100: { + Name: "test", + Tables: map[int64]bool{}, + }, + 105: { + Name: "test2", + Tables: map[int64]bool{ + 300: true, + }, + }, + }, + map[int64][]uint64{ + 300: {1010, 1020}, + }, + []uint64{1010, 1020}, + nil, + }, + // test rename partition table + { + []mockDBInfo{ + { + dbInfo: &model.DBInfo{ + ID: 100, + Name: pmodel.NewCIStr("test"), + }, + }, + { + dbInfo: &model.DBInfo{ + ID: 105, + Name: pmodel.NewCIStr("test2"), + }, + }, + }, + func() []*model.Job { + return []*model.Job{ + buildCreatePartitionTableJobForTest(100, 300, "t1", []int64{301, 302, 303}, 1010), // create table 300 + buildRenamePartitionTableJobForTest(105, 300, "t2", []int64{301, 302, 303}, 1020), // rename table 300 to schema 105 + } + }(), + map[int64]*BasicTableInfo{ + 300: { + SchemaID: 105, + Name: "t2", + }, + }, + map[int64]BasicPartitionInfo{ + 300: { + 301: nil, + 302: nil, + 303: nil, + }, + }, + map[int64]*BasicDatabaseInfo{ + 100: { + Name: "test", + Tables: map[int64]bool{}, + }, + 105: { + Name: "test2", + Tables: map[int64]bool{ + 300: true, + }, + }, + }, + map[int64][]uint64{ + 301: {1010, 1020}, + 302: {1010, 1020}, + 303: {1010, 1020}, + }, + []uint64{1010, 1020}, + nil, + }, + // test create tables + { + []mockDBInfo{ + { + dbInfo: &model.DBInfo{ + ID: 100, + Name: pmodel.NewCIStr("test"), + }, + }, + }, + func() []*model.Job { + return []*model.Job{ + buildCreateTablesJobForTest(100, []int64{301, 302, 303}, []string{"t1", "t2", "t3"}, 1010), // create table 301, 302, 303 + } + }(), + map[int64]*BasicTableInfo{ + 301: { + SchemaID: 100, + Name: "t1", + }, + 302: { + SchemaID: 100, + Name: "t2", + }, + 303: { + SchemaID: 100, + Name: "t3", + }, + }, + nil, + map[int64]*BasicDatabaseInfo{ + 100: { + Name: "test", + Tables: map[int64]bool{ + 301: true, + 302: true, + 303: true, + }, + }, + }, + map[int64][]uint64{ + 301: {1010}, + 302: {1010}, + 303: {1010}, + }, + []uint64{1010}, + nil, + }, + // test create tables for partition table + { + []mockDBInfo{ + { + dbInfo: &model.DBInfo{ + ID: 100, + Name: pmodel.NewCIStr("test"), + }, + }, + }, + func() []*model.Job { + return []*model.Job{ + buildCreatePartitionTablesJobForTest(100, + []int64{300, 400, 500}, + []string{"t1", "t2", "t3"}, + [][]int64{{301, 302, 303}, {401, 402, 403}, {501, 502, 503}}, + 1010), // create table 301, 302, 303 + } + }(), + map[int64]*BasicTableInfo{ + 300: { + SchemaID: 100, + Name: "t1", + }, + 400: { + SchemaID: 100, + Name: "t2", + }, + 500: { + SchemaID: 100, + Name: "t3", + }, + }, + map[int64]BasicPartitionInfo{ + 300: { + 301: nil, + 302: nil, + 303: nil, + }, + 400: { + 401: nil, + 402: nil, + 403: nil, + }, + 500: { + 501: nil, + 502: nil, + 503: nil, + }, + }, + map[int64]*BasicDatabaseInfo{ + 100: { + Name: "test", + Tables: map[int64]bool{ + 300: true, + 400: true, + 500: true, + }, + }, + }, + map[int64][]uint64{ + 301: {1010}, + 302: {1010}, + 303: {1010}, + 401: {1010}, + 402: {1010}, + 403: {1010}, + 501: {1010}, + 502: {1010}, + 503: {1010}, + }, + []uint64{1010}, + nil, }, } for _, tt := range testCases { dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name()) - pStorage := newEmptyPersistentStorageForTest(dbPath) + pStorage := newPersistentStorageForTest(dbPath, tt.initailDBInfos) checkState := func(fromDisk bool) { if (tt.tableMap != nil && !reflect.DeepEqual(tt.tableMap, pStorage.tableMap)) || (tt.tableMap == nil && len(pStorage.tableMap) != 0) { @@ -272,6 +580,20 @@ func TestApplyDDLJobs(t *testing.T) { log.Warn("tableTriggerDDLHistory not equal", zap.Any("ddlJobs", tt.ddlJobs), zap.Any("expected", tt.tableTriggerDDLHistory), zap.Any("actual", pStorage.tableTriggerDDLHistory), zap.Bool("fromDisk", fromDisk)) t.Fatalf("tableTriggerDDLHistory not equal") } + for _, testCase := range tt.physicalTableQueryTestCases { + allPhysicalTables, err := pStorage.getAllPhysicalTables(testCase.snapTs, testCase.tableFilter) + require.Nil(t, err) + if !compareUnorderedTableSlices(testCase.result, allPhysicalTables) { + log.Warn("getAllPhysicalTables result wrong", + zap.Any("ddlJobs", tt.ddlJobs), + zap.Uint64("snapTs", testCase.snapTs), + zap.Any("tableFilter", testCase.tableFilter), + zap.Any("expected", testCase.result), + zap.Any("actual", allPhysicalTables), + zap.Bool("fromDisk", fromDisk)) + t.Fatalf("getAllPhysicalTables result wrong") + } + } } for _, job := range tt.ddlJobs { err := pStorage.handleDDLJob(job) @@ -2218,168 +2540,3 @@ func TestApplyDDLJobs(t *testing.T) { // // TODO: test obsolete data can be removed // } - -// func TestGetAllPhysicalTables(t *testing.T) { -// dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name()) -// err := os.RemoveAll(dbPath) -// require.Nil(t, err) - -// schemaID := int64(300) -// gcTs := uint64(600) -// tableID1 := int64(100) -// tableID2 := tableID1 + 100 - -// databaseInfo := make(map[int64]*model.DBInfo) -// databaseInfo[schemaID] = &model.DBInfo{ -// ID: schemaID, -// Name: model.NewCIStr("test"), -// Tables: []*model.TableInfo{ -// { -// ID: tableID1, -// Name: model.NewCIStr("t1"), -// }, -// { -// ID: tableID2, -// Name: model.NewCIStr("t2"), -// }, -// }, -// } -// pStorage := newPersistentStorageForTest(dbPath, gcTs, databaseInfo) - -// // create table t3 -// tableID3 := tableID2 + 100 -// { -// job := &model.Job{ -// Type: model.ActionCreateTable, -// SchemaID: schemaID, -// TableID: tableID3, -// BinlogInfo: &model.HistoryInfo{ -// SchemaVersion: 501, -// TableInfo: &model.TableInfo{ -// ID: tableID3, -// Name: model.NewCIStr("t3"), -// }, - -// FinishedTS: 601, -// }, -// } -// pStorage.handleDDLJob(job) -// } - -// // drop table t2 -// { -// job := &model.Job{ -// Type: model.ActionDropTable, -// SchemaID: schemaID, -// TableID: tableID2, -// BinlogInfo: &model.HistoryInfo{ -// SchemaVersion: 503, -// TableInfo: nil, -// FinishedTS: 603, -// }, -// } -// pStorage.handleDDLJob(job) -// } - -// // create partition table t4 -// tableID4 := tableID3 + 100 -// partitionID1 := tableID4 + 100 -// partitionID2 := tableID4 + 200 -// partitionID3 := tableID4 + 300 -// { -// job := &model.Job{ -// Type: model.ActionCreateTable, -// SchemaID: schemaID, -// TableID: tableID4, -// BinlogInfo: &model.HistoryInfo{ -// SchemaVersion: 503, -// TableInfo: &model.TableInfo{ -// ID: tableID4, -// Name: model.NewCIStr("t4"), -// Partition: &model.PartitionInfo{ -// Definitions: []model.PartitionDefinition{ -// { -// ID: partitionID1, -// }, -// { -// ID: partitionID2, -// }, -// { -// ID: partitionID3, -// }, -// }, -// }, -// }, -// FinishedTS: 609, -// }, -// } -// pStorage.handleDDLJob(job) -// } - -// // drop partition table t4 -// { -// job := &model.Job{ -// Type: model.ActionDropTable, -// SchemaID: schemaID, -// TableID: tableID4, -// BinlogInfo: &model.HistoryInfo{ -// SchemaVersion: 505, -// TableInfo: &model.TableInfo{ -// ID: tableID4, -// Name: model.NewCIStr("t4"), -// Partition: &model.PartitionInfo{ -// Definitions: []model.PartitionDefinition{ -// { -// ID: partitionID1, -// }, -// { -// ID: partitionID2, -// }, -// { -// ID: partitionID3, -// }, -// }, -// }, -// }, -// FinishedTS: 611, -// }, -// } -// pStorage.handleDDLJob(job) -// } - -// { -// allPhysicalTables, err := pStorage.getAllPhysicalTables(600, nil) -// require.Nil(t, err) -// require.Equal(t, 2, len(allPhysicalTables)) -// } - -// { -// allPhysicalTables, err := pStorage.getAllPhysicalTables(601, nil) -// require.Nil(t, err) -// require.Equal(t, 3, len(allPhysicalTables)) -// } - -// { -// allPhysicalTables, err := pStorage.getAllPhysicalTables(603, nil) -// require.Nil(t, err) -// require.Equal(t, 2, len(allPhysicalTables)) -// } - -// { -// allPhysicalTables, err := pStorage.getAllPhysicalTables(605, nil) -// require.Nil(t, err) -// require.Equal(t, 2, len(allPhysicalTables)) -// } - -// { -// allPhysicalTables, err := pStorage.getAllPhysicalTables(609, nil) -// require.Nil(t, err) -// require.Equal(t, 5, len(allPhysicalTables)) -// } - -// { -// allPhysicalTables, err := pStorage.getAllPhysicalTables(611, nil) -// require.Nil(t, err) -// require.Equal(t, 2, len(allPhysicalTables)) -// } -// } diff --git a/logservice/schemastore/test_utils.go b/logservice/schemastore/test_utils.go index f396d1fb..5b1c228c 100644 --- a/logservice/schemastore/test_utils.go +++ b/logservice/schemastore/test_utils.go @@ -14,11 +14,146 @@ package schemastore import ( + "encoding/json" + "fmt" + "os" + "reflect" + "sort" + "strings" + + "github.com/cockroachdb/pebble" + "github.com/pingcap/log" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/tidb/pkg/meta/model" pmodel "github.com/pingcap/tidb/pkg/parser/model" + "go.uber.org/zap" ) -func buildCreateSchemaJob(schemaID int64, schemaName string, finishedTs uint64) *model.Job { +func loadPersistentStorageForTest(db *pebble.DB, gcTs uint64, upperBound UpperBoundMeta) *persistentStorage { + p := &persistentStorage{ + pdCli: nil, + kvStorage: nil, + db: db, + gcTs: gcTs, + upperBound: upperBound, + tableMap: make(map[int64]*BasicTableInfo), + partitionMap: make(map[int64]BasicPartitionInfo), + databaseMap: make(map[int64]*BasicDatabaseInfo), + tablesDDLHistory: make(map[int64][]uint64), + tableTriggerDDLHistory: make([]uint64, 0), + tableInfoStoreMap: make(map[int64]*versionedTableInfoStore), + tableRegisteredCount: make(map[int64]int), + } + p.initializeFromDisk() + return p +} + +// create a persistent storage at dbPath with initailDBInfos +func newPersistentStorageForTest(dbPath string, initailDBInfos []mockDBInfo) *persistentStorage { + if err := os.RemoveAll(dbPath); err != nil { + log.Panic("remove path fail", zap.Error(err)) + } + db, err := pebble.Open(dbPath, &pebble.Options{}) + if err != nil { + log.Panic("create database fail", zap.Error(err)) + } + gcTs := uint64(0) + if len(initailDBInfos) > 0 { + mockWriteKVSnapOnDisk(db, gcTs, initailDBInfos) + } + upperBound := UpperBoundMeta{ + FinishedDDLTs: gcTs, + ResolvedTs: gcTs, + } + writeUpperBoundMeta(db, upperBound) + return loadPersistentStorageForTest(db, gcTs, upperBound) +} + +// load a persistent storage from dbPath +func loadPersistentStorageFromPathForTest(dbPath string, maxFinishedDDLTs uint64) *persistentStorage { + db, err := pebble.Open(dbPath, &pebble.Options{}) + if err != nil { + log.Panic("create database fail", zap.Error(err)) + } + gcTs := uint64(0) + upperBound := UpperBoundMeta{ + FinishedDDLTs: maxFinishedDDLTs, + ResolvedTs: maxFinishedDDLTs, + } + writeUpperBoundMeta(db, upperBound) + return loadPersistentStorageForTest(db, gcTs, upperBound) +} + +type mockDBInfo struct { + dbInfo *model.DBInfo + tables []*model.TableInfo +} + +func mockWriteKVSnapOnDisk(db *pebble.DB, snapTs uint64, dbInfos []mockDBInfo) { + batch := db.NewBatch() + defer batch.Close() + for _, dbInfo := range dbInfos { + writeSchemaInfoToBatch(batch, snapTs, dbInfo.dbInfo) + for _, tableInfo := range dbInfo.tables { + tableInfoValue, err := json.Marshal(tableInfo) + if err != nil { + log.Panic("marshal table info fail", zap.Error(err)) + } + writeTableInfoToBatch(batch, snapTs, dbInfo.dbInfo, tableInfoValue) + } + } + if err := batch.Commit(pebble.NoSync); err != nil { + log.Panic("commit batch fail", zap.Error(err)) + } + writeGcTs(db, snapTs) +} + +func compareUnorderedTableSlices(slice1, slice2 []commonEvent.Table) bool { + if len(slice1) != len(slice2) { + return false + } + + sort.Slice(slice1, func(i, j int) bool { + if slice1[i].SchemaID == slice1[j].SchemaID { + return slice1[i].TableID < slice1[j].TableID + } + return slice1[i].SchemaID < slice1[j].SchemaID + }) + + sort.Slice(slice2, func(i, j int) bool { + if slice2[i].SchemaID == slice2[j].SchemaID { + return slice2[i].TableID < slice2[j].TableID + } + return slice2[i].SchemaID < slice2[j].SchemaID + }) + + for i := range slice1 { + if slice1[i].SchemaID != slice2[i].SchemaID || + slice1[i].TableID != slice2[i].TableID || + !reflect.DeepEqual(slice1[i].SchemaTableName, slice2[i].SchemaTableName) { + return false + } + } + + return true +} + +func buildTableFilterByNameForTest(schemaName, tableName string) filter.Filter { + filterRule := fmt.Sprintf("%s.%s", schemaName, tableName) + log.Info("filterRule", zap.String("filterRule", filterRule)) + filterConfig := &config.FilterConfig{ + Rules: []string{filterRule}, + } + tableFilter, err := filter.NewFilter(filterConfig, "", false) + if err != nil { + log.Panic("build filter failed", zap.Error(err)) + } + return tableFilter +} + +func buildCreateSchemaJobForTest(schemaID int64, schemaName string, finishedTs uint64) *model.Job { return &model.Job{ Type: model.ActionCreateSchema, SchemaID: schemaID, @@ -32,7 +167,7 @@ func buildCreateSchemaJob(schemaID int64, schemaName string, finishedTs uint64) } } -func buildDropSchemaJob(schemaID int64, finishedTs uint64) *model.Job { +func buildDropSchemaJobForTest(schemaID int64, finishedTs uint64) *model.Job { return &model.Job{ Type: model.ActionDropSchema, SchemaID: schemaID, @@ -45,7 +180,7 @@ func buildDropSchemaJob(schemaID int64, finishedTs uint64) *model.Job { } } -func buildCreateTableJob(schemaID, tableID int64, tableName string, finishedTs uint64) *model.Job { +func buildCreateTableJobForTest(schemaID, tableID int64, tableName string, finishedTs uint64) *model.Job { return &model.Job{ Type: model.ActionCreateTable, SchemaID: schemaID, @@ -60,43 +195,78 @@ func buildCreateTableJob(schemaID, tableID int64, tableName string, finishedTs u } } -func buildCreatePartitionTableJob(schemaID, tableID int64, tableName string, partitionIDs []int64, finishedTs uint64) *model.Job { - partitionDefinitions := make([]model.PartitionDefinition, 0, len(partitionIDs)) - for _, partitionID := range partitionIDs { - partitionDefinitions = append(partitionDefinitions, model.PartitionDefinition{ - ID: partitionID, +func buildCreateTablesJobForTest(schemaID int64, tableIDs []int64, tableNames []string, finishedTs uint64) *model.Job { + multiTableInfos := make([]*model.TableInfo, 0, len(tableIDs)) + querys := make([]string, 0, len(tableIDs)) + for i, id := range tableIDs { + multiTableInfos = append(multiTableInfos, &model.TableInfo{ + ID: id, + Name: pmodel.NewCIStr(tableNames[i]), }) + querys = append(querys, fmt.Sprintf("create table %s(a int primary key)", tableNames[i])) } return &model.Job{ - Type: model.ActionCreateTable, + Type: model.ActionCreateTables, SchemaID: schemaID, - TableID: tableID, + Query: strings.Join(querys, ";"), BinlogInfo: &model.HistoryInfo{ - TableInfo: &model.TableInfo{ - ID: tableID, - Name: pmodel.NewCIStr(tableName), - Partition: &model.PartitionInfo{ - Definitions: partitionDefinitions, - }, + MultipleTableInfos: multiTableInfos, + FinishedTS: finishedTs, + }, + } +} + +func buildCreatePartitionTablesJobForTest(schemaID int64, tableIDs []int64, tableNames []string, partitionIDLists [][]int64, finishedTs uint64) *model.Job { + multiTableInfos := make([]*model.TableInfo, 0, len(tableIDs)) + querys := make([]string, 0, len(tableIDs)) + for i, id := range tableIDs { + partitionDefinitions := make([]model.PartitionDefinition, 0, len(partitionIDLists[i])) + for _, partitionID := range partitionIDLists[i] { + partitionDefinitions = append(partitionDefinitions, model.PartitionDefinition{ + ID: partitionID, + }) + } + multiTableInfos = append(multiTableInfos, &model.TableInfo{ + ID: id, + Name: pmodel.NewCIStr(tableNames[i]), + Partition: &model.PartitionInfo{ + Definitions: partitionDefinitions, }, - FinishedTS: finishedTs, + }) + querys = append(querys, fmt.Sprintf("create table %s(a int primary key)", tableNames[i])) + } + return &model.Job{ + Type: model.ActionCreateTables, + SchemaID: schemaID, + Query: strings.Join(querys, ";"), + BinlogInfo: &model.HistoryInfo{ + MultipleTableInfos: multiTableInfos, + FinishedTS: finishedTs, }, } } -func buildDropTableJob(schemaID, tableID int64, finishedTs uint64) *model.Job { +func buildRenameTableJobForTest(schemaID, tableID int64, tableName string, finishedTs uint64) *model.Job { return &model.Job{ - Type: model.ActionDropTable, + Type: model.ActionRenameTable, SchemaID: schemaID, TableID: tableID, BinlogInfo: &model.HistoryInfo{ + TableInfo: &model.TableInfo{ + ID: tableID, + Name: pmodel.NewCIStr(tableName), + }, FinishedTS: finishedTs, }, } } -// Note: `partitionIDs` must include all partition IDs of the original table. -func buildDropPartitionTableJob(schemaID, tableID int64, tableName string, partitionIDs []int64, finishedTs uint64) *model.Job { +func buildRenamePartitionTableJobForTest(schemaID, tableID int64, tableName string, partitionIDs []int64, finishedTs uint64) *model.Job { + return buildPartitionTableRelatedJobForTest(model.ActionRenameTable, schemaID, tableID, tableName, partitionIDs, finishedTs) +} + +// most partition table related job have the same structure +func buildPartitionTableRelatedJobForTest(jobType model.ActionType, schemaID, tableID int64, tableName string, partitionIDs []int64, finishedTs uint64) *model.Job { partitionDefinitions := make([]model.PartitionDefinition, 0, len(partitionIDs)) for _, partitionID := range partitionIDs { partitionDefinitions = append(partitionDefinitions, model.PartitionDefinition{ @@ -104,7 +274,7 @@ func buildDropPartitionTableJob(schemaID, tableID int64, tableName string, parti }) } return &model.Job{ - Type: model.ActionDropTable, + Type: jobType, SchemaID: schemaID, TableID: tableID, BinlogInfo: &model.HistoryInfo{ @@ -120,28 +290,27 @@ func buildDropPartitionTableJob(schemaID, tableID int64, tableName string, parti } } -func buildTruncateTableJob(schemaID, oldTableID, newTableID int64, tableName string, finishedTs uint64) *model.Job { +func buildCreatePartitionTableJobForTest(schemaID, tableID int64, tableName string, partitionIDs []int64, finishedTs uint64) *model.Job { + return buildPartitionTableRelatedJobForTest(model.ActionCreateTable, schemaID, tableID, tableName, partitionIDs, finishedTs) +} + +func buildDropTableJobForTest(schemaID, tableID int64, finishedTs uint64) *model.Job { return &model.Job{ - Type: model.ActionTruncateTable, + Type: model.ActionDropTable, SchemaID: schemaID, - TableID: oldTableID, + TableID: tableID, BinlogInfo: &model.HistoryInfo{ - TableInfo: &model.TableInfo{ - ID: newTableID, - Name: pmodel.NewCIStr(tableName), - }, FinishedTS: finishedTs, }, } } -func buildTruncatePartitionTableJob(schemaID, oldTableID, newTableID int64, tableName string, newPartitionIDs []int64, finishedTs uint64) *model.Job { - partitionDefinitions := make([]model.PartitionDefinition, 0, len(newPartitionIDs)) - for _, partitionID := range newPartitionIDs { - partitionDefinitions = append(partitionDefinitions, model.PartitionDefinition{ - ID: partitionID, - }) - } +// Note: `partitionIDs` must include all partition IDs of the original table. +func buildDropPartitionTableJobForTest(schemaID, tableID int64, tableName string, partitionIDs []int64, finishedTs uint64) *model.Job { + return buildPartitionTableRelatedJobForTest(model.ActionDropTable, schemaID, tableID, tableName, partitionIDs, finishedTs) +} + +func buildTruncateTableJobForTest(schemaID, oldTableID, newTableID int64, tableName string, finishedTs uint64) *model.Job { return &model.Job{ Type: model.ActionTruncateTable, SchemaID: schemaID, @@ -150,30 +319,26 @@ func buildTruncatePartitionTableJob(schemaID, oldTableID, newTableID int64, tabl TableInfo: &model.TableInfo{ ID: newTableID, Name: pmodel.NewCIStr(tableName), - Partition: &model.PartitionInfo{ - Definitions: partitionDefinitions, - }, }, FinishedTS: finishedTs, }, } } -// Note: `partitionIDs` must include all partition IDs of the table after add partition. -func buildAddPartitionJob(schemaID, tableID int64, tableName string, partitionIDs []int64, finishedTs uint64) *model.Job { - partitionDefinitions := make([]model.PartitionDefinition, 0, len(partitionIDs)) - for _, partitionID := range partitionIDs { +func buildTruncatePartitionTableJobForTest(schemaID, oldTableID, newTableID int64, tableName string, newPartitionIDs []int64, finishedTs uint64) *model.Job { + partitionDefinitions := make([]model.PartitionDefinition, 0, len(newPartitionIDs)) + for _, partitionID := range newPartitionIDs { partitionDefinitions = append(partitionDefinitions, model.PartitionDefinition{ ID: partitionID, }) } return &model.Job{ - Type: model.ActionAddTablePartition, + Type: model.ActionTruncateTable, SchemaID: schemaID, - TableID: tableID, + TableID: oldTableID, BinlogInfo: &model.HistoryInfo{ TableInfo: &model.TableInfo{ - ID: tableID, + ID: newTableID, Name: pmodel.NewCIStr(tableName), Partition: &model.PartitionInfo{ Definitions: partitionDefinitions, @@ -184,33 +349,30 @@ func buildAddPartitionJob(schemaID, tableID int64, tableName string, partitionID } } +// Note: `partitionIDs` must include all partition IDs of the table after add partition. +func buildAddPartitionJobForTest(schemaID, tableID int64, tableName string, partitionIDs []int64, finishedTs uint64) *model.Job { + return buildPartitionTableRelatedJobForTest(model.ActionAddTablePartition, schemaID, tableID, tableName, partitionIDs, finishedTs) +} + // Note: `partitionIDs` must include all partition IDs of the table after drop partition. -func buildDropPartitionJob(schemaID, tableID int64, tableName string, partitionIDs []int64, finishedTs uint64) *model.Job { - partitionDefinitions := make([]model.PartitionDefinition, 0, len(partitionIDs)) - for _, partitionID := range partitionIDs { - partitionDefinitions = append(partitionDefinitions, model.PartitionDefinition{ - ID: partitionID, - }) - } - return &model.Job{ - Type: model.ActionDropTablePartition, - SchemaID: schemaID, - TableID: tableID, - BinlogInfo: &model.HistoryInfo{ - TableInfo: &model.TableInfo{ - ID: tableID, - Name: pmodel.NewCIStr(tableName), - Partition: &model.PartitionInfo{ - Definitions: partitionDefinitions, - }, - }, - FinishedTS: finishedTs, - }, - } +func buildDropPartitionJobForTest(schemaID, tableID int64, tableName string, partitionIDs []int64, finishedTs uint64) *model.Job { + return buildPartitionTableRelatedJobForTest(model.ActionDropTablePartition, schemaID, tableID, tableName, partitionIDs, finishedTs) } // Note: `partitionIDs` must include all partition IDs of the table after truncate partition. -func buildTruncatePartitionJob(schemaID, tableID int64, tableName string, partitionIDs []int64, finishedTs uint64) *model.Job { +func buildTruncatePartitionJobForTest(schemaID, tableID int64, tableName string, partitionIDs []int64, finishedTs uint64) *model.Job { + return buildPartitionTableRelatedJobForTest(model.ActionTruncateTablePartition, schemaID, tableID, tableName, partitionIDs, finishedTs) +} + +// Note: `partitionIDs` must include all partition IDs of the table after exchange partition. +func buildExchangePartitionJobForTest( + normalSchemaID int64, + normalTableID int64, + partitionTableID int64, + partitionTableName string, + partitionIDs []int64, + finishedTs uint64, +) *model.Job { partitionDefinitions := make([]model.PartitionDefinition, 0, len(partitionIDs)) for _, partitionID := range partitionIDs { partitionDefinitions = append(partitionDefinitions, model.PartitionDefinition{ @@ -218,13 +380,13 @@ func buildTruncatePartitionJob(schemaID, tableID int64, tableName string, partit }) } return &model.Job{ - Type: model.ActionTruncateTablePartition, - SchemaID: schemaID, - TableID: tableID, + Type: model.ActionExchangeTablePartition, + SchemaID: normalSchemaID, + TableID: normalTableID, BinlogInfo: &model.HistoryInfo{ TableInfo: &model.TableInfo{ - ID: tableID, - Name: pmodel.NewCIStr(tableName), + ID: partitionTableID, + Name: pmodel.NewCIStr(partitionTableName), Partition: &model.PartitionInfo{ Definitions: partitionDefinitions, },