Skip to content

Commit

Permalink
Add more e2e tests and helper func to get vindex
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 committed Jan 5, 2025
1 parent 128198d commit 983a415
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 34 deletions.
38 changes: 32 additions & 6 deletions go/test/endtoend/vreplication/lookup_vindex_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,18 @@ func (lv *lookupVindex) create() {
err := vc.VtctldClient.ExecuteCommand(args...)
require.NoError(lv.t, err, "error executing LookupVindex create: %v", err)
waitForWorkflowState(lv.t, vc, fmt.Sprintf("%s.%s", lv.ownerTableKeyspace, lv.name), binlogdatapb.VReplicationWorkflowState_Running.String())
lv.expectWriteOnly(true)
lv.expectParamsAndOwner(true)
}

func (lv *lookupVindex) cancel() {
panic("not implemented")
args := []string{
"LookupVindex",
"--name", lv.name,
"--table-keyspace=" + lv.ownerTableKeyspace,
"cancel",
}
err := vc.VtctldClient.ExecuteCommand(args...)
require.NoError(lv.t, err, "error executing LookupVindex complete: %v", err)
}

func (lv *lookupVindex) externalize() {
Expand All @@ -83,7 +90,7 @@ func (lv *lookupVindex) externalize() {
}
err := vc.VtctldClient.ExecuteCommand(args...)
require.NoError(lv.t, err, "error executing LookupVindex externalize: %v", err)
lv.expectWriteOnly(false)
lv.expectParamsAndOwner(false)
}

func (lv *lookupVindex) internalize() {
Expand All @@ -96,20 +103,39 @@ func (lv *lookupVindex) internalize() {
}
err := vc.VtctldClient.ExecuteCommand(args...)
require.NoError(lv.t, err, "error executing LookupVindex internalize: %v", err)
lv.expectWriteOnly(true)
lv.expectParamsAndOwner(true)
}

func (lv *lookupVindex) complete() {
args := []string{
"LookupVindex",
"--name", lv.name,
"--table-keyspace=" + lv.ownerTableKeyspace,
"complete",
"--keyspace=" + lv.tableKeyspace,
}
err := vc.VtctldClient.ExecuteCommand(args...)
require.NoError(lv.t, err, "error executing LookupVindex complete: %v", err)
lv.expectParamsAndOwner(false)
}

func (lv *lookupVindex) show() error {
return nil
}

