Skip to content

Commit

Permalink
Ensure workflows in VDiff are in Running state to avoid unpredictable…
Browse files Browse the repository at this point in the history
… results from Starting a Stopped workflow, for example

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Apr 8, 2024
1 parent fe96898 commit a08bb2f
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 22 deletions.
3 changes: 2 additions & 1 deletion go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vdiff2 "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff"
)

Expand Down Expand Up @@ -193,7 +194,7 @@ func performVDiff2Action(t *testing.T, useVtctlclient bool, ksWorkflow, cells, a
var err error
targetKeyspace, workflowName, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok, "invalid keyspace.workflow value: %s", ksWorkflow)

waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
if useVtctlclient {
// This will always result in us using a PRIMARY tablet, which is all
// we start in many e2e tests, but it avoids the tablet picker logic
Expand Down
48 changes: 27 additions & 21 deletions go/test/endtoend/vreplication/vdiff_online_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/proto/vtctldata"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

// TestOnlineDDLVDiff is to run a vdiff on a table that is part of an OnlineDDL workflow.
Expand All @@ -41,24 +42,28 @@ func TestOnlineDDLVDiff(t *testing.T) {
execOnlineDDL(t, "direct", keyspace, createQuery)
defer execOnlineDDL(t, "direct", keyspace, dropQuery)

var done = make(chan bool)
go populate(ctx, done, insertTemplate, updateTemplate)

var output string
waitForAdditionalRows(t, keyspace, "temp", 100)
output = execOnlineDDL(t, "vitess --postpone-completion", keyspace, alterQuery)
uuid := strings.TrimSpace(output)
waitForAdditionalRows(t, keyspace, "temp", 200)
want := &expectedVDiff2Result{
state: "completed",
minimumRowsCompared: 200,
hasMismatch: false,
shards: []string{"0"},
}
doVtctldclientVDiff(t, keyspace, uuid, "zone1", want)

cancel()
<-done
t.Run("OnlineDDL VDiff", func(t *testing.T) {
var done = make(chan bool)
go populate(ctx, done, insertTemplate, updateTemplate)

waitForAdditionalRows(t, keyspace, "temp", 100)
output = execOnlineDDL(t, "vitess --postpone-completion", keyspace, alterQuery)
uuid := strings.TrimSpace(output)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, uuid), binlogdatapb.VReplicationWorkflowState_Running.String())
waitForAdditionalRows(t, keyspace, "temp", 200)
want := &expectedVDiff2Result{
state: "completed",
minimumRowsCompared: 200,
hasMismatch: false,
shards: []string{"0"},
}
doVtctldclientVDiff(t, keyspace, uuid, "zone1", want)

cancel()
<-done
})
}

func execOnlineDDL(t *testing.T, strategy, keyspace, query string) string {
Expand All @@ -75,16 +80,17 @@ func execOnlineDDL(t *testing.T, strategy, keyspace, query string) string {
log.Errorf("error unmarshalling response: %v", err)
return false
}
if len(response.Migrations) > 0 && response.Migrations[0].Status == vtctldata.SchemaMigration_RUNNING {
if len(response.Migrations) > 0 &&
(response.Migrations[0].Status == vtctldata.SchemaMigration_RUNNING ||
response.Migrations[0].Status == vtctldata.SchemaMigration_COMPLETE) {
return true
}
return false
}, 30*time.Second)
require.NoError(t, err)
uuid := strings.TrimSpace(output)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, uuid), binlogdatapb.VReplicationWorkflowState_Running.String())

}
return output
return uuid
}

func waitForAdditionalRows(t *testing.T, keyspace, table string, count int) {
Expand Down
34 changes: 34 additions & 0 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1792,6 +1792,16 @@ func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRe
req.TargetKeyspace, req.Workflow)
}

workflowStatus, err := s.getWorkflowStatus(ctx, req.TargetKeyspace, req.Workflow)
if err != nil {
return nil, err
}
if workflowStatus != binlogdatapb.VReplicationWorkflowState_Running {
log.Infof("Workflow %s.%s is not running, cannot start VDiff in state %s", req.TargetKeyspace, req.Workflow, workflowStatus)
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
"not all streams are running in workflow %s.%s", req.TargetKeyspace, req.Workflow)
}

err = ts.ForAllTargets(func(target *MigrationTarget) error {
_, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq)
return err
Expand Down Expand Up @@ -3949,3 +3959,27 @@ func (s *Server) MigrateCreate(ctx context.Context, req *vtctldatapb.MigrateCrea
}
return s.moveTablesCreate(ctx, moveTablesCreateRequest, binlogdatapb.VReplicationWorkflowType_Migrate)
}

// getWorkflowStatus gets the overall status of the workflow by checking the status of all the streams. If all streams are not
// in the same state, it returns the unknown state.
func (s *Server) getWorkflowStatus(ctx context.Context, keyspace string, workflow string) (binlogdatapb.VReplicationWorkflowState, error) {
workflowStatus := binlogdatapb.VReplicationWorkflowState_Unknown
wf, err := s.GetWorkflow(ctx, keyspace, workflow, false, nil)
if err != nil {
return workflowStatus, err
}
for _, shardStream := range wf.ShardStreams {
for _, stream := range shardStream.GetStreams() {
state, ok := binlogdatapb.VReplicationWorkflowState_value[stream.State]
if !ok {
return workflowStatus, fmt.Errorf("invalid state for stream %s of workflow %s.%s", stream.State, keyspace, workflow)
}
currentStatus := binlogdatapb.VReplicationWorkflowState(state)
if workflowStatus != binlogdatapb.VReplicationWorkflowState_Unknown && currentStatus != workflowStatus {
return binlogdatapb.VReplicationWorkflowState_Unknown, nil
}
workflowStatus = currentStatus
}
}
return workflowStatus, nil
}

0 comments on commit a08bb2f

Please sign in to comment.