Skip to content

Commit

Permalink
VTGate MoveTables Buffering: Fix panic when buffering is disabled (vi…
Browse files Browse the repository at this point in the history
…tessio#16922)

Signed-off-by: Rohit Nayak <[email protected]>
Signed-off-by: Harshit Gangal <[email protected]>
Co-authored-by: Harshit Gangal <[email protected]>
  • Loading branch information
rohit-nayak-ps and harshit-gangal authored Oct 11, 2024
1 parent d209847 commit f0062e6
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
14 changes: 14 additions & 0 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,20 @@ func TestExecutorUnrecognized(t *testing.T) {
require.Error(t, err, "unrecognized statement: invalid statement'")
}

func TestExecutorDeniedErrorNoBuffer(t *testing.T) {
executor, sbc1, _, _, ctx := createExecutorEnv(t)
sbc1.EphemeralShardErr = errors.New("enforce denied tables")

vschemaWaitTimeout = 500 * time.Millisecond

session := NewAutocommitSession(&vtgatepb.Session{TargetString: "@primary"})
startExec := time.Now()
_, err := executor.Execute(ctx, nil, "TestExecutorDeniedErrorNoBuffer", session, "select * from user", nil)
require.NoError(t, err, "enforce denied tables not buffered")
endExec := time.Now()
require.GreaterOrEqual(t, endExec.Sub(startExec).Milliseconds(), int64(500))
}

// TestVSchemaStats makes sure the building and displaying of the
// VSchemaStats works.
func TestVSchemaStats(t *testing.T) {
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
type planExec func(ctx context.Context, plan *engine.Plan, vc *vcursorImpl, bindVars map[string]*querypb.BindVariable, startTime time.Time) error
type txResult func(sqlparser.StatementType, *sqltypes.Result) error

var vschemaWaitTimeout = 30 * time.Second

func waitForNewerVSchema(ctx context.Context, e *Executor, lastVSchemaCreated time.Time, timeout time.Duration) bool {
pollingInterval := 10 * time.Millisecond
waitCtx, cancel := context.WithTimeout(ctx, timeout)
Expand Down Expand Up @@ -104,7 +106,10 @@ func (e *Executor) newExecute(
// based on the buffering configuration. This way we should be able to perform the max retries
// within the given window of time for most queries and we should not end up waiting too long
// after the traffic switch fails or the buffer window has ended, retrying old queries.
timeout := e.resolver.scatterConn.gateway.buffer.GetConfig().MaxFailoverDuration / (MaxBufferingRetries - 1)
timeout := vschemaWaitTimeout
if e.resolver.scatterConn.gateway.buffer != nil && e.resolver.scatterConn.gateway.buffer.GetConfig() != nil {
timeout = e.resolver.scatterConn.gateway.buffer.GetConfig().MaxFailoverDuration / (MaxBufferingRetries - 1)
}
if waitForNewerVSchema(ctx, e, lastVSchemaCreated, timeout) {
vs = e.VSchema()
lastVSchemaCreated = vs.GetCreated()
Expand Down

0 comments on commit f0062e6

Please sign in to comment.