Skip to content

Commit

Permalink
test(LookupVindex): Add tests for internalize and complete
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 committed Dec 29, 2024
1 parent 7038c78 commit 128198d
Show file tree
Hide file tree
Showing 6 changed files with 592 additions and 7 deletions.
13 changes: 13 additions & 0 deletions go/test/endtoend/vreplication/lookup_vindex_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,19 @@ func (lv *lookupVindex) externalize() {
lv.expectWriteOnly(false)
}

func (lv *lookupVindex) internalize() {
args := []string{
"LookupVindex",
"--name", lv.name,
"--table-keyspace=" + lv.ownerTableKeyspace,
"internalize",
"--keyspace=" + lv.tableKeyspace,
}
err := vc.VtctldClient.ExecuteCommand(args...)
require.NoError(lv.t, err, "error executing LookupVindex internalize: %v", err)
lv.expectWriteOnly(true)
}

func (lv *lookupVindex) show() error {
return nil
}
Expand Down
11 changes: 11 additions & 0 deletions go/test/endtoend/vreplication/lookup_vindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type lookupTestCase struct {
initQuery string
runningQuery string
postExternalizeQuery string
postInternalizeQuery string
cleanupQuery string
}

Expand All @@ -106,6 +107,7 @@ func TestLookupVindex(t *testing.T) {
initQuery := "insert into t1 (c1, c2, val) values (1, 1, 'val1'), (2, 2, 'val2'), (3, 3, 'val3')"
runningQuery := "insert into t1 (c1, c2, val) values (4, 4, 'val4'), (5, 5, 'val5'), (6, 6, 'val6')"
postExternalizeQuery := "insert into t1 (c1, c2, val) values (7, 7, 'val7'), (8, 8, 'val8'), (9, 9, 'val9')"
postInternalizeQuery := "insert into t1 (c1, c2, val) values (10, 10, 'val10'), (11, 11, 'val11'), (12, 12, 'val12')"
cleanupQuery := "delete from t1"

testCases := []lookupTestCase{
Expand Down Expand Up @@ -158,6 +160,7 @@ func TestLookupVindex(t *testing.T) {
tc.initQuery = initQuery
tc.runningQuery = runningQuery
tc.postExternalizeQuery = postExternalizeQuery
tc.postInternalizeQuery = postInternalizeQuery
tc.cleanupQuery = cleanupQuery
testLookupVindex(t, &tc)
})
Expand Down Expand Up @@ -196,6 +199,14 @@ func testLookupVindex(t *testing.T, tc *lookupTestCase) {
waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows)
})

t.Run("internalize", func(t *testing.T) {
tc.lv.internalize()
totalRows += getNumRowsInQuery(t, tc.postInternalizeQuery)
_, err := vtgateConn.ExecuteFetch(tc.postInternalizeQuery, 1000, false)
require.NoError(t, err)
waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows)
})

t.Run("cleanup", func(t *testing.T) {
_, err := vtgateConn.ExecuteFetch(tc.cleanupQuery, 1000, false)
require.NoError(t, err)
Expand Down
8 changes: 8 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,14 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string
vdx = gjson.Get(customerVSchema, fmt.Sprintf("vindexes.%s", vindexName))
require.NotNil(t, vdx, "lookup vindex %s not found", vindexName)
require.NotEqual(t, "true", vdx.Get("params.write_only").String(), "did not expect write_only parameter to be true")

err = vc.VtctldClient.ExecuteCommand("LookupVindex", "--name", vindexName, "--table-keyspace=product", "internalize", "--keyspace=customer")
require.NoError(t, err, "error executing LookupVindex internalize: %v", err)
customerVSchema, err = vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", "customer")
require.NoError(t, err, "error executing GetVSchema: %v", err)
vdx = gjson.Get(customerVSchema, fmt.Sprintf("vindexes.%s", vindexName))
require.NotNil(t, vdx, "lookup vindex %s not found", vindexName)
require.Equal(t, "true", vdx.Get("params.write_only").String(), "expected write_only parameter to be true")
})
}

Expand Down
7 changes: 5 additions & 2 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ const (
lockTablesCycles = 2
// Time to wait between LOCK TABLES cycles on the sources during SwitchWrites.
lockTablesCycleDelay = time.Duration(100 * time.Millisecond)

SqlFreezeWorkflow = "update _vt.vreplication set message = '%s' where db_name=%s and workflow=%s"
SqlUnfreezeWorkflow = "update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s"
)

var (
Expand Down Expand Up @@ -762,7 +765,7 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L
return vterrors.Wrapf(err, "failed to stop workflow %s on shard %s/%s", req.Name, tabletInfo.Keyspace, tabletInfo.Shard)
}
// Mark workflow as frozen.
query := fmt.Sprintf("update _vt.vreplication set message = '%s' where db_name=%s and workflow=%s", Frozen,
query := fmt.Sprintf(SqlFreezeWorkflow, Frozen,
encodeString(tabletInfo.DbName()), encodeString(req.Name))
_, err = s.tmc.VReplicationExec(ctx, tabletInfo.Tablet, query)
return err
Expand Down Expand Up @@ -849,7 +852,7 @@ func (s *Server) LookupVindexInternalize(ctx context.Context, req *vtctldatapb.L
if err != nil {
return err
}
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
query := fmt.Sprintf(SqlUnfreezeWorkflow,
encodeString(tabletInfo.DbName()), encodeString(req.Name))
_, err = s.tmc.VReplicationExec(ctx, tabletInfo.Tablet, query)
return err
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletmanager/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ type fakeTMClient struct {
getSchemaCounts map[string]int
// Used to confirm the number of times WorkflowDelete was called.
workflowDeleteCalls int
// Used to confirm the number of times UpdateVReplicationWorkflow with state as Stopped was called.
workflowStopCalls int
}

func newFakeTMClient() *fakeTMClient {
Expand Down Expand Up @@ -580,5 +582,8 @@ func (tmc *fakeTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *
func (tmc *fakeTMClient) UpdateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest) (*tabletmanagerdatapb.UpdateVReplicationWorkflowResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
if *req.State == binlogdatapb.VReplicationWorkflowState_Stopped {
tmc.workflowStopCalls++
}
return tmc.tablets[int(tablet.Alias.Uid)].tm.UpdateVReplicationWorkflow(ctx, req)
}
Loading

0 comments on commit 128198d

Please sign in to comment.