Skip to content

Commit

Permalink
infoschemaV2: support create&drop&update table/schema for v2 (#51424)
Browse files Browse the repository at this point in the history
ref #50959
  • Loading branch information
ywqzzy authored Mar 6, 2024
1 parent 9b255d5 commit 2830714
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 51 deletions.
5 changes: 3 additions & 2 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
// We can fall back to full load, don't need to return the error.
logutil.BgLogger().Error("failed to load schema diff", zap.Error(err))
}

// full load.
schemas, err := do.fetchAllSchemasWithTables(m)
if err != nil {
return nil, false, currentSchemaVersion, nil, err
Expand All @@ -303,7 +303,8 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
if err != nil {
return nil, false, currentSchemaVersion, nil, err
}

// clear data
do.infoCache.Data = infoschema.NewData()
newISBuilder, err := infoschema.NewBuilder(do, do.sysFacHack, do.infoCache.Data).InitWithDBInfos(schemas, policies, resourceGroups, neededSchemaVersion)
if err != nil {
return nil, false, currentSchemaVersion, nil, err
Expand Down
109 changes: 97 additions & 12 deletions pkg/infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
case model.ActionCreateSchema:
return nil, b.applyCreateSchema(m, diff)
case model.ActionDropSchema:
return b.applyDropSchema(diff.SchemaID), nil
return b.applyDropSchema(diff), nil
case model.ActionRecoverSchema:
return b.applyRecoverSchema(m, diff)
case model.ActionModifySchemaCharsetAndCollate:
Expand Down Expand Up @@ -399,9 +399,9 @@ func (b *Builder) dropTableForUpdate(newTableID, oldTableID int64, dbInfo *model
)
}
oldDBInfo := b.getSchemaAndCopyIfNecessary(oldRoDBInfo.Name.L)
tmpIDs = b.applyDropTable(oldDBInfo, oldTableID, tmpIDs)
tmpIDs = b.applyDropTable(diff, oldDBInfo, oldTableID, tmpIDs)
} else {
tmpIDs = b.applyDropTable(dbInfo, oldTableID, tmpIDs)
tmpIDs = b.applyDropTable(diff, dbInfo, oldTableID, tmpIDs)
}