func (lv *lookupVindex) expectWriteOnly(expected bool) {
func (lv *lookupVindex) expectParamsAndOwner(expectedWriteOnlyParam bool) {
vschema, err := vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", lv.ownerTableKeyspace)
require.NoError(lv.t, err, "error executing GetVSchema: %v", err)
vdx := gjson.Get(vschema, fmt.Sprintf("vindexes.%s", lv.name))
require.NotNil(lv.t, vdx, "lookup vindex %s not found", lv.name)

expectedOwner, expectedFrom, expectedTo := lv.ownerTable, strings.Join(lv.columns, ","), "keyspace_id"
require.Equal(lv.t, expectedOwner, vdx.Get("owner").String(), "expected 'owner' parameter to be %s", expectedOwner)
require.Equal(lv.t, expectedFrom, vdx.Get("params.from").String(), "expected 'from' parameter to be %s", expectedFrom)
require.Equal(lv.t, expectedTo, vdx.Get("params.to").String(), "expected 'to' parameter to be %s", expectedTo)

want := ""
if expected {
if expectedWriteOnlyParam {
want = "true"
}
require.Equal(lv.t, want, vdx.Get("params.write_only").String(), "expected write_only parameter to be %s", want)
Expand Down
134 changes: 134 additions & 0 deletions go/test/endtoend/vreplication/lookup_vindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type lookupTestCase struct {
runningQuery string
postExternalizeQuery string
postInternalizeQuery string
postCompleteQuery string
cleanupQuery string
}

Expand All @@ -108,6 +109,7 @@ func TestLookupVindex(t *testing.T) {
runningQuery := "insert into t1 (c1, c2, val) values (4, 4, 'val4'), (5, 5, 'val5'), (6, 6, 'val6')"
postExternalizeQuery := "insert into t1 (c1, c2, val) values (7, 7, 'val7'), (8, 8, 'val8'), (9, 9, 'val9')"
postInternalizeQuery := "insert into t1 (c1, c2, val) values (10, 10, 'val10'), (11, 11, 'val11'), (12, 12, 'val12')"
postCompleteQuery := "insert into t1 (c1, c2, val) values (13, 13, 'val13'), (14, 14, 'val14'), (15, 15, 'val15')"
cleanupQuery := "delete from t1"

testCases := []lookupTestCase{
Expand Down Expand Up @@ -161,10 +163,42 @@ func TestLookupVindex(t *testing.T) {
tc.runningQuery = runningQuery
tc.postExternalizeQuery = postExternalizeQuery
tc.postInternalizeQuery = postInternalizeQuery
tc.postCompleteQuery = postCompleteQuery
tc.cleanupQuery = cleanupQuery
testLookupVindex(t, &tc)
})
}

for _, tc := range testCases {
// Modify the testcase name, here we cancel just after creating lookupvindex.
tc.name = tc.name + ", cancel"
t.Run(tc.name, func(t *testing.T) {
// Modify the name to avoid error on duplicate lookup vindex/table name.
tc.lv.name = tc.lv.name + "_cancel"

tc.initQuery = initQuery
tc.runningQuery = runningQuery
tc.postCompleteQuery = postCompleteQuery
tc.cleanupQuery = cleanupQuery
testLookupVindexCancel(t, &tc)
})
}

for _, tc := range testCases {
// Modify the testcase name, here we cancel just after externalizing lookupvindex.
tc.name = tc.name + ", externalize and cancel"
t.Run(tc.name, func(t *testing.T) {
// Modify the name to avoid error on duplicate lookup vindex/table name.
tc.lv.name = tc.lv.name + "_externalize_cancel"

tc.initQuery = initQuery
tc.runningQuery = runningQuery
tc.postExternalizeQuery = postExternalizeQuery
tc.postCompleteQuery = postCompleteQuery
tc.cleanupQuery = cleanupQuery
testLookupVindexCancelAfterExternalize(t, &tc)
})
}
}

func testLookupVindex(t *testing.T, tc *lookupTestCase) {
Expand Down Expand Up @@ -207,6 +241,106 @@ func testLookupVindex(t *testing.T, tc *lookupTestCase) {
waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows)
})

t.Run("externalize after internalize", func(t *testing.T) {
tc.lv.externalize()
})

t.Run("complete", func(t *testing.T) {
tc.lv.complete()
totalRows += getNumRowsInQuery(t, tc.postCompleteQuery)
_, err := vtgateConn.ExecuteFetch(tc.postCompleteQuery, 1000, false)
require.NoError(t, err)
waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows)
})

t.Run("cleanup", func(t *testing.T) {
_, err := vtgateConn.ExecuteFetch(tc.cleanupQuery, 1000, false)
require.NoError(t, err)
waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, 0)
})
}

func testLookupVindexCancel(t *testing.T, tc *lookupTestCase) {
vtgateConn, cancel := getVTGateConn()
defer cancel()
var totalRows int
lv := tc.lv

t.Run("init data", func(t *testing.T) {
totalRows += getNumRowsInQuery(t, tc.initQuery)
_, err := vtgateConn.ExecuteFetch(tc.initQuery, 1000, false)
require.NoError(t, err)
})

t.Run("create", func(t *testing.T) {
tc.lv.create()
lks := lv.tableKeyspace
vindexName := lv.name
waitForRowCount(t, vtgateConn, lks, vindexName, totalRows)
totalRows += getNumRowsInQuery(t, tc.runningQuery)
_, err := vtgateConn.ExecuteFetch(tc.runningQuery, 1000, false)
require.NoError(t, err)
waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows)
})

