Skip to content

Commit

Permalink
Views: VTGate changes for seamless integration (#17439)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Jan 21, 2025
1 parent 9b57daf commit 78f633c
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 30 deletions.
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ Flags:
--enable-partial-keyspace-migration (Experimental) Follow shard routing rules: enable only while migrating a keyspace shard by shard. See documentation on Partial MoveTables for more. (default false)
--enable-per-workload-table-metrics If true, query counts and query error metrics include a label that identifies the workload
--enable-tx-throttler Synonym to -enable_tx_throttler
--enable-views Enable views support in vtgate.
--enable-views Enable views support in vtgate. (default true)
--enable_buffer Enable buffering (stalling) of primary traffic during failovers.
--enable_buffer_dry_run Detect and log failover events, but do not actually buffer requests.
--enable_consolidator This option enables the query consolidator. (default true)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Flags:
--emit_stats If set, emit stats to push-based monitoring and stats backends
--enable-balancer Enable the tablet balancer to evenly spread query load for a given tablet type
--enable-partial-keyspace-migration (Experimental) Follow shard routing rules: enable only while migrating a keyspace shard by shard. See documentation on Partial MoveTables for more. (default false)
--enable-views Enable views support in vtgate.
--enable-views Enable views support in vtgate. (default true)
--enable_buffer Enable buffering (stalling) of primary traffic during failovers.
--enable_buffer_dry_run Detect and log failover events, but do not actually buffer requests.
--enable_direct_ddl Allow users to submit direct DDL statements (default true)
Expand Down
11 changes: 11 additions & 0 deletions go/test/vschemawrapper/vschema_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,17 @@ func (vw *VSchemaWrapper) FindView(tab sqlparser.TableName) sqlparser.TableState
return vw.V.FindView(destKeyspace, tab.Name.String())
}

func (vw *VSchemaWrapper) FindViewTarget(name sqlparser.TableName) (*vindexes.Keyspace, error) {
destKeyspace, _, _, err := topoproto.ParseDestination(name.Qualifier.String(), topodatapb.TabletType_PRIMARY)
if err != nil {
return nil, err
}
if ks, ok := vw.V.Keyspaces[destKeyspace]; ok {
return ks.Keyspace, nil
}
return nil, nil
}

func (vw *VSchemaWrapper) FindTableOrVindex(tab sqlparser.TableName) (*vindexes.Table, vindexes.Vindex, string, topodatapb.TabletType, key.Destination, error) {
return vw.Vcursor.FindTableOrVindex(tab)
}
Expand Down
24 changes: 13 additions & 11 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1277,17 +1277,19 @@ func TestExecutorDDL(t *testing.T) {
}

for _, stmt := range stmts2 {
sbc1.ExecCount.Store(0)
sbc2.ExecCount.Store(0)
sbclookup.ExecCount.Store(0)
_, err := executor.Execute(ctx, nil, "TestExecute", econtext.NewSafeSession(&vtgatepb.Session{TargetString: ""}), stmt.input, nil)
if stmt.hasErr {
require.EqualError(t, err, econtext.ErrNoKeyspace.Error(), "expect query to fail")
testQueryLog(t, executor, logChan, "TestExecute", "", stmt.input, 0)
} else {
require.NoError(t, err)
testQueryLog(t, executor, logChan, "TestExecute", "DDL", stmt.input, 8)
}
t.Run(stmt.input, func(t *testing.T) {
sbc1.ExecCount.Store(0)
sbc2.ExecCount.Store(0)
sbclookup.ExecCount.Store(0)
_, err := executor.Execute(ctx, nil, "TestExecute", econtext.NewSafeSession(&vtgatepb.Session{TargetString: ""}), stmt.input, nil)
if stmt.hasErr {
assert.EqualError(t, err, econtext.ErrNoKeyspace.Error(), "expect query to fail")
testQueryLog(t, executor, logChan, "TestExecute", "", stmt.input, 0)
} else {
assert.NoError(t, err)
testQueryLog(t, executor, logChan, "TestExecute", "DDL", stmt.input, 8)
}
})
}
}

