From 28307147f2b86c3a29bd914406fa03250e118158 Mon Sep 17 00:00:00 2001 From: EasonBall <592838129@qq.com> Date: Wed, 6 Mar 2024 14:38:06 +0800 Subject: [PATCH] infoschemaV2: support create&drop&update table/schema for v2 (#51424) ref pingcap/tidb#50959 --- pkg/domain/domain.go | 5 +- pkg/infoschema/builder.go | 109 ++++++++++++++++++++++++++---- pkg/infoschema/infoschema_test.go | 66 ++++++++++-------- pkg/infoschema/infoschema_v2.go | 51 +++++++++++--- 4 files changed, 180 insertions(+), 51 deletions(-) diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 991ba561fad5a..e11520f860fa9 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -288,7 +288,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i // We can fall back to full load, don't need to return the error. logutil.BgLogger().Error("failed to load schema diff", zap.Error(err)) } - + // full load. schemas, err := do.fetchAllSchemasWithTables(m) if err != nil { return nil, false, currentSchemaVersion, nil, err @@ -303,7 +303,8 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i if err != nil { return nil, false, currentSchemaVersion, nil, err } - + // clear data + do.infoCache.Data = infoschema.NewData() newISBuilder, err := infoschema.NewBuilder(do, do.sysFacHack, do.infoCache.Data).InitWithDBInfos(schemas, policies, resourceGroups, neededSchemaVersion) if err != nil { return nil, false, currentSchemaVersion, nil, err diff --git a/pkg/infoschema/builder.go b/pkg/infoschema/builder.go index badfc00139aec..29a7f160872ae 100644 --- a/pkg/infoschema/builder.go +++ b/pkg/infoschema/builder.go @@ -65,7 +65,7 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro case model.ActionCreateSchema: return nil, b.applyCreateSchema(m, diff) case model.ActionDropSchema: - return b.applyDropSchema(diff.SchemaID), nil + return b.applyDropSchema(diff), nil case model.ActionRecoverSchema: return b.applyRecoverSchema(m, diff) case model.ActionModifySchemaCharsetAndCollate: @@ -399,9 +399,9 @@ func (b *Builder) dropTableForUpdate(newTableID, oldTableID int64, dbInfo *model ) } oldDBInfo := b.getSchemaAndCopyIfNecessary(oldRoDBInfo.Name.L) - tmpIDs = b.applyDropTable(oldDBInfo, oldTableID, tmpIDs) + tmpIDs = b.applyDropTable(diff, oldDBInfo, oldTableID, tmpIDs) } else { - tmpIDs = b.applyDropTable(dbInfo, oldTableID, tmpIDs) + tmpIDs = b.applyDropTable(diff, dbInfo, oldTableID, tmpIDs) } if oldTableID != newTableID { @@ -413,6 +413,9 @@ func (b *Builder) dropTableForUpdate(newTableID, oldTableID int64, dbInfo *model } func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { + if b.enableV2 { + return b.applyTableUpdateV2(m, diff) + } roDBInfo, ok := b.infoSchema.SchemaByID(diff.SchemaID) if !ok { return nil, ErrDatabaseNotExists.GenWithStackByArgs( @@ -440,6 +443,34 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 return tblIDs, nil } +// TODO: more UT to check the correctness. +func (b *Builder) applyTableUpdateV2(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { + oldDBInfo, ok := b.infoschemaV2.SchemaByID(diff.SchemaID) + if !ok { + return nil, ErrDatabaseNotExists.GenWithStackByArgs( + fmt.Sprintf("(Schema ID %d)", diff.SchemaID), + ) + } + + oldTableID, newTableID := b.getTableIDs(diff) + b.updateBundleForTableUpdate(diff, newTableID, oldTableID) + + tblIDs, allocs, err := b.dropTableForUpdate(newTableID, oldTableID, oldDBInfo, diff) + if err != nil { + return nil, err + } + + if tableIDIsValid(newTableID) { + // All types except DropTableOrView. + var err error + tblIDs, err = b.applyCreateTable(m, oldDBInfo, newTableID, allocs, diff.Type, tblIDs, diff.Version) + if err != nil { + return nil, errors.Trace(err) + } + } + return tblIDs, nil +} + func filterAllocators(diff *model.SchemaDiff, oldAllocs autoid.Allocators) autoid.Allocators { var newAllocs autoid.Allocators switch diff.Type { @@ -521,8 +552,11 @@ func (b *Builder) applyModifySchemaDefaultPlacement(m *meta.Meta, diff *model.Sc return nil } -func (b *Builder) applyDropSchema(schemaID int64) []int64 { - di, ok := b.infoSchema.SchemaByID(schemaID) +func (b *Builder) applyDropSchema(diff *model.SchemaDiff) []int64 { + if b.enableV2 { + return b.applyDropSchemaV2(diff) + } + di, ok := b.infoSchema.SchemaByID(diff.SchemaID) if !ok { return nil } @@ -543,11 +577,56 @@ func (b *Builder) applyDropSchema(schemaID int64) []int64 { di = di.Clone() for _, id := range tableIDs { b.deleteBundle(b.infoSchema, id) - b.applyDropTable(di, id, nil) + b.applyDropTable(diff, di, id, nil) + } + return tableIDs +} + +func (b *Builder) applyDropSchemaV2(diff *model.SchemaDiff) []int64 { + di, ok := b.infoschemaV2.SchemaByID(diff.SchemaID) + if !ok { + return nil + } + + b.infoData.deleteDB(di.Name) + tableIDs := make([]int64, 0, len(di.Tables)) + for _, tbl := range di.Tables { + tableIDs = appendAffectedIDs(tableIDs, tbl) + } + + di = di.Clone() + for _, id := range tableIDs { + b.deleteBundle(b.infoSchema, id) + b.applyDropTableV2(diff, di, id, nil) } return tableIDs } +func (b *Builder) applyDropTableV2(diff *model.SchemaDiff, dbInfo *model.DBInfo, tableID int64, affected []int64) []int64 { + // Remove the table in temporaryTables + if b.infoSchemaMisc.temporaryTableIDs != nil { + delete(b.infoSchemaMisc.temporaryTableIDs, tableID) + } + + table, ok := b.infoschemaV2.TableByID(tableID) + + if !ok { + return nil + } + + b.infoData.delete(tableItem{ + dbName: dbInfo.Name.L, + dbID: dbInfo.ID, + tableName: table.Meta().Name.L, + tableID: table.Meta().ID, + schemaVersion: diff.Version, + }) + + // The old DBInfo still holds a reference to old table info, we need to remove it. + b.deleteReferredForeignKeys(dbInfo, tableID) + return affected +} + func (b *Builder) applyRecoverSchema(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { if di, ok := b.infoSchema.SchemaByID(diff.SchemaID); ok { return nil, ErrDatabaseExists.GenWithStackByArgs( @@ -726,7 +805,10 @@ func ConvertOldVersionUTF8ToUTF8MB4IfNeed(tbInfo *model.TableInfo) { } } -func (b *Builder) applyDropTable(dbInfo *model.DBInfo, tableID int64, affected []int64) []int64 { +func (b *Builder) applyDropTable(diff *model.SchemaDiff, dbInfo *model.DBInfo, tableID int64, affected []int64) []int64 { + if b.enableV2 { + return b.applyDropTableV2(diff, dbInfo, tableID, affected) + } bucketIdx := tableBucketIdx(tableID) sortedTbls := b.infoSchema.sortedTablesBuckets[bucketIdx] idx := sortedTbls.searchTable(tableID) @@ -745,8 +827,12 @@ func (b *Builder) applyDropTable(dbInfo *model.DBInfo, tableID int64, affected [ if b.infoSchema.temporaryTableIDs != nil { delete(b.infoSchema.temporaryTableIDs, tableID) } - // The old DBInfo still holds a reference to old table info, we need to remove it. + b.deleteReferredForeignKeys(dbInfo, tableID) + return affected +} + +func (b *Builder) deleteReferredForeignKeys(dbInfo *model.DBInfo, tableID int64) { for i, tblInfo := range dbInfo.Tables { if tblInfo.ID == tableID { if i == len(dbInfo.Tables)-1 { @@ -758,7 +844,6 @@ func (b *Builder) applyDropTable(dbInfo *model.DBInfo, tableID int64, affected [ break } } - return affected } // Build builds and returns the built infoschema. @@ -781,8 +866,8 @@ func (b *Builder) InitWithOldInfoSchema(oldSchema InfoSchema) (*Builder, error) } var oldIS *infoSchema - if proxy, ok := oldSchema.(*infoschemaV2); ok { - oldIS = proxy.infoSchema + if schemaV2, ok := oldSchema.(*infoschemaV2); ok { + oldIS = schemaV2.infoSchema } else { oldIS = oldSchema.(*infoSchema) } @@ -914,8 +999,8 @@ func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableF b.addTemporaryTable(tblInfo.ID) } } - b.addDB(schemaVersion, di, schTbls) + return nil } diff --git a/pkg/infoschema/infoschema_test.go b/pkg/infoschema/infoschema_test.go index 5193a8d764a8a..5fcb16d26dc21 100644 --- a/pkg/infoschema/infoschema_test.go +++ b/pkg/infoschema/infoschema_test.go @@ -927,6 +927,7 @@ type infoschemaTestContext struct { t *testing.T re autoid.Requirement ctx context.Context + data *infoschema.Data is infoschema.InfoSchema } @@ -949,7 +950,7 @@ func (tc *infoschemaTestContext) createSchema() { tc.dbInfo = dbInfo // init infoschema - builder, err := infoschema.NewBuilder(tc.re, nil, nil).InitWithDBInfos([]*model.DBInfo{tc.dbInfo}, nil, nil, 1) + builder, err := infoschema.NewBuilder(tc.re, nil, tc.data).InitWithDBInfos([]*model.DBInfo{}, nil, nil, 1) require.NoError(tc.t, err) tc.is = builder.Build() } @@ -958,7 +959,7 @@ func (tc *infoschemaTestContext) runCreateSchema() { // create schema tc.createSchema() - tc.applyDiffAddCheck(&model.SchemaDiff{Type: model.ActionCreateSchema, SchemaID: tc.dbInfo.ID}, func(tc *infoschemaTestContext) { + tc.applyDiffAndCheck(&model.SchemaDiff{Type: model.ActionCreateSchema, SchemaID: tc.dbInfo.ID}, func(tc *infoschemaTestContext) { dbInfo, ok := tc.is.SchemaByID(tc.dbInfo.ID) require.True(tc.t, ok) require.Equal(tc.t, dbInfo.Name, tc.dbInfo.Name) @@ -977,11 +978,11 @@ func (tc *infoschemaTestContext) dropSchema() { func (tc *infoschemaTestContext) runDropSchema() { // create schema tc.runCreateSchema() - // drop schema tc.dropSchema() - tc.applyDiffAddCheck(&model.SchemaDiff{Type: model.ActionDropSchema, SchemaID: tc.dbInfo.ID}, func(tc *infoschemaTestContext) { - _, ok := tc.is.SchemaByID(tc.dbInfo.ID) + + tc.applyDiffAndCheck(&model.SchemaDiff{Type: model.ActionDropSchema, SchemaID: tc.dbInfo.ID, Version: 100}, func(tc *infoschemaTestContext) { + _, ok := tc.is.SchemaByName(tc.dbInfo.Name) require.False(tc.t, ok) }) } @@ -1024,7 +1025,7 @@ func (tc *infoschemaTestContext) runCreateTable(tblName string) int64 { // create table tblID := tc.createTable(tblName) - tc.applyDiffAddCheck(&model.SchemaDiff{Type: model.ActionCreateTable, SchemaID: tc.dbInfo.ID, TableID: tblID}, func(tc *infoschemaTestContext) { + tc.applyDiffAndCheck(&model.SchemaDiff{Type: model.ActionCreateTable, SchemaID: tc.dbInfo.ID, TableID: tblID}, func(tc *infoschemaTestContext) { tbl, ok := tc.is.TableByID(tblID) require.True(tc.t, ok) require.Equal(tc.t, tbl.Meta().Name.O, tblName) @@ -1047,7 +1048,7 @@ func (tc *infoschemaTestContext) runDropTable(tblName string) { // dropTable tc.dropTable(tblName, tblID) - tc.applyDiffAddCheck(&model.SchemaDiff{Type: model.ActionDropTable, SchemaID: tc.dbInfo.ID, TableID: tblID}, func(tc *infoschemaTestContext) { + tc.applyDiffAndCheck(&model.SchemaDiff{Type: model.ActionDropTable, SchemaID: tc.dbInfo.ID, TableID: tblID}, func(tc *infoschemaTestContext) { tbl, ok := tc.is.TableByID(tblID) require.False(tc.t, ok) require.Nil(tc.t, tbl) @@ -1070,7 +1071,7 @@ func (tc *infoschemaTestContext) runAddColumn(tblName string) { require.NoError(tc.t, err) tc.addColumn(tbl.Meta()) - tc.applyDiffAddCheck(&model.SchemaDiff{Type: model.ActionAddColumn, SchemaID: tc.dbInfo.ID, TableID: tbl.Meta().ID}, func(tc *infoschemaTestContext) { + tc.applyDiffAndCheck(&model.SchemaDiff{Type: model.ActionAddColumn, SchemaID: tc.dbInfo.ID, TableID: tbl.Meta().ID}, func(tc *infoschemaTestContext) { tbl, ok := tc.is.TableByID(tbl.Meta().ID) require.True(tc.t, ok) require.Equal(tc.t, 2, len(tbl.Cols())) @@ -1103,7 +1104,7 @@ func (tc *infoschemaTestContext) runModifyColumn(tblName string) { require.NoError(tc.t, err) tc.modifyColumn(tbl.Meta()) - tc.applyDiffAddCheck(&model.SchemaDiff{Type: model.ActionModifyColumn, SchemaID: tc.dbInfo.ID, TableID: tbl.Meta().ID}, func(tc *infoschemaTestContext) { + tc.applyDiffAndCheck(&model.SchemaDiff{Type: model.ActionModifyColumn, SchemaID: tc.dbInfo.ID, TableID: tbl.Meta().ID}, func(tc *infoschemaTestContext) { tbl, ok := tc.is.TableByID(tbl.Meta().ID) require.True(tc.t, ok) require.Equal(tc.t, "test", tbl.Cols()[0].Comment) @@ -1122,11 +1123,11 @@ func (tc *infoschemaTestContext) modifyColumn(tblInfo *model.TableInfo) { require.NoError(tc.t, err) } -func (tc *infoschemaTestContext) applyDiffAddCheck(diff *model.SchemaDiff, checkFn func(tc *infoschemaTestContext)) { +func (tc *infoschemaTestContext) applyDiffAndCheck(diff *model.SchemaDiff, checkFn func(tc *infoschemaTestContext)) { txn, err := tc.re.Store().Begin() require.NoError(tc.t, err) - builder, err := infoschema.NewBuilder(tc.re, nil, nil).InitWithOldInfoSchema(tc.is) + builder, err := infoschema.NewBuilder(tc.re, nil, tc.data).InitWithOldInfoSchema(tc.is) require.NoError(tc.t, err) // applyDiff _, err = builder.ApplyDiff(meta.NewMeta(txn), diff) @@ -1147,23 +1148,30 @@ func TestApplyDiff(t *testing.T) { require.NoError(t, err) }() - tc := &infoschemaTestContext{ - t: t, - re: re, - ctx: kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), - } + for i := 0; i < 2; i++ { + if i == 0 { + // enable infoschema v2. + variable.SchemaCacheSize.Store(1000000) + } - tc.runCreateSchema() - tc.clear() - tc.runDropSchema() - tc.clear() - tc.runCreateTable("test") - tc.clear() - tc.runDropTable("test") - tc.clear() - - tc.runCreateTable("test") - tc.runModifyTable("test", model.ActionAddColumn) - tc.runModifyTable("test", model.ActionModifyColumn) - // TODO check all actions.. + tc := &infoschemaTestContext{ + t: t, + re: re, + ctx: kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), + data: infoschema.NewData(), + } + tc.runCreateSchema() + tc.clear() + tc.runDropSchema() + tc.clear() + tc.runCreateTable("test") + tc.clear() + tc.runDropTable("test") + tc.clear() + + tc.runCreateTable("test") + tc.runModifyTable("test", model.ActionAddColumn) + tc.runModifyTable("test", model.ActionModifyColumn) + } + // TODO(ywqzzy): check all actions. } diff --git a/pkg/infoschema/infoschema_v2.go b/pkg/infoschema/infoschema_v2.go index e362be15dbbbb..c303d53997220 100644 --- a/pkg/infoschema/infoschema_v2.go +++ b/pkg/infoschema/infoschema_v2.go @@ -78,12 +78,12 @@ type Data struct { // But this mapping should be synced with byName. byID *btree.BTreeG[tableItem] - tableCache fifo.Cache[tableCacheKey, table.Table] - // For the SchemaByName API, sorted by {dbName, schemaVersion} => model.DBInfo // Stores the full data in memory. schemaMap *btree.BTreeG[schemaItem] + tableCache fifo.Cache[tableCacheKey, table.Table] + // sorted by both SchemaVersion and timestamp in descending order, assume they have same order mu struct { sync.RWMutex @@ -97,7 +97,6 @@ type Data struct { func (isd *Data) getVersionByTS(ts uint64) (int64, bool) { isd.mu.RLock() defer isd.mu.RUnlock() - return isd.getVersionByTSNoLock(ts) } @@ -162,6 +161,30 @@ func (isd *Data) addDB(schemaVersion int64, dbInfo *model.DBInfo) { isd.schemaMap.Set(schemaItem{schemaVersion: schemaVersion, dbInfo: dbInfo}) } +func (isd *Data) delete(item tableItem) { + isd.tableCache.Remove(tableCacheKey{item.tableID, item.schemaVersion}) +} + +func (isd *Data) deleteDB(name model.CIStr) { + dbInfo, schemaVersion := isd.schemaByName(name) + isd.schemaMap.Delete(schemaItem{schemaVersion: schemaVersion, dbInfo: dbInfo}) +} + +func (isd *Data) schemaByName(name model.CIStr) (res *model.DBInfo, schemaVersion int64) { + var dbInfo model.DBInfo + dbInfo.Name = name + + isd.schemaMap.Descend(schemaItem{dbInfo: &dbInfo, schemaVersion: math.MaxInt64}, func(item schemaItem) bool { + if item.Name() != name.L { + return false + } + res = item.dbInfo + schemaVersion = item.schemaVersion + return false + }) + return res, schemaVersion +} + func compareByID(a, b tableItem) bool { if a.tableID < b.tableID { return true @@ -263,11 +286,12 @@ func (is *infoschemaV2) TableByID(id int64) (val table.Table, ok bool) { // Maybe the table is evicted? need to reload. ret, err := loadTableInfo(is.r, is.Data, id, itm.dbID, is.ts, is.schemaVersion) - if err == nil { - is.tableCache.Set(key, ret) - return ret, true + if err != nil || ret == nil { + return nil, false } - return nil, false + + is.tableCache.Set(key, ret) + return ret, true } func isSpecialDB(dbName string) bool { @@ -454,7 +478,18 @@ func loadTableInfo(r autoid.Requirement, infoData *Data, tblID, dbID int64, ts u // TODO load table panic!!! panic(err) } - tblInfo := res.(*model.TableInfo) // TODO: it could be missing!!! + if res == nil { + return nil, err + } + tblInfo := res.(*model.TableInfo) + + // table removed. + if tblInfo == nil { + return nil, errors.Trace(ErrTableNotExists.GenWithStackByArgs( + fmt.Sprintf("(Schema ID %d)", dbID), + fmt.Sprintf("(Table ID %d)", tblID), + )) + } ConvertCharsetCollateToLowerCaseIfNeed(tblInfo) ConvertOldVersionUTF8ToUTF8MB4IfNeed(tblInfo)