Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix view tracking on sharded keyspace #15436

Merged
merged 8 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions go/test/endtoend/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,23 @@ func WaitForKsError(t *testing.T, vtgateProcess cluster.VtgateProcess, ks string
var ok bool
errString, ok = ksErr.(string)
return ok
})
}, "Waiting for error")
return errString
}

// WaitForVschemaCondition waits for the condition to be true
func WaitForVschemaCondition(t *testing.T, vtgateProcess cluster.VtgateProcess, ks string, conditionMet func(t *testing.T, keyspace map[string]interface{}) bool) {
func WaitForVschemaCondition(
t *testing.T,
vtgateProcess cluster.VtgateProcess,
ks string,
conditionMet func(t *testing.T, keyspace map[string]interface{}) bool,
message string,
) {
timeout := time.After(60 * time.Second)
for {
select {
case <-timeout:
t.Fatalf("schema tracking did not met the condition within the time for keyspace: %s", ks)
t.Fatalf("schema tracking did not met the condition within the time for keyspace: %s\n%s", ks, message)
default:
res, err := vtgateProcess.ReadVSchema()
require.NoError(t, err, res)
Expand All @@ -305,12 +311,12 @@ func WaitForVschemaCondition(t *testing.T, vtgateProcess cluster.VtgateProcess,
}

// WaitForTableDeletions waits for a table to be deleted
func WaitForTableDeletions(ctx context.Context, t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl string) {
func WaitForTableDeletions(t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl string) {
WaitForVschemaCondition(t, vtgateProcess, ks, func(t *testing.T, keyspace map[string]interface{}) bool {
tablesMap := keyspace["tables"]
_, isPresent := convertToMap(tablesMap)[tbl]
return !isPresent
})
}, "Waiting for table to be deleted")
}

// WaitForColumn waits for a table's column to be present
Expand Down
7 changes: 3 additions & 4 deletions go/test/endtoend/vtgate/foreignkey/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,12 +1175,11 @@ func TestCyclicFks(t *testing.T) {
// Drop the cyclic foreign key constraint.
utils.Exec(t, mcmp.VtConn, "alter table fk_t10 drop foreign key test_cyclic_fks")

// Wait for schema-tracking to be complete.
utils.WaitForVschemaCondition(t, clusterInstance.VtgateProcess, unshardedKs, func(t *testing.T, keyspace map[string]interface{}) bool {
// Let's clean out the cycle so that the other tests don't fail
utils.WaitForVschemaCondition(t, clusterInstance.VtgateProcess, unshardedKs, func(t *testing.T, keyspace map[string]any) bool {
_, fieldPresent := keyspace["error"]
return !fieldPresent
})

}, "wait for error to disappear")
}

func TestReplace(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,10 +786,8 @@ func createInitialSchema(t *testing.T, tcase *testCase) {
}
})
t.Run("waiting for vschema deletions to apply", func(t *testing.T) {
timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
for _, tableName := range tableNames {
utils.WaitForTableDeletions(timeoutCtx, t, clusterInstance.VtgateProcess, keyspaceName, tableName)
utils.WaitForTableDeletions(t, clusterInstance.VtgateProcess, keyspaceName, tableName)
}
})
t.Run("creating tables", func(t *testing.T) {
Expand Down
13 changes: 0 additions & 13 deletions go/test/endtoend/vtgate/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,19 +321,6 @@ func TestCreateIndex(t *testing.T) {
utils.Exec(t, conn, `create index i2 on ks.t1000 (id1)`)
}

func TestCreateView(t *testing.T) {
// The test wont work since we cant change the vschema without reloading the vtgate.
t.Skip()
conn, closer := start(t)
defer closer()
// Test that create view works and the output is as expected
utils.Exec(t, conn, `create view v1 as select * from t1`)
utils.Exec(t, conn, `insert into t1(id1, id2) values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)`)
// This wont work, since ALTER VSCHEMA ADD TABLE is only supported for unsharded keyspaces
utils.Exec(t, conn, "alter vschema add table v1")
utils.AssertMatches(t, conn, "select * from v1", `[[INT64(1) INT64(1)] [INT64(2) INT64(2)] [INT64(3) INT64(3)] [INT64(4) INT64(4)] [INT64(5) INT64(5)]]`)
}

func TestVersions(t *testing.T) {
conn, closer := start(t)
defer closer()
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/vtgate/queries/misc/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func TestMain(m *testing.M) {
return 1
}

clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--enable-views")
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--queryserver-enable-views")

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
Expand Down
54 changes: 54 additions & 0 deletions go/test/endtoend/vtgate/queries/misc/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strconv"
"strings"
"testing"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -371,3 +372,56 @@ func TestAliasesInOuterJoinQueries(t *testing.T) {
mcmp.AssertMatches("select t1.id1 as t0, t1.id1 as t1, tbl.unq_col as col from t1 left outer join tbl on t1.id2 = tbl.nonunq_col order by t1.id2 limit 2", `[[INT64(1) INT64(1) INT64(42)] [INT64(42) INT64(42) NULL]]`)
mcmp.ExecWithColumnCompare("select t1.id1 as t0, t1.id1 as t1, tbl.unq_col as col from t1 left outer join tbl on t1.id2 = tbl.nonunq_col order by t1.id2 limit 2")
}

func TestAlterTableWithView(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 20, "vtgate")
mcmp, closer := start(t)
defer closer()

// Test that create/alter view works and the output is as expected
mcmp.Exec(`use ks_misc`)
mcmp.Exec(`create view v1 as select * from t1`)
var viewDef string
utils.WaitForVschemaCondition(t, clusterInstance.VtgateProcess, keyspaceName, func(t *testing.T, ksMap map[string]any) bool {
views, ok := ksMap["views"]
if !ok {
return false
}
viewsMap := views.(map[string]any)
view, ok := viewsMap["v1"]
if ok {
viewDef = view.(string)
}
return ok
}, "Waiting for view creation")
mcmp.Exec(`insert into t1(id1, id2) values (1, 1)`)
mcmp.AssertMatches("select * from v1", `[[INT64(1) INT64(1)]]`)

// alter table add column
mcmp.Exec(`alter table t1 add column test bigint`)
time.Sleep(10 * time.Second)
mcmp.Exec(`alter view v1 as select * from t1`)

waitForChange := func(t *testing.T, ksMap map[string]any) bool {
// wait for the view definition to change
views := ksMap["views"]
viewsMap := views.(map[string]any)
newView := viewsMap["v1"]
if newView.(string) == viewDef {
return false
}
viewDef = newView.(string)
return true
}
utils.WaitForVschemaCondition(t, clusterInstance.VtgateProcess, keyspaceName, waitForChange, "Waiting for alter view")

mcmp.AssertMatches("select * from v1", `[[INT64(1) INT64(1) NULL]]`)

// alter table remove column
mcmp.Exec(`alter table t1 drop column test`)
mcmp.Exec(`alter view v1 as select * from t1`)

utils.WaitForVschemaCondition(t, clusterInstance.VtgateProcess, keyspaceName, waitForChange, "Waiting for alter view")

mcmp.AssertMatches("select * from v1", `[[INT64(1) INT64(1)]]`)
}
76 changes: 23 additions & 53 deletions go/vt/vtgate/planbuilder/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ func buildDDLPlans(ctx context.Context, sql string, ddlStatement sqlparser.DDLSt
}
err = checkFKError(vschema, ddlStatement, keyspace)
case *sqlparser.CreateView:
destination, keyspace, err = buildCreateView(ctx, vschema, ddl, reservedVars, enableOnlineDDL, enableDirectDDL)
destination, keyspace, err = buildCreateViewCommon(ctx, vschema, reservedVars, enableOnlineDDL, enableDirectDDL, ddl.Select, ddl)
case *sqlparser.AlterView:
destination, keyspace, err = buildAlterView(ctx, vschema, ddl, reservedVars, enableOnlineDDL, enableDirectDDL)
destination, keyspace, err = buildCreateViewCommon(ctx, vschema, reservedVars, enableOnlineDDL, enableDirectDDL, ddl.Select, ddl)
case *sqlparser.DropView:
destination, keyspace, err = buildDropView(vschema, ddlStatement)
case *sqlparser.DropTable:
Expand Down Expand Up @@ -192,64 +192,43 @@ func findTableDestinationAndKeyspace(vschema plancontext.VSchema, ddlStatement s
return destination, keyspace, nil
}

func buildAlterView(ctx context.Context, vschema plancontext.VSchema, ddl *sqlparser.AlterView, reservedVars *sqlparser.ReservedVars, enableOnlineDDL, enableDirectDDL bool) (key.Destination, *vindexes.Keyspace, error) {
// For Alter View, we require that the view exist and the select query can be satisfied within the keyspace itself
func buildCreateViewCommon(
ctx context.Context,
vschema plancontext.VSchema,
reservedVars *sqlparser.ReservedVars,
enableOnlineDDL, enableDirectDDL bool,
ddlSelect sqlparser.SelectStatement,
ddl sqlparser.DDLStatement,
) (key.Destination, *vindexes.Keyspace, error) {
// For Create View, we require that the keyspace exist and the select query can be satisfied within the keyspace itself
// We should remove the keyspace name from the table name, as the database name in MySQL might be different than the keyspace name
destination, keyspace, err := findTableDestinationAndKeyspace(vschema, ddl)
if err != nil {
return nil, nil, err
}

selectPlan, err := createInstructionFor(ctx, sqlparser.String(ddl.Select), ddl.Select, reservedVars, vschema, enableOnlineDDL, enableDirectDDL)
// because we don't trust the schema tracker to have up-to-date info, we don't want to expand any SELECT * here
var expressions []sqlparser.SelectExprs
_ = sqlparser.VisitAllSelects(ddlSelect, func(p *sqlparser.Select, idx int) error {
expressions = append(expressions, sqlparser.CloneSelectExprs(p.SelectExprs))
return nil
})
selectPlan, err := createInstructionFor(ctx, sqlparser.String(ddlSelect), ddlSelect, reservedVars, vschema, enableOnlineDDL, enableDirectDDL)
if err != nil {
return nil, nil, err
}
selPlanKs := selectPlan.primitive.GetKeyspaceName()
if keyspace.Name != selPlanKs {
return nil, nil, vterrors.VT12001(ViewDifferentKeyspace)
}
if vschema.IsViewsEnabled() {
if keyspace == nil {
return nil, nil, vterrors.VT09005()
}
return destination, keyspace, nil
}
isRoutePlan, opCode := tryToGetRoutePlan(selectPlan.primitive)
if !isRoutePlan {
return nil, nil, vterrors.VT12001(ViewComplex)
}
if opCode != engine.Unsharded && opCode != engine.EqualUnique && opCode != engine.Scatter {
return nil, nil, vterrors.VT12001(ViewComplex)
}
_ = sqlparser.SafeRewrite(ddl.Select, nil, func(cursor *sqlparser.Cursor) bool {
switch tableName := cursor.Node().(type) {
case sqlparser.TableName:
cursor.Replace(sqlparser.TableName{
Name: tableName.Name,
})
}
return true

_ = sqlparser.VisitAllSelects(ddlSelect, func(p *sqlparser.Select, idx int) error {
p.SelectExprs = expressions[idx]
return nil
})
return destination, keyspace, nil
}

func buildCreateView(ctx context.Context, vschema plancontext.VSchema, ddl *sqlparser.CreateView, reservedVars *sqlparser.ReservedVars, enableOnlineDDL, enableDirectDDL bool) (key.Destination, *vindexes.Keyspace, error) {
// For Create View, we require that the keyspace exist and the select query can be satisfied within the keyspace itself
// We should remove the keyspace name from the table name, as the database name in MySQL might be different than the keyspace name
destination, keyspace, _, err := vschema.TargetDestination(ddl.ViewName.Qualifier.String())
if err != nil {
return nil, nil, err
}
ddl.ViewName.Qualifier = sqlparser.NewIdentifierCS("")
sqlparser.RemoveKeyspace(ddl)

selectPlan, err := createInstructionFor(ctx, sqlparser.String(ddl.Select), ddl.Select, reservedVars, vschema, enableOnlineDDL, enableDirectDDL)
if err != nil {
return nil, nil, err
}
selPlanKs := selectPlan.primitive.GetKeyspaceName()
if keyspace.Name != selPlanKs {
return nil, nil, vterrors.VT12001(ViewDifferentKeyspace)
}
if vschema.IsViewsEnabled() {
if keyspace == nil {
return nil, nil, vterrors.VT09005()
Expand All @@ -263,15 +242,6 @@ func buildCreateView(ctx context.Context, vschema plancontext.VSchema, ddl *sqlp
if opCode != engine.Unsharded && opCode != engine.EqualUnique && opCode != engine.Scatter {
return nil, nil, vterrors.VT12001(ViewComplex)
}
_ = sqlparser.SafeRewrite(ddl.Select, nil, func(cursor *sqlparser.Cursor) bool {
switch tableName := cursor.Node().(type) {
case sqlparser.TableName:
cursor.Replace(sqlparser.TableName{
Name: tableName.Name,
})
}
return true
})
return destination, keyspace, nil
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/testdata/ddl_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@
"Name": "user",
"Sharded": true
},
"Query": "create view tmp_view as select user_id, col1, col2 from authoritative"
"Query": "create view tmp_view as select * from authoritative"
},
"TablesUsed": [
"user.tmp_view"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
"Name": "user",
"Sharded": true
},
"Query": "create view view_a as select user_id, col1, col2 from authoritative"
"Query": "create view view_a as select * from authoritative"
},
"TablesUsed": [
"user.view_a"
Expand All @@ -144,7 +144,7 @@
"Name": "user",
"Sharded": true
},
"Query": "create view view_a as select a.user_id, a.col1, a.col2, b.user_id, b.col1, b.col2 from authoritative as a join authoritative as b on a.user_id = b.user_id"
"Query": "create view view_a as select * from authoritative as a join authoritative as b on a.user_id = b.user_id"
},
"TablesUsed": [
"user.view_a"
Expand All @@ -163,7 +163,7 @@
"Name": "user",
"Sharded": true
},
"Query": "create view view_a as select user_id, col1, col2 from authoritative as a"
"Query": "create view view_a as select a.* from authoritative as a"
},
"TablesUsed": [
"user.view_a"
Expand Down Expand Up @@ -201,7 +201,7 @@
"Name": "user",
"Sharded": true
},
"Query": "create view view_a as select `user`.id, a.user_id, a.col1, a.col2, `user`.col1 from authoritative as a join `user` on a.user_id = `user`.id"
"Query": "create view view_a as select `user`.id, a.*, `user`.col1 from authoritative as a join `user` on a.user_id = `user`.id"
},
"TablesUsed": [
"user.view_a"
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/testdata/view_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"Name": "user",
"Sharded": true
},
"Query": "alter view user_extra as select * from `user`.`user`"
"Query": "alter view user_extra as select * from `user`"
},
"TablesUsed": [
"user.user_extra"
Expand All @@ -35,7 +35,7 @@
"Name": "user",
"Sharded": true
},
"Query": "create view view_ac as select user_id, col1, col2 from authoritative"
"Query": "create view view_ac as select * from authoritative"
},
"TablesUsed": [
"user.view_ac"
Expand Down
Loading