Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
systay committed Sep 12, 2024
1 parent 6994eb4 commit f76ae88
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 13 deletions.
13 changes: 6 additions & 7 deletions go/test/endtoend/vtgate/queries/aggregation/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package aggregation
import (
_ "embed"
"flag"
"fmt"
"os"
"testing"
"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/test/endtoend/utils"

Expand Down Expand Up @@ -54,7 +54,7 @@ func TestMain(m *testing.M) {
// Start topo server
err := clusterInstance.StartTopo()
if err != nil {
return 1
log.Fatalf("topo err: %v", err.Error())
}

// Start keyspace
Expand All @@ -63,27 +63,26 @@ func TestMain(m *testing.M) {
SchemaSQL: schemaSQL,
VSchema: vschema,
}
clusterInstance.VtGateExtraArgs = []string{"--schema_change_signal"}
clusterInstance.VtGateExtraArgs = []string{"--schema_change_signal", "--log_operator_traffic=true"}
clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-schema-change-signal"}
err = clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 0, false)
if err != nil {
return 1
log.Fatalf("Error starting keyspace: %v", err.Error())
}

clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--enable_system_settings=true")
// Start vtgate
err = clusterInstance.StartVtgate()
if err != nil {
return 1
log.Fatalf("Error starting vtgate: %v", err.Error())
}

vtParams = clusterInstance.GetVTParams(keyspaceName)

// create mysql instance and connection parameters
conn, closer, err := utils.NewMySQL(clusterInstance, keyspaceName, schemaSQL)
if err != nil {
fmt.Println(err)
return 1
log.Fatalf("Error creating mysql instance: %v", err)
}
defer closer()
mysqlParams = conn
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func NewExecutor(
plans: plans,
warmingReadsPercent: warmingReadsPercent,
warmingReadsChannel: make(chan bool, warmingReadsConcurrency),
logOperatorTraffic: logOperatorTraffic,
}

vschemaacl.Init()
Expand Down
39 changes: 33 additions & 6 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,16 +516,22 @@ func (vc *vcursorImpl) ExecutePrimitive(ctx context.Context, primitive engine.Pr
if err != nil && vterrors.RootCause(err) == buffer.ShardMissingError {
continue
}
if vc.logOperatorTraffic {
stats := vc.logStats.PrimitiveStats[int(primitive.GetID())]
stats.NoOfCalls++
stats.Rows = append(stats.Rows, len(res.Rows))
}
vc.logOpTraffic(primitive, res)
return res, err
}
return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "upstream shards are not available")
}

func (vc *vcursorImpl) logOpTraffic(primitive engine.Primitive, res *sqltypes.Result) {
//if vc.logOperatorTraffic {
key := int(primitive.GetID())
stats := vc.logStats.PrimitiveStats[key]
stats.NoOfCalls++
stats.Rows = append(stats.Rows, len(res.Rows))
vc.logStats.PrimitiveStats[key] = stats
//}
}

func (vc *vcursorImpl) ExecutePrimitiveStandalone(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
// clone the vcursorImpl with a new session.
newVC := vc.cloneWithAutocommitSession()
Expand All @@ -534,12 +540,26 @@ func (vc *vcursorImpl) ExecutePrimitiveStandalone(ctx context.Context, primitive
if err != nil && vterrors.RootCause(err) == buffer.ShardMissingError {
continue
}
vc.logOpTraffic(primitive, res)
return res, err
}
return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "upstream shards are not available")
}

func (vc *vcursorImpl) wrapCallback(callback func(*sqltypes.Result) error, primitive engine.Primitive) func(*sqltypes.Result) error {
if !vc.logOperatorTraffic {
return callback
}

return func(result *sqltypes.Result) error {
vc.logOpTraffic(primitive, result)
return callback(result)
}
}

func (vc *vcursorImpl) StreamExecutePrimitive(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
callback = vc.wrapCallback(callback, primitive)

for try := 0; try < MaxBufferingRetries; try++ {
err := primitive.TryStreamExecute(ctx, vc, bindVars, wantfields, callback)
if err != nil && vterrors.RootCause(err) == buffer.ShardMissingError {
Expand All @@ -551,6 +571,8 @@ func (vc *vcursorImpl) StreamExecutePrimitive(ctx context.Context, primitive eng
}

func (vc *vcursorImpl) StreamExecutePrimitiveStandalone(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(result *sqltypes.Result) error) error {
callback = vc.wrapCallback(callback, primitive)

// clone the vcursorImpl with a new session.
newVC := vc.cloneWithAutocommitSession()
for try := 0; try < MaxBufferingRetries; try++ {
Expand Down Expand Up @@ -617,12 +639,14 @@ func (vc *vcursorImpl) ExecuteMultiShard(ctx context.Context, primitive engine.P

qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, canAutocommit, vc.ignoreMaxMemoryRows, vc.resultsObserver)
vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError)

vc.logOpTraffic(primitive, qr)
return qr, errs
}

// StreamExecuteMulti is the streaming version of ExecuteMultiShard.
func (vc *vcursorImpl) StreamExecuteMulti(ctx context.Context, primitive engine.Primitive, query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, rollbackOnError bool, autocommit bool, callback func(reply *sqltypes.Result) error) []error {
callback = vc.wrapCallback(callback, primitive)

noOfShards := len(rss)
atomic.AddUint64(&vc.logStats.ShardQueries, uint64(noOfShards))
err := vc.markSavepoint(ctx, rollbackOnError && (noOfShards > 1), map[string]*querypb.BindVariable{})
Expand Down Expand Up @@ -654,6 +678,7 @@ func (vc *vcursorImpl) ExecuteStandalone(ctx context.Context, primitive engine.P
// The autocommit flag is always set to false because we currently don't
// execute DMLs through ExecuteStandalone.
qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, bqs, NewAutocommitSession(vc.safeSession.Session), false /* autocommit */, vc.ignoreMaxMemoryRows, vc.resultsObserver)
vc.logOpTraffic(primitive, qr)
return qr, vterrors.Aggregate(errs)
}

Expand Down Expand Up @@ -1432,6 +1457,7 @@ func (vc *vcursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso
warnings: vc.warnings,
pv: vc.pv,
resultsObserver: nullResultsObserver{},
logOperatorTraffic: vc.logOperatorTraffic,
}

v.marginComments.Trailing += "/* warming read */"
Expand Down Expand Up @@ -1464,6 +1490,7 @@ func (vc *vcursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor {
warnings: vc.warnings,
pv: vc.pv,
resultsObserver: nullResultsObserver{},
logOperatorTraffic: vc.logOperatorTraffic,
}

v.marginComments.Trailing += "/* mirror query */"
Expand Down

0 comments on commit f76ae88

Please sign in to comment.