diff --git a/go.mod b/go.mod index 9963f901765..f0fc8237ed8 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/planetscale/pargzip v0.0.0-20201116224723-90c7fc03ea8a github.com/planetscale/vtprotobuf v0.5.0 github.com/prometheus/client_golang v1.18.0 - github.com/prometheus/common v0.46.0 // indirect + github.com/prometheus/common v0.46.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/sjmudd/stopwatch v0.1.1 github.com/soheilhy/cmux v0.1.5 diff --git a/go/cmd/vtadmin/main.go b/go/cmd/vtadmin/main.go index 2548986c2ba..6cc3b9065b5 100644 --- a/go/cmd/vtadmin/main.go +++ b/go/cmd/vtadmin/main.go @@ -221,8 +221,6 @@ func main() { if err := rootCmd.Execute(); err != nil { log.Fatal(err) } - - log.Flush() } type noopCloser struct{} diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 867685b8fec..520214d65fd 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -30,15 +30,12 @@ import ( "github.com/spf13/pflag" - "vitess.io/vitess/go/cache/theine" - "vitess.io/vitess/go/streamlog" - "vitess.io/vitess/go/vt/vtenv" - "vitess.io/vitess/go/vt/vthash" - "vitess.io/vitess/go/acl" + "vitess.io/vitess/go/cache/theine" "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/key" @@ -53,6 +50,7 @@ import ( "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/sysvars" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/evalengine" @@ -62,6 +60,7 @@ import ( "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vtgate/vschemaacl" "vitess.io/vitess/go/vt/vtgate/vtgateservice" + "vitess.io/vitess/go/vt/vthash" ) var ( diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index dd51fb6c042..d9b68d052c3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -186,7 +186,6 @@ func TestPlayerInvisibleColumns(t *testing.T) { output := qh.Expect(tcases.output) expectNontxQueries(t, output) time.Sleep(1 * time.Second) - log.Flush() if tcases.table != "" { expectData(t, tcases.table, tcases.data) } @@ -3094,7 +3093,6 @@ func TestPlayerNoBlob(t *testing.T) { output := qh.Expect(tcases.output) expectNontxQueries(t, output) time.Sleep(1 * time.Second) - log.Flush() if tcases.table != "" { expectData(t, tcases.table, tcases.data) } @@ -3333,7 +3331,6 @@ func TestPlayerBatchMode(t *testing.T) { } expectNontxQueries(t, output) time.Sleep(1 * time.Second) - log.Flush() if tcase.table != "" { expectData(t, tcase.table, tcase.data) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go b/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go new file mode 100644 index 00000000000..cc586756fe8 --- /dev/null +++ b/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go @@ -0,0 +1,589 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vstreamer + +// This file contains the test framework for testing the event generation logic in vstreamer. +// The test framework is designed to be used in the following way: +// 1. Define a TestSpec with the following fields: +// - ddls: a list of create table statements for the tables to be used in the test +// - tests: a list of test cases, each test case is a list of TestQuery +// - options: test-specific options, if any +// 2. Call ts.Init() to initialize the test. +// 3. Call ts.Run() to run the test. This will run the queries and validate the events. +// 4. Call ts.Close() to clean up the tables created in the test. +// The test framework will take care of creating the tables, running the queries, and validating the events for +// simpler cases. For more complex cases, the test framework provides hooks to customize the event generation. + +// Note: To simplify the initial implementation, the test framework is designed to be used in the vstreamer package only. +// It makes several assumptions about how the test cases are written. For example, queries are expected to +// use single quotes for string literals, for example: +// `"insert into t1 values (1, 'blob1', 'aaa')"`. +// The test framework will not work if the queries use double quotes for string literals at the moment. + +import ( + "context" + "fmt" + "slices" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/schemadiff" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" +) + +const ( + lengthInt = 11 + lengthBlob = 65535 + lengthText = 262140 + lengthSet = 56 +) + +func getDefaultCollationID() int64 { + return 45 +} + +var ( + // noEvents is used to indicate that a query is expected to generate no events. + noEvents = []TestRowEvent{} +) + +// TestColumn has all the attributes of a column required for the test cases. +type TestColumn struct { + name, dataType, colType string + len, collationID int64 + dataTypeLowered string + skip bool + collationName string +} + +// TestFieldEvent has all the attributes of a table required for creating a field event. +type TestFieldEvent struct { + table, db string + cols []*TestColumn +} + +// TestQuery represents a database query and the expected events it generates. +type TestQuery struct { + query string + events []TestRowEvent +} + +// TestRowChange represents the before and after state of a row due to a dml +type TestRowChange struct { + before []string + after []string +} + +// TestRowEventSpec is used for defining a custom row event. +type TestRowEventSpec struct { + table string + changes []TestRowChange +} + +// Generates a string representation for a custom row event. +func (s *TestRowEventSpec) String() string { + ev := &binlogdata.RowEvent{ + TableName: s.table, + } + var rowChanges []*binlogdata.RowChange + if s.changes != nil && len(s.changes) > 0 { + for _, c := range s.changes { + rowChange := binlogdata.RowChange{} + if c.before != nil && len(c.before) > 0 { + rowChange.Before = &query.Row{} + for _, val := range c.before { + rowChange.Before.Lengths = append(rowChange.Before.Lengths, int64(len(val))) + rowChange.Before.Values = append(rowChange.Before.Values, []byte(val)...) + } + } + if c.after != nil && len(c.after) > 0 { + rowChange.After = &query.Row{} + for _, val := range c.after { + rowChange.After.Lengths = append(rowChange.After.Lengths, int64(len(val))) + rowChange.After.Values = append(rowChange.After.Values, []byte(val)...) + } + } + rowChanges = append(rowChanges, &rowChange) + } + ev.RowChanges = rowChanges + } + vEvent := &binlogdata.VEvent{ + Type: binlogdata.VEventType_ROW, + RowEvent: ev, + } + return vEvent.String() +} + +// TestRowEvent is used to define either the actual row event string (the `event` field) or a custom row event +// (the `spec` field). Only one should be specified. If a test validates `flags` of a RowEvent then it is set. +type TestRowEvent struct { + event string + spec *TestRowEventSpec + flags int +} + +// TestSpecOptions has any non-standard test-specific options which can modify the event generation behaviour. +type TestSpecOptions struct { + noblob bool + filter *binlogdata.Filter +} + +// TestSpec is defined one per unit test. +type TestSpec struct { + // test=specific parameters + t *testing.T + ddls []string // create table statements + tests [][]*TestQuery // list of input queries and expected events for each query + options *TestSpecOptions // test-specific options + + // internal state + inited bool // whether the test has been initialized + tables []string // list of tables in the schema (created in `ddls`) + pkColumns map[string][]string // map of table name to primary key columns + schema *schemadiff.Schema // parsed schema from `ddls` using `schemadiff` + fieldEvents map[string]*TestFieldEvent // map of table name to field event for the table + fieldEventsSent map[string]bool // whether the field event has been sent for the table in the test + state map[string]*query.Row // last row inserted for each table. Useful to generate events only for inserts + metadata map[string][]string // list of enum/set values for enum/set columns +} + +func (ts *TestSpec) getCurrentState(table string) *query.Row { + return ts.state[table] +} + +func (ts *TestSpec) setCurrentState(table string, row *query.Row) { + ts.state[table] = row +} + +// Init() initializes the test. It creates the tables and sets up the internal state. +func (ts *TestSpec) Init() error { + var err error + if ts.inited { + return nil + } + defer func() { ts.inited = true }() + if ts.options == nil { + ts.options = &TestSpecOptions{} + } + ts.schema, err = schemadiff.NewSchemaFromQueries(schemadiff.NewTestEnv(), ts.ddls) + if err != nil { + return err + } + ts.fieldEvents = make(map[string]*TestFieldEvent) + ts.fieldEventsSent = make(map[string]bool) + ts.state = make(map[string]*query.Row) + ts.metadata = make(map[string][]string) + ts.pkColumns = make(map[string][]string) + // create tables + require.Equal(ts.t, len(ts.ddls), len(ts.schema.Tables()), "number of tables in ddls and schema do not match") + for i, t := range ts.schema.Tables() { + execStatement(ts.t, ts.ddls[i]) + fe := ts.getFieldEvent(t) + ts.fieldEvents[t.Name()] = fe + + var pkColumns []string + var hasPK bool + for _, index := range t.TableSpec.Indexes { + require.NotNil(ts.t, index.Info, "index.Info is nil") + if index.Info.Type == sqlparser.IndexTypePrimary { + for _, col := range index.Columns { + pkColumns = append(pkColumns, col.Column.String()) + } + hasPK = true + } + } + if !hasPK { + // add all columns as pk columns + for _, col := range t.TableSpec.Columns { + pkColumns = append(pkColumns, col.Name.String()) + } + } + ts.pkColumns[t.Name()] = pkColumns + } + engine.se.Reload(context.Background()) + return nil +} + +// Close() should be called (via defer) at the end of the test to clean up the tables created in the test. +func (ts *TestSpec) Close() { + dropStatement := fmt.Sprintf("drop tables %s", strings.Join(ts.schema.TableNames(), ", ")) + execStatement(ts.t, dropStatement) +} + +func (ts *TestSpec) getBindVarsForInsert(stmt sqlparser.Statement) (string, map[string]string) { + bv := make(map[string]string) + ins := stmt.(*sqlparser.Insert) + tn, err := ins.Table.TableName() + require.NoError(ts.t, err) + table := tn.Name.String() + fe := ts.fieldEvents[table] + vals, ok := ins.Rows.(sqlparser.Values) + require.True(ts.t, ok, "insert statement does not have values") + for _, val := range vals { + for i, v := range val { + bufV := sqlparser.NewTrackedBuffer(nil) + v.Format(bufV) + s := bufV.String() + switch fe.cols[i].dataTypeLowered { + case "varchar", "char", "binary", "varbinary", "blob", "text": + s = strings.Trim(s, "'") + case "set", "enum": + s = ts.getMetadataMap(table, fe.cols[i], s) + } + bv[fe.cols[i].name] = s + } + } + return table, bv +} + +func (ts *TestSpec) getBindVarsForUpdate(stmt sqlparser.Statement) (string, map[string]string) { + bv := make(map[string]string) + upd := stmt.(*sqlparser.Update) + //buf := sqlparser.NewTrackedBuffer(nil) + table := sqlparser.String(upd.TableExprs[0].(*sqlparser.AliasedTableExpr).Expr) + //upd.TableExprs[0].(*sqlparser.AliasedTableExpr).Expr.Format(buf) + //table := buf.String() + fe, ok := ts.fieldEvents[table] + require.True(ts.t, ok, "field event for table %s not found", table) + index := int64(0) + state := ts.getCurrentState(table) + for i, col := range fe.cols { + bv[col.name] = string(state.Values[index : index+state.Lengths[i]]) + index += state.Lengths[i] + } + for _, expr := range upd.Exprs { + bufV := sqlparser.NewTrackedBuffer(nil) + bufN := sqlparser.NewTrackedBuffer(nil) + expr.Expr.Format(bufV) + expr.Name.Format(bufN) + bv[bufN.String()] = strings.Trim(bufV.String(), "'") + } + return table, bv +} + +// Run() runs the test. It first initializes the test, then runs the queries and validates the events. +func (ts *TestSpec) Run() { + require.NoError(ts.t, engine.se.Reload(context.Background())) + if !ts.inited { + require.NoError(ts.t, ts.Init()) + } + var testcases []testcase + for _, t := range ts.tests { + var tc testcase + var input []string + var output []string + for _, tq := range t { + var table string + input = append(input, tq.query) + switch { + case tq.events != nil && len(tq.events) == 0: // when an input query is expected to generate no events + continue + case tq.events != nil && // when we define the actual events either as a serialized string or as a TestRowEvent + (len(tq.events) > 0 && + !(len(tq.events) == 1 && tq.events[0].event == "" && tq.events[0].spec == nil)): + for _, e := range tq.events { + if e.event != "" { + output = append(output, e.event) + } else if e.spec != nil { + output = append(output, e.spec.String()) + } else { + panic("invalid event") + } + } + continue + default: + // when we don't define the actual events, we generate them based on the input query + flags := 0 + if len(tq.events) == 1 { + flags = tq.events[0].flags + } + stmt, err := sqlparser.NewTestParser().Parse(tq.query) + require.NoError(ts.t, err) + bv := make(map[string]string) + isRowEvent := false + switch stmt.(type) { + case *sqlparser.Begin: + output = append(output, "begin") + case *sqlparser.Commit: + output = append(output, "gtid", "commit") + case *sqlparser.Insert: + isRowEvent = true + table, bv = ts.getBindVarsForInsert(stmt) + case *sqlparser.Update: + isRowEvent = true + table, bv = ts.getBindVarsForUpdate(stmt) + case *sqlparser.Delete: + isRowEvent = true + del := stmt.(*sqlparser.Delete) + table = del.TableExprs[0].(*sqlparser.AliasedTableExpr).As.String() + default: + require.FailNowf(ts.t, "unsupported statement type", "stmt: %s", stmt) + } + if isRowEvent { + fe := ts.fieldEvents[table] + if fe == nil { + require.FailNowf(ts.t, "field event for table %s not found", table) + } + if !ts.fieldEventsSent[table] { + output = append(output, fe.String()) + ts.fieldEventsSent[table] = true + } + output = append(output, ts.getRowEvent(table, bv, fe, stmt, uint32(flags))) + } + } + + } + tc.input = input + tc.output = append(tc.output, output) + testcases = append(testcases, tc) + } + runCases(ts.t, ts.options.filter, testcases, "current", nil) +} + +func (ts *TestSpec) getFieldEvent(table *schemadiff.CreateTableEntity) *TestFieldEvent { + var tfe TestFieldEvent + tfe.table = table.Name() + tfe.db = testenv.DBName + for _, col := range table.TableSpec.Columns { + tc := TestColumn{} + tc.name = col.Name.String() + sqlType := col.Type.SQLType() + tc.dataType = sqlType.String() + tc.dataTypeLowered = strings.ToLower(tc.dataType) + tc.collationName = col.Type.Options.Collate + switch tc.dataTypeLowered { + case "int32": + tc.len = lengthInt + tc.collationID = collations.CollationBinaryID + tc.colType = "int(11)" + case "varchar", "varbinary", "char", "binary": + l := *col.Type.Length + switch tc.dataTypeLowered { + case "binary", "varbinary": + tc.len = int64(l) + tc.collationID = collations.CollationBinaryID + default: + tc.len = 4 * int64(l) + tc.collationID = getDefaultCollationID() + if tc.dataTypeLowered == "char" && strings.Contains(tc.collationName, "bin") { + tc.dataType = "BINARY" + } + } + tc.colType = fmt.Sprintf("%s(%d)", tc.dataTypeLowered, l) + case "blob": + tc.len = lengthBlob + tc.collationID = collations.CollationBinaryID + tc.colType = "blob" + case "text": + tc.len = lengthText + tc.collationID = getDefaultCollationID() + tc.colType = "text" + case "set": + tc.len = lengthSet + tc.collationID = getDefaultCollationID() + tc.colType = fmt.Sprintf("%s(%s)", tc.dataTypeLowered, strings.Join(col.Type.EnumValues, ",")) + ts.metadata[getMetadataKey(table.Name(), tc.name)] = col.Type.EnumValues + case "enum": + tc.len = int64(len(col.Type.EnumValues) + 1) + tc.collationID = getDefaultCollationID() + tc.colType = fmt.Sprintf("%s(%s)", tc.dataTypeLowered, strings.Join(col.Type.EnumValues, ",")) + ts.metadata[getMetadataKey(table.Name(), tc.name)] = col.Type.EnumValues + default: + log.Infof(fmt.Sprintf("unknown sqlTypeString %s", tc.dataTypeLowered)) + } + tfe.cols = append(tfe.cols, &tc) + } + return &tfe +} + +func getMetadataKey(table, col string) string { + return fmt.Sprintf("%s:%s", table, col) +} + +func (ts *TestSpec) setMetadataMap(table, col, value string) { + values := strings.Split(value, ",") + valuesReversed := slices.Clone(values) + slices.Reverse(valuesReversed) + ts.metadata[getMetadataKey(table, col)] = valuesReversed +} + +func (ts *TestSpec) getMetadataMap(table string, col *TestColumn, value string) string { + var bits int64 + value = strings.Trim(value, "'") + meta := ts.metadata[getMetadataKey(table, col.name)] + values := strings.Split(value, ",") + for _, v := range values { + v2 := strings.Trim(v, "'") + for i, m := range meta { + m2 := strings.Trim(m, "'") + if m2 == v2 { + switch col.dataTypeLowered { + case "set": + bits |= 1 << uint(i) + case "enum": + bits = int64(i) + 1 + } + } + } + } + return strconv.FormatInt(bits, 10) +} + +func (ts *TestSpec) getRowEvent(table string, bv map[string]string, fe *TestFieldEvent, stmt sqlparser.Statement, flags uint32) string { + ev := &binlogdata.RowEvent{ + TableName: table, + RowChanges: []*binlogdata.RowChange{ + { + Before: nil, + After: nil, + }, + }, + Flags: flags, + } + var row query.Row + for i, col := range fe.cols { + if fe.cols[i].skip { + continue + } + if col.dataTypeLowered == "binary" { + bv[col.name] = strings.TrimSuffix(bv[col.name], "\\0") + } + val := []byte(bv[col.name]) + l := int64(len(val)) + if col.dataTypeLowered == "binary" { + for l < col.len { + val = append(val, "\x00"...) + l++ + } + } + row.Values = append(row.Values, val...) + row.Lengths = append(row.Lengths, l) + } + ev.RowChanges = ts.getRowChanges(table, stmt, &row) + vEvent := &binlogdata.VEvent{ + Type: binlogdata.VEventType_ROW, + RowEvent: ev, + } + return vEvent.String() +} + +func (ts *TestSpec) getRowChanges(table string, stmt sqlparser.Statement, row *query.Row) []*binlogdata.RowChange { + var rowChanges []*binlogdata.RowChange + var rowChange binlogdata.RowChange + switch stmt.(type) { + case *sqlparser.Insert: + rowChange.After = row + ts.setCurrentState(table, row) + case *sqlparser.Update: + rowChange = *ts.getRowChangeForUpdate(table, row) + ts.setCurrentState(table, row) + case *sqlparser.Delete: + rowChange.Before = row + ts.setCurrentState(table, nil) + } + rowChanges = append(rowChanges, &rowChange) + return rowChanges +} + +func (ts *TestSpec) getRowChangeForUpdate(table string, newState *query.Row) *binlogdata.RowChange { + var rowChange binlogdata.RowChange + var bitmap byte + var before, after query.Row + + currentState := ts.getCurrentState(table) + if currentState == nil { + return nil + } + var currentValueIndex int64 + var hasSkip bool + for i, l := range currentState.Lengths { + skip := false + isPKColumn := false + for _, pkColumn := range ts.pkColumns[table] { + if pkColumn == ts.fieldEvents[table].cols[i].name { + isPKColumn = true + break + } + } + if ts.options.noblob { + switch ts.fieldEvents[table].cols[i].dataTypeLowered { + case "blob", "text": + currentValue := currentState.Values[currentValueIndex : currentValueIndex+l] + newValue := newState.Values[currentValueIndex : currentValueIndex+l] + if string(currentValue) == string(newValue) { + skip = true + hasSkip = true + } + } + } + if skip && !isPKColumn { + before.Lengths = append(before.Lengths, -1) + } else { + before.Values = append(before.Values, currentState.Values[currentValueIndex:currentValueIndex+l]...) + before.Lengths = append(before.Lengths, l) + } + if skip { + after.Lengths = append(after.Lengths, -1) + } else { + after.Values = append(after.Values, newState.Values[currentValueIndex:currentValueIndex+l]...) + after.Lengths = append(after.Lengths, l) + bitmap |= 1 << uint(i) + } + currentValueIndex += l + } + rowChange.Before = &before + rowChange.After = &after + if hasSkip { + rowChange.DataColumns = &binlogdata.RowChange_Bitmap{ + Count: int64(len(currentState.Lengths)), + Cols: []byte{bitmap}, + } + } + return &rowChange +} + +func (ts *TestSpec) getBefore(table string) *query.Row { + currentState := ts.getCurrentState(table) + if currentState == nil { + return nil + } + var row query.Row + var currentValueIndex int64 + for i, l := range currentState.Lengths { + dataTypeIsRedacted := false + switch ts.fieldEvents[table].cols[i].dataTypeLowered { + case "blob", "text": + dataTypeIsRedacted = true + } + if ts.options.noblob && dataTypeIsRedacted { + row.Lengths = append(row.Lengths, -1) + } else { + row.Values = append(row.Values, currentState.Values[currentValueIndex:currentValueIndex+l]...) + row.Lengths = append(row.Lengths, l) + } + currentValueIndex += l + } + return &row +} diff --git a/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go b/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go index c056ef1d7e1..9c77ca18594 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go +++ b/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go @@ -41,6 +41,8 @@ import ( vttestpb "vitess.io/vitess/go/vt/proto/vttest" ) +const DBName = "vttest" + // Env contains all the env vars for a test against a mysql instance. type Env struct { cluster *vttest.LocalCluster @@ -65,7 +67,7 @@ type Env struct { // Init initializes an Env. func Init(ctx context.Context) (*Env, error) { te := &Env{ - KeyspaceName: "vttest", + KeyspaceName: DBName, ShardName: "0", Cells: []string{"cell1"}, } @@ -89,7 +91,7 @@ func Init(ctx context.Context) (*Env, error) { Shards: []*vttestpb.Shard{ { Name: "0", - DbNameOverride: "vttest", + DbNameOverride: DBName, }, }, }, diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index ea6e0fb76aa..c515357a4ec 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -71,11 +71,11 @@ const ( numInitialRows = 10 ) -type state struct { +type TestState struct { tables []string } -var testState = &state{} +var testState = &TestState{} var positions map[string]string var allEvents []*binlogdatapb.VEvent diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go similarity index 82% rename from go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go rename to go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 0eda0d6c52e..5a1a786513f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -26,6 +26,8 @@ import ( "testing" "time" + "github.com/prometheus/common/version" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" @@ -57,22 +59,15 @@ func checkIfOptionIsSupported(t *testing.T, variable string) bool { return false } -type TestColumn struct { - name, dataType, colType string - len, charset int64 -} - -type TestFieldEvent struct { - table, db string - cols []*TestColumn -} - func (tfe *TestFieldEvent) String() string { s := fmt.Sprintf("type:FIELD field_event:{table_name:\"%s\"", tfe.table) fld := "" for _, col := range tfe.cols { + if col.skip { + continue + } fld += fmt.Sprintf(" fields:{name:\"%s\" type:%s table:\"%s\" org_table:\"%s\" database:\"%s\" org_name:\"%s\" column_length:%d charset:%d", - col.name, col.dataType, tfe.table, tfe.table, tfe.db, col.name, col.len, col.charset) + col.name, col.dataType, tfe.table, tfe.table, tfe.db, col.name, col.len, col.collationID) if col.colType != "" { fld += fmt.Sprintf(" column_type:\"%s\"", col.colType) } @@ -97,163 +92,94 @@ func TestNoBlob(t *testing.T) { engine = oldEngine env = oldEnv }() - execStatements(t, []string{ - "create table t1(id int, blb blob, val varchar(4), primary key(id))", - "create table t2(id int, txt text, val varchar(4), unique key(id, val))", - }) - defer execStatements(t, []string{ - "drop table t1", - "drop table t2", - }) - engine.se.Reload(context.Background()) - queries := []string{ - "begin", - "insert into t1 values (1, 'blob1', 'aaa')", - "update t1 set val = 'bbb'", - "commit", - "begin", - "insert into t2 values (1, 'text1', 'aaa')", - "update t2 set val = 'bbb'", - "commit", - } - fe1 := &TestFieldEvent{ - table: "t1", - db: "vttest", - cols: []*TestColumn{ - {name: "id", dataType: "INT32", colType: "int(11)", len: 11, charset: 63}, - {name: "blb", dataType: "BLOB", colType: "blob", len: 65535, charset: 63}, - {name: "val", dataType: "VARCHAR", colType: "varchar(4)", len: 16, charset: 45}, + ts := &TestSpec{ + t: t, + ddls: []string{ + // t1 has a blob column and a primary key. The blob column will not be in update row events. + "create table t1(id int, blb blob, val varchar(4), primary key(id))", + // t2 has a text column and no primary key. The text column will be in update row events. + "create table t2(id int, txt text, val varchar(4), unique key(id, val))", + // t3 has a text column and a primary key. The text column will not be in update row events. + "create table t3(id int, txt text, val varchar(4), primary key(id))", }, - } - fe2 := &TestFieldEvent{ - table: "t2", - db: "vttest", - cols: []*TestColumn{ - {name: "id", dataType: "INT32", colType: "int(11)", len: 11, charset: 63}, - {name: "txt", dataType: "TEXT", colType: "text", len: 262140, charset: 45}, - {name: "val", dataType: "VARCHAR", colType: "varchar(4)", len: 16, charset: 45}, + options: &TestSpecOptions{ + noblob: true, }, } - - testcases := []testcase{{ - input: queries, - output: [][]string{{ - "begin", - fe1.String(), - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:5 lengths:3 values:"1blob1aaa"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{before:{lengths:1 lengths:-1 lengths:3 values:"1aaa"} after:{lengths:1 lengths:-1 lengths:3 values:"1bbb"} data_columns:{count:3 cols:"\x05"}}}`, - "gtid", - "commit", - }, { - "begin", - fe2.String(), - `type:ROW row_event:{table_name:"t2" row_changes:{after:{lengths:1 lengths:5 lengths:3 values:"1text1aaa"}}}`, - `type:ROW row_event:{table_name:"t2" row_changes:{before:{lengths:1 lengths:5 lengths:3 values:"1text1aaa"} after:{lengths:1 lengths:-1 lengths:3 values:"1bbb"} data_columns:{count:3 cols:"\x05"}}}`, - "gtid", - "commit", - }}, + defer ts.Close() + require.NoError(t, ts.Init()) + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1 values (1, 'blob1', 'aaa')", nil}, + {"update t1 set val = 'bbb'", nil}, + {"commit", nil}, + }, {{"begin", nil}, + {"insert into t2 values (1, 'text1', 'aaa')", nil}, + {"update t2 set val = 'bbb'", nil}, + {"commit", nil}, + }, {{"begin", nil}, + {"insert into t3 values (1, 'text1', 'aaa')", nil}, + {"update t3 set val = 'bbb'", nil}, + {"commit", nil}, }} - runCases(t, nil, testcases, "current", nil) + ts.Run() } +// TestSetAndEnum confirms that the events for set and enum columns are correct. func TestSetAndEnum(t *testing.T) { - execStatements(t, []string{ - "create table t1(id int, val binary(4), color set('red','green','blue'), size enum('S','M','L'), primary key(id))", - }) - defer execStatements(t, []string{ - "drop table t1", - }) - engine.se.Reload(context.Background()) - queries := []string{ - "begin", - "insert into t1 values (1, 'aaa', 'red,blue', 'S')", - "insert into t1 values (2, 'bbb', 'green', 'M')", - "insert into t1 values (3, 'ccc', 'red,blue,green', 'L')", - "commit", - } - - fe := &TestFieldEvent{ - table: "t1", - db: "vttest", - cols: []*TestColumn{ - {name: "id", dataType: "INT32", colType: "int(11)", len: 11, charset: 63}, - {name: "val", dataType: "BINARY", colType: "binary(4)", len: 4, charset: 63}, - {name: "color", dataType: "SET", colType: "set('red','green','blue')", len: 56, charset: 45}, - {name: "size", dataType: "ENUM", colType: "enum('S','M','L')", len: 4, charset: 45}, + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(id int, val binary(4), color set('red','green','blue'), size enum('S','M','L'), primary key(id))", }, } - - testcases := []testcase{{ - input: queries, - output: [][]string{{ - `begin`, - fe.String(), - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:4 lengths:1 lengths:1 values:"1aaa\x0051"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:4 lengths:1 lengths:1 values:"2bbb\x0022"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:4 lengths:1 lengths:1 values:"3ccc\x0073"}}}`, - `gtid`, - `commit`, - }}, + defer ts.Close() + require.NoError(t, ts.Init()) + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1 values (1, 'aaa', 'red,blue', 'S')", nil}, + {"insert into t1 values (2, 'bbb', 'green', 'M')", nil}, + {"insert into t1 values (3, 'ccc', 'red,blue,green', 'L')", nil}, + {"commit", nil}, }} - runCases(t, nil, testcases, "current", nil) + ts.Run() } +// TestCellValuePadding tests that the events are correctly padded for binary columns. func TestCellValuePadding(t *testing.T) { - - execStatements(t, []string{ - "create table t1(id int, val binary(4), primary key(val))", - "create table t2(id int, val char(4), primary key(val))", - "create table t3(id int, val char(4) collate utf8mb4_bin, primary key(val))", - }) - defer execStatements(t, []string{ - "drop table t1", - "drop table t2", - "drop table t3", - }) - engine.se.Reload(context.Background()) - queries := []string{ - "begin", - "insert into t1 values (1, 'aaa\000')", - "insert into t1 values (2, 'bbb\000')", - "update t1 set id = 11 where val = 'aaa\000'", - "insert into t2 values (1, 'aaa')", - "insert into t2 values (2, 'bbb')", - "update t2 set id = 11 where val = 'aaa'", - "insert into t3 values (1, 'aaa')", - "insert into t3 values (2, 'bb')", - "update t3 set id = 11 where val = 'aaa'", - "commit", - } - - testcases := []testcase{{ - input: queries, - output: [][]string{{ - `begin`, - `type:FIELD field_event:{table_name:"t1" fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:BINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:4 charset:63 column_type:"binary(4)"}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:4 values:"1aaa\x00"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:4 values:"2bbb\x00"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{before:{lengths:1 lengths:4 values:"1aaa\x00"} after:{lengths:2 lengths:4 values:"11aaa\x00"}}}`, - `type:FIELD field_event:{table_name:"t2" fields:{name:"id" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:CHAR table:"t2" org_table:"t2" database:"vttest" org_name:"val" column_length:16 charset:45 column_type:"char(4)"}}`, - `type:ROW row_event:{table_name:"t2" row_changes:{after:{lengths:1 lengths:3 values:"1aaa"}}}`, - `type:ROW row_event:{table_name:"t2" row_changes:{after:{lengths:1 lengths:3 values:"2bbb"}}}`, - `type:ROW row_event:{table_name:"t2" row_changes:{before:{lengths:1 lengths:3 values:"1aaa"} after:{lengths:2 lengths:3 values:"11aaa"}}}`, - `type:FIELD field_event:{table_name:"t3" fields:{name:"id" type:INT32 table:"t3" org_table:"t3" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:BINARY table:"t3" org_table:"t3" database:"vttest" org_name:"val" column_length:16 charset:45 column_type:"char(4)"}}`, - `type:ROW row_event:{table_name:"t3" row_changes:{after:{lengths:1 lengths:3 values:"1aaa"}}}`, - `type:ROW row_event:{table_name:"t3" row_changes:{after:{lengths:1 lengths:2 values:"2bb"}}}`, - `type:ROW row_event:{table_name:"t3" row_changes:{before:{lengths:1 lengths:3 values:"1aaa"} after:{lengths:2 lengths:3 values:"11aaa"}}}`, - `gtid`, - `commit`, + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(id int, val binary(4), primary key(val))", + "create table t2(id int, val char(4), primary key(val))", + "create table t3(id int, val char(4) collate utf8mb4_bin, primary key(val))"}, + } + defer ts.Close() + require.NoError(t, ts.Init()) + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1 values (1, 'aaa\000')", nil}, + {"insert into t1 values (2, 'bbb\000')", nil}, + {"update t1 set id = 11 where val = 'aaa\000'", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{before: []string{"1", "aaa\x00"}, after: []string{"11", "aaa\x00"}}}}}, }}, + {"insert into t2 values (1, 'aaa')", nil}, + {"insert into t2 values (2, 'bbb')", nil}, + {"update t2 set id = 11 where val = 'aaa'", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t2", changes: []TestRowChange{{before: []string{"1", "aaa"}, after: []string{"11", "aaa"}}}}}, + }}, + {"insert into t3 values (1, 'aaa')", nil}, + {"insert into t3 values (2, 'bb')", nil}, + {"update t3 set id = 11 where val = 'aaa'", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t3", changes: []TestRowChange{{before: []string{"1", "aaa"}, after: []string{"11", "aaa"}}}}}, + }}, + {"commit", nil}, }} - runCases(t, nil, testcases, "current", nil) + ts.Run() } func TestSetStatement(t *testing.T) { - - if testing.Short() { - t.Skip() - } if !checkIfOptionIsSupported(t, "log_builtin_as_identified_by_password") { // the combination of setting this option and support for "set password" only works on a few flavors log.Info("Cannot test SetStatement on this flavor") @@ -296,45 +222,25 @@ func TestSetForeignKeyCheck(t *testing.T) { testRowEventFlags = true defer func() { testRowEventFlags = false }() - execStatements(t, []string{ - "create table t1(id int, val binary(4), primary key(id))", - }) - defer execStatements(t, []string{ - "drop table t1", - }) - engine.se.Reload(context.Background()) - queries := []string{ - "begin", - "insert into t1 values (1, 'aaa')", - "set @@session.foreign_key_checks=1", - "insert into t1 values (2, 'bbb')", - "set @@session.foreign_key_checks=0", - "insert into t1 values (3, 'ccc')", - "commit", - } - - fe := &TestFieldEvent{ - table: "t1", - db: "vttest", - cols: []*TestColumn{ - {name: "id", dataType: "INT32", colType: "int(11)", len: 11, charset: 63}, - {name: "val", dataType: "BINARY", colType: "binary(4)", len: 4, charset: 63}, + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(id int, val binary(4), primary key(id))", }, } - - testcases := []testcase{{ - input: queries, - output: [][]string{{ - `begin`, - fe.String(), - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:4 values:"1aaa\x00"}} flags:1}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:4 values:"2bbb\x00"}} flags:1}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:4 values:"3ccc\x00"}} flags:3}`, - `gtid`, - `commit`, - }}, + defer ts.Close() + require.NoError(t, ts.Init()) + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1 values (1, 'aaa')", []TestRowEvent{{flags: 1}}}, + {"set @@session.foreign_key_checks=1", noEvents}, + {"insert into t1 values (2, 'bbb')", []TestRowEvent{{flags: 1}}}, + {"set @@session.foreign_key_checks=0", noEvents}, + {"insert into t1 values (3, 'ccc')", []TestRowEvent{{flags: 3}}}, + {"commit", nil}, }} - runCases(t, nil, testcases, "current", nil) + ts.Run() + } func TestStmtComment(t *testing.T) { @@ -711,225 +617,179 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { } } +// TestFilteredVarBinary confirms that adding a filter using a varbinary column results in the correct set of events. func TestFilteredVarBinary(t *testing.T) { - if testing.Short() { - t.Skip() + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(id1 int, val varbinary(128), primary key(id1))", + }, + options: &TestSpecOptions{ + filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select id1, val from t1 where val = 'newton'", + }}, + }, + }, } - - execStatements(t, []string{ - "create table t1(id1 int, val varbinary(128), primary key(id1))", - }) - defer execStatements(t, []string{ - "drop table t1", - }) - engine.se.Reload(context.Background()) - - filter := &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select id1, val from t1 where val = 'newton'", + defer ts.Close() + require.NoError(t, ts.Init()) + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1 values (1, 'kepler')", noEvents}, + {"insert into t1 values (2, 'newton')", nil}, + {"insert into t1 values (3, 'newton')", nil}, + {"insert into t1 values (4, 'kepler')", noEvents}, + {"insert into t1 values (5, 'newton')", nil}, + {"update t1 set val = 'newton' where id1 = 1", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"1", "newton"}}}}}, }}, - } - - testcases := []testcase{{ - input: []string{ - "begin", - "insert into t1 values (1, 'kepler')", - "insert into t1 values (2, 'newton')", - "insert into t1 values (3, 'newton')", - "insert into t1 values (4, 'kepler')", - "insert into t1 values (5, 'newton')", - "update t1 set val = 'newton' where id1 = 1", - "update t1 set val = 'kepler' where id1 = 2", - "update t1 set val = 'newton' where id1 = 2", - "update t1 set val = 'kepler' where id1 = 1", - "delete from t1 where id1 in (2,3)", - "commit", - }, - output: [][]string{{ - `begin`, - `type:FIELD field_event:{table_name:"t1" fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:6 values:"2newton"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:6 values:"3newton"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:6 values:"5newton"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:6 values:"1newton"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{before:{lengths:1 lengths:6 values:"2newton"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:6 values:"2newton"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{before:{lengths:1 lengths:6 values:"1newton"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{before:{lengths:1 lengths:6 values:"2newton"}} row_changes:{before:{lengths:1 lengths:6 values:"3newton"}}}`, - `gtid`, - `commit`, + {"update t1 set val = 'kepler' where id1 = 2", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{before: []string{"2", "newton"}}}}}, + }}, + {"update t1 set val = 'newton' where id1 = 2", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"2", "newton"}}}}}, + }}, + {"update t1 set val = 'kepler' where id1 = 1", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{before: []string{"1", "newton"}}}}}, }}, + {"delete from t1 where id1 in (2,3)", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{before: []string{"2", "newton"}}, {before: []string{"3", "newton"}}}}}, + }}, + {"commit", nil}, }} - runCases(t, filter, testcases, "", nil) + ts.Run() } +// TestFilteredInt confirms that adding a filter using an int column results in the correct set of events. func TestFilteredInt(t *testing.T) { - if testing.Short() { - t.Skip() + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(id1 int, id2 int, val varbinary(128), primary key(id1))", + }, + options: &TestSpecOptions{ + filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select id1, val from t1 where id2 = 200", + }}, + }, + }, } - engine.se.Reload(context.Background()) - - execStatements(t, []string{ - "create table t1(id1 int, id2 int, val varbinary(128), primary key(id1))", - }) - defer execStatements(t, []string{ - "drop table t1", - }) - engine.se.Reload(context.Background()) - - filter := &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select id1, val from t1 where id2 = 200", + defer ts.Close() + require.NoError(t, ts.Init()) + ts.fieldEvents["t1"].cols[1].skip = true + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1 values (1, 100, 'aaa')", noEvents}, + {"insert into t1 values (2, 200, 'bbb')", nil}, + {"insert into t1 values (3, 100, 'ccc')", noEvents}, + {"insert into t1 values (4, 200, 'ddd')", nil}, + {"insert into t1 values (5, 200, 'eee')", nil}, + {"update t1 set val = 'newddd' where id1 = 4", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{before: []string{"4", "ddd"}, after: []string{"4", "newddd"}}}}}, }}, - } - - testcases := []testcase{{ - input: []string{ - "begin", - "insert into t1 values (1, 100, 'aaa')", - "insert into t1 values (2, 200, 'bbb')", - "insert into t1 values (3, 100, 'ccc')", - "insert into t1 values (4, 200, 'ddd')", - "insert into t1 values (5, 200, 'eee')", - "update t1 set val = 'newddd' where id1 = 4", - "update t1 set id2 = 200 where id1 = 1", - "update t1 set id2 = 100 where id1 = 2", - "update t1 set id2 = 100 where id1 = 1", - "update t1 set id2 = 200 where id1 = 2", - "commit", - }, - output: [][]string{{ - `begin`, - `type:FIELD field_event:{table_name:"t1" fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:3 values:"2bbb"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:3 values:"4ddd"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:3 values:"5eee"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{before:{lengths:1 lengths:3 values:"4ddd"} after:{lengths:1 lengths:6 values:"4newddd"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:3 values:"1aaa"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{before:{lengths:1 lengths:3 values:"2bbb"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{before:{lengths:1 lengths:3 values:"1aaa"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:3 values:"2bbb"}}}`, - `gtid`, - `commit`, + {"update t1 set id2 = 200 where id1 = 1", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"1", "aaa"}}}}}, }}, + {"update t1 set id2 = 100 where id1 = 2", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{before: []string{"2", "bbb"}}}}}, + }}, + {"update t1 set id2 = 100 where id1 = 1", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{before: []string{"1", "aaa"}}}}}, + }}, + {"update t1 set id2 = 200 where id1 = 2", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"2", "bbb"}}}}}, + }}, + {"commit", nil}, }} - runCases(t, filter, testcases, "", nil) + ts.Run() } +// TestSavepoint confirms that rolling back to a savepoint drops the dmls that were executed during the savepoint. func TestSavepoint(t *testing.T) { - if testing.Short() { - t.Skip() - } - - execStatements(t, []string{ - "create table stream1(id int, val varbinary(128), primary key(id))", - "create table stream2(id int, val varbinary(128), primary key(id))", - }) - defer execStatements(t, []string{ - "drop table stream1", - "drop table stream2", - }) - engine.se.Reload(context.Background()) - testcases := []testcase{{ - input: []string{ - "begin", - "insert into stream1 values (1, 'aaa')", - "savepoint a", - "insert into stream1 values (2, 'aaa')", - "rollback work to savepoint a", - "savepoint b", - "update stream1 set val='bbb' where id = 1", - "release savepoint b", - "commit", + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table stream1(id int, val varbinary(128), primary key(id))", }, - output: [][]string{{ - `begin`, - `type:FIELD field_event:{table_name:"stream1" fields:{name:"id" type:INT32 table:"stream1" org_table:"stream1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"stream1" org_table:"stream1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"}}`, - `type:ROW row_event:{table_name:"stream1" row_changes:{after:{lengths:1 lengths:3 values:"1aaa"}}}`, - `type:ROW row_event:{table_name:"stream1" row_changes:{before:{lengths:1 lengths:3 values:"1aaa"} after:{lengths:1 lengths:3 values:"1bbb"}}}`, - `gtid`, - `commit`, + } + defer ts.Close() + require.NoError(t, ts.Init()) + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into stream1 values (1, 'aaa')", nil}, + {"savepoint a", noEvents}, + {"insert into stream1 values (2, 'aaa')", noEvents}, + {"rollback work to savepoint a", noEvents}, + {"savepoint b", noEvents}, + {"update stream1 set val='bbb' where id = 1", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "stream1", changes: []TestRowChange{{before: []string{"1", "aaa"}, after: []string{"1", "bbb"}}}}}, }}, + {"release savepoint b", noEvents}, + {"commit", nil}, }} - runCases(t, nil, testcases, "current", nil) + ts.Run() } +// TestSavepointWithFilter tests that using savepoints with both filtered and unfiltered tables works as expected. func TestSavepointWithFilter(t *testing.T) { - if testing.Short() { - t.Skip() - } - - execStatements(t, []string{ - "create table stream1(id int, val varbinary(128), primary key(id))", - "create table stream2(id int, val varbinary(128), primary key(id))", - }) - defer execStatements(t, []string{ - "drop table stream1", - "drop table stream2", - }) - engine.se.Reload(context.Background()) - testcases := []testcase{{ - input: []string{ - "begin", - "insert into stream1 values (1, 'aaa')", - "savepoint a", - "insert into stream1 values (2, 'aaa')", - "savepoint b", - "insert into stream1 values (3, 'aaa')", - "savepoint c", - "insert into stream1 values (4, 'aaa')", - "savepoint d", - "commit", - - "begin", - "insert into stream1 values (5, 'aaa')", - "savepoint d", - "insert into stream1 values (6, 'aaa')", - "savepoint c", - "insert into stream1 values (7, 'aaa')", - "savepoint b", - "insert into stream1 values (8, 'aaa')", - "savepoint a", - "commit", - - "begin", - "insert into stream1 values (9, 'aaa')", - "savepoint a", - "insert into stream2 values (1, 'aaa')", - "savepoint b", - "insert into stream1 values (10, 'aaa')", - "savepoint c", - "insert into stream2 values (2, 'aaa')", - "savepoint d", - "commit", + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table stream1(id int, val varbinary(128), primary key(id))", + "create table stream2(id int, val varbinary(128), primary key(id))", }, - output: [][]string{{ - `begin`, - `gtid`, - `commit`, - }, { - `begin`, - `gtid`, - `commit`, - }, { - `begin`, - `type:FIELD field_event:{table_name:"stream2" fields:{name:"id" type:INT32 table:"stream2" org_table:"stream2" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"stream2" org_table:"stream2" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"}}`, - `type:ROW row_event:{table_name:"stream2" row_changes:{after:{lengths:1 lengths:3 values:"1aaa"}}}`, - `type:ROW row_event:{table_name:"stream2" row_changes:{after:{lengths:1 lengths:3 values:"2aaa"}}}`, - `gtid`, - `commit`, + options: &TestSpecOptions{ + filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "stream2", + Filter: "select * from stream2", + }}, + }, + }, + } + defer ts.Close() + require.NoError(t, ts.Init()) + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into stream1 values (1, 'aaa')", noEvents}, + {"savepoint a", noEvents}, + {"insert into stream1 values (2, 'aaa')", noEvents}, + {"savepoint b", noEvents}, + {"insert into stream1 values (3, 'aaa')", noEvents}, + {"savepoint c", noEvents}, + {"insert into stream1 values (4, 'aaa')", noEvents}, + {"savepoint d", noEvents}, + {"commit", nil}, + }, { + {"begin", nil}, + {"insert into stream1 values (5, 'aaa')", noEvents}, + {"savepoint d", noEvents}, + {"insert into stream1 values (6, 'aaa')", noEvents}, + {"savepoint c", noEvents}, + {"insert into stream1 values (7, 'aaa')", noEvents}, + {"savepoint b", noEvents}, + {"insert into stream1 values (8, 'aaa')", noEvents}, + {"savepoint a", noEvents}, + {"commit", nil}, + }, { + {"begin", nil}, + {"insert into stream1 values (9, 'aaa')", noEvents}, + {"savepoint a", noEvents}, + {"insert into stream2 values (1, 'aaa')", nil}, + {"savepoint b", noEvents}, + {"insert into stream1 values (10, 'aaa')", noEvents}, + {"savepoint c", noEvents}, + {"insert into stream2 values (2, 'aaa')", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "stream2", changes: []TestRowChange{{after: []string{"2", "aaa"}}}}}, }}, + {"savepoint d", noEvents}, + {"commit", nil}, }} - - filter := &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "stream2", - Filter: "select * from stream2", - }}, - } - runCases(t, filter, testcases, "current", nil) + ts.Run() } func TestStatements(t *testing.T) { @@ -1685,6 +1545,9 @@ func TestBestEffortNameInFieldEvent(t *testing.T) { // test that vstreamer ignores tables created by OnlineDDL func TestInternalTables(t *testing.T) { + if version.GoOS == "darwin" { + t.Skip("internal online ddl table matching doesn't work on Mac because it is case insensitive") + } if testing.Short() { t.Skip() } @@ -2158,11 +2021,11 @@ func TestGeneratedColumns(t *testing.T) { table: "t1", db: "vttest", cols: []*TestColumn{ - {name: "id", dataType: "INT32", colType: "int(11)", len: 11, charset: 63}, - {name: "val", dataType: "VARBINARY", colType: "varbinary(6)", len: 6, charset: 63}, - {name: "val2", dataType: "VARBINARY", colType: "varbinary(6)", len: 6, charset: 63}, - {name: "val3", dataType: "VARBINARY", colType: "varbinary(6)", len: 6, charset: 63}, - {name: "id2", dataType: "INT32", colType: "int(11)", len: 11, charset: 63}, + {name: "id", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, + {name: "val", dataType: "VARBINARY", colType: "varbinary(6)", len: 6, collationID: 63}, + {name: "val2", dataType: "VARBINARY", colType: "varbinary(6)", len: 6, collationID: 63}, + {name: "val3", dataType: "VARBINARY", colType: "varbinary(6)", len: 6, collationID: 63}, + {name: "id2", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, }, } @@ -2205,8 +2068,8 @@ func TestGeneratedInvisiblePrimaryKey(t *testing.T) { table: "t1", db: "vttest", cols: []*TestColumn{ - {name: "my_row_id", dataType: "UINT64", colType: "bigint unsigned", len: 20, charset: 63}, - {name: "val", dataType: "VARBINARY", colType: "varbinary(6)", len: 6, charset: 63}, + {name: "my_row_id", dataType: "UINT64", colType: "bigint unsigned", len: 20, collationID: 63}, + {name: "val", dataType: "VARBINARY", colType: "varbinary(6)", len: 6, collationID: 63}, }, } @@ -2288,10 +2151,16 @@ func expectLog(ctx context.Context, t *testing.T, input any, ch <-chan []*binlog break } } + + numEventsToMatch := len(evs) if len(wantset) != len(evs) { - t.Fatalf("%v: evs\n%v, want\n%v, >> got length %d, wanted length %d", input, evs, wantset, len(evs), len(wantset)) + log.Warningf("%v: evs\n%v, want\n%v, >> got length %d, wanted length %d", input, evs, wantset, len(evs), len(wantset)) + if len(wantset) < len(evs) { + numEventsToMatch = len(wantset) + } } - for i, want := range wantset { + for i := 0; i < numEventsToMatch; i++ { + want := wantset[i] // CurrentTime is not testable. evs[i].CurrentTime = 0 evs[i].Keyspace = "" @@ -2350,6 +2219,9 @@ func expectLog(ctx context.Context, t *testing.T, input any, ch <-chan []*binlog } } } + if len(wantset) != len(evs) { + t.Fatalf("%v: evs\n%v, want\n%v, got length %d, wanted length %d", input, evs, wantset, len(evs), len(wantset)) + } } } @@ -2385,7 +2257,7 @@ func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*binlogda timer := time.NewTimer(2 * time.Second) defer timer.Stop() - t.Logf("Received events: %v", evs) + log.Infof("Received events: %v", evs) select { case ch <- evs: case <-ctx.Done(): diff --git a/tools/rowlog/rowlog.go b/tools/rowlog/rowlog.go index 475006b2b59..8092159c6b6 100644 --- a/tools/rowlog/rowlog.go +++ b/tools/rowlog/rowlog.go @@ -71,7 +71,6 @@ func usage() { func main() { usage() - defer log.Flush() ctx := context.Background() config := parseCommandLine() if !config.Validate() {