Skip to content

Commit

Permalink
removed all leaf level context timeout setting for non-streaming
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Sep 10, 2024
1 parent 5e1ec56 commit af6de49
Show file tree
Hide file tree
Showing 12 changed files with 34 additions and 62 deletions.
5 changes: 3 additions & 2 deletions go/test/endtoend/vtgate/queries/timeout/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ func TestMain(m *testing.M) {

clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--queryserver-config-max-result-size", "1000000",
"--queryserver-config-query-timeout", "200s",
"--queryserver-config-query-pool-timeout", "200s")
"--queryserver-config-query-timeout", "2s",
"--queryserver-config-transaction-timeout", "3s",
"--queryserver-config-query-pool-timeout", "2s")
// Start Unsharded keyspace
ukeyspace := &cluster.Keyspace{
Name: uks,
Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/vtgate/queries/timeout/timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func TestQueryTimeoutWithDual(t *testing.T) {
assert.Error(t, err)
_, err = utils.ExecAllowError(t, mcmp.VtConn, "select /*vt+ QUERY_TIMEOUT_MS=15 */ sleep(0.001) from dual")
assert.NoError(t, err)
_, err = utils.ExecAllowError(t, mcmp.VtConn, "select /*vt+ QUERY_TIMEOUT_MS=0 */ sleep(5) from dual")
assert.NoError(t, err)
}

func TestQueryTimeoutWithTables(t *testing.T) {
Expand Down
20 changes: 12 additions & 8 deletions go/vt/vtgate/engine/dbddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,12 @@ func (c *DBDDL) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[st
log.Errorf("'%s' database ddl plugin is not registered. Falling back to default plugin", name)
plugin = databaseCreatorPlugins[defaultDBDDLPlugin]
}
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, c.queryTimeout)
defer cancelFunc()

if c.queryTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(c.queryTimeout)*time.Millisecond)
defer cancel()
}

if c.create {
return c.createDatabase(ctx, vcursor, plugin)
Expand All @@ -125,9 +129,9 @@ func (c *DBDDL) createDatabase(ctx context.Context, vcursor VCursor, plugin DBDD
break
}
select {
case <-ctx.Done(): //context cancelled
case <-ctx.Done(): // context cancelled
return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "could not validate create database: destination not resolved")
case <-time.After(500 * time.Millisecond): //timeout
case <-time.After(500 * time.Millisecond): // timeout
}
}
var queries []*querypb.BoundQuery
Expand All @@ -146,9 +150,9 @@ func (c *DBDDL) createDatabase(ctx context.Context, vcursor VCursor, plugin DBDD
if err != nil {
noErr = false
select {
case <-ctx.Done(): //context cancelled
case <-ctx.Done(): // context cancelled
return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "could not validate create database: tablets not healthy")
case <-time.After(500 * time.Millisecond): //timeout
case <-time.After(500 * time.Millisecond): // timeout
}
break
}
Expand All @@ -167,9 +171,9 @@ func (c *DBDDL) dropDatabase(ctx context.Context, vcursor VCursor, plugin DBDDLP
}
for vcursor.KeyspaceAvailable(c.name) {
select {
case <-ctx.Done(): //context cancelled
case <-ctx.Done(): // context cancelled
return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "could not validate drop database: keyspace still available in vschema")
case <-time.After(500 * time.Millisecond): //timeout
case <-time.After(500 * time.Millisecond): // timeout
}
}

Expand Down
3 changes: 0 additions & 3 deletions go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ type Delete struct {

// TryExecute performs a non-streaming exec.
func (del *Delete) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, del.QueryTimeout)
defer cancelFunc()

rss, bvs, err := del.findRoute(ctx, vcursor, bindVars)
if err != nil {
return nil, err
Expand Down
4 changes: 0 additions & 4 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,6 @@ func (t *noopVCursor) SetClientFoundRows(context.Context, bool) error {
func (t *noopVCursor) SetQueryTimeout(maxExecutionTime int64) {
}

func (t *noopVCursor) GetQueryTimeout(queryTimeoutFromComments int) int {
return queryTimeoutFromComments
}

func (t *noopVCursor) SetSkipQueryPlanCache(context.Context, bool) error {
panic("implement me")
}
Expand Down
3 changes: 0 additions & 3 deletions go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,6 @@ func (ins *Insert) RouteType() string {

// TryExecute performs a non-streaming exec.
func (ins *Insert) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, ins.QueryTimeout)
defer cancelFunc()

switch ins.Opcode {
case InsertUnsharded:
return ins.insertIntoUnshardedTable(ctx, vcursor, bindVars)
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vtgate/engine/insert_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"strconv"
"sync"
"time"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
Expand Down Expand Up @@ -93,9 +94,6 @@ func (ins *InsertSelect) RouteType() string {

// TryExecute performs a non-streaming exec.
func (ins *InsertSelect) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, ins.QueryTimeout)
defer cancelFunc()

if ins.Keyspace.Sharded {
return ins.execInsertSharded(ctx, vcursor, bindVars)
}
Expand All @@ -111,8 +109,11 @@ func (ins *InsertSelect) TryStreamExecute(ctx context.Context, vcursor VCursor,
}
return callback(res)
}
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, ins.QueryTimeout)
defer cancelFunc()
if ins.QueryTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(ins.QueryTimeout)*time.Millisecond)
defer cancel()
}