t.Run("cancel", func(t *testing.T) {
tc.lv.cancel()
// Expect true as we cancelled the LookupVindex before externalizing.
lv.expectParamsAndOwner(true)
totalRows += getNumRowsInQuery(t, tc.postCompleteQuery)
_, err := vtgateConn.ExecuteFetch(tc.postCompleteQuery, 1000, false)
require.NoError(t, err)
waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows)
})

t.Run("cleanup", func(t *testing.T) {
_, err := vtgateConn.ExecuteFetch(tc.cleanupQuery, 1000, false)
require.NoError(t, err)
waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, 0)
})
}

func testLookupVindexCancelAfterExternalize(t *testing.T, tc *lookupTestCase) {
vtgateConn, cancel := getVTGateConn()
defer cancel()
var totalRows int
lv := tc.lv

t.Run("init data", func(t *testing.T) {
totalRows += getNumRowsInQuery(t, tc.initQuery)
_, err := vtgateConn.ExecuteFetch(tc.initQuery, 1000, false)
require.NoError(t, err)
})

t.Run("create", func(t *testing.T) {
tc.lv.create()
lks := lv.tableKeyspace
vindexName := lv.name
waitForRowCount(t, vtgateConn, lks, vindexName, totalRows)
totalRows += getNumRowsInQuery(t, tc.runningQuery)
_, err := vtgateConn.ExecuteFetch(tc.runningQuery, 1000, false)
require.NoError(t, err)
waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows)
})

t.Run("externalize", func(t *testing.T) {
tc.lv.externalize()
totalRows += getNumRowsInQuery(t, tc.postExternalizeQuery)
_, err := vtgateConn.ExecuteFetch(tc.postExternalizeQuery, 1000, false)
require.NoError(t, err)
waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows)
})

t.Run("cancel", func(t *testing.T) {
tc.lv.cancel()
// Expect false as we cancelled the LookupVindex after externalizing.
lv.expectParamsAndOwner(false)
totalRows += getNumRowsInQuery(t, tc.postCompleteQuery)
_, err := vtgateConn.ExecuteFetch(tc.postCompleteQuery, 1000, false)
require.NoError(t, err)
waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows)
})

t.Run("cleanup", func(t *testing.T) {
_, err := vtgateConn.ExecuteFetch(tc.cleanupQuery, 1000, false)
require.NoError(t, err)
Expand Down
35 changes: 10 additions & 25 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,14 +567,9 @@ func (s *Server) LookupVindexComplete(ctx context.Context, req *vtctldatapb.Look
span.Annotate("name", req.Name)
span.Annotate("table_keyspace", req.TableKeyspace)

// Find the lookup vindex by name.
sourceVschema, err := s.ts.GetVSchema(ctx, req.Keyspace)
vindex, _, err := getVindexAndVSchema(ctx, s.ts, req.Keyspace, req.Name)
if err != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get vschema for the %s keyspace", req.Keyspace)
}
vindex := sourceVschema.Vindexes[req.Name]
if vindex == nil {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "vindex %s not found in the %s keyspace", req.Name, req.Keyspace)
return nil, err
}

targetShards, err := s.ts.GetServingShards(ctx, req.TableKeyspace)
Expand Down Expand Up @@ -693,14 +688,9 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L
span.Annotate("name", req.Name)
span.Annotate("table_keyspace", req.TableKeyspace)

// Find the lookup vindex by name.
sourceVschema, err := s.ts.GetVSchema(ctx, req.Keyspace)
vindex, sourceVSchema, err := getVindexAndVSchema(ctx, s.ts, req.Keyspace, req.Name)
if err != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get vschema for the %s keyspace", req.Keyspace)
}
vindex := sourceVschema.Vindexes[req.Name]
if vindex == nil {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "vindex %s not found in the %s keyspace", req.Name, req.Keyspace)
return nil, err
}

targetShards, err := s.ts.GetServingShards(ctx, req.TableKeyspace)
Expand Down Expand Up @@ -778,7 +768,7 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L

