Skip to content

Commit

Permalink
boost: load query filters configs and check for boost
Browse files Browse the repository at this point in the history
boost: cleanup
  • Loading branch information
benetis committed Mar 27, 2024
1 parent bb85bb2 commit fe4f52b
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 57 deletions.
4 changes: 3 additions & 1 deletion go/cmd/vtcombo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ var (

mysqlPort = flag.Int("mysql_port", 3306, "mysql port")

configPath = flag.String("boost_configs_path", "./boost_query_configs.yaml", "path to the boost query configurations file")

ts *topo.Server
resilientServer *srvtopo.ResilientServer
)
Expand Down Expand Up @@ -214,7 +216,7 @@ func main() {
vtgate.QueryLogHandler = "/debug/vtgate/querylog"
vtgate.QueryLogzHandler = "/debug/vtgate/querylogz"
vtgate.QueryzHandler = "/debug/vtgate/queryz"
vtg := vtgate.Init(context.Background(), resilientServer, tpb.Cells[0], tabletTypesToWait)
vtg := vtgate.Init(context.Background(), resilientServer, tpb.Cells[0], tabletTypesToWait, configPath)

// vtctld configuration and init
vtctld.InitVtctld(ts)
Expand Down
4 changes: 3 additions & 1 deletion go/cmd/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ var (
tabletTypesToWait = flag.String("tablet_types_to_wait", "", "wait till connected for specified tablet types during Gateway initialization")
)

var configPath = flag.String("boost_configs_path", "./boost_query_configs.yaml", "path to the boost query configurations file")

var resilientServer *srvtopo.ResilientServer
var legacyHealthCheck discovery.LegacyHealthCheck

Expand Down Expand Up @@ -153,7 +155,7 @@ func main() {
vtg = vtgate.LegacyInit(context.Background(), legacyHealthCheck, resilientServer, *cell, *vtgate.RetryCount, tabletTypes)
} else {
// use new Init otherwise
vtg = vtgate.Init(context.Background(), resilientServer, *cell, tabletTypes)
vtg = vtgate.Init(context.Background(), resilientServer, *cell, tabletTypes, configPath)
}

servenv.OnRun(func() {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts *Options) error {

streamSize := 10
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
vtgateExecutor = vtgate.NewExecutor(context.Background(), explainTopo, vtexplainCell, resolver, opts.Normalize, false /*do not warn for sharded only*/, streamSize, cache.DefaultConfig, schemaTracker)
vtgateExecutor = vtgate.NewExecutor(context.Background(), explainTopo, vtexplainCell, resolver, opts.Normalize, false /*do not warn for sharded only*/, streamSize, cache.DefaultConfig, schemaTracker, nil)

queryLogBufferSize := 10
vtgate.QueryLogger = streamlog.New("VTGate", queryLogBufferSize)
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vtgate/boost/boost.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,19 @@ type PlanConfig struct {
IsBoosted bool
BoostColumns Columns
}

func NonBoostedPlanConfig() *PlanConfig {
return &PlanConfig{
IsBoosted: false,
BoostColumns: Columns{},
}
}

type QueryFilterConfig struct {
Columns []string `yaml:"columns"`
TableName string `yaml:"tableName"`
}

type QueryFilterConfigs struct {
BoostConfigs []QueryFilterConfig `yaml:"boostConfigs"`
}
43 changes: 43 additions & 0 deletions go/vt/vtgate/boost/load_configs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package boost

import (
"fmt"
"gopkg.in/yaml.v2"
"io/ioutil"
)

func Load(configPath *string) (*QueryFilterConfigs, error) {
data, err := ioutil.ReadFile(*configPath)
fmt.Printf("Reading Boost Configs from: %s\n", *configPath)
if err != nil {
fmt.Printf("Error reading file: %v\n", err)
return nil, err
}

var configs QueryFilterConfigs
err = yaml.Unmarshal(data, &configs)
if err != nil {
return nil, err
}

fmt.Printf("Parsed Configs: %+v\n", configs)

rocket := `
|
/ \
/ _ \
|.'''.|
|'._.'|
|BOOST|
,'| | |\.
/ | | | \
|,-'--|--'-.|
|| || ||
`

fmt.Println(rocket)

fmt.Println("Boost enabled.")

return &configs, nil
}
65 changes: 35 additions & 30 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type Executor struct {

vm *VSchemaManager
schemaTracker SchemaInfo

queryFilterConfigs *boost.QueryFilterConfigs
}

var executorOnce sync.Once
Expand All @@ -119,18 +121,20 @@ func NewExecutor(
streamSize int,
cacheCfg *cache.Config,
schemaTracker SchemaInfo,
queryFilterConfigs *boost.QueryFilterConfigs,
) *Executor {
e := &Executor{
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
queryFilterConfigs: queryFilterConfigs,
}

vschemaacl.Init()
Expand Down Expand Up @@ -1202,7 +1206,7 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser.
}
ignoreMaxMemoryRows := sqlparser.IgnoreMaxMaxMemoryRowsDirective(stmt)
vcursor.SetIgnoreMaxMemoryRows(ignoreMaxMemoryRows)
var boostPlanConfig *boost.PlanConfig
var boostPlanConfig = boost.NonBoostedPlanConfig()

// Normalize if possible and retry.
if (e.normalize && sqlparser.CanNormalize(stmt)) || sqlparser.MustRewriteAST(stmt) {
Expand All @@ -1214,7 +1218,7 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser.
statement = result.AST
bindVarNeeds = result.BindVarNeeds
query = sqlparser.String(statement)
boostPlanConfig = configForBoost(result.Columns, "<TODO>")
boostPlanConfig = configForBoost(e.queryFilterConfigs, result.Columns, vcursor.vschema.UniqueTables)
}

if logStats != nil {
Expand Down Expand Up @@ -1242,30 +1246,31 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser.
return plan, nil
}

// TODO
func configForBoost(columns boost.Columns, table string) *boost.PlanConfig {
configColumns := map[string]string{
"user_id": "1337",
func configForBoost(configs *boost.QueryFilterConfigs, columns boost.Columns, inputMap map[string]*vindexes.Table) *boost.PlanConfig {
if configs == nil || columns == nil {
return boost.NonBoostedPlanConfig()
}

//todo compare sets for ordering
if !keysMatch(columns, configColumns) {
return &boost.PlanConfig{}
}
for tableName, _ := range inputMap {
for _, filterConfig := range configs.BoostConfigs {
if tableName == filterConfig.TableName {

return &boost.PlanConfig{
IsBoosted: true,
BoostColumns: columns,
if keysMatch(columns, filterConfig.Columns) {
return &boost.PlanConfig{
IsBoosted: true,
BoostColumns: columns,
}
}
}
}
}
}

func keysMatch(map1, map2 map[string]string) bool {
if len(map1) != len(map2) {
return false
}
return boost.NonBoostedPlanConfig()
}

for k := range map1 {
if _, exists := map2[k]; !exists {
func keysMatch(columns map[string]string, keys []string) bool {
for _, k := range keys {
if _, exists := columns[k]; !exists {
return false
}
}
Expand Down
18 changes: 9 additions & 9 deletions go/vt/vtgate/vindexes/vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const (
// used for building routing plans.
type VSchema struct {
RoutingRules map[string]*RoutingRule `json:"routing_rules"`
uniqueTables map[string]*Table
UniqueTables map[string]*Table
uniqueVindexes map[string]Vindex
Keyspaces map[string]*KeyspaceSchema `json:"keyspaces"`
}
Expand Down Expand Up @@ -165,7 +165,7 @@ type AutoIncrement struct {
func BuildVSchema(source *vschemapb.SrvVSchema) (vschema *VSchema) {
vschema = &VSchema{
RoutingRules: make(map[string]*RoutingRule),
uniqueTables: make(map[string]*Table),
UniqueTables: make(map[string]*Table),
uniqueVindexes: make(map[string]Vindex),
Keyspaces: make(map[string]*KeyspaceSchema),
}
Expand All @@ -189,7 +189,7 @@ func BuildKeyspaceSchema(input *vschemapb.Keyspace, keyspace string) (*KeyspaceS
},
}
vschema := &VSchema{
uniqueTables: make(map[string]*Table),
UniqueTables: make(map[string]*Table),
uniqueVindexes: make(map[string]Vindex),
Keyspaces: make(map[string]*KeyspaceSchema),
}
Expand Down Expand Up @@ -335,10 +335,10 @@ func buildTables(ks *vschemapb.Keyspace, vschema *VSchema, ksvschema *KeyspaceSc
// Add the table to the map entries.
// If the keyspace requires explicit routing, don't include it in global routing
if !ks.RequireExplicitRouting {
if _, ok := vschema.uniqueTables[tname]; ok {
vschema.uniqueTables[tname] = nil
if _, ok := vschema.UniqueTables[tname]; ok {
vschema.UniqueTables[tname] = nil
} else {
vschema.uniqueTables[tname] = t
vschema.UniqueTables[tname] = t
}
}
ksvschema.Tables[tname] = t
Expand All @@ -362,7 +362,7 @@ func resolveAutoIncrement(source *vschemapb.SrvVSchema, vschema *VSchema) {
if err != nil {
// Better to remove the table than to leave it partially initialized.
delete(ksvschema.Tables, tname)
delete(vschema.uniqueTables, tname)
delete(vschema.UniqueTables, tname)
ksvschema.Error = fmt.Errorf("cannot resolve sequence %s: %v", table.AutoIncrement.Sequence, err)
continue
}
Expand Down Expand Up @@ -391,7 +391,7 @@ func addDual(vschema *VSchema) {
// the keyspaces. For consistency, we'll always use the
// first keyspace by lexical ordering.
first = ksname
vschema.uniqueTables["dual"] = t
vschema.UniqueTables["dual"] = t
}
}
}
Expand Down Expand Up @@ -464,7 +464,7 @@ func (vschema *VSchema) FindTable(keyspace, tablename string) (*Table, error) {
// findTable is like FindTable, but does not return an error if a table is not found.
func (vschema *VSchema) findTable(keyspace, tablename string) (*Table, error) {
if keyspace == "" {
table, ok := vschema.uniqueTables[tablename]
table, ok := vschema.UniqueTables[tablename]
if table == nil {
if ok {
return nil, fmt.Errorf("ambiguous table reference: %s", tablename)
Expand Down
24 changes: 12 additions & 12 deletions go/vt/vtgate/vindexes/vschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func TestUnshardedVSchema(t *testing.T) {
}
want := &VSchema{
RoutingRules: map[string]*RoutingRule{},
uniqueTables: map[string]*Table{
UniqueTables: map[string]*Table{
"t1": t1,
"dual": dual,
},
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestVSchemaColumns(t *testing.T) {
}
want := &VSchema{
RoutingRules: map[string]*RoutingRule{},
uniqueTables: map[string]*Table{
UniqueTables: map[string]*Table{
"t1": t1,
"dual": dual,
},
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestVSchemaColumnListAuthoritative(t *testing.T) {
}
want := &VSchema{
RoutingRules: map[string]*RoutingRule{},
uniqueTables: map[string]*Table{
UniqueTables: map[string]*Table{
"t1": t1,
"dual": dual,
},
Expand Down Expand Up @@ -428,7 +428,7 @@ func TestVSchemaPinned(t *testing.T) {
}
want := &VSchema{
RoutingRules: map[string]*RoutingRule{},
uniqueTables: map[string]*Table{
UniqueTables: map[string]*Table{
"t1": t1,
"dual": dual,
},
Expand Down Expand Up @@ -530,7 +530,7 @@ func TestShardedVSchemaOwned(t *testing.T) {
}
want := &VSchema{
RoutingRules: map[string]*RoutingRule{},
uniqueTables: map[string]*Table{
UniqueTables: map[string]*Table{
"t1": t1,
"dual": dual,
},
Expand Down Expand Up @@ -778,7 +778,7 @@ func TestVSchemaRoutingRules(t *testing.T) {
Error: errors.New("table t2 not found"),
},
},
uniqueTables: map[string]*Table{
UniqueTables: map[string]*Table{
"t1": t1,
"t2": t2,
"dual": dual1,
Expand Down Expand Up @@ -1195,7 +1195,7 @@ func TestShardedVSchemaMultiColumnVindex(t *testing.T) {
}
want := &VSchema{
RoutingRules: map[string]*RoutingRule{},
uniqueTables: map[string]*Table{
UniqueTables: map[string]*Table{
"t1": t1,
"dual": dual,
},
Expand Down Expand Up @@ -1293,7 +1293,7 @@ func TestShardedVSchemaNotOwned(t *testing.T) {
}
want := &VSchema{
RoutingRules: map[string]*RoutingRule{},
uniqueTables: map[string]*Table{
UniqueTables: map[string]*Table{
"t1": t1,
"dual": dual,
},
Expand Down Expand Up @@ -1425,7 +1425,7 @@ func TestBuildVSchemaDupSeq(t *testing.T) {
}
want := &VSchema{
RoutingRules: map[string]*RoutingRule{},
uniqueTables: map[string]*Table{
UniqueTables: map[string]*Table{
"t1": nil,
"dual": duala,
},
Expand Down Expand Up @@ -1498,7 +1498,7 @@ func TestBuildVSchemaDupTable(t *testing.T) {
}
want := &VSchema{
RoutingRules: map[string]*RoutingRule{},
uniqueTables: map[string]*Table{
UniqueTables: map[string]*Table{
"t1": nil,
"dual": duala,
},
Expand Down Expand Up @@ -1632,7 +1632,7 @@ func TestBuildVSchemaDupVindex(t *testing.T) {
}
want := &VSchema{
RoutingRules: map[string]*RoutingRule{},
uniqueTables: map[string]*Table{
UniqueTables: map[string]*Table{
"t1": nil,
"dual": duala,
},
Expand Down Expand Up @@ -1949,7 +1949,7 @@ func TestSequence(t *testing.T) {
}
want := &VSchema{
RoutingRules: map[string]*RoutingRule{},
uniqueTables: map[string]*Table{
UniqueTables: map[string]*Table{
"seq": seq,
"t1": t1,
"t2": t2,
Expand Down
Loading

0 comments on commit fe4f52b

Please sign in to comment.