Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VDiff: "show all" should only report vdiffs for the specified keyspace and workflow #14442

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions go/test/endtoend/vreplication/vdiff_multiple_movetables_test.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could instead/additionally add a show all test case to TestPerformVDiffAction. I just pushed one. We can keep or remove the e2e test, up to you. IMO we can remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the unit test. I looked at those tests first, but wanted to actually test multiple vdiffs for concurrent workflows in the same keyspace and test if that works.

We don't seem to have any current e2e test that has multiple active workflows and vdiffs run on them. Hence the e2e test. I want to expand that soon to test the state changes in the concurrent workflows and run vdiffs on them, etc.

Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
Copyright 2023 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vreplication

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/tidwall/gjson"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
deepthi marked this conversation as resolved.
Show resolved Hide resolved
)

func TestMultipleConcurrentVDiffs(t *testing.T) {
cellName := "zone"
cells := []string{cellName}
vc = NewVitessCluster(t, t.Name(), cells, mainClusterConfig)

require.NotNil(t, vc)
allCellNames = cellName
defaultCellName := cellName
defaultCell = vc.Cells[defaultCellName]
sourceKeyspace := "product"
shardName := "0"

defer vc.TearDown(t)

cell := vc.Cells[cellName]
vc.AddKeyspace(t, []*Cell{cell}, sourceKeyspace, shardName, initialProductVSchema, initialProductSchema, 0, 0, 100, sourceKsOpts)

vtgate = cell.Vtgates[0]
require.NotNil(t, vtgate)
err := cluster.WaitForHealthyShard(vc.VtctldClient, sourceKeyspace, shardName)
require.NoError(t, err)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", sourceKeyspace, shardName), 1, 30*time.Second)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

insertInitialData(t)
targetTabletId := 200
targetKeyspace := "customer"
vc.AddKeyspace(t, []*Cell{cell}, targetKeyspace, shardName, initialProductVSchema, initialProductSchema, 0, 0, targetTabletId, sourceKsOpts)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", targetKeyspace, shardName), 1, 30*time.Second)

index := 1000
var loadCtx context.Context
var loadCancel context.CancelFunc
loadCtx, loadCancel = context.WithCancel(context.Background())
load := func(tableName string) {
query := "insert into %s(cid, name) values(%d, 'customer-%d')"
for {
select {
case <-loadCtx.Done():
log.Infof("load cancelled")
return
default:
index += 1
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
q := fmt.Sprintf(query, tableName, index, index)
vtgateConn.ExecuteFetch(q, 1000, false)
vtgateConn.Close()
}
time.Sleep(10 * time.Millisecond)
}
}
targetKs := vc.Cells[cellName].Keyspaces[targetKeyspace]
targetTab := targetKs.Shards["0"].Tablets[fmt.Sprintf("%s-%d", cellName, targetTabletId)].Vttablet
require.NotNil(t, targetTab)

time.Sleep(15 * time.Second) // wait for some rows to be inserted.

createWorkflow := func(workflowName, tables string) {
mt := newMoveTables(vc, &moveTables{
workflowName: workflowName,
targetKeyspace: targetKeyspace,
sourceKeyspace: sourceKeyspace,
tables: tables,
}, moveTablesFlavorVtctld)
mt.Create()
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKeyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())
catchup(t, targetTab, workflowName, "MoveTables")
}

createWorkflow("wf1", "customer")
createWorkflow("wf2", "customer2")

go load("customer")
go load("customer2")

var wg sync.WaitGroup
wg.Add(2)

doVdiff := func(workflowName, table string) {
defer wg.Done()
vdiff(t, targetKeyspace, workflowName, cellName, true, false, nil)
}
go doVdiff("wf1", "customer")
go doVdiff("wf2", "customer2")
wg.Wait()
loadCancel()

// confirm that show all shows the correct workflow and only that workflow.
output, err := vc.VtctldClient.ExecuteCommandWithOutput("VDiff", "--format", "json", "--workflow", "wf1", "--target-keyspace", "customer", "show", "all")
require.NoError(t, err)
log.Infof("VDiff output: %s", output)
count := gjson.Get(output, "..#").Int()
wf := gjson.Get(output, "0.Workflow").String()
ksName := gjson.Get(output, "0.Keyspace").String()
require.Equal(t, int64(1), count)
require.Equal(t, "wf1", wf)
require.Equal(t, "customer", ksName)
}
1 change: 0 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1781,7 +1781,6 @@ func (s *Server) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowReques
log.Errorf("Error executing vdiff show action: %v", output.err)
return nil, output.err
}

return &vtctldatapb.VDiffShowResponse{
TabletResponses: output.responses,
}, nil
Expand Down
16 changes: 13 additions & 3 deletions go/vt/vttablet/tabletmanager/vdiff/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
DeleteAction VDiffAction = "delete"
AllActionArg = "all"
LastActionArg = "last"

maxVDiffsToReport = 100
Copy link
Contributor

@mattlord mattlord Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth adding a flag for this. But we can do it later if needed.

)