// Remove the write_only param and save the source vschema.
delete(vindex.Params, "write_only")
if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVschema); err != nil {
if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVSchema); err != nil {
return nil, err
}
return resp, s.ts.RebuildSrvVSchema(ctx, nil)
Expand All @@ -794,14 +784,9 @@ func (s *Server) LookupVindexInternalize(ctx context.Context, req *vtctldatapb.L
span.Annotate("name", req.Name)
span.Annotate("table_keyspace", req.TableKeyspace)

// Find the lookup vindex by name.
sourceVschema, err := s.ts.GetVSchema(ctx, req.Keyspace)
vindex, sourceVSchema, err := getVindexAndVSchema(ctx, s.ts, req.Keyspace, req.Name)
if err != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get vschema for the %s keyspace", req.Keyspace)
}
vindex := sourceVschema.Vindexes[req.Name]
if vindex == nil {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "vindex %s not found in the %s keyspace", req.Name, req.Keyspace)
return nil, err
}

targetShards, err := s.ts.GetServingShards(ctx, req.TableKeyspace)
Expand Down Expand Up @@ -865,7 +850,7 @@ func (s *Server) LookupVindexInternalize(ctx context.Context, req *vtctldatapb.L

// Make the vindex back to write_only and save the source vschema.
vindex.Params["write_only"] = "true"
if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVschema); err != nil {
if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVSchema); err != nil {
return nil, err
}
return resp, s.ts.RebuildSrvVSchema(ctx, nil)
Expand Down Expand Up @@ -1013,7 +998,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
return nil, err
}
if len(tables) > 0 {
err = validateSourceTablesExist(ctx, sourceKeyspace, ksTables, tables)
err = validateSourceTablesExist(sourceKeyspace, ksTables, tables)
if err != nil {
return nil, err
}
Expand All @@ -1025,7 +1010,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
}
}
if len(req.ExcludeTables) > 0 {
err = validateSourceTablesExist(ctx, sourceKeyspace, ksTables, req.ExcludeTables)
err = validateSourceTablesExist(sourceKeyspace, ksTables, req.ExcludeTables)
if err != nil {
return nil, err
}
Expand Down
17 changes: 16 additions & 1 deletion go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
Expand Down Expand Up @@ -1028,7 +1029,7 @@ func applyTargetShards(ts *trafficSwitcher, targetShards []string) error {

// validateSourceTablesExist validates that tables provided are present
// in the source keyspace.
func validateSourceTablesExist(ctx context.Context, sourceKeyspace string, ksTables, tables []string) error {
func validateSourceTablesExist(sourceKeyspace string, ksTables, tables []string) error {
var missingTables []string
for _, table := range tables {
if schema.IsInternalOperationTableName(table) {
Expand All @@ -1051,3 +1052,17 @@ func validateSourceTablesExist(ctx context.Context, sourceKeyspace string, ksTab
}
return nil
}

// getVindexAndVSchema gets the vindex (from VSchema) and VSchema with the
// provided vindex name and keyspace.
func getVindexAndVSchema(ctx context.Context, ts *topo.Server, keyspace string, vindexName string) (*vschemapb.Vindex, *vschemapb.Keyspace, error) {
vschema, err := ts.GetVSchema(ctx, keyspace)
if err != nil {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get vschema for the %s keyspace", keyspace)
}
vindex := vschema.Vindexes[vindexName]
if vindex == nil {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "vindex %s not found in the %s keyspace", vindexName, keyspace)
}
return vindex, vschema, nil
}
3 changes: 1 addition & 2 deletions go/vt/vtctl/workflow/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ func startEtcd(t *testing.T) string {
}

func TestValidateSourceTablesExist(t *testing.T) {
ctx := context.Background()
ks := "source_keyspace"
ksTables := []string{"table1", "table2"}

Expand All @@ -272,7 +271,7 @@ func TestValidateSourceTablesExist(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := validateSourceTablesExist(ctx, ks, ksTables, tc.tables)
err := validateSourceTablesExist(ks, ksTables, tc.tables)
if tc.errContains != "" {
assert.ErrorContains(t, err, tc.errContains)
} else {
Expand Down

0 comments on commit 983a415

Please sign in to comment.