Expand Down
29 changes: 23 additions & 6 deletions go/vt/vtgate/executorcontext/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (vc *VCursorImpl) StartPrimitiveTrace() func() engine.Stats {
// FindTable finds the specified table. If the keyspace what specified in the input, it gets used as qualifier.
// Otherwise, the keyspace from the request is used, if one was provided.
func (vc *VCursorImpl) FindTable(name sqlparser.TableName) (*vindexes.Table, string, topodatapb.TabletType, key.Destination, error) {
destKeyspace, destTabletType, dest, err := vc.ParseDestinationTarget(name.Qualifier.String())
destKeyspace, destTabletType, dest, err := vc.parseDestinationTarget(name.Qualifier.String())
if err != nil {
return nil, "", destTabletType, nil, err
}
Expand All @@ -407,7 +407,7 @@ func (vc *VCursorImpl) FindTable(name sqlparser.TableName) (*vindexes.Table, str
}

func (vc *VCursorImpl) FindView(name sqlparser.TableName) sqlparser.TableStatement {
ks, _, _, err := vc.ParseDestinationTarget(name.Qualifier.String())
ks, _, _, err := vc.parseDestinationTarget(name.Qualifier.String())
if err != nil {
return nil
}
Expand All @@ -418,7 +418,7 @@ func (vc *VCursorImpl) FindView(name sqlparser.TableName) sqlparser.TableStateme
}

func (vc *VCursorImpl) FindRoutedTable(name sqlparser.TableName) (*vindexes.Table, error) {
destKeyspace, destTabletType, _, err := vc.ParseDestinationTarget(name.Qualifier.String())
destKeyspace, destTabletType, _, err := vc.parseDestinationTarget(name.Qualifier.String())
if err != nil {
return nil, err
}
Expand All @@ -442,7 +442,7 @@ func (vc *VCursorImpl) FindTableOrVindex(name sqlparser.TableName) (*vindexes.Ta
return vc.getDualTable()
}

destKeyspace, destTabletType, dest, err := ParseDestinationTarget(name.Qualifier.String(), vc.tabletType, vc.vschema)
destKeyspace, destTabletType, dest, err := vc.parseDestinationTarget(name.Qualifier.String())
if err != nil {
return nil, nil, "", destTabletType, nil, err
}
Expand All @@ -456,7 +456,24 @@ func (vc *VCursorImpl) FindTableOrVindex(name sqlparser.TableName) (*vindexes.Ta
return table, vindex, destKeyspace, destTabletType, dest, nil
}

func (vc *VCursorImpl) ParseDestinationTarget(targetString string) (string, topodatapb.TabletType, key.Destination, error) {
// FindViewTarget finds the specified view's target keyspace.
func (vc *VCursorImpl) FindViewTarget(name sqlparser.TableName) (*vindexes.Keyspace, error) {
destKeyspace, _, _, err := vc.parseDestinationTarget(name.Qualifier.String())
if err != nil {
return nil, err
}
if destKeyspace != "" {
return vc.FindKeyspace(destKeyspace)
}

tbl, err := vc.vschema.FindRoutedTable("", name.Name.String(), vc.tabletType)
if err != nil || tbl == nil {
return nil, err
}
return tbl.Keyspace, nil
}

func (vc *VCursorImpl) parseDestinationTarget(targetString string) (string, topodatapb.TabletType, key.Destination, error) {
return ParseDestinationTarget(targetString, vc.tabletType, vc.vschema)
}

Expand Down Expand Up @@ -1319,7 +1336,7 @@ func (vc *VCursorImpl) GetAggregateUDFs() []string {
// FindMirrorRule finds the mirror rule for the requested table name and
// VSchema tablet type.
func (vc *VCursorImpl) FindMirrorRule(name sqlparser.TableName) (*vindexes.MirrorRule, error) {
destKeyspace, destTabletType, _, err := vc.ParseDestinationTarget(name.Qualifier.String())
destKeyspace, destTabletType, _, err := vc.parseDestinationTarget(name.Qualifier.String())
if err != nil {
return nil, err
}
Expand Down
58 changes: 54 additions & 4 deletions go/vt/vtgate/planbuilder/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ func buildCreateViewCommon(
ddlSelect sqlparser.TableStatement,
ddl sqlparser.DDLStatement,
) (key.Destination, *vindexes.Keyspace, error) {
if vschema.IsViewsEnabled() {
return createViewEnabled(vschema, reservedVars, ddlSelect, ddl)
}

// For Create View, we require that the keyspace exist and the select query can be satisfied within the keyspace itself
// We should remove the keyspace name from the table name, as the database name in MySQL might be different than the keyspace name
destination, keyspace, err := findTableDestinationAndKeyspace(vschema, ddl)
Expand Down Expand Up @@ -228,9 +232,6 @@ func buildCreateViewCommon(

sqlparser.RemoveKeyspace(ddl)

if vschema.IsViewsEnabled() {
return destination, keyspace, nil
}
isRoutePlan, opCode := tryToGetRoutePlan(selectPlan.primitive)
if !isRoutePlan {
return nil, nil, vterrors.VT12001(ViewComplex)
Expand All @@ -241,14 +242,63 @@ func buildCreateViewCommon(
return destination, keyspace, nil
}

func createViewEnabled(vschema plancontext.VSchema, reservedVars *sqlparser.ReservedVars, ddlSelect sqlparser.TableStatement, ddl sqlparser.DDLStatement) (key.Destination, *vindexes.Keyspace, error) {
// For Create View, we require that the keyspace exist and the select query can be satisfied within the keyspace itself
// We should remove the keyspace name from the table name, as the database name in MySQL might be different than the keyspace name
destination, keyspace, err := findTableDestinationAndKeyspace(vschema, ddl)
if err != nil {
return nil, nil, err
}

// views definition with `select *` should not be expanded as schema tracker might not be up-to-date
// We copy the expressions and restore them after the planning context is created
var expressions []sqlparser.SelectExprs
_ = sqlparser.VisitAllSelects(ddlSelect, func(p *sqlparser.Select, idx int) error {
expressions = append(expressions, sqlparser.Clone(p.SelectExprs))
return nil
})

pCtx, err := plancontext.CreatePlanningContext(ddlSelect, reservedVars, vschema, Gen4)
if err != nil {
return nil, nil, err
}

var tblKs string
for _, tbl := range pCtx.SemTable.Tables {
vTbl := tbl.GetVindexTable()
if vTbl == nil {
continue
}
if tblKs == "" {
tblKs = vTbl.Keyspace.Name
}
if tblKs != vTbl.Keyspace.Name {
return nil, nil, vterrors.VT12001(ViewComplex)
}
}

if tblKs != keyspace.Name {
return nil, nil, vterrors.VT12001(ViewDifferentKeyspace)
}

_ = sqlparser.VisitAllSelects(ddlSelect, func(p *sqlparser.Select, idx int) error {
p.SelectExprs = expressions[idx]
return nil
})

sqlparser.RemoveKeyspace(ddl)

return destination, keyspace, nil
}

func buildDropView(vschema plancontext.VSchema, ddlStatement sqlparser.DDLStatement) (key.Destination, *vindexes.Keyspace, error) {
if !vschema.IsViewsEnabled() {
return buildDropTable(vschema, ddlStatement)
}
var ks *vindexes.Keyspace
viewMap := make(map[string]any)
for _, tbl := range ddlStatement.GetFromTables() {
_, ksForView, _, err := vschema.TargetDestination(tbl.Qualifier.String())
ksForView, err := vschema.FindViewTarget(tbl)
if err != nil {
return nil, nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtgate/planbuilder/plancontext/planning_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ func createPlanContext(st *semantics.SemTable) *PlanningContext {

type vschema struct{}

func (v *vschema) FindViewTarget(name sqlparser.TableName) (*vindexes.Keyspace, error) {
// TODO implement me
panic("implement me")
}

func (v *vschema) FindTable(tablename sqlparser.TableName) (*vindexes.Table, string, topodatapb.TabletType, key.Destination, error) {
// TODO implement me
panic("implement me")
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/planbuilder/plancontext/vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type PlannerVersion = querypb.ExecuteOptions_PlannerVersion
type VSchema interface {
FindTable(tablename sqlparser.TableName) (*vindexes.Table, string, topodatapb.TabletType, key.Destination, error)
FindView(name sqlparser.TableName) sqlparser.TableStatement
// FindViewTarget finds the target keyspace for the view table provided.
FindViewTarget(name sqlparser.TableName) (*vindexes.Keyspace, error)
FindTableOrVindex(tablename sqlparser.TableName) (*vindexes.Table, vindexes.Vindex, string, topodatapb.TabletType, key.Destination, error)

// SelectedKeyspace returns the current keyspace if set, otherwise returns an error
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtgate/vindexes/vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
TypeTable = ""
TypeSequence = "sequence"
TypeReference = "reference"
TypeView = "view"
)

// VSchema represents the denormalized version of SrvVSchema,
Expand Down Expand Up @@ -439,7 +440,7 @@ func (vschema *VSchema) AddView(ksname, viewName, query string, parser *sqlparse
}
ks.Views[viewName] = selectStmt
t := &Table{
Type: "View",
Type: TypeView,
Name: sqlparser.NewIdentifierCS(viewName),
Keyspace: ks.Keyspace,
ColumnListAuthoritative: true,
Expand Down
23 changes: 18 additions & 5 deletions go/vt/vtgate/vschema_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,21 +204,34 @@ func (vm *VSchemaManager) buildAndEnhanceVSchema(v *vschemapb.SrvVSchema) *vinde

func (vm *VSchemaManager) updateFromSchema(vschema *vindexes.VSchema) {
for ksName, ks := range vschema.Keyspaces {
vm.updateTableInfo(vschema, ks, ksName)
vm.updateViewInfo(ks, ksName)
vm.updateTableInfo(vschema, ks, ksName)
vm.updateUDFsInfo(ks, ksName)
}
}

func (vm *VSchemaManager) updateViewInfo(ks *vindexes.KeyspaceSchema, ksName string) {
views := vm.schema.Views(ksName)
if views != nil {
ks.Views = make(map[string]sqlparser.TableStatement, len(views))
for name, def := range views {
ks.Views[name] = sqlparser.Clone(def)
if views == nil {
return
}
ks.Views = make(map[string]sqlparser.TableStatement, len(views))
for name, def := range views {
ks.Views[name] = sqlparser.Clone(def)
vTbl, ok := ks.Tables[name]
if ok {
vTbl.Type = vindexes.TypeView
} else {
// Adding view to the VSchema as a table.
ks.Tables[name] = &vindexes.Table{
Type: vindexes.TypeView,
Name: sqlparser.NewIdentifierCS(name),
Keyspace: ks.Keyspace,
}
}
}
}

func (vm *VSchemaManager) updateTableInfo(vschema *vindexes.VSchema, ks *vindexes.KeyspaceSchema, ksName string) {
m := vm.schema.Tables(ksName)
// Before we add the foreign key definitions in the tables, we need to make sure that all the tables
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ var (

// schema tracking flags
enableSchemaChangeSignal = true
enableViews bool
enableViews = true
enableUdfs bool

// vtgate views flags
Expand Down

0 comments on commit 78f633c

Please sign in to comment.