Skip to content

Commit

Permalink
Optimize the GetWorkflows RPC (#14212)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Oct 10, 2023
1 parent aacb1f5 commit fa4c980
Show file tree
Hide file tree
Showing 15 changed files with 1,259 additions and 1,128 deletions.
10 changes: 8 additions & 2 deletions go/cmd/vtctldclient/command/vreplication/common/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

var showOptions = struct {
IncludeLogs bool
}{}

func GetShowCommand(opts *SubCommandsOpts) *cobra.Command {
cmd := &cobra.Command{
Use: "show",
Expand All @@ -36,15 +40,17 @@ func GetShowCommand(opts *SubCommandsOpts) *cobra.Command {
Args: cobra.NoArgs,
RunE: commandShow,
}
cmd.Flags().BoolVar(&showOptions.IncludeLogs, "include-logs", true, "Include recent logs for the workflow.")
return cmd
}

func commandShow(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.GetWorkflowsRequest{
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
IncludeLogs: showOptions.IncludeLogs,
}
resp, err := GetClient().GetWorkflows(GetCommandCtx(), req)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions go/cmd/vtctldclient/command/vreplication/workflow/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func commandGetWorkflows(cmd *cobra.Command, args []string) error {
ks := cmd.Flags().Arg(0)

resp, err := common.GetClient().GetWorkflows(common.GetCommandCtx(), &vtctldatapb.GetWorkflowsRequest{
Keyspace: ks,
ActiveOnly: !getWorkflowsOptions.ShowAll,
Keyspace: ks,
ActiveOnly: !getWorkflowsOptions.ShowAll,
IncludeLogs: workflowShowOptions.IncludeLogs,
})

if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions go/cmd/vtctldclient/command/vreplication/workflow/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func commandShow(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.GetWorkflowsRequest{
Keyspace: baseOptions.Keyspace,
Workflow: baseOptions.Workflow,
Keyspace: baseOptions.Keyspace,
Workflow: baseOptions.Workflow,
IncludeLogs: workflowShowOptions.IncludeLogs,
}
resp, err := common.GetClient().GetWorkflows(common.GetCommandCtx(), req)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,18 @@ var (
Keyspace string
Workflow string
}{}

workflowShowOptions = struct {
IncludeLogs bool
}{}
)

func registerCommands(root *cobra.Command) {
base.PersistentFlags().StringVarP(&baseOptions.Keyspace, "keyspace", "k", "", "Keyspace context for the workflow.")
base.MarkPersistentFlagRequired("keyspace")
root.AddCommand(base)

getWorkflows.Flags().BoolVar(&workflowShowOptions.IncludeLogs, "include-logs", true, "Include recent logs for the workflows.")
getWorkflows.Flags().BoolVarP(&getWorkflowsOptions.ShowAll, "show-all", "a", false, "Show all workflows instead of just active workflows.")
root.AddCommand(getWorkflows) // Yes this is supposed to be root as GetWorkflows is a top-level command.

Expand All @@ -60,6 +65,7 @@ func registerCommands(root *cobra.Command) {

show.Flags().StringVarP(&baseOptions.Workflow, "workflow", "w", "", "The workflow you want the details for.")
show.MarkFlagRequired("workflow")
show.Flags().BoolVar(&workflowShowOptions.IncludeLogs, "include-logs", true, "Include recent logs for the workflow.")
base.AddCommand(show)

start.Flags().StringVarP(&baseOptions.Workflow, "workflow", "w", "", "The workflow you want to start.")
Expand Down
9 changes: 6 additions & 3 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,13 @@ func waitForNoWorkflowLag(t *testing.T, vc *VitessCluster, keyspace, worfklow st
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "--", ksWorkflow, "show")
require.NoError(t, err)
lag, err = jsonparser.GetInt([]byte(output), "MaxVReplicationTransactionLag")
// We don't need log records for this so pass --include-logs=false.
output, err := vc.VtctldClient.ExecuteCommandWithOutput("workflow", "--keyspace", keyspace, "show", "--workflow", worfklow, "--include-logs=false")
require.NoError(t, err)
// Confirm that we got no log records back.
require.NotEmpty(t, len(gjson.Get(output, "workflows.0.shard_streams.*.streams.0").String()), "workflow %q had no streams listed in the output: %s", ksWorkflow, output)
require.Equal(t, 0, len(gjson.Get(output, "workflows.0.shard_streams.*.streams.0.logs").Array()), "workflow %q returned log records when we expected none", ksWorkflow)
lag = gjson.Get(output, "workflows.0.max_v_replication_lag").Int()
if lag == 0 {
return
}
Expand Down
2,202 changes: 1,106 additions & 1,096 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

42 changes: 38 additions & 4 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion go/vt/sidecardb/schema/vreplication/vreplication_log.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ CREATE TABLE IF NOT EXISTS vreplication_log
`updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`message` text NOT NULL,
`count` bigint NOT NULL DEFAULT '1',
PRIMARY KEY (`id`)
PRIMARY KEY (`id`),
KEY `vrepl_id_idx` (`vrepl_id`)
) ENGINE = InnoDB
5 changes: 3 additions & 2 deletions go/vt/vtadmin/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,8 +731,9 @@ func (c *Cluster) findWorkflows(ctx context.Context, keyspaces []string, opts Fi
}

resp, err := c.Vtctld.GetWorkflows(ctx, &vtctldatapb.GetWorkflowsRequest{
Keyspace: ks,
ActiveOnly: opts.ActiveOnly,
Keyspace: ks,
ActiveOnly: opts.ActiveOnly,
IncludeLogs: true,
})
c.workflowReadPool.Release()

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2196,6 +2196,7 @@ func (s *VtctldServer) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWor

span.Annotate("keyspace", req.Keyspace)
span.Annotate("active_only", req.ActiveOnly)
span.Annotate("include_logs", req.IncludeLogs)

resp, err = s.ws.GetWorkflows(ctx, req)
return resp, err
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func TestMoveTablesDDLFlag(t *testing.T) {
require.NoError(t, err)
sourceShard, err := env.topoServ.GetShardNames(ctx, ms.SourceKeyspace)
require.NoError(t, err)
want := fmt.Sprintf("shard_streams:{key:\"%s/%s\" value:{streams:{id:1 tablet:{cell:\"%s\" uid:200} source_shard:\"%s/%s\" position:\"MySQL56/9d10e6ec-07a0-11ee-ae73-8e53f4cf3083:1-97\" status:\"running\" info:\"VStream Lag: 0s\"}}}",
want := fmt.Sprintf("shard_streams:{key:\"%s/%s\" value:{streams:{id:1 tablet:{cell:\"%s\" uid:200} source_shard:\"%s/%s\" position:\"9d10e6ec-07a0-11ee-ae73-8e53f4cf3083:1-97\" status:\"running\" info:\"VStream Lag: 0s\"}}}",
ms.TargetKeyspace, targetShard[0], env.cell, ms.SourceKeyspace, sourceShard[0])

res, err := env.ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{
Expand Down Expand Up @@ -568,7 +568,7 @@ func TestMoveTablesNoRoutingRules(t *testing.T) {
require.NoError(t, err)
sourceShard, err := env.topoServ.GetShardNames(ctx, ms.SourceKeyspace)
require.NoError(t, err)
want := fmt.Sprintf("shard_streams:{key:\"%s/%s\" value:{streams:{id:1 tablet:{cell:\"%s\" uid:200} source_shard:\"%s/%s\" position:\"MySQL56/9d10e6ec-07a0-11ee-ae73-8e53f4cf3083:1-97\" status:\"running\" info:\"VStream Lag: 0s\"}}}",
want := fmt.Sprintf("shard_streams:{key:\"%s/%s\" value:{streams:{id:1 tablet:{cell:\"%s\" uid:200} source_shard:\"%s/%s\" position:\"9d10e6ec-07a0-11ee-ae73-8e53f4cf3083:1-97\" status:\"running\" info:\"VStream Lag: 0s\"}}}",
ms.TargetKeyspace, targetShard[0], env.cell, ms.SourceKeyspace, sourceShard[0])

res, err := env.ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{
Expand Down
65 changes: 51 additions & 14 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
Expand Down Expand Up @@ -336,10 +337,11 @@ func (s *Server) GetCellsWithTableReadsSwitched(
return cellsSwitched, cellsNotSwitched, nil
}

func (s *Server) GetWorkflow(ctx context.Context, keyspace, workflow string) (*vtctldatapb.Workflow, error) {
func (s *Server) GetWorkflow(ctx context.Context, keyspace, workflow string, includeLogs bool) (*vtctldatapb.Workflow, error) {
res, err := s.GetWorkflows(ctx, &vtctldatapb.GetWorkflowsRequest{
Keyspace: keyspace,
Workflow: workflow,
Keyspace: keyspace,
Workflow: workflow,
IncludeLogs: includeLogs,
})
if err != nil {
return nil, err
Expand All @@ -364,6 +366,7 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows

span.Annotate("keyspace", req.Keyspace)
span.Annotate("active_only", req.ActiveOnly)
span.Annotate("include_logs", req.IncludeLogs)

where := ""
predicates := []string{}
Expand Down Expand Up @@ -444,7 +447,22 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
return err
}

pos := row["pos"].ToString()
// The value in the pos column can be compressed and thus not
// have a valid GTID consisting of valid UTF-8 characters so we
// have to decode it so that it's properly decompressed first
// when needed.
pos, err := row.ToString("pos")
if err != nil {
return err
}
if pos != "" {
mpos, err := binlogplayer.DecodePosition(pos)
if err != nil {
return err
}
pos = mpos.String()
}

stopPos := row["stop_pos"].ToString()
state := row["state"].ToString()
dbName := row["db_name"].ToString()
Expand Down Expand Up @@ -627,20 +645,37 @@ SELECT
count
FROM
_vt.vreplication_log
WHERE vrepl_id IN %a
ORDER BY
vrepl_id ASC,
id ASC
`)
)

fetchStreamLogs := func(ctx context.Context, workflow *vtctldatapb.Workflow) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.scanWorkflow")
span, ctx := trace.NewSpan(ctx, "workflow.Server.fetchStreamLogs")
defer span.Finish()

span.Annotate("keyspace", req.Keyspace)
span.Annotate("workflow", workflow.Name)

results, err := vx.WithWorkflow(workflow.Name).QueryContext(ctx, vrepLogQuery)
vreplIDs := make([]int64, 0, len(workflow.ShardStreams))
for _, shardStream := range maps.Values(workflow.ShardStreams) {
for _, stream := range shardStream.Streams {
vreplIDs = append(vreplIDs, stream.Id)
}
}
idsBV, err := sqltypes.BuildBindVariable(vreplIDs)
if err != nil {
return
}

query, err := sqlparser.ParseAndBind(vrepLogQuery, idsBV)
if err != nil {
return
}

results, err := vx.WithWorkflow(workflow.Name).QueryContext(ctx, query)
if err != nil {
// Note that we do not return here. If there are any query results
// in the map (i.e. some tablets returned successfully), we will
Expand Down Expand Up @@ -800,12 +835,14 @@ ORDER BY

workflows = append(workflows, workflow)

// Fetch logs for all streams associated with this workflow in the background.
fetchLogsWG.Add(1)
go func(ctx context.Context, workflow *vtctldatapb.Workflow) {
defer fetchLogsWG.Done()
fetchStreamLogs(ctx, workflow)
}(ctx, workflow)
if req.IncludeLogs {
// Fetch logs for all streams associated with this workflow in the background.
fetchLogsWG.Add(1)
go func(ctx context.Context, workflow *vtctldatapb.Workflow) {
defer fetchLogsWG.Done()
fetchStreamLogs(ctx, workflow)
}(ctx, workflow)
}
}

// Wait for all the log fetchers to finish.
Expand Down Expand Up @@ -1802,7 +1839,7 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt
}
}

workflow, err := s.GetWorkflow(ctx, req.Keyspace, req.Workflow)
workflow, err := s.GetWorkflow(ctx, req.Keyspace, req.Workflow, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3117,7 +3154,7 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, state *Stat
log.Infof("writes already switched no need to check lag")
return "", nil
}
wf, err := s.GetWorkflow(ctx, state.TargetKeyspace, state.Workflow)
wf, err := s.GetWorkflow(ctx, state.TargetKeyspace, state.Workflow, false)
if err != nil {
return "", err
}
Expand Down
1 change: 1 addition & 0 deletions proto/vtctldata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,7 @@ message GetWorkflowsRequest {
bool name_only = 3;
// If you only want a specific workflow then set this field.
string workflow = 4;
bool include_logs = 5;
}

message GetWorkflowsResponse {
Expand Down
6 changes: 6 additions & 0 deletions web/vtadmin/src/proto/vtadmin.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit fa4c980

Please sign in to comment.