Skip to content

Commit

Permalink
add ID to all primitives
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Sep 11, 2024
1 parent e0a5069 commit 53b6626
Show file tree
Hide file tree
Showing 55 changed files with 139 additions and 35 deletions.
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/concatenate.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var _ Primitive = (*Concatenate)(nil)

// Concatenate specified the parameter for concatenate primitive
type Concatenate struct {
identifiablePrimitive
Sources []Primitive

// These column offsets do not need to be typed checked - they usually contain weight_string()
Expand Down
13 changes: 7 additions & 6 deletions go/vt/vtgate/engine/dbddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type DBDDLPlugin interface {
// DBDDL is just a container around custom database provisioning plugins
// The default behaviour is to just return an error
type DBDDL struct {
identifiablePrimitive
noInputs
noTxNeeded

Expand Down Expand Up @@ -125,9 +126,9 @@ func (c *DBDDL) createDatabase(ctx context.Context, vcursor VCursor, plugin DBDD
break
}
select {
case <-ctx.Done(): //context cancelled
case <-ctx.Done(): // context cancelled
return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "could not validate create database: destination not resolved")
case <-time.After(500 * time.Millisecond): //timeout
case <-time.After(500 * time.Millisecond): // timeout
}
}
var queries []*querypb.BoundQuery
Expand All @@ -146,9 +147,9 @@ func (c *DBDDL) createDatabase(ctx context.Context, vcursor VCursor, plugin DBDD
if err != nil {
noErr = false
select {
case <-ctx.Done(): //context cancelled
case <-ctx.Done(): // context cancelled
return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "could not validate create database: tablets not healthy")
case <-time.After(500 * time.Millisecond): //timeout
case <-time.After(500 * time.Millisecond): // timeout
}
break
}
Expand All @@ -167,9 +168,9 @@ func (c *DBDDL) dropDatabase(ctx context.Context, vcursor VCursor, plugin DBDDLP
}
for vcursor.KeyspaceAvailable(c.name) {
select {
case <-ctx.Done(): //context cancelled
case <-ctx.Done(): // context cancelled
return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "could not validate drop database: keyspace still available in vschema")
case <-time.After(500 * time.Millisecond): //timeout
case <-time.After(500 * time.Millisecond): // timeout
}
}

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var _ Primitive = (*DDL)(nil)

