Skip to content

Commit

Permalink
Changes from self review
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Sep 9, 2024
1 parent 1a145a5 commit 5e3054b
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 51 deletions.
9 changes: 4 additions & 5 deletions go/cmd/vtctldclient/command/vreplication/common/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import (
"sort"
"strings"

"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

"github.com/spf13/cobra"

"vitess.io/vitess/go/cmd/vtctldclient/cli"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
Expand Down Expand Up @@ -143,7 +142,7 @@ func commandUpdateState(cmd *cobra.Command, args []string) error {
TabletRequest: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflowUpdateOptions.Workflow,
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
TabletTypes: textutil.SimulatedNullTabletTypeSlice,
State: &state,
},
}
Expand Down
3 changes: 1 addition & 2 deletions go/cmd/vtctldclient/command/vreplication/workflow/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

Expand Down Expand Up @@ -79,7 +78,7 @@ func commandUpdateState(cmd *cobra.Command, args []string) error {
TabletRequest: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: baseOptions.Workflow,
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
TabletTypes: textutil.SimulatedNullTabletTypeSlice,
State: &state,
},
}
Expand Down
20 changes: 8 additions & 12 deletions go/cmd/vtctldclient/command/vreplication/workflow/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"vitess.io/vitess/go/cmd/vtctldclient/cli"
"vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common"
"vitess.io/vitess/go/ptr"
"vitess.io/vitess/go/textutil"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -65,7 +66,7 @@ var (
}
changes = true
} else {
updateOptions.TabletTypes = []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}
updateOptions.TabletTypes = textutil.SimulatedNullTabletTypeSlice
}
if cmd.Flags().Lookup("on-ddl").Changed { // Validate the provided value
changes = true
Expand All @@ -85,14 +86,6 @@ var (
func commandUpdate(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

// We've already validated any provided value, if one WAS provided.
// Now we need to do the mapping from the string representation to
// the enum value.
onddl := int32(textutil.SimulatedNullInt)
if val, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(updateOptions.OnDDL)]; ok {
onddl = val
}

tsp := tabletmanagerdatapb.TabletSelectionPreference_UNKNOWN
if cmd.Flags().Lookup("tablet-types-in-order").Changed {
if updateOptions.TabletTypesInPreferenceOrder {
Expand All @@ -111,9 +104,12 @@ func commandUpdate(cmd *cobra.Command, args []string) error {
TabletSelectionPreference: &tsp,
},
}
if onddl != int32(textutil.SimulatedNullInt) {
v := binlogdatapb.OnDDLAction(onddl)
req.TabletRequest.OnDdl = &v

// We've already validated any provided value, if one WAS provided.
// Now we need to do the mapping from the string representation to
// the enum value.
if val, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(updateOptions.OnDDL)]; ok {
req.TabletRequest.OnDdl = ptr.Of(binlogdatapb.OnDDLAction(val))
}

resp, err := common.GetClient().WorkflowUpdate(common.GetCommandCtx(), req)
Expand Down
7 changes: 4 additions & 3 deletions go/textutil/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ const (
)

var (
delimitedListRegexp = regexp.MustCompile(`[ ,;]+`)
SimulatedNullStringSlice = []string{sqltypes.NULL.String()}
SimulatedNullInt = -1
delimitedListRegexp = regexp.MustCompile(`[ ,;]+`)
SimulatedNullStringSlice = []string{sqltypes.NULL.String()}
SimulatedNullTabletTypeSlice = []topodatapb.TabletType{topodatapb.TabletType(SimulatedNullInt)}
SimulatedNullInt = -1
)

// SplitDelimitedList splits a given string by comma, semi-colon or space, and returns non-empty strings
Expand Down
12 changes: 5 additions & 7 deletions go/vt/vtadmin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ import (
"github.com/patrickmn/go-cache"

vreplcommon "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/vtenv"

"vitess.io/vitess/go/ptr"
"vitess.io/vitess/go/sets"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
Expand All @@ -54,6 +53,7 @@ import (
"vitess.io/vitess/go/vt/vtadmin/rbac"
"vitess.io/vitess/go/vt/vtadmin/sort"
"vitess.io/vitess/go/vt/vtadmin/vtadminproto"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtexplain"

Expand Down Expand Up @@ -1717,12 +1717,11 @@ func (api *API) StartWorkflow(ctx context.Context, req *vtadminpb.StartWorkflowR
return nil, err
}

running := binlogdatapb.VReplicationWorkflowState_Running
return c.Vtctld.WorkflowUpdate(ctx, &vtctldatapb.WorkflowUpdateRequest{
Keyspace: req.Keyspace,
TabletRequest: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: req.Workflow,
State: &running,
State: ptr.Of(binlogdatapb.VReplicationWorkflowState_Running),
},
})
}
Expand All @@ -1745,12 +1744,11 @@ func (api *API) StopWorkflow(ctx context.Context, req *vtadminpb.StopWorkflowReq
return nil, nil
}

stopped := binlogdatapb.VReplicationWorkflowState_Stopped
return c.Vtctld.WorkflowUpdate(ctx, &vtctldatapb.WorkflowUpdateRequest{
Keyspace: req.Keyspace,
TabletRequest: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: req.Workflow,
State: &stopped,
State: ptr.Of(binlogdatapb.VReplicationWorkflowState_Stopped),
},
})
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ import (

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/ptr"

"vitess.io/vitess/go/cmd/vtctldclient/cli"
"vitess.io/vitess/go/flagutil"
Expand Down Expand Up @@ -3805,7 +3806,7 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag
}
}
} else {
tabletTypes = []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}
tabletTypes = textutil.SimulatedNullTabletTypeSlice
}
onddl := int32(textutil.SimulatedNullInt) // To signify no value has been provided
if subFlags.Lookup("on-ddl").Changed { // Validate the provided value
Expand All @@ -3830,8 +3831,7 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag
TabletSelectionPreference: &tsp,
}
if onddl != int32(textutil.SimulatedNullInt) {
v := binlogdatapb.OnDDLAction(onddl)
rpcReq.(*tabletmanagerdatapb.UpdateVReplicationWorkflowRequest).OnDdl = &v
rpcReq.(*tabletmanagerdatapb.UpdateVReplicationWorkflowRequest).OnDdl = ptr.Of(binlogdatapb.OnDDLAction(onddl))
}
}
results, err = wr.WorkflowAction(ctx, workflow, keyspace, action, *dryRun, rpcReq, *shards) // Only update currently uses the new RPC path
Expand Down
11 changes: 4 additions & 7 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"vitess.io/vitess/go/ptr"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/concurrency"
Expand All @@ -42,7 +43,6 @@ import (

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)
Expand Down Expand Up @@ -495,15 +495,12 @@ func (mz *materializer) startStreams(ctx context.Context) error {
if err != nil {
return vterrors.Wrapf(err, "GetTablet(%v) failed", target.PrimaryAlias)
}
running := binlogdatapb.VReplicationWorkflowState_Running
if _, err := mz.tmc.UpdateVReplicationWorkflow(ctx, targetPrimary.Tablet, &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: mz.ms.Workflow,
State: &running,
State: ptr.Of(binlogdatapb.VReplicationWorkflowState_Running),
// Don't change anything else, so pass simulated NULLs.
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: []topodatapb.TabletType{
topodatapb.TabletType(textutil.SimulatedNullInt),
},
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: textutil.SimulatedNullTabletTypeSlice,
}); err != nil {
return vterrors.Wrap(err, "failed to update workflow")
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/workflow/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"vitess.io/vitess/go/ptr"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
Expand Down Expand Up @@ -322,7 +323,6 @@ func (rs *resharder) createStreams(ctx context.Context) error {
}

func (rs *resharder) startStreams(ctx context.Context) error {
running := binlogdatapb.VReplicationWorkflowState_Running
err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error {
targetPrimary := rs.targetPrimaries[target.ShardName()]
// This is the rare case where we truly want to update every stream/record
Expand All @@ -331,7 +331,7 @@ func (rs *resharder) startStreams(ctx context.Context) error {
// that we've created on the new shards as we're migrating them.
req := &tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest{
AllWorkflows: true,
State: &running,
State: ptr.Of(binlogdatapb.VReplicationWorkflowState_Running),
}
if _, err := rs.s.tmc.UpdateVReplicationWorkflows(ctx, targetPrimary.Tablet, req); err != nil {
return vterrors.Wrapf(err, "UpdateVReplicationWorkflows(%v, 'state='%s')",
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/rpc_vreplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,8 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta
source := row.AsBytes("source", []byte{})
state := row.AsString("state", "")
message := row.AsString("message", "")
if req.State != nil && *req.State == binlogdatapb.VReplicationWorkflowState_Running && strings.ToUpper(message) == workflow.Frozen {
if req.State != nil && *req.State == binlogdatapb.VReplicationWorkflowState_Running &&
strings.ToUpper(message) == workflow.Frozen {
return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{Result: nil},
vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "cannot start a workflow when it is frozen")
}
Expand Down
11 changes: 5 additions & 6 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/ptr"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
Expand Down Expand Up @@ -621,7 +622,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
Cells: []string{"zone3"},
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}, // So keep the current value of replica
TabletTypes: textutil.SimulatedNullTabletTypeSlice, // So keep the current value of replica
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, "zone3", tabletTypes[0], vreplID),
Expand Down Expand Up @@ -672,7 +673,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
Workflow: workflow,
State: &stopped,
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
TabletTypes: textutil.SimulatedNullTabletTypeSlice,
},
query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
binlogdatapb.VReplicationWorkflowState_Stopped.String(), keyspace, shard, cells[0], tabletTypes[0], vreplID),
Expand All @@ -683,7 +684,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
Workflow: workflow,
State: &running,
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
TabletTypes: textutil.SimulatedNullTabletTypeSlice,
},
isCopying: true,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Copying', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
Expand Down Expand Up @@ -743,8 +744,6 @@ func TestUpdateVReplicationWorkflows(t *testing.T) {
tablet := tenv.addTablet(t, tabletUID, keyspace, shard)
defer tenv.deleteTablet(tablet.tablet)

hi := "hi"

tests := []struct {
name string
request *tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest
Expand All @@ -771,7 +770,7 @@ func TestUpdateVReplicationWorkflows(t *testing.T) {
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest{
AllWorkflows: true,
State: &running,
Message: &hi,
Message: ptr.Of("hi"),
StopPosition: &position,
},
query: fmt.Sprintf(`update /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ _vt.vreplication set state = 'Running', message = 'hi', stop_pos = '%s' where id in (%s)`, position, strings.Join(vreplIDs, ", ")),
Expand Down
6 changes: 3 additions & 3 deletions go/vt/wrangler/vexec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,13 +409,13 @@ func TestWorkflowUpdate(t *testing.T) {
{
name: "no flags",
cells: nullSlice,
tabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
tabletTypes: textutil.SimulatedNullTabletTypeSlice,
wantErr: "no updates were provided; use --cells, --tablet-types, or --on-ddl to specify new values",
},
{
name: "only cells",
cells: []string{"zone1"},
tabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
tabletTypes: textutil.SimulatedNullTabletTypeSlice,
output: "The following workflow fields will be updated:\n cells=\"zone1\"\nOn the following tablets in the target keyspace for workflow wrWorkflow:\n zone1-0000000200 (target/-80)\n zone1-0000000210 (target/80-)\n",
},
{
Expand All @@ -427,7 +427,7 @@ func TestWorkflowUpdate(t *testing.T) {
{
name: "only on-ddl",
cells: nullSlice,
tabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
tabletTypes: textutil.SimulatedNullTabletTypeSlice,
onDDL: binlogdatapb.OnDDLAction_EXEC_IGNORE,
output: "The following workflow fields will be updated:\n on_ddl=\"EXEC_IGNORE\"\nOn the following tablets in the target keyspace for workflow wrWorkflow:\n zone1-0000000200 (target/-80)\n zone1-0000000210 (target/80-)\n",
},
Expand Down

0 comments on commit 5e3054b

Please sign in to comment.