var (
Expand Down Expand Up @@ -267,13 +269,13 @@ func (vde *Engine) handleCreateResumeAction(ctx context.Context, dbClient binlog

func (vde *Engine) handleShowAction(ctx context.Context, dbClient binlogplayer.DBClient, action VDiffAction, req *tabletmanagerdatapb.VDiffRequest, resp *tabletmanagerdatapb.VDiffResponse) error {
var qr *sqltypes.Result
var err error
vdiffUUID := ""

if req.ActionArg == LastActionArg {
query, err := sqlparser.ParseAndBind(sqlGetMostRecentVDiff,
query, err := sqlparser.ParseAndBind(sqlGetMostRecentVDiffByKeyspaceWorkflow,
sqltypes.StringBindVariable(req.Keyspace),
sqltypes.StringBindVariable(req.Workflow),
sqltypes.Int64BindVariable(1),
)
if err != nil {
return err
Expand Down Expand Up @@ -322,7 +324,15 @@ func (vde *Engine) handleShowAction(ctx context.Context, dbClient binlogplayer.D
}
switch req.ActionArg {
case AllActionArg:
if qr, err = dbClient.ExecuteFetch(sqlGetAllVDiffs, -1); err != nil {
query, err := sqlparser.ParseAndBind(sqlGetMostRecentVDiffByKeyspaceWorkflow,
sqltypes.StringBindVariable(req.Keyspace),
sqltypes.StringBindVariable(req.Workflow),
sqltypes.Int64BindVariable(maxVDiffsToReport),
)
if err != nil {
return err
}
if qr, err = dbClient.ExecuteFetch(query, -1); err != nil {
return err
}
resp.Output = sqltypes.ResultToProto3(qr)
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,38 @@ func TestPerformVDiffAction(t *testing.T) {
},
},
},
{
name: "show last",
req: &tabletmanagerdatapb.VDiffRequest{
Action: string(ShowAction),
ActionArg: "last",
Keyspace: keyspace,
Workflow: workflow,
},
expectQueries: []queryAndResult{
{
query: fmt.Sprintf("select * from _vt.vdiff where keyspace = %s and workflow = %s order by id desc limit %d",
encodeString(keyspace), encodeString(workflow), 1),
result: noResults,
},
},
},
{
name: "show all",
req: &tabletmanagerdatapb.VDiffRequest{
Action: string(ShowAction),
ActionArg: "all",
Keyspace: keyspace,
Workflow: workflow,
},
expectQueries: []queryAndResult{
{
query: fmt.Sprintf("select * from _vt.vdiff where keyspace = %s and workflow = %s order by id desc limit %d",
encodeString(keyspace), encodeString(workflow), maxVDiffsToReport),
result: noResults,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
9 changes: 4 additions & 5 deletions go/vt/vttablet/tabletmanager/vdiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ const (
and vdt.state in ('completed', 'stopped')`
sqlRetryVDiff = `update _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) set vd.state = 'pending',
vd.last_error = '', vdt.state = 'pending' where vd.id = %a and (vd.state = 'error' or vdt.state = 'error')`
sqlGetVDiffByKeyspaceWorkflowUUID = "select * from _vt.vdiff where keyspace = %a and workflow = %a and vdiff_uuid = %a"
sqlGetMostRecentVDiff = "select * from _vt.vdiff where keyspace = %a and workflow = %a order by id desc limit 1"
sqlGetVDiffByID = "select * from _vt.vdiff where id = %a"
sqlDeleteVDiffs = `delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
sqlGetVDiffByKeyspaceWorkflowUUID = "select * from _vt.vdiff where keyspace = %a and workflow = %a and vdiff_uuid = %a"
sqlGetMostRecentVDiffByKeyspaceWorkflow = "select * from _vt.vdiff where keyspace = %a and workflow = %a order by id desc limit %a"
sqlGetVDiffByID = "select * from _vt.vdiff where id = %a"
sqlDeleteVDiffs = `delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
left join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id)
where vd.keyspace = %a and vd.workflow = %a`
sqlDeleteVDiffByUUID = `delete from vd, vdt using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
Expand All @@ -48,7 +48,6 @@ const (
sqlGetVDiffsToRetry = "select * from _vt.vdiff where state = 'error' and json_unquote(json_extract(options, '$.core_options.auto_retry')) = 'true'"
sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %a"
sqlGetVDiffIDsByKeyspaceWorkflow = "select id as id from _vt.vdiff where keyspace = %a and workflow = %a"
sqlGetAllVDiffs = "select * from _vt.vdiff order by id desc"
sqlGetTableRows = "select table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %a and table_name = %a"
sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s)"

Expand Down
9 changes: 9 additions & 0 deletions test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,15 @@
"RetryMax": 0,
"Tags": []
},
"vdiff_multiple_movetables_test.go": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMultipleConcurrentVDiffs"],
"Command": [],
"Manual": false,
"Shard": "vreplication_partial_movetables_basic",
"RetryMax": 0,
"Tags": []
},
"vreplication_movetables_buffering": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMoveTablesBuffering"],
Expand Down
Loading