if oldTableID != newTableID {
Expand All @@ -413,6 +413,9 @@ func (b *Builder) dropTableForUpdate(newTableID, oldTableID int64, dbInfo *model
}

func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
if b.enableV2 {
return b.applyTableUpdateV2(m, diff)
}
roDBInfo, ok := b.infoSchema.SchemaByID(diff.SchemaID)
if !ok {
return nil, ErrDatabaseNotExists.GenWithStackByArgs(
Expand Down Expand Up @@ -440,6 +443,34 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6
return tblIDs, nil
}

// TODO: more UT to check the correctness.
func (b *Builder) applyTableUpdateV2(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
oldDBInfo, ok := b.infoschemaV2.SchemaByID(diff.SchemaID)
if !ok {
return nil, ErrDatabaseNotExists.GenWithStackByArgs(
fmt.Sprintf("(Schema ID %d)", diff.SchemaID),
)
}

oldTableID, newTableID := b.getTableIDs(diff)
b.updateBundleForTableUpdate(diff, newTableID, oldTableID)

tblIDs, allocs, err := b.dropTableForUpdate(newTableID, oldTableID, oldDBInfo, diff)
if err != nil {
return nil, err
}

if tableIDIsValid(newTableID) {
// All types except DropTableOrView.
var err error
tblIDs, err = b.applyCreateTable(m, oldDBInfo, newTableID, allocs, diff.Type, tblIDs, diff.Version)
if err != nil {
return nil, errors.Trace(err)
}
}
return tblIDs, nil
}

func filterAllocators(diff *model.SchemaDiff, oldAllocs autoid.Allocators) autoid.Allocators {
var newAllocs autoid.Allocators
switch diff.Type {
Expand Down Expand Up @@ -521,8 +552,11 @@ func (b *Builder) applyModifySchemaDefaultPlacement(m *meta.Meta, diff *model.Sc
return nil
}

func (b *Builder) applyDropSchema(schemaID int64) []int64 {
di, ok := b.infoSchema.SchemaByID(schemaID)
func (b *Builder) applyDropSchema(diff *model.SchemaDiff) []int64 {
if b.enableV2 {
return b.applyDropSchemaV2(diff)
}
di, ok := b.infoSchema.SchemaByID(diff.SchemaID)
if !ok {
return nil
}
Expand All @@ -543,11 +577,56 @@ func (b *Builder) applyDropSchema(schemaID int64) []int64 {
di = di.Clone()
for _, id := range tableIDs {
b.deleteBundle(b.infoSchema, id)
b.applyDropTable(di, id, nil)
b.applyDropTable(diff, di, id, nil)
}
return tableIDs
}

func (b *Builder) applyDropSchemaV2(diff *model.SchemaDiff) []int64 {
di, ok := b.infoschemaV2.SchemaByID(diff.SchemaID)
if !ok {
return nil
}

b.infoData.deleteDB(di.Name)
tableIDs := make([]int64, 0, len(di.Tables))
for _, tbl := range di.Tables {
tableIDs = appendAffectedIDs(tableIDs, tbl)
}

di = di.Clone()
for _, id := range tableIDs {
b.deleteBundle(b.infoSchema, id)
b.applyDropTableV2(diff, di, id, nil)
}
return tableIDs
}

func (b *Builder) applyDropTableV2(diff *model.SchemaDiff, dbInfo *model.DBInfo, tableID int64, affected []int64) []int64 {
// Remove the table in temporaryTables
if b.infoSchemaMisc.temporaryTableIDs != nil {
delete(b.infoSchemaMisc.temporaryTableIDs, tableID)
}

table, ok := b.infoschemaV2.TableByID(tableID)

if !ok {
return nil
}

b.infoData.delete(tableItem{
dbName: dbInfo.Name.L,
dbID: dbInfo.ID,
tableName: table.Meta().Name.L,
tableID: table.Meta().ID,
schemaVersion: diff.Version,
})

// The old DBInfo still holds a reference to old table info, we need to remove it.
b.deleteReferredForeignKeys(dbInfo, tableID)
return affected
}

func (b *Builder) applyRecoverSchema(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
if di, ok := b.infoSchema.SchemaByID(diff.SchemaID); ok {
return nil, ErrDatabaseExists.GenWithStackByArgs(
Expand Down Expand Up @@ -726,7 +805,10 @@ func ConvertOldVersionUTF8ToUTF8MB4IfNeed(tbInfo *model.TableInfo) {
}
}

func (b *Builder) applyDropTable(dbInfo *model.DBInfo, tableID int64, affected []int64) []int64 {
func (b *Builder) applyDropTable(diff *model.SchemaDiff, dbInfo *model.DBInfo, tableID int64, affected []int64) []int64 {
if b.enableV2 {
return b.applyDropTableV2(diff, dbInfo, tableID, affected)
}
bucketIdx := tableBucketIdx(tableID)
sortedTbls := b.infoSchema.sortedTablesBuckets[bucketIdx]
idx := sortedTbls.searchTable(tableID)
Expand All @@ -745,8 +827,12 @@ func (b *Builder) applyDropTable(dbInfo *model.DBInfo, tableID int64, affected [
if b.infoSchema.temporaryTableIDs != nil {
delete(b.infoSchema.temporaryTableIDs, tableID)
}

// The old DBInfo still holds a reference to old table info, we need to remove it.
b.deleteReferredForeignKeys(dbInfo, tableID)
return affected
}

func (b *Builder) deleteReferredForeignKeys(dbInfo *model.DBInfo, tableID int64) {
for i, tblInfo := range dbInfo.Tables {
if tblInfo.ID == tableID {
if i == len(dbInfo.Tables)-1 {
Expand All @@ -758,7 +844,6 @@ func (b *Builder) applyDropTable(dbInfo *model.DBInfo, tableID int64, affected [
break
}
}
return affected
}

// Build builds and returns the built infoschema.
Expand All @@ -781,8 +866,8 @@ func (b *Builder) InitWithOldInfoSchema(oldSchema InfoSchema) (*Builder, error)
}

var oldIS *infoSchema
if proxy, ok := oldSchema.(*infoschemaV2); ok {
oldIS = proxy.infoSchema
if schemaV2, ok := oldSchema.(*infoschemaV2); ok {
oldIS = schemaV2.infoSchema
} else {
oldIS = oldSchema.(*infoSchema)
}
Expand Down Expand Up @@ -914,8 +999,8 @@ func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableF
b.addTemporaryTable(tblInfo.ID)
}
}

b.addDB(schemaVersion, di, schTbls)

return nil
}

Expand Down
66 changes: 37 additions & 29 deletions pkg/infoschema/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,7 @@ type infoschemaTestContext struct {
t *testing.T
re autoid.Requirement
ctx context.Context
data *infoschema.Data
is infoschema.InfoSchema
}

Expand All @@ -949,7 +950,7 @@ func (tc *infoschemaTestContext) createSchema() {
tc.dbInfo = dbInfo

// init infoschema
builder, err := infoschema.NewBuilder(tc.re, nil, nil).InitWithDBInfos([]*model.DBInfo{tc.dbInfo}, nil, nil, 1)
builder, err := infoschema.NewBuilder(tc.re, nil, tc.data).InitWithDBInfos([]*model.DBInfo{}, nil, nil, 1)
require.NoError(tc.t, err)
tc.is = builder.Build()
}
Expand All @@ -958,7 +959,7 @@ func (tc *infoschemaTestContext) runCreateSchema() {
// create schema
tc.createSchema()

tc.applyDiffAddCheck(&model.SchemaDiff{Type: model.ActionCreateSchema, SchemaID: tc.dbInfo.ID}, func(tc *infoschemaTestContext) {
tc.applyDiffAndCheck(&model.SchemaDiff{Type: model.ActionCreateSchema, SchemaID: tc.dbInfo.ID}, func(tc *infoschemaTestContext) {
dbInfo, ok := tc.is.SchemaByID(tc.dbInfo.ID)
require.True(tc.t, ok)
require.Equal(tc.t, dbInfo.Name, tc.dbInfo.Name)
Expand All @@ -977,11 +978,11 @@ func (tc *infoschemaTestContext) dropSchema() {
func (tc *infoschemaTestContext) runDropSchema() {
// create schema
tc.runCreateSchema()

// drop schema
tc.dropSchema()
tc.applyDiffAddCheck(&model.SchemaDiff{Type: model.ActionDropSchema, SchemaID: tc.dbInfo.ID}, func(tc *infoschemaTestContext) {
_, ok := tc.is.SchemaByID(tc.dbInfo.ID)

tc.applyDiffAndCheck(&model.SchemaDiff{Type: model.ActionDropSchema, SchemaID: tc.dbInfo.ID, Version: 100}, func(tc *infoschemaTestContext) {
_, ok := tc.is.SchemaByName(tc.dbInfo.Name)
require.False(tc.t, ok)
})
}
Expand Down Expand Up @@ -1024,7 +1025,7 @@ func (tc *infoschemaTestContext) runCreateTable(tblName string) int64 {
// create table
tblID := tc.createTable(tblName)

tc.applyDiffAddCheck(&model.SchemaDiff{Type: model.ActionCreateTable, SchemaID: tc.dbInfo.ID, TableID: tblID}, func(tc *infoschemaTestContext) {
tc.applyDiffAndCheck(&model.SchemaDiff{Type: model.ActionCreateTable, SchemaID: tc.dbInfo.ID, TableID: tblID}, func(tc *infoschemaTestContext) {
tbl, ok := tc.is.TableByID(tblID)
require.True(tc.t, ok)
require.Equal(tc.t, tbl.Meta().Name.O, tblName)
Expand All @@ -1047,7 +1048,7 @@ func (tc *infoschemaTestContext) runDropTable(tblName string) {

// dropTable
tc.dropTable(tblName, tblID)
tc.applyDiffAddCheck(&model.SchemaDiff{Type: model.ActionDropTable, SchemaID: tc.dbInfo.ID, TableID: tblID}, func(tc *infoschemaTestContext) {
tc.applyDiffAndCheck(&model.SchemaDiff{Type: model.ActionDropTable, SchemaID: tc.dbInfo.ID, TableID: tblID}, func(tc *infoschemaTestContext) {
tbl, ok := tc.is.TableByID(tblID)
require.False(tc.t, ok)
require.Nil(tc.t, tbl)
Expand All @@ -1070,7 +1071,7 @@ func (tc *infoschemaTestContext) runAddColumn(tblName string) {
require.NoError(tc.t, err)

tc.addColumn(tbl.Meta())
tc.applyDiffAddCheck(&model.SchemaDiff{Type: model.ActionAddColumn, SchemaID: tc.dbInfo.ID, TableID: tbl.Meta().ID}, func(tc *infoschemaTestContext) {
tc.applyDiffAndCheck(&model.SchemaDiff{Type: model.ActionAddColumn, SchemaID: tc.dbInfo.ID, TableID: tbl.Meta().ID}, func(tc *infoschemaTestContext) {
tbl, ok := tc.is.TableByID(tbl.Meta().ID)
require.True(tc.t, ok)
require.Equal(tc.t, 2, len(tbl.Cols()))
Expand Down Expand Up @@ -1103,7 +1104,7 @@ func (tc *infoschemaTestContext) runModifyColumn(tblName string) {
require.NoError(tc.t, err)

tc.modifyColumn(tbl.Meta())
tc.applyDiffAddCheck(&model.SchemaDiff{Type: model.ActionModifyColumn, SchemaID: tc.dbInfo.ID, TableID: tbl.Meta().ID}, func(tc *infoschemaTestContext) {
tc.applyDiffAndCheck(&model.SchemaDiff{Type: model.ActionModifyColumn, SchemaID: tc.dbInfo.ID, TableID: tbl.Meta().ID}, func(tc *infoschemaTestContext) {
tbl, ok := tc.is.TableByID(tbl.Meta().ID)
require.True(tc.t, ok)
require.Equal(tc.t, "test", tbl.Cols()[0].Comment)
Expand All @@ -1122,11 +1123,11 @@ func (tc *infoschemaTestContext) modifyColumn(tblInfo *model.TableInfo) {
require.NoError(tc.t, err)
}

func (tc *infoschemaTestContext) applyDiffAddCheck(diff *model.SchemaDiff, checkFn func(tc *infoschemaTestContext)) {
func (tc *infoschemaTestContext) applyDiffAndCheck(diff *model.SchemaDiff, checkFn func(tc *infoschemaTestContext)) {
txn, err := tc.re.Store().Begin()
require.NoError(tc.t, err)

builder, err := infoschema.NewBuilder(tc.re, nil, nil).InitWithOldInfoSchema(tc.is)
builder, err := infoschema.NewBuilder(tc.re, nil, tc.data).InitWithOldInfoSchema(tc.is)
require.NoError(tc.t, err)
// applyDiff
_, err = builder.ApplyDiff(meta.NewMeta(txn), diff)
Expand All @@ -1147,23 +1148,30 @@ func TestApplyDiff(t *testing.T) {
require.NoError(t, err)
}()

tc := &infoschemaTestContext{
t: t,
re: re,
ctx: kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL),
}
for i := 0; i < 2; i++ {
if i == 0 {
// enable infoschema v2.
variable.SchemaCacheSize.Store(1000000)
}

tc.runCreateSchema()
tc.clear()
tc.runDropSchema()
tc.clear()
tc.runCreateTable("test")
tc.clear()
tc.runDropTable("test")
tc.clear()

tc.runCreateTable("test")
tc.runModifyTable("test", model.ActionAddColumn)
tc.runModifyTable("test", model.ActionModifyColumn)
// TODO check all actions..
tc := &infoschemaTestContext{
t: t,
re: re,
ctx: kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL),
data: infoschema.NewData(),
}
tc.runCreateSchema()
tc.clear()
tc.runDropSchema()
tc.clear()
tc.runCreateTable("test")
tc.clear()
tc.runDropTable("test")
tc.clear()

tc.runCreateTable("test")
tc.runModifyTable("test", model.ActionAddColumn)
tc.runModifyTable("test", model.ActionModifyColumn)
}
// TODO(ywqzzy): check all actions.
}
Loading

0 comments on commit 2830714

Please sign in to comment.