sharded := ins.Keyspace.Sharded
output := &sqltypes.Result{}
Expand Down
3 changes: 0 additions & 3 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,6 @@ type (
// This is used to select the right shard session to perform the vindex lookup query.
SetCommitOrder(co vtgatepb.CommitOrder)

// GetQueryTimeout gets the query timeout and takes in the query timeout from comments
GetQueryTimeout(queryTimeoutFromComment int) int

// SetQueryTimeout sets the query timeout
SetQueryTimeout(queryTimeout int64)

Expand Down
11 changes: 0 additions & 11 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,24 +130,13 @@ func (route *Route) GetTableName() string {

// TryExecute performs a non-streaming exec.
func (route *Route) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, route.QueryTimeout)
defer cancelFunc()
qr, err := route.executeInternal(ctx, vcursor, bindVars, wantfields)
if err != nil {
return nil, err
}
return qr.Truncate(route.TruncateColumnCount), nil
}

// addQueryTimeout adds a query timeout to the context it receives and returns the modified context along with the cancel function.
func addQueryTimeout(ctx context.Context, vcursor VCursor, queryTimeout int) (context.Context, context.CancelFunc) {
timeout := vcursor.Session().GetQueryTimeout(queryTimeout)
if timeout > 0 {
return context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond)
}
return ctx, func() {}
}

type cxtKey int

const (
Expand Down
3 changes: 0 additions & 3 deletions go/vt/vtgate/engine/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ func (s *Send) GetTableName() string {

// TryExecute implements Primitive interface
func (s *Send) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, s.QueryTimeout)
defer cancelFunc()

rss, err := s.checkAndReturnShards(ctx, vcursor)
if err != nil {
return nil, err
Expand Down
3 changes: 0 additions & 3 deletions go/vt/vtgate/engine/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ type Update struct {

// TryExecute performs a non-streaming exec.
func (upd *Update) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, upd.QueryTimeout)
defer cancelFunc()

rss, bvs, err := upd.findRoute(ctx, vcursor, bindVars)
if err != nil {
return nil, err
Expand Down
28 changes: 11 additions & 17 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,22 +873,6 @@ func (vc *vcursorImpl) SetQueryTimeout(maxExecutionTime int64) {
vc.safeSession.QueryTimeout = maxExecutionTime
}

// GetQueryTimeout implements the SessionActions interface
// The priority of adding query timeouts -
// 1. Query timeout comment directive.
// 2. If the comment directive is unspecified, then we use the session setting.
// 3. If the comment directive and session settings is unspecified, then we use the global default specified by a flag.
func (vc *vcursorImpl) GetQueryTimeout(queryTimeoutFromComments int) int {
if queryTimeoutFromComments != 0 {
return queryTimeoutFromComments
}
sessionQueryTimeout := int(vc.safeSession.GetQueryTimeout())
if sessionQueryTimeout != 0 {
return sessionQueryTimeout
}
return queryTimeout
}

// SetClientFoundRows implements the SessionActions interface
func (vc *vcursorImpl) SetClientFoundRows(_ context.Context, clientFoundRows bool) error {
vc.safeSession.GetOrCreateOptions().ClientFoundRows = clientFoundRows
Expand Down Expand Up @@ -935,7 +919,7 @@ func (vc *vcursorImpl) SetExecQueryTimeout(timeout *int) {
var execTimeout *int
if timeout != nil {
execTimeout = timeout
} else if sessionTimeout := vc.GetQueryTimeout(0); sessionTimeout > 0 {
} else if sessionTimeout := vc.getQueryTimeout(); sessionTimeout > 0 {
execTimeout = &sessionTimeout
}

Expand All @@ -955,6 +939,16 @@ func (vc *vcursorImpl) SetExecQueryTimeout(timeout *int) {
}
}

// getQueryTimeout returns timeout based on the priority
// session setting > global default specified by a flag.
func (vc *vcursorImpl) getQueryTimeout() int {
sessionQueryTimeout := int(vc.safeSession.GetQueryTimeout())
if sessionQueryTimeout != 0 {
return sessionQueryTimeout
}
return queryTimeout
}

// SetConsolidator implements the SessionActions interface
func (vc *vcursorImpl) SetConsolidator(consolidator querypb.ExecuteOptions_Consolidator) {
// Avoid creating session Options when they do not yet exist and the
Expand Down

0 comments on commit af6de49

Please sign in to comment.