// DDL represents a DDL statement, either normal or online DDL
type DDL struct {
identifiablePrimitive
noTxNeeded
noInputs

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var _ Primitive = (*Distinct)(nil)
type (
// Distinct Primitive is used to uniqueify results
Distinct struct {
identifiablePrimitive
Source Primitive
CheckCols []CheckCol
Truncate int
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

// DML contains the common elements between Update and Delete plans
type DML struct {
identifiablePrimitive
txNeeded

// Query specifies the query to be executed.
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/dml_with_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const DmlVals = "dml_vals"

// DMLWithInput represents the instructions to perform a DML operation based on the input result.
type DMLWithInput struct {
identifiablePrimitive
txNeeded

Input Primitive
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/exec_prepared_statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
var _ Primitive = (*ExecStmt)(nil)

type ExecStmt struct {
identifiablePrimitive
Params []*sqlparser.Variable
Input Primitive
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/fake_primitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
// returns sendErr. For streaming calls, it sends the field info
// first and two rows at a time till all rows are sent.
type fakePrimitive struct {
identifiablePrimitive
results []*sqltypes.Result
curResult int
// sendErr is sent at the end of the stream if it's set.
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var _ Primitive = (*Filter)(nil)

// Filter is a primitive that performs the FILTER operation.
type Filter struct {
identifiablePrimitive
noTxNeeded

Predicate evalengine.Expr
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/fk_cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type NonLiteralUpdateInfo struct {
// FkCascade is a primitive that implements foreign key cascading using Selection as values required to execute the FkChild Primitives.
// On success, it executes the Parent Primitive.
type FkCascade struct {
identifiablePrimitive
txNeeded

// Selection is the Primitive that is used to find the rows that are going to be modified in the child tables.
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/fk_verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Verify struct {
// FkVerify is a primitive that verifies that the foreign key constraints in parent tables are satisfied.
// It does this by executing a select distinct query on the parent table with the values that are being inserted/updated.
type FkVerify struct {
identifiablePrimitive
txNeeded

Verify []*Verify
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type (
// Then the RHS is fetched, and we can check if the rows from the RHS matches any from the LHS.
// When they match by hash code, we double-check that we are not working with a false positive by comparing the values.
HashJoin struct {
identifiablePrimitive
Opcode JoinOpcode

// Left and Right are the LHS and RHS primitives
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var _ Primitive = (*Insert)(nil)

// Insert represents the instructions to perform an insert operation.
type Insert struct {
identifiablePrimitive
noInputs
InsertCommon

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

type (
InsertCommon struct {
identifiablePrimitive
// Insert needs tx handling
txNeeded

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var _ Primitive = (*Join)(nil)

// Join specifies the parameters for a join primitive.
type Join struct {
identifiablePrimitive
Opcode JoinOpcode
// Left and Right are the LHS and RHS primitives
// of the Join. They can be any primitive.
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var _ Primitive = (*Limit)(nil)

// Limit is a primitive that performs the LIMIT operation.
type Limit struct {
identifiablePrimitive
Count evalengine.Expr
Offset evalengine.Expr
Input Primitive
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var _ Primitive = (*Lock)(nil)

// Lock primitive will execute sql containing lock functions
type Lock struct {
identifiablePrimitive
noInputs
noTxNeeded

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/memory_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var _ Primitive = (*MemorySort)(nil)

// MemorySort is a primitive that performs in-memory sorting.
type MemorySort struct {
identifiablePrimitive
UpperLimit evalengine.Expr
OrderBy evalengine.Comparison
Input Primitive
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var _ Primitive = (*MergeSort)(nil)
// be used like other Primitives in VTGate. However, it satisfies the Primitive API
// so that vdiff can use it. In that situation, only StreamExecute is used.
type MergeSort struct {
identifiablePrimitive
noInputs
noTxNeeded

Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtgate/engine/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type (
// authoritative primitive and, based on whether a die-roll exceeds a
// percentage, to also execute a target Primitive.
percentBasedMirror struct {
identifiablePrimitive
percent float32
primitive Primitive
target Primitive
Expand All @@ -46,7 +47,7 @@ var _ Primitive = (*percentBasedMirror)(nil)

// NewPercentBasedMirror creates a Mirror.
func NewPercentBasedMirror(percentage float32, primitive Primitive, target Primitive) Primitive {
return &percentBasedMirror{percentage, primitive, target}
return &percentBasedMirror{percent: percentage, primitive: primitive, target: target}
}

func (m *percentBasedMirror) RouteType() string {
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/mstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var _ Primitive = (*MStream)(nil)

// MStream is an operator for message streaming from specific keyspace, destination
type MStream struct {
identifiablePrimitive
noTxNeeded
noInputs

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var _ Primitive = (*OnlineDDL)(nil)

// OnlineDDL represents the instructions to perform an online schema change via vtctld
type OnlineDDL struct {
identifiablePrimitive
noTxNeeded
noInputs

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/ordered_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var _ Primitive = (*OrderedAggregate)(nil)
// is that the underlying primitive is a scatter select with pre-sorted
// rows.
type OrderedAggregate struct {
identifiablePrimitive
// Aggregates specifies the aggregation parameters for each
// aggregation function: function opcode and input column number.
Aggregates []*AggregateParams
Expand Down
10 changes: 10 additions & 0 deletions go/vt/vtgate/engine/plan_description.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type PrimitiveDescription struct {
TargetTabletType topodatapb.TabletType
Other map[string]any

ID PrimitiveID
InputName string
Inputs []PrimitiveDescription
}
Expand All @@ -57,6 +58,12 @@ func (pd PrimitiveDescription) MarshalJSON() ([]byte, error) {
buf.WriteString("{")

prepend := ""
if pd.ID > 0 {
if err := marshalAdd(prepend, buf, "ID", int(pd.ID)); err != nil {
return nil, err
}
prepend = ","
}
if pd.InputName != "" {
if err := marshalAdd(prepend, buf, "InputName", pd.InputName); err != nil {
return nil, err
Expand Down Expand Up @@ -184,6 +191,9 @@ func marshalAdd(prepend string, buf *bytes.Buffer, name string, obj any) error {
// PrimitiveToPlanDescription transforms a primitive tree into a corresponding PlanDescription tree
func PrimitiveToPlanDescription(in Primitive) PrimitiveDescription {
this := in.description()
if id := in.GetID(); id > 0 {
this.ID = id
}

inputs, infos := in.Inputs()
for idx, input := range inputs {
Expand Down
25 changes: 25 additions & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ type (
// description is the description, sans the inputs, of this Primitive.
// to get the plan description with all children, use PrimitiveToPlanDescription()
description() PrimitiveDescription

GetID() PrimitiveID
SetID(PrimitiveID)
}

// noInputs default implementation for Primitives that are leaves
Expand All @@ -262,6 +265,12 @@ type (

// txNeeded is a default implementation for Primitives that need transaction handling
txNeeded struct{}

PrimitiveID int

identifiablePrimitive struct {
id PrimitiveID
}
)

// Find will return the first Primitive that matches the evaluate function. If no match is found, nil will be returned
Expand All @@ -279,6 +288,14 @@ func Find(isMatch Match, start Primitive) Primitive {
return nil
}

func PreOrderTraverse(p Primitive, f func(Primitive)) {
f(p)
inputs, _ := p.Inputs()
for _, input := range inputs {
PreOrderTraverse(input, f)
}
}

// Exists traverses recursively down the Primitive tree structure, and returns true when Match returns true
func Exists(m Match, p Primitive) bool {
return Find(m, p) != nil
Expand All @@ -296,3 +313,11 @@ func (noTxNeeded) NeedsTransaction() bool {
func (txNeeded) NeedsTransaction() bool {
return true
}

func (i *identifiablePrimitive) GetID() PrimitiveID {
return i.id
}

func (i *identifiablePrimitive) SetID(id PrimitiveID) {
i.id = id
}
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var _ Primitive = (*Projection)(nil)

// Projection can evaluate expressions and project the results
type Projection struct {
identifiablePrimitive
noTxNeeded

Cols []string
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/recurse_cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
// The values being sent to the Term side are stored in the Vars map -
// the key is the bindvar name and the value is the index of the column in the recursive result
type RecurseCTE struct {
identifiablePrimitive
Seed, Term Primitive

Vars map[string]int
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/rename_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var _ Primitive = (*RenameFields)(nil)

// RenameFields is a primitive that renames the fields
type RenameFields struct {
identifiablePrimitive
noTxNeeded

Cols []string
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/replace_variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var _ Primitive = (*ReplaceVariables)(nil)

// ReplaceVariables is used in SHOW VARIABLES statements so that it replaces the values for vitess-aware variables
type ReplaceVariables struct {
identifiablePrimitive
noTxNeeded
Input Primitive
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/revert_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var _ Primitive = (*RevertMigration)(nil)

// RevertMigration represents the instructions to perform an online schema change via vtctld
type RevertMigration struct {
identifiablePrimitive
noTxNeeded
noInputs

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
// Route represents the instructions to route a read query to
// one or many vttablets.
type Route struct {
identifiablePrimitive
// Route does not take inputs
noInputs

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var _ Primitive = (*Rows)(nil)

// Rows simply returns a number or rows
type Rows struct {
identifiablePrimitive
noInputs
noTxNeeded

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/scalar_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var _ Primitive = (*ScalarAggregate)(nil)

// ScalarAggregate is a primitive used to do aggregations without grouping keys
type ScalarAggregate struct {
identifiablePrimitive
// Aggregates specifies the aggregation parameters for each
// aggregation function: function opcode and input column number.
Aggregates []*AggregateParams
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/semi_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var _ Primitive = (*SemiJoin)(nil)

// SemiJoin specifies the parameters for a SemiJoin primitive.
type SemiJoin struct {
identifiablePrimitive
// Left and Right are the LHS and RHS primitives
// of the SemiJoin. They can be any primitive.
Left, Right Primitive
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var _ Primitive = (*Send)(nil)

// Send is an operator to send query to the specific keyspace, tabletType and destination
type Send struct {
identifiablePrimitive
noInputs

// Keyspace specifies the keyspace to send the query to.
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/sequential.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
// Sequential Primitive is used to execute DML statements in a fixed order.
// Any failure, stops the execution and returns.
type Sequential struct {
identifiablePrimitive
txNeeded
Sources []Primitive
}
Expand Down
Loading

0 comments on commit 53b6626

Please sign in to comment.