Skip to content

Commit

Permalink
[release-19.0] Fix view tracking on sharded keyspace (#15436) (#15477)
Browse files Browse the repository at this point in the history
Co-authored-by: Harshit Gangal <[email protected]>
Co-authored-by: Andres Taylor <[email protected]>
  • Loading branch information
3 people authored Mar 14, 2024
1 parent b719540 commit 791cb84
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 85 deletions.
16 changes: 11 additions & 5 deletions go/test/endtoend/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,17 +265,23 @@ func WaitForKsError(t *testing.T, vtgateProcess cluster.VtgateProcess, ks string
var ok bool
errString, ok = ksErr.(string)
return ok
})
}, "Waiting for error")
return errString
}

// WaitForVschemaCondition waits for the condition to be true
func WaitForVschemaCondition(t *testing.T, vtgateProcess cluster.VtgateProcess, ks string, conditionMet func(t *testing.T, keyspace map[string]interface{}) bool) {
func WaitForVschemaCondition(
t *testing.T,
vtgateProcess cluster.VtgateProcess,
ks string,
conditionMet func(t *testing.T, keyspace map[string]interface{}) bool,
message string,
) {
timeout := time.After(60 * time.Second)
for {
select {
case <-timeout:
t.Fatalf("schema tracking did not met the condition within the time for keyspace: %s", ks)
t.Fatalf("schema tracking did not met the condition within the time for keyspace: %s\n%s", ks, message)
default:
res, err := vtgateProcess.ReadVSchema()
require.NoError(t, err, res)
Expand All @@ -290,12 +296,12 @@ func WaitForVschemaCondition(t *testing.T, vtgateProcess cluster.VtgateProcess,
}

// WaitForTableDeletions waits for a table to be deleted
func WaitForTableDeletions(ctx context.Context, t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl string) {
func WaitForTableDeletions(t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl string) {
WaitForVschemaCondition(t, vtgateProcess, ks, func(t *testing.T, keyspace map[string]interface{}) bool {
tablesMap := keyspace["tables"]
_, isPresent := convertToMap(tablesMap)[tbl]
return !isPresent
})
}, "Waiting for table to be deleted")
}

// WaitForColumn waits for a table's column to be present
Expand Down
7 changes: 3 additions & 4 deletions go/test/endtoend/vtgate/foreignkey/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,12 +1078,11 @@ func TestCyclicFks(t *testing.T) {
// Drop the cyclic foreign key constraint.
utils.Exec(t, mcmp.VtConn, "alter table fk_t10 drop foreign key test_cyclic_fks")

// Wait for schema-tracking to be complete.
utils.WaitForVschemaCondition(t, clusterInstance.VtgateProcess, unshardedKs, func(t *testing.T, keyspace map[string]interface{}) bool {
// Let's clean out the cycle so that the other tests don't fail
utils.WaitForVschemaCondition(t, clusterInstance.VtgateProcess, unshardedKs, func(t *testing.T, keyspace map[string]any) bool {
_, fieldPresent := keyspace["error"]
return !fieldPresent
})

}, "wait for error to disappear")
}

func TestReplace(t *testing.T) {
Expand Down
4 changes: 1 addition & 3 deletions go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,10 +786,8 @@ func createInitialSchema(t *testing.T, tcase *testCase) {
}
})
t.Run("waiting for vschema deletions to apply", func(t *testing.T) {
timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
for _, tableName := range tableNames {
utils.WaitForTableDeletions(timeoutCtx, t, clusterInstance.VtgateProcess, keyspaceName, tableName)
utils.WaitForTableDeletions(t, clusterInstance.VtgateProcess, keyspaceName, tableName)
}
})
t.Run("creating tables", func(t *testing.T) {
Expand Down
13 changes: 0 additions & 13 deletions go/test/endtoend/vtgate/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,19 +321,6 @@ func TestCreateIndex(t *testing.T) {
utils.Exec(t, conn, `create index i2 on ks.t1000 (id1)`)
}

func TestCreateView(t *testing.T) {
// The test wont work since we cant change the vschema without reloading the vtgate.
t.Skip()
conn, closer := start(t)
defer closer()
// Test that create view works and the output is as expected
utils.Exec(t, conn, `create view v1 as select * from t1`)
utils.Exec(t, conn, `insert into t1(id1, id2) values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)`)
// This wont work, since ALTER VSCHEMA ADD TABLE is only supported for unsharded keyspaces
utils.Exec(t, conn, "alter vschema add table v1")
utils.AssertMatches(t, conn, "select * from v1", `[[INT64(1) INT64(1)] [INT64(2) INT64(2)] [INT64(3) INT64(3)] [INT64(4) INT64(4)] [INT64(5) INT64(5)]]`)
}

func TestVersions(t *testing.T) {
conn, closer := start(t)
defer closer()
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/vtgate/queries/misc/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func TestMain(m *testing.M) {
return 1
}

clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--enable-views")
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--queryserver-enable-views")

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
Expand Down
54 changes: 54 additions & 0 deletions go/test/endtoend/vtgate/queries/misc/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strconv"
"strings"
"testing"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -317,3 +318,56 @@ func TestTransactionModeVar(t *testing.T) {
})
}
}

func TestAlterTableWithView(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 19, "vtgate")
mcmp, closer := start(t)
defer closer()

// Test that create/alter view works and the output is as expected
mcmp.Exec(`use ks_misc`)
mcmp.Exec(`create view v1 as select * from t1`)
var viewDef string
utils.WaitForVschemaCondition(t, clusterInstance.VtgateProcess, keyspaceName, func(t *testing.T, ksMap map[string]any) bool {
views, ok := ksMap["views"]
if !ok {
return false
}
viewsMap := views.(map[string]any)
view, ok := viewsMap["v1"]
if ok {
viewDef = view.(string)
}
return ok
}, "Waiting for view creation")
mcmp.Exec(`insert into t1(id1, id2) values (1, 1)`)
mcmp.AssertMatches("select * from v1", `[[INT64(1) INT64(1)]]`)

// alter table add column
mcmp.Exec(`alter table t1 add column test bigint`)
time.Sleep(10 * time.Second)
mcmp.Exec(`alter view v1 as select * from t1`)

waitForChange := func(t *testing.T, ksMap map[string]any) bool {
// wait for the view definition to change
views := ksMap["views"]
viewsMap := views.(map[string]any)
newView := viewsMap["v1"]
if newView.(string) == viewDef {
return false
}
viewDef = newView.(string)
return true
}
utils.WaitForVschemaCondition(t, clusterInstance.VtgateProcess, keyspaceName, waitForChange, "Waiting for alter view")

mcmp.AssertMatches("select * from v1", `[[INT64(1) INT64(1) NULL]]`)

// alter table remove column
mcmp.Exec(`alter table t1 drop column test`)
mcmp.Exec(`alter view v1 as select * from t1`)

utils.WaitForVschemaCondition(t, clusterInstance.VtgateProcess, keyspaceName, waitForChange, "Waiting for alter view")

mcmp.AssertMatches("select * from v1", `[[INT64(1) INT64(1)]]`)
}
76 changes: 23 additions & 53 deletions go/vt/vtgate/planbuilder/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ func buildDDLPlans(ctx context.Context, sql string, ddlStatement sqlparser.DDLSt
}
err = checkFKError(vschema, ddlStatement, keyspace)
case *sqlparser.CreateView:
destination, keyspace, err = buildCreateView(ctx, vschema, ddl, reservedVars, enableOnlineDDL, enableDirectDDL)
destination, keyspace, err = buildCreateViewCommon(ctx, vschema, reservedVars, enableOnlineDDL, enableDirectDDL, ddl.Select, ddl)
case *sqlparser.AlterView:
destination, keyspace, err = buildAlterView(ctx, vschema, ddl, reservedVars, enableOnlineDDL, enableDirectDDL)
destination, keyspace, err = buildCreateViewCommon(ctx, vschema, reservedVars, enableOnlineDDL, enableDirectDDL, ddl.Select, ddl)
case *sqlparser.DropView:
destination, keyspace, err = buildDropView(vschema, ddlStatement)
case *sqlparser.DropTable:
Expand Down Expand Up @@ -192,64 +192,43 @@ func findTableDestinationAndKeyspace(vschema plancontext.VSchema, ddlStatement s
return destination, keyspace, nil
}

func buildAlterView(ctx context.Context, vschema plancontext.VSchema, ddl *sqlparser.AlterView, reservedVars *sqlparser.ReservedVars, enableOnlineDDL, enableDirectDDL bool) (key.Destination, *vindexes.Keyspace, error) {
// For Alter View, we require that the view exist and the select query can be satisfied within the keyspace itself
func buildCreateViewCommon(
ctx context.Context,
vschema plancontext.VSchema,
reservedVars *sqlparser.ReservedVars,
enableOnlineDDL, enableDirectDDL bool,
ddlSelect sqlparser.SelectStatement,
ddl sqlparser.DDLStatement,
) (key.Destination, *vindexes.Keyspace, error) {
// For Create View, we require that the keyspace exist and the select query can be satisfied within the keyspace itself
// We should remove the keyspace name from the table name, as the database name in MySQL might be different than the keyspace name
destination, keyspace, err := findTableDestinationAndKeyspace(vschema, ddl)
if err != nil {
return nil, nil, err
}

selectPlan, err := createInstructionFor(ctx, sqlparser.String(ddl.Select), ddl.Select, reservedVars, vschema, enableOnlineDDL, enableDirectDDL)
// because we don't trust the schema tracker to have up-to-date info, we don't want to expand any SELECT * here
var expressions []sqlparser.SelectExprs
_ = sqlparser.VisitAllSelects(ddlSelect, func(p *sqlparser.Select, idx int) error {
expressions = append(expressions, sqlparser.CloneSelectExprs(p.SelectExprs))
return nil
})
selectPlan, err := createInstructionFor(ctx, sqlparser.String(ddlSelect), ddlSelect, reservedVars, vschema, enableOnlineDDL, enableDirectDDL)
if err != nil {
return nil, nil, err
}
selPlanKs := selectPlan.primitive.GetKeyspaceName()
if keyspace.Name != selPlanKs {
return nil, nil, vterrors.VT12001(ViewDifferentKeyspace)
}
if vschema.IsViewsEnabled() {
if keyspace == nil {
return nil, nil, vterrors.VT09005()
}
return destination, keyspace, nil
}
isRoutePlan, opCode := tryToGetRoutePlan(selectPlan.primitive)
if !isRoutePlan {
return nil, nil, vterrors.VT12001(ViewComplex)
}
if opCode != engine.Unsharded && opCode != engine.EqualUnique && opCode != engine.Scatter {
return nil, nil, vterrors.VT12001(ViewComplex)
}
_ = sqlparser.SafeRewrite(ddl.Select, nil, func(cursor *sqlparser.Cursor) bool {
switch tableName := cursor.Node().(type) {
case sqlparser.TableName:
cursor.Replace(sqlparser.TableName{
Name: tableName.Name,
})
}
return true

_ = sqlparser.VisitAllSelects(ddlSelect, func(p *sqlparser.Select, idx int) error {
p.SelectExprs = expressions[idx]
return nil
})
return destination, keyspace, nil
}

func buildCreateView(ctx context.Context, vschema plancontext.VSchema, ddl *sqlparser.CreateView, reservedVars *sqlparser.ReservedVars, enableOnlineDDL, enableDirectDDL bool) (key.Destination, *vindexes.Keyspace, error) {
// For Create View, we require that the keyspace exist and the select query can be satisfied within the keyspace itself
// We should remove the keyspace name from the table name, as the database name in MySQL might be different than the keyspace name
destination, keyspace, _, err := vschema.TargetDestination(ddl.ViewName.Qualifier.String())
if err != nil {
return nil, nil, err
}
ddl.ViewName.Qualifier = sqlparser.NewIdentifierCS("")
sqlparser.RemoveKeyspace(ddl)

selectPlan, err := createInstructionFor(ctx, sqlparser.String(ddl.Select), ddl.Select, reservedVars, vschema, enableOnlineDDL, enableDirectDDL)
if err != nil {
return nil, nil, err
}
selPlanKs := selectPlan.primitive.GetKeyspaceName()
if keyspace.Name != selPlanKs {
return nil, nil, vterrors.VT12001(ViewDifferentKeyspace)
}
if vschema.IsViewsEnabled() {
if keyspace == nil {
return nil, nil, vterrors.VT09005()
Expand All @@ -263,15 +242,6 @@ func buildCreateView(ctx context.Context, vschema plancontext.VSchema, ddl *sqlp
if opCode != engine.Unsharded && opCode != engine.EqualUnique && opCode != engine.Scatter {
return nil, nil, vterrors.VT12001(ViewComplex)
}
_ = sqlparser.SafeRewrite(ddl.Select, nil, func(cursor *sqlparser.Cursor) bool {
switch tableName := cursor.Node().(type) {
case sqlparser.TableName:
cursor.Replace(sqlparser.TableName{
Name: tableName.Name,
})
}
return true
})
return destination, keyspace, nil
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/testdata/ddl_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@
"Name": "user",
"Sharded": true
},
"Query": "create view tmp_view as select user_id, col1, col2 from authoritative"
"Query": "create view tmp_view as select * from authoritative"
},
"TablesUsed": [
"user.tmp_view"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
"Name": "user",
"Sharded": true
},
"Query": "create view view_a as select user_id, col1, col2 from authoritative"
"Query": "create view view_a as select * from authoritative"
},
"TablesUsed": [
"user.view_a"
Expand All @@ -144,7 +144,7 @@
"Name": "user",
"Sharded": true
},
"Query": "create view view_a as select a.user_id as user_id, a.col1 as col1, a.col2 as col2, b.user_id as user_id, b.col1 as col1, b.col2 as col2 from authoritative as a join authoritative as b on a.user_id = b.user_id"
"Query": "create view view_a as select * from authoritative as a join authoritative as b on a.user_id = b.user_id"
},
"TablesUsed": [
"user.view_a"
Expand All @@ -163,7 +163,7 @@
"Name": "user",
"Sharded": true
},
"Query": "create view view_a as select user_id, col1, col2 from authoritative as a"
"Query": "create view view_a as select a.* from authoritative as a"
},
"TablesUsed": [
"user.view_a"
Expand Down Expand Up @@ -201,7 +201,7 @@
"Name": "user",
"Sharded": true
},
"Query": "create view view_a as select `user`.id, a.user_id as user_id, a.col1 as col1, a.col2 as col2, `user`.col1 from authoritative as a join `user` on a.user_id = `user`.id"
"Query": "create view view_a as select `user`.id, a.*, `user`.col1 from authoritative as a join `user` on a.user_id = `user`.id"
},
"TablesUsed": [
"user.view_a"
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/testdata/view_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"Name": "user",
"Sharded": true
},
"Query": "alter view user_extra as select * from `user`.`user`"
"Query": "alter view user_extra as select * from `user`"
},
"TablesUsed": [
"user.user_extra"
Expand All @@ -35,7 +35,7 @@
"Name": "user",
"Sharded": true
},
"Query": "create view view_ac as select user_id, col1, col2 from authoritative"
"Query": "create view view_ac as select * from authoritative"
},
"TablesUsed": [
"user.view_ac"
Expand Down

0 comments on commit 791cb84

Please sign in to comment.