Skip to content

Commit

Permalink
Query serving: incorporate mirror query results in log stats, fix mir…
Browse files Browse the repository at this point in the history
…ror query max lag bug (vitessio#16879)

Signed-off-by: Max Englander <[email protected]>
  • Loading branch information
maxenglander authored Oct 6, 2024
1 parent 8dd1083 commit b6be2ee
Show file tree
Hide file tree
Showing 8 changed files with 370 additions and 79 deletions.
17 changes: 14 additions & 3 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,10 @@ type loggingVCursor struct {

parser *sqlparser.Parser

handleMirrorClonesFn func(context.Context) VCursor
onMirrorClonesFn func(context.Context) VCursor
onExecuteMultiShardFn func(context.Context, Primitive, []*srvtopo.ResolvedShard, []*querypb.BoundQuery, bool, bool)
onStreamExecuteMultiFn func(context.Context, Primitive, string, []*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, bool, bool, func(*sqltypes.Result) error)
onRecordMirrorStatsFn func(time.Duration, time.Duration, error)
}

func (f *loggingVCursor) HasCreatedTempTable() {
Expand Down Expand Up @@ -564,8 +565,8 @@ func (f *loggingVCursor) CloneForReplicaWarming(ctx context.Context) VCursor {
}

func (f *loggingVCursor) CloneForMirroring(ctx context.Context) VCursor {
if f.handleMirrorClonesFn != nil {
return f.handleMirrorClonesFn(ctx)
if f.onMirrorClonesFn != nil {
return f.onMirrorClonesFn(ctx)
}
panic("no mirror clones available")
}
Expand Down Expand Up @@ -886,6 +887,12 @@ func (t *loggingVCursor) SQLParser() *sqlparser.Parser {
return t.parser
}

func (t *loggingVCursor) RecordMirrorStats(sourceExecTime, targetExecTime time.Duration, targetErr error) {
if t.onRecordMirrorStatsFn != nil {
t.onRecordMirrorStatsFn(sourceExecTime, targetExecTime, targetErr)
}
}

func (t *noopVCursor) VExplainLogging() {}
func (t *noopVCursor) DisableLogging() {}
func (t *noopVCursor) GetVExplainLogs() []ExecuteEntry {
Expand All @@ -896,6 +903,10 @@ func (t *noopVCursor) GetLogs() ([]ExecuteEntry, error) {
return nil, nil
}

// RecordMirrorStats implements VCursor.
func (t *noopVCursor) RecordMirrorStats(sourceExecTime, targetExecTime time.Duration, targetErr error) {
}

func expectResult(t *testing.T, result, want *sqltypes.Result) {
t.Helper()
fieldsResult := fmt.Sprintf("%v", result.Fields)
Expand Down
87 changes: 65 additions & 22 deletions go/vt/vtgate/engine/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ import (

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
)

var errMirrorTargetQueryTookTooLong = vterrors.Errorf(vtrpc.Code_ABORTED, "Mirror target query took too long")

type (
// percentBasedMirror represents the instructions to execute an
// authoritative primitive and, based on whether a die-roll exceeds a
Expand All @@ -34,6 +38,11 @@ type (
primitive Primitive
target Primitive
}

mirrorResult struct {
execTime time.Duration
err error
}
)

const (
Expand Down Expand Up @@ -74,26 +83,44 @@ func (m *percentBasedMirror) TryExecute(ctx context.Context, vcursor VCursor, bi
return vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields)
}

mirrorCh := make(chan any)
mirrorCtx, mirrorCtxCancel := context.WithTimeout(ctx, maxMirrorTargetLag)
mirrorCh := make(chan mirrorResult, 1)
mirrorCtx, mirrorCtxCancel := context.WithCancel(ctx)
defer mirrorCtxCancel()

go func() {
defer close(mirrorCh)
mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx)
// TODO(maxeng) handle error.
_, _ = mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields)
targetStartTime := time.Now()
_, targetErr := mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields)
mirrorCh <- mirrorResult{
execTime: time.Since(targetStartTime),
err: targetErr,
}
}()

var (
sourceExecTime, targetExecTime time.Duration
targetErr error
)

sourceStartTime := time.Now()
r, err := vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields)
sourceExecTime = time.Since(sourceStartTime)

// Cancel the mirror context if it continues executing too long.
select {
case <-mirrorCh:
// Mirroring completed within the allowed time.
case <-mirrorCtx.Done():
// Mirroring took too long and was canceled.
case r := <-mirrorCh:
// Mirror target finished on time.
targetExecTime = r.execTime
targetErr = r.err
case <-time.After(maxMirrorTargetLag):
// Mirror target took too long.
mirrorCtxCancel()
targetExecTime = sourceExecTime + maxMirrorTargetLag
targetErr = errMirrorTargetQueryTookTooLong
}

vcursor.RecordMirrorStats(sourceExecTime, targetExecTime, targetErr)

return r, err
}

Expand All @@ -102,30 +129,46 @@ func (m *percentBasedMirror) TryStreamExecute(ctx context.Context, vcursor VCurs
return vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback)
}

mirrorCh := make(chan any)
mirrorCtx, mirrorCtxCancel := context.WithTimeout(ctx, maxMirrorTargetLag)
mirrorCh := make(chan mirrorResult, 1)
mirrorCtx, mirrorCtxCancel := context.WithCancel(ctx)
defer mirrorCtxCancel()

go func() {
defer close(mirrorCh)
mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx)
// TODO(maxeng) handle error.
_ = mirrorVCursor.StreamExecutePrimitive(
mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result,
) error {
return nil
})
mirrorStartTime := time.Now()
targetErr := mirrorVCursor.StreamExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result) error {
return nil
})
mirrorCh <- mirrorResult{
execTime: time.Since(mirrorStartTime),
err: targetErr,
}
}()

var (
sourceExecTime, targetExecTime time.Duration
targetErr error
)

sourceStartTime := time.Now()
err := vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback)
sourceExecTime = time.Since(sourceStartTime)

// Cancel the mirror context if it continues executing too long.
select {
case <-mirrorCh:
// Mirroring completed within the allowed time.
case <-mirrorCtx.Done():
// Mirroring took too long and was canceled.
case r := <-mirrorCh:
// Mirror target finished on time.
targetExecTime = r.execTime
targetErr = r.err
case <-time.After(maxMirrorTargetLag):
// Mirror target took too long.
mirrorCtxCancel()
targetExecTime = sourceExecTime + maxMirrorTargetLag
targetErr = errMirrorTargetQueryTookTooLong
}

vcursor.RecordMirrorStats(sourceExecTime, targetExecTime, targetErr)

return err
}

Expand Down
Loading

0 comments on commit b6be2ee

Please sign in to comment.