diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index af93ac40726..74923cf6a6c 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -54,8 +54,9 @@ var ( sidecarDBIdentifier = sqlparser.String(sqlparser.NewIdentifierCS(sidecarDBName)) mainClusterConfig *ClusterConfig externalClusterConfig *ClusterConfig - extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDurationStr, - "--buffer_size", "100000", "--buffer_min_time_between_failovers", "0s", "--buffer_max_failover_duration", loadTestBufferingWindowDurationStr} + extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDuration.String(), + "--buffer_size", "250000", "--buffer_min_time_between_failovers", "1s", "--buffer_max_failover_duration", loadTestBufferingWindowDuration.String(), + "--buffer_drain_concurrency", "10"} extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"} // This variable can be used within specific tests to alter vttablet behavior extraVTTabletArgs = []string{} diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 445a15f7767..8c96d06226a 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -18,16 +18,17 @@ package vreplication import ( "context" - "crypto/rand" "encoding/hex" "encoding/json" "fmt" "io" + "math/rand" "net/http" "os/exec" "regexp" "sort" "strings" + "sync" "sync/atomic" "testing" "time" @@ -75,9 +76,10 @@ func execQuery(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { func getConnection(t *testing.T, hostname string, port int) *mysql.Conn { vtParams := mysql.ConnParams{ - Host: hostname, - Port: port, - Uname: "vt_dba", + Host: hostname, + Port: port, + Uname: "vt_dba", + ConnectTimeoutMs: 1000, } ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) @@ -714,31 +716,35 @@ func isBinlogRowImageNoBlob(t *testing.T, tablet *cluster.VttabletProcess) bool } const ( - loadTestBufferingWindowDurationStr = "30s" - loadTestPostBufferingInsertWindow = 60 * time.Second // should be greater than loadTestBufferingWindowDurationStr - loadTestWaitForCancel = 30 * time.Second - loadTestWaitBetweenQueries = 2 * time.Millisecond + loadTestBufferingWindowDuration = 10 * time.Second + loadTestAvgWaitBetweenQueries = 500 * time.Microsecond + loadTestDefaultConnections = 100 ) type loadGenerator struct { - t *testing.T - vc *VitessCluster - ctx context.Context - cancel context.CancelFunc + t *testing.T + vc *VitessCluster + ctx context.Context + cancel context.CancelFunc + connections int + wg sync.WaitGroup } func newLoadGenerator(t *testing.T, vc *VitessCluster) *loadGenerator { return &loadGenerator{ - t: t, - vc: vc, + t: t, + vc: vc, + connections: loadTestDefaultConnections, } } func (lg *loadGenerator) stop() { - time.Sleep(loadTestPostBufferingInsertWindow) // wait for buffering to stop and additional records to be inserted by startLoad after traffic is switched + // Wait for buffering to stop and additional records to be inserted by start + // after traffic is switched. + time.Sleep(loadTestBufferingWindowDuration * 2) log.Infof("Canceling load") lg.cancel() - time.Sleep(loadTestWaitForCancel) // wait for cancel to take effect + lg.wg.Wait() log.Flush() } @@ -746,62 +752,77 @@ func (lg *loadGenerator) stop() { func (lg *loadGenerator) start() { t := lg.t lg.ctx, lg.cancel = context.WithCancel(context.Background()) + var connectionCount atomic.Int64 var id int64 - log.Infof("startLoad: starting") + log.Infof("loadGenerator: starting") queryTemplate := "insert into loadtest(id, name) values (%d, 'name-%d')" var totalQueries, successfulQueries int64 var deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors int64 + lg.wg.Add(1) defer func() { - - log.Infof("startLoad: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d", + defer lg.wg.Done() + log.Infof("loadGenerator: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d", totalQueries, successfulQueries, deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors) }() - logOnce := true for { select { case <-lg.ctx.Done(): - log.Infof("startLoad: context cancelled") - log.Infof("startLoad: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d", + log.Infof("loadGenerator: context cancelled") + log.Infof("loadGenerator: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d", deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors) require.Equal(t, int64(0), deniedErrors) require.Equal(t, int64(0), otherErrors) + require.Equal(t, int64(0), reshardedErrors) require.Equal(t, totalQueries, successfulQueries) return default: - go func() { - conn := vc.GetVTGateConn(t) - defer conn.Close() - atomic.AddInt64(&id, 1) - query := fmt.Sprintf(queryTemplate, id, id) - _, err := conn.ExecuteFetch(query, 1, false) - atomic.AddInt64(&totalQueries, 1) - if err != nil { - sqlErr := err.(*sqlerror.SQLError) - if strings.Contains(strings.ToLower(err.Error()), "denied tables") { - log.Infof("startLoad: denied tables error executing query: %d:%v", sqlErr.Number(), err) - atomic.AddInt64(&deniedErrors, 1) - } else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") { - // this can happen when a second keyspace is setup with the same tables, but there are no routing rules - // set yet by MoveTables. So we ignore these errors. - atomic.AddInt64(&ambiguousErrors, 1) - } else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") { - atomic.AddInt64(&reshardedErrors, 1) - } else if strings.Contains(strings.ToLower(err.Error()), "not found") { - atomic.AddInt64(&tableNotFoundErrors, 1) - } else { - if logOnce { - log.Infof("startLoad: error executing query: %d:%v", sqlErr.Number(), err) - logOnce = false + if int(connectionCount.Load()) < lg.connections { + connectionCount.Add(1) + lg.wg.Add(1) + go func() { + defer lg.wg.Done() + defer connectionCount.Add(-1) + conn := vc.GetVTGateConn(t) + defer conn.Close() + for { + select { + case <-lg.ctx.Done(): + return + default: + } + newID := atomic.AddInt64(&id, 1) + query := fmt.Sprintf(queryTemplate, newID, newID) + _, err := conn.ExecuteFetch(query, 1, false) + atomic.AddInt64(&totalQueries, 1) + if err != nil { + sqlErr := err.(*sqlerror.SQLError) + if strings.Contains(strings.ToLower(err.Error()), "denied tables") { + if debugMode { + t.Logf("loadGenerator: denied tables error executing query: %d:%v", sqlErr.Number(), err) + } + atomic.AddInt64(&deniedErrors, 1) + } else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") { + // This can happen when a second keyspace is setup with the same tables, but + // there are no routing rules set yet by MoveTables. So we ignore these errors. + atomic.AddInt64(&ambiguousErrors, 1) + } else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") { + atomic.AddInt64(&reshardedErrors, 1) + } else if strings.Contains(strings.ToLower(err.Error()), "not found") { + atomic.AddInt64(&tableNotFoundErrors, 1) + } else { + if debugMode { + t.Logf("loadGenerator: error executing query: %d:%v", sqlErr.Number(), err) + } + atomic.AddInt64(&otherErrors, 1) + } + } else { + atomic.AddInt64(&successfulQueries, 1) } - atomic.AddInt64(&otherErrors, 1) + time.Sleep(time.Duration(int64(float64(loadTestAvgWaitBetweenQueries.Microseconds()) * rand.Float64()))) } - time.Sleep(loadTestWaitBetweenQueries) - } else { - atomic.AddInt64(&successfulQueries, 1) - } - }() - time.Sleep(loadTestWaitBetweenQueries) + }() + } } } } diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index 3171c35e35b..9b9b1c69163 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -2,6 +2,7 @@ package vreplication import ( "testing" + "time" "github.com/stretchr/testify/require" @@ -34,8 +35,12 @@ func TestMoveTablesBuffering(t *testing.T) { catchup(t, targetTab2, workflowName, "MoveTables") vdiffSideBySide(t, ksWorkflow, "") waitForLowLag(t, "customer", workflowName) - tstWorkflowSwitchReads(t, "", "") - tstWorkflowSwitchWrites(t) + for i := 0; i < 10; i++ { + tstWorkflowSwitchReadsAndWrites(t) + time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) + tstWorkflowReverseReadsAndWrites(t) + time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) + } log.Infof("SwitchWrites done") lg.stop() diff --git a/go/test/endtoend/vreplication/partial_movetables_test.go b/go/test/endtoend/vreplication/partial_movetables_test.go index 40bb585495e..62d9a0dcafa 100644 --- a/go/test/endtoend/vreplication/partial_movetables_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_test.go @@ -20,6 +20,7 @@ import ( "fmt" "strings" "testing" + "time" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -64,10 +65,12 @@ func testCancel(t *testing.T) { mt.SwitchReadsAndWrites() checkDenyList(targetKeyspace, false) checkDenyList(sourceKeyspace, true) + time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) mt.ReverseReadsAndWrites() checkDenyList(targetKeyspace, true) checkDenyList(sourceKeyspace, false) + time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) mt.Cancel() checkDenyList(targetKeyspace, false) @@ -108,6 +111,7 @@ func TestPartialMoveTablesBasic(t *testing.T) { // sharded customer keyspace. createMoveTablesWorkflow(t, "customer,loadtest,customer2") tstWorkflowSwitchReadsAndWrites(t) + time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) tstWorkflowComplete(t) emptyGlobalRoutingRules := "{}\n" @@ -218,6 +222,7 @@ func TestPartialMoveTablesBasic(t *testing.T) { expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n", targetKs, wfName, shard, shard) require.Equal(t, expectedSwitchOutput, lastOutput) + time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) // Confirm global routing rules -- everything should still be routed // to the source side, customer, globally. @@ -296,6 +301,7 @@ func TestPartialMoveTablesBasic(t *testing.T) { expectedSwitchOutput = fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n", targetKs, wfName) require.Equal(t, expectedSwitchOutput, lastOutput) + time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) // Confirm global routing rules: everything should still be routed // to the source side, customer, globally. diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index 014284ed5ee..9fa457c1589 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -21,6 +21,7 @@ import ( "fmt" "sync" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/key" @@ -93,18 +94,8 @@ func NewKeyspaceEventWatcher(ctx context.Context, topoServer srvtopo.Server, hc return kew } -type MoveTablesStatus int - -const ( - MoveTablesUnknown MoveTablesStatus = iota - // MoveTablesSwitching is set when the write traffic is the middle of being switched from the source to the target - MoveTablesSwitching - // MoveTablesSwitched is set when write traffic has been completely switched to the target - MoveTablesSwitched -) - // keyspaceState is the internal state for all the keyspaces that the KEW is -// currently watching +// currently watching. type keyspaceState struct { kew *KeyspaceEventWatcher keyspace string @@ -120,7 +111,7 @@ type keyspaceState struct { moveTablesState *MoveTablesState } -// Format prints the internal state for this keyspace for debug purposes +// Format prints the internal state for this keyspace for debug purposes. func (kss *keyspaceState) Format(f fmt.State, verb rune) { kss.mu.Lock() defer kss.mu.Unlock() @@ -137,9 +128,9 @@ func (kss *keyspaceState) Format(f fmt.State, verb rune) { fmt.Fprintf(f, "]\n") } -// beingResharded returns whether this keyspace is thought to be in the middle of a resharding -// operation. currentShard is the name of the shard that belongs to this keyspace and which -// we are trying to access. currentShard can _only_ be a primary shard. +// beingResharded returns whether this keyspace is thought to be in the middle of a +// resharding operation. currentShard is the name of the shard that belongs to this +// keyspace and which we are trying to access. currentShard can _only_ be a primary shard. func (kss *keyspaceState) beingResharded(currentShard string) bool { kss.mu.Lock() defer kss.mu.Unlock() @@ -179,11 +170,19 @@ type shardState struct { currentPrimary *topodatapb.TabletAlias } -// Subscribe returns a channel that will receive any KeyspaceEvents for all keyspaces in the current cell +// Subscribe returns a channel that will receive any KeyspaceEvents for all keyspaces in the +// current cell. func (kew *KeyspaceEventWatcher) Subscribe() chan *KeyspaceEvent { kew.subsMu.Lock() defer kew.subsMu.Unlock() - c := make(chan *KeyspaceEvent, 2) + // Use a decent size buffer to: + // 1. Avoid blocking the KEW + // 2. While not losing/missing any events + // 3. And processing them in the order received + // TODO: do we care about intermediate events? + // If not, then we could instead e.g. pull the first/oldest event + // from the channel, discard it, and add the current/latest. + c := make(chan *KeyspaceEvent, 10) kew.subs[c] = struct{}{} return c } @@ -195,14 +194,11 @@ func (kew *KeyspaceEventWatcher) Unsubscribe(c chan *KeyspaceEvent) { delete(kew.subs, c) } -func (kew *KeyspaceEventWatcher) broadcast(th *KeyspaceEvent) { +func (kew *KeyspaceEventWatcher) broadcast(ev *KeyspaceEvent) { kew.subsMu.Lock() defer kew.subsMu.Unlock() for c := range kew.subs { - select { - case c <- th: - default: - } + c <- ev } } @@ -240,7 +236,8 @@ func (kew *KeyspaceEventWatcher) run(ctx context.Context) { } // ensureConsistentLocked checks if the current keyspace has recovered from an availability -// event, and if so, returns information about the availability event to all subscribers +// event, and if so, returns information about the availability event to all subscribers. +// Note: you MUST be holding the ks.mu when calling this function. func (kss *keyspaceState) ensureConsistentLocked() { // if this keyspace is consistent, there's no ongoing availability event if kss.consistent { @@ -285,7 +282,8 @@ func (kss *keyspaceState) ensureConsistentLocked() { } } - // clone the current moveTablesState, if any, to handle race conditions where it can get updated while we're broadcasting + // Clone the current moveTablesState, if any, to handle race conditions where it can get + // updated while we're broadcasting. var moveTablesState MoveTablesState if kss.moveTablesState != nil { moveTablesState = *kss.moveTablesState @@ -312,8 +310,8 @@ func (kss *keyspaceState) ensureConsistentLocked() { Serving: sstate.serving, }) - log.Infof("keyspace event resolved: %s/%s is now consistent (serving: %v)", - sstate.target.Keyspace, sstate.target.Keyspace, + log.Infof("keyspace event resolved: %s is now consistent (serving: %t)", + topoproto.KeyspaceShardString(sstate.target.Keyspace, sstate.target.Shard), sstate.serving, ) @@ -325,9 +323,10 @@ func (kss *keyspaceState) ensureConsistentLocked() { kss.kew.broadcast(ksevent) } -// onHealthCheck is the callback that updates this keyspace with event data from the HealthCheck stream. -// the HealthCheck stream applies to all the keyspaces in the cluster and emits TabletHealth events to our -// parent KeyspaceWatcher, which will mux them into their corresponding keyspaceState +// onHealthCheck is the callback that updates this keyspace with event data from the HealthCheck +// stream. The HealthCheck stream applies to all the keyspaces in the cluster and emits +// TabletHealth events to our parent KeyspaceWatcher, which will mux them into their +// corresponding keyspaceState. func (kss *keyspaceState) onHealthCheck(th *TabletHealth) { // we only care about health events on the primary if th.Target.TabletType != topodatapb.TabletType_PRIMARY { @@ -371,6 +370,17 @@ func (kss *keyspaceState) onHealthCheck(th *TabletHealth) { kss.ensureConsistentLocked() } +type MoveTablesStatus int + +const ( + MoveTablesUnknown MoveTablesStatus = iota + // MoveTablesSwitching is set when the write traffic is the middle of being switched from + // the source to the target. + MoveTablesSwitching + // MoveTablesSwitched is set when write traffic has been completely switched to the target. + MoveTablesSwitched +) + type MoveTablesType int const ( @@ -384,33 +394,66 @@ type MoveTablesState struct { State MoveTablesStatus } +func (mts MoveTablesState) String() string { + var typ, state string + switch mts.Typ { + case MoveTablesRegular: + typ = "Regular" + case MoveTablesShardByShard: + typ = "ShardByShard" + default: + typ = "None" + } + switch mts.State { + case MoveTablesSwitching: + state = "Switching" + case MoveTablesSwitched: + state = "Switched" + default: + state = "Unknown" + } + return fmt.Sprintf("{Type: %s, State: %s}", typ, state) +} + func (kss *keyspaceState) getMoveTablesStatus(vs *vschemapb.SrvVSchema) (*MoveTablesState, error) { mtState := &MoveTablesState{ Typ: MoveTablesNone, State: MoveTablesUnknown, } - // if there are no routing rules defined, then movetables is not in progress, exit early + // If there are no routing rules defined, then movetables is not in progress, exit early. if len(vs.GetRoutingRules().GetRules()) == 0 && len(vs.GetShardRoutingRules().GetRules()) == 0 { return mtState, nil } shortCtx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) defer cancel() - ts, _ := kss.kew.ts.GetTopoServer() - - // collect all current shard information from the topo + ts, err := kss.kew.ts.GetTopoServer() + if err != nil { + return mtState, err + } + // Collect all current shard information from the topo. var shardInfos []*topo.ShardInfo + mu := sync.Mutex{} + eg, ectx := errgroup.WithContext(shortCtx) for _, sstate := range kss.shards { - si, err := ts.GetShard(shortCtx, kss.keyspace, sstate.target.Shard) - if err != nil { - return nil, err - } - shardInfos = append(shardInfos, si) + eg.Go(func() error { + si, err := ts.GetShard(ectx, kss.keyspace, sstate.target.Shard) + if err != nil { + return err + } + mu.Lock() + defer mu.Unlock() + shardInfos = append(shardInfos, si) + return nil + }) + } + if err := eg.Wait(); err != nil { + return mtState, err } - // check if any shard has denied tables and if so, record one of these to check where it currently points to - // using the (shard) routing rules + // Check if any shard has denied tables and if so, record one of these to check where it + // currently points to using the (shard) routing rules. var shardsWithDeniedTables []string var oneDeniedTable string for _, si := range shardInfos { @@ -425,11 +468,11 @@ func (kss *keyspaceState) getMoveTablesStatus(vs *vschemapb.SrvVSchema) (*MoveTa return mtState, nil } - // check if a shard by shard migration is in progress and if so detect if it has been switched - isPartialTables := vs.ShardRoutingRules != nil && len(vs.ShardRoutingRules.Rules) > 0 + // Check if a shard by shard migration is in progress and if so detect if it has been switched. + isPartialTables := vs.GetShardRoutingRules() != nil && len(vs.GetShardRoutingRules().GetRules()) > 0 if isPartialTables { - srr := topotools.GetShardRoutingRulesMap(vs.ShardRoutingRules) + srr := topotools.GetShardRoutingRulesMap(vs.GetShardRoutingRules()) mtState.Typ = MoveTablesShardByShard mtState.State = MoveTablesSwitched for _, shard := range shardsWithDeniedTables { @@ -440,31 +483,32 @@ func (kss *keyspaceState) getMoveTablesStatus(vs *vschemapb.SrvVSchema) (*MoveTa break } } - log.Infof("getMoveTablesStatus: keyspace %s declaring partial move tables %v", kss.keyspace, mtState) + log.Infof("getMoveTablesStatus: keyspace %s declaring partial move tables %s", kss.keyspace, mtState.String()) return mtState, nil } - // it wasn't a shard by shard migration, but since we have denied tables it must be a regular MoveTables + // It wasn't a shard by shard migration, but since we have denied tables it must be a + // regular MoveTables. mtState.Typ = MoveTablesRegular mtState.State = MoveTablesSwitching - rr := topotools.GetRoutingRulesMap(vs.RoutingRules) + rr := topotools.GetRoutingRulesMap(vs.GetRoutingRules()) if rr != nil { r, ok := rr[oneDeniedTable] - // if a rule exists for the table and points to the target keyspace, writes have been switched + // If a rule exists for the table and points to the target keyspace, writes have been switched. if ok && len(r) > 0 && r[0] != fmt.Sprintf("%s.%s", kss.keyspace, oneDeniedTable) { mtState.State = MoveTablesSwitched log.Infof("onSrvKeyspace:: keyspace %s writes have been switched for table %s, rule %v", kss.keyspace, oneDeniedTable, r[0]) } } - log.Infof("getMoveTablesStatus: keyspace %s declaring regular move tables %v", kss.keyspace, mtState) + log.Infof("getMoveTablesStatus: keyspace %s declaring regular move tables %s", kss.keyspace, mtState.String()) return mtState, nil } -// onSrvKeyspace is the callback that updates this keyspace with fresh topology data from our topology server. -// this callback is called from a Watcher in the topo server whenever a change to the topology for this keyspace -// occurs. this watcher is dedicated to this keyspace, and will only yield topology metadata changes for as -// long as we're interested on this keyspace. +// onSrvKeyspace is the callback that updates this keyspace with fresh topology data from our +// topology server. this callback is called from a Watcher in the topo server whenever a change to +// the topology for this keyspace occurs. This watcher is dedicated to this keyspace, and will +// only yield topology metadata changes for as long as we're interested on this keyspace. func (kss *keyspaceState) onSrvKeyspace(newKeyspace *topodatapb.SrvKeyspace, newError error) bool { kss.mu.Lock() defer kss.mu.Unlock() @@ -478,23 +522,25 @@ func (kss *keyspaceState) onSrvKeyspace(newKeyspace *topodatapb.SrvKeyspace, new return false } - // if there's another kind of error while watching this keyspace, we assume it's temporary and related - // to the topology server, not to the keyspace itself. we'll keep waiting for more topology events. + // If there's another kind of error while watching this keyspace, we assume it's temporary and + // related to the topology server, not to the keyspace itself. we'll keep waiting for more + // topology events. if newError != nil { kss.lastError = newError log.Errorf("error while watching keyspace %q: %v", kss.keyspace, newError) return true } - // if the topology metadata for our keyspace is identical to the last one we saw there's nothing to do - // here. this is a side-effect of the way ETCD watchers work. + // If the topology metadata for our keyspace is identical to the last one we saw there's nothing to + // do here. this is a side-effect of the way ETCD watchers work. if proto.Equal(kss.lastKeyspace, newKeyspace) { // no changes return true } - // we only mark this keyspace as inconsistent if there has been a topology change in the PRIMARY for - // this keyspace, but we store the topology metadata for both primary and replicas for future-proofing. + // we only mark this keyspace as inconsistent if there has been a topology change in the PRIMARY + // for this keyspace, but we store the topology metadata for both primary and replicas for + // future-proofing. var oldPrimary, newPrimary *topodatapb.SrvKeyspace_KeyspacePartition if kss.lastKeyspace != nil { oldPrimary = topoproto.SrvKeyspaceGetPartition(kss.lastKeyspace, topodatapb.TabletType_PRIMARY) @@ -525,20 +571,24 @@ func (kss *keyspaceState) isServing() bool { // onSrvVSchema is called from a Watcher in the topo server whenever the SrvVSchema is updated by Vitess. // For the purposes here, we are interested in updates to the RoutingRules or ShardRoutingRules. -// In addition, the traffic switcher updates SrvVSchema when the DeniedTables attributes in a Shard record is -// modified. +// In addition, the traffic switcher updates SrvVSchema when the DeniedTables attributes in a Shard +// record is modified. func (kss *keyspaceState) onSrvVSchema(vs *vschemapb.SrvVSchema, err error) bool { - // the vschema can be nil if the server is currently shutting down + // The vschema can be nil if the server is currently shutting down. if vs == nil { return true } kss.mu.Lock() defer kss.mu.Unlock() - kss.moveTablesState, _ = kss.getMoveTablesStatus(vs) + var kerr error + if kss.moveTablesState, kerr = kss.getMoveTablesStatus(vs); err != nil { + log.Errorf("onSrvVSchema: keyspace %s failed to get move tables status: %v", kss.keyspace, kerr) + } if kss.moveTablesState != nil && kss.moveTablesState.Typ != MoveTablesNone { - // mark the keyspace as inconsistent. ensureConsistentLocked() checks if the workflow is switched, - // and if so, it will send an event to the buffering subscribers to indicate that buffering can be stopped. + // Mark the keyspace as inconsistent. ensureConsistentLocked() checks if the workflow is + // switched, and if so, it will send an event to the buffering subscribers to indicate that + // buffering can be stopped. kss.consistent = false kss.ensureConsistentLocked() } @@ -560,8 +610,9 @@ func newKeyspaceState(ctx context.Context, kew *KeyspaceEventWatcher, cell, keys return kss } -// processHealthCheck is the callback that is called by the global HealthCheck stream that was initiated -// by this KeyspaceEventWatcher. it redirects the TabletHealth event to the corresponding keyspaceState +// processHealthCheck is the callback that is called by the global HealthCheck stream that was +// initiated by this KeyspaceEventWatcher. It redirects the TabletHealth event to the +// corresponding keyspaceState. func (kew *KeyspaceEventWatcher) processHealthCheck(ctx context.Context, th *TabletHealth) { kss := kew.getKeyspaceStatus(ctx, th.Target.Keyspace) if kss == nil { @@ -571,8 +622,8 @@ func (kew *KeyspaceEventWatcher) processHealthCheck(ctx context.Context, th *Tab kss.onHealthCheck(th) } -// getKeyspaceStatus returns the keyspaceState object for the corresponding keyspace, allocating it -// if we've never seen the keyspace before. +// getKeyspaceStatus returns the keyspaceState object for the corresponding keyspace, allocating +// it if we've never seen the keyspace before. func (kew *KeyspaceEventWatcher) getKeyspaceStatus(ctx context.Context, keyspace string) *keyspaceState { kew.mu.Lock() defer kew.mu.Unlock() @@ -612,15 +663,15 @@ func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(ctx context.Context, tar } // PrimaryIsNotServing checks if the reason why the given target is not accessible right now is -// that the primary tablet for that shard is not serving. This is possible during a Planned Reparent Shard -// operation. Just as the operation completes, a new primary will be elected, and it will send its own healthcheck -// stating that it is serving. We should buffer requests until that point. -// There are use cases where people do not run with a Primary server at all, so we must verify that -// we only start buffering when a primary was present, and it went not serving. -// The shard state keeps track of the current primary and the last externally reparented time, which we can use -// to determine that there was a serving primary which now became non serving. This is only possible in a DemotePrimary -// RPC which are only called from ERS and PRS. So buffering will stop when these operations succeed. -// We return the tablet alias of the primary if it is serving. +// that the primary tablet for that shard is not serving. This is possible during a Planned +// Reparent Shard operation. Just as the operation completes, a new primary will be elected, and +// it will send its own healthcheck stating that it is serving. We should buffer requests until +// that point. There are use cases where people do not run with a Primary server at all, so we must +// verify that we only start buffering when a primary was present, and it went not serving. +// The shard state keeps track of the current primary and the last externally reparented time, which +// we can use to determine that there was a serving primary which now became non serving. This is +// only possible in a DemotePrimary RPC which are only called from ERS and PRS. So buffering will +// stop when these operations succeed. We return the tablet alias of the primary if it is serving. func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(ctx context.Context, target *querypb.Target) (*topodatapb.TabletAlias, bool) { if target.TabletType != topodatapb.TabletType_PRIMARY { return nil, false @@ -632,7 +683,8 @@ func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(ctx context.Context, target ks.mu.Lock() defer ks.mu.Unlock() if state, ok := ks.shards[target.Shard]; ok { - // If the primary tablet was present then externallyReparented will be non-zero and currentPrimary will be not nil + // If the primary tablet was present then externallyReparented will be non-zero and + // currentPrimary will be not nil. return state.currentPrimary, !state.serving && !ks.consistent && state.externallyReparented != 0 && state.currentPrimary != nil } return nil, false diff --git a/go/vt/discovery/keyspace_events_test.go b/go/vt/discovery/keyspace_events_test.go index 43af4bf49de..e9406ff1de2 100644 --- a/go/vt/discovery/keyspace_events_test.go +++ b/go/vt/discovery/keyspace_events_test.go @@ -19,6 +19,8 @@ package discovery import ( "context" "encoding/hex" + "sync" + "sync/atomic" "testing" "time" @@ -60,6 +62,67 @@ func TestSrvKeyspaceWithNilNewKeyspace(t *testing.T) { require.True(t, kss.onSrvKeyspace(nil, nil)) } +// TestKeyspaceEventConcurrency confirms that the keyspace event watcher +// does not fail to broadcast received keyspace events to subscribers. +// This verifies that no events are lost when there's a high number of +// concurrent keyspace events. +func TestKeyspaceEventConcurrency(t *testing.T) { + cell := "cell1" + factory := faketopo.NewFakeTopoFactory() + factory.AddCell(cell) + sts := &fakeTopoServer{} + hc := NewFakeHealthCheck(make(chan *TabletHealth)) + defer hc.Close() + kew := &KeyspaceEventWatcher{ + hc: hc, + ts: sts, + localCell: cell, + keyspaces: make(map[string]*keyspaceState), + subs: make(map[chan *KeyspaceEvent]struct{}), + } + + // Subscribe to the watcher's broadcasted keyspace events. + receiver := kew.Subscribe() + + updates := atomic.Uint32{} + updates.Store(0) + wg := sync.WaitGroup{} + concurrency := 100 + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + go func() { + for { + select { + case <-ctx.Done(): + return + case <-receiver: + updates.Add(1) + } + } + }() + // Start up concurent go-routines that will broadcast keyspace events. + for i := 1; i <= concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + kew.broadcast(&KeyspaceEvent{}) + }() + } + wg.Wait() + for { + select { + case <-ctx.Done(): + require.Equal(t, concurrency, int(updates.Load()), "expected %d updates, got %d", concurrency, updates.Load()) + return + default: + if int(updates.Load()) == concurrency { // Pass + cancel() + return + } + } + } +} + // TestKeyspaceEventTypes confirms that the keyspace event watcher determines // that the unavailability event is caused by the correct scenario. We should // consider it to be caused by a resharding operation when the following @@ -309,6 +372,26 @@ func (f *fakeTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace stri return ks, nil } +// GetSrvVSchema returns the SrvVSchema for a cell. +func (f *fakeTopoServer) GetSrvVSchema(ctx context.Context, cell string) (*vschemapb.SrvVSchema, error) { + vs := &vschemapb.SrvVSchema{ + Keyspaces: map[string]*vschemapb.Keyspace{ + "ks1": { + Sharded: true, + }, + }, + RoutingRules: &vschemapb.RoutingRules{ + Rules: []*vschemapb.RoutingRule{ + { + FromTable: "db1.t1", + ToTables: []string{"db1.t1"}, + }, + }, + }, + } + return vs, nil +} + func (f *fakeTopoServer) WatchSrvKeyspace(ctx context.Context, cell, keyspace string, callback func(*topodatapb.SrvKeyspace, error) bool) { ks, err := f.GetSrvKeyspace(ctx, cell, keyspace) callback(ks, err) @@ -318,5 +401,6 @@ func (f *fakeTopoServer) WatchSrvKeyspace(ctx context.Context, cell, keyspace st // the provided cell. It will call the callback when // a new value or an error occurs. func (f *fakeTopoServer) WatchSrvVSchema(ctx context.Context, cell string, callback func(*vschemapb.SrvVSchema, error) bool) { - + sv, err := f.GetSrvVSchema(ctx, cell) + callback(sv, err) } diff --git a/go/vt/srvtopo/watch_srvvschema.go b/go/vt/srvtopo/watch_srvvschema.go index 1b5536e623d..c758211375d 100644 --- a/go/vt/srvtopo/watch_srvvschema.go +++ b/go/vt/srvtopo/watch_srvvschema.go @@ -21,8 +21,9 @@ import ( "time" "vitess.io/vitess/go/stats" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" "vitess.io/vitess/go/vt/topo" + + vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) type SrvVSchemaWatcher struct { diff --git a/go/vt/topo/etcd2topo/watch.go b/go/vt/topo/etcd2topo/watch.go index cdc9be44b21..2fc58d437ff 100644 --- a/go/vt/topo/etcd2topo/watch.go +++ b/go/vt/topo/etcd2topo/watch.go @@ -51,7 +51,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < } wd := &topo.WatchData{ Contents: initial.Kvs[0].Value, - Version: EtcdVersion(initial.Kvs[0].ModRevision), + Version: EtcdVersion(initial.Kvs[0].Version), } // Create an outer context that will be canceled on return and will cancel all inner watches. @@ -76,7 +76,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < defer close(notifications) defer outerCancel() - var currVersion = initial.Header.Revision + var rev = initial.Header.Revision var watchRetries int for { select { @@ -107,9 +107,9 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < // Cancel inner context on retry and create new one. watchCancel() watchCtx, watchCancel = context.WithCancel(ctx) - newWatcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(currVersion)) + newWatcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(rev)) if newWatcher == nil { - log.Warningf("watch %v failed and get a nil channel returned, currVersion: %v", nodePath, currVersion) + log.Warningf("watch %v failed and get a nil channel returned, rev: %v", nodePath, rev) } else { watcher = newWatcher } @@ -126,7 +126,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < return } - currVersion = wresp.Header.GetRevision() + rev = wresp.Header.GetRevision() for _, ev := range wresp.Events { switch ev.Type { @@ -174,7 +174,7 @@ func (s *Server) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Wa var wd topo.WatchDataRecursive wd.Path = string(kv.Key) wd.Contents = kv.Value - wd.Version = EtcdVersion(initial.Kvs[0].ModRevision) + wd.Version = EtcdVersion(initial.Kvs[0].Version) initialwd = append(initialwd, &wd) } @@ -200,7 +200,7 @@ func (s *Server) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Wa defer close(notifications) defer outerCancel() - var currVersion = initial.Header.Revision + var rev = initial.Header.Revision var watchRetries int for { select { @@ -228,9 +228,9 @@ func (s *Server) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Wa watchCancel() watchCtx, watchCancel = context.WithCancel(ctx) - newWatcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(currVersion), clientv3.WithPrefix()) + newWatcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(rev), clientv3.WithPrefix()) if newWatcher == nil { - log.Warningf("watch %v failed and get a nil channel returned, currVersion: %v", nodePath, currVersion) + log.Warningf("watch %v failed and get a nil channel returned, rev: %v", nodePath, rev) } else { watcher = newWatcher } @@ -247,7 +247,7 @@ func (s *Server) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Wa return } - currVersion = wresp.Header.GetRevision() + rev = wresp.Header.GetRevision() for _, ev := range wresp.Events { switch ev.Type { diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 43d1f1a2b05..3f6297d345f 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2891,7 +2891,9 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor return nil, err } if hasReplica || hasRdonly { - if rdDryRunResults, err = s.switchReads(ctx, req, ts, startState, timeout, false, direction); err != nil { + // If we're going to switch writes immediately after then we don't need to + // rebuild the SrvVSchema here as we will do it after switching writes. + if rdDryRunResults, err = s.switchReads(ctx, req, ts, startState, !hasPrimary /* rebuildSrvVSchema */, direction); err != nil { return nil, err } log.Infof("Switch Reads done for workflow %s.%s", req.Keyspace, req.Workflow) @@ -2945,7 +2947,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor } // switchReads is a generic way of switching read traffic for a workflow. -func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitchTrafficRequest, ts *trafficSwitcher, state *State, timeout time.Duration, cancel bool, direction TrafficSwitchDirection) (*[]string, error) { +func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitchTrafficRequest, ts *trafficSwitcher, state *State, rebuildSrvVSchema bool, direction TrafficSwitchDirection) (*[]string, error) { var roTabletTypes []topodatapb.TabletType // When we are switching all traffic we also get the primary tablet type, which we need to // filter out for switching reads. @@ -3032,7 +3034,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { if ts.isPartialMigration { ts.Logger().Infof("Partial migration, skipping switchTableReads as traffic is all or nothing per shard and overridden for reads AND writes in the ShardRoutingRule created when switching writes.") - } else if err := sw.switchTableReads(ctx, req.Cells, roTabletTypes, direction); err != nil { + } else if err := sw.switchTableReads(ctx, req.Cells, roTabletTypes, rebuildSrvVSchema, direction); err != nil { return handleError("failed to switch read traffic for the tables", err) } return sw.logs(), nil diff --git a/go/vt/vtctl/workflow/switcher.go b/go/vt/vtctl/workflow/switcher.go index 0cbdce164dc..d7690458439 100644 --- a/go/vt/vtctl/workflow/switcher.go +++ b/go/vt/vtctl/workflow/switcher.go @@ -66,8 +66,8 @@ func (r *switcher) switchShardReads(ctx context.Context, cells []string, servedT return r.ts.switchShardReads(ctx, cells, servedTypes, direction) } -func (r *switcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { - return r.ts.switchTableReads(ctx, cells, servedTypes, direction) +func (r *switcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, rebuildSrvVSchema bool, direction TrafficSwitchDirection) error { + return r.ts.switchTableReads(ctx, cells, servedTypes, rebuildSrvVSchema, direction) } func (r *switcher) startReverseVReplication(ctx context.Context) error { diff --git a/go/vt/vtctl/workflow/switcher_dry_run.go b/go/vt/vtctl/workflow/switcher_dry_run.go index 1c8a05e00c2..60213194071 100644 --- a/go/vt/vtctl/workflow/switcher_dry_run.go +++ b/go/vt/vtctl/workflow/switcher_dry_run.go @@ -76,7 +76,7 @@ func (dr *switcherDryRun) switchShardReads(ctx context.Context, cells []string, return nil } -func (dr *switcherDryRun) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { +func (dr *switcherDryRun) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, rebuildSrvVSchema bool, direction TrafficSwitchDirection) error { ks := dr.ts.TargetKeyspaceName() if direction == DirectionBackward { ks = dr.ts.SourceKeyspaceName() @@ -88,6 +88,9 @@ func (dr *switcherDryRun) switchTableReads(ctx context.Context, cells []string, tables := strings.Join(dr.ts.Tables(), ",") dr.drLog.Logf("Switch reads for tables [%s] to keyspace %s for tablet types [%s]", tables, ks, strings.Join(tabletTypes, ",")) dr.drLog.Logf("Routing rules for tables [%s] will be updated", tables) + if rebuildSrvVSchema { + dr.drLog.Logf("Serving VSchema will be rebuilt for the %s keyspace", ks) + } return nil } diff --git a/go/vt/vtctl/workflow/switcher_interface.go b/go/vt/vtctl/workflow/switcher_interface.go index 8d0f9e847be..9af9ff49f2f 100644 --- a/go/vt/vtctl/workflow/switcher_interface.go +++ b/go/vt/vtctl/workflow/switcher_interface.go @@ -36,7 +36,7 @@ type iswitcher interface { changeRouting(ctx context.Context) error streamMigraterfinalize(ctx context.Context, ts *trafficSwitcher, workflows []string) error startReverseVReplication(ctx context.Context) error - switchTableReads(ctx context.Context, cells []string, servedType []topodatapb.TabletType, direction TrafficSwitchDirection) error + switchTableReads(ctx context.Context, cells []string, servedType []topodatapb.TabletType, rebuildSrvVSchema bool, direction TrafficSwitchDirection) error switchShardReads(ctx context.Context, cells []string, servedType []topodatapb.TabletType, direction TrafficSwitchDirection) error validateWorkflowHasCompleted(ctx context.Context) error removeSourceTables(ctx context.Context, removalType TableRemovalType) error diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index a102c3162f3..66bae0942d0 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -573,7 +573,7 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, return nil } -func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { +func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, rebuildSrvVSchema bool, direction TrafficSwitchDirection) error { log.Infof("switchTableReads: cells: %s, tablet types: %+v, direction %d", strings.Join(cells, ","), servedTypes, direction) rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { @@ -605,7 +605,10 @@ func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, if err := topotools.SaveRoutingRules(ctx, ts.TopoServer(), rules); err != nil { return err } - return ts.TopoServer().RebuildSrvVSchema(ctx, cells) + if rebuildSrvVSchema { + return ts.TopoServer().RebuildSrvVSchema(ctx, cells) + } + return nil } func (ts *trafficSwitcher) startReverseVReplication(ctx context.Context) error { diff --git a/go/vt/vtgate/buffer/buffer.go b/go/vt/vtgate/buffer/buffer.go index 622bb03b082..260fb272544 100644 --- a/go/vt/vtgate/buffer/buffer.go +++ b/go/vt/vtgate/buffer/buffer.go @@ -164,6 +164,10 @@ func New(cfg *Config) *Buffer { } } +func (b *Buffer) GetConfig() *Config { + return b.config +} + // WaitForFailoverEnd blocks until a pending buffering due to a failover for // keyspace/shard is over. // If there is no ongoing failover, "err" is checked. If it's caused by a diff --git a/go/vt/vtgate/buffer/flags.go b/go/vt/vtgate/buffer/flags.go index a17cc09ccc3..19f30c64ed8 100644 --- a/go/vt/vtgate/buffer/flags.go +++ b/go/vt/vtgate/buffer/flags.go @@ -70,6 +70,9 @@ func verifyFlags() error { if bufferSize < 1 { return fmt.Errorf("--buffer_size must be >= 1 (specified value: %d)", bufferSize) } + if bufferMinTimeBetweenFailovers < 1*time.Second { + return fmt.Errorf("--buffer_min_time_between_failovers must be >= 1s (specified value: %v)", bufferMinTimeBetweenFailovers) + } if bufferDrainConcurrency < 1 { return fmt.Errorf("--buffer_drain_concurrency must be >= 1 (specified value: %d)", bufferDrainConcurrency) diff --git a/go/vt/vtgate/buffer/shard_buffer.go b/go/vt/vtgate/buffer/shard_buffer.go index ae33aabb399..a58b86f670a 100644 --- a/go/vt/vtgate/buffer/shard_buffer.go +++ b/go/vt/vtgate/buffer/shard_buffer.go @@ -24,14 +24,13 @@ import ( "time" "vitess.io/vitess/go/vt/discovery" - - "vitess.io/vitess/go/vt/vtgate/errorsanitizer" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/errorsanitizer" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) // bufferState represents the different states a shardBuffer object can be in. diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 2d3a82f0b9b..319c2507d13 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -1408,7 +1408,7 @@ func TestExecutorAlterVSchemaKeyspace(t *testing.T) { session := NewSafeSession(&vtgatepb.Session{TargetString: "@primary", Autocommit: true}) vschemaUpdates := make(chan *vschemapb.SrvVSchema, 2) - executor.serv.WatchSrvVSchema(ctx, "aa", func(vschema *vschemapb.SrvVSchema, err error) bool { + executor.serv.WatchSrvVSchema(ctx, executor.cell, func(vschema *vschemapb.SrvVSchema, err error) bool { vschemaUpdates <- vschema return true }) diff --git a/go/vt/vtgate/executor_vschema_ddl_test.go b/go/vt/vtgate/executor_vschema_ddl_test.go index 1c2813a33c4..1c912ed0d62 100644 --- a/go/vt/vtgate/executor_vschema_ddl_test.go +++ b/go/vt/vtgate/executor_vschema_ddl_test.go @@ -17,26 +17,23 @@ limitations under the License. package vtgate import ( - "context" "reflect" "slices" "testing" "time" - "vitess.io/vitess/go/test/utils" - - "vitess.io/vitess/go/vt/callerid" - querypb "vitess.io/vitess/go/vt/proto/query" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/utils" + "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/vtgate/vschemaacl" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - + querypb "vitess.io/vitess/go/vt/proto/query" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) func waitForVindex(t *testing.T, ks, name string, watch chan *vschemapb.SrvVSchema, executor *Executor) (*vschemapb.SrvVSchema, *vschemapb.Vindex) { @@ -426,9 +423,7 @@ func TestExecutorDropSequenceDDL(t *testing.T) { _, err = executor.Execute(ctx, nil, "TestExecute", session, stmt, nil) require.NoError(t, err) - ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - if !waitForNewerVSchema(ctxWithTimeout, executor, ts) { + if !waitForNewerVSchema(ctx, executor, ts, 5*time.Second) { t.Fatalf("vschema did not drop the sequene 'test_seq'") } @@ -464,9 +459,7 @@ func TestExecutorDropAutoIncDDL(t *testing.T) { stmt = "alter vschema on test_table add auto_increment id using `db-name`.`test_seq`" _, err = executor.Execute(ctx, nil, "TestExecute", session, stmt, nil) require.NoError(t, err) - ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - if !waitForNewerVSchema(ctxWithTimeout, executor, ts) { + if !waitForNewerVSchema(ctx, executor, ts, 5*time.Second) { t.Fatalf("vschema did not update with auto_increment for 'test_table'") } ts = executor.VSchema().GetCreated() @@ -480,9 +473,7 @@ func TestExecutorDropAutoIncDDL(t *testing.T) { _, err = executor.Execute(ctx, nil, "TestExecute", session, stmt, nil) require.NoError(t, err) - ctxWithTimeout, cancel2 := context.WithTimeout(ctx, 5*time.Second) - defer cancel2() - if !waitForNewerVSchema(ctxWithTimeout, executor, ts) { + if !waitForNewerVSchema(ctx, executor, ts, 5*time.Second) { t.Fatalf("vschema did not drop the auto_increment for 'test_table'") } if executor.vm.GetCurrentSrvVschema().Keyspaces[ks].Tables["test_table"].AutoIncrement != nil { diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 5d2414ac275..73ad109ef37 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -24,20 +24,20 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" - querypb "vitess.io/vitess/go/vt/proto/query" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/logstats" "vitess.io/vitess/go/vt/vtgate/vtgateservice" + + querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) type planExec func(ctx context.Context, plan *engine.Plan, vc *vcursorImpl, bindVars map[string]*querypb.BindVariable, startTime time.Time) error type txResult func(sqlparser.StatementType, *sqltypes.Result) error -func waitForNewerVSchema(ctx context.Context, e *Executor, lastVSchemaCreated time.Time) bool { - timeout := 30 * time.Second +func waitForNewerVSchema(ctx context.Context, e *Executor, lastVSchemaCreated time.Time, timeout time.Duration) bool { pollingInterval := 10 * time.Millisecond waitCtx, cancel := context.WithTimeout(ctx, timeout) ticker := time.NewTicker(pollingInterval) @@ -48,7 +48,7 @@ func waitForNewerVSchema(ctx context.Context, e *Executor, lastVSchemaCreated ti case <-waitCtx.Done(): return false case <-ticker.C: - if e.VSchema().GetCreated().After(lastVSchemaCreated) { + if e.VSchema() != nil && e.VSchema().GetCreated().After(lastVSchemaCreated) { return true } } @@ -64,11 +64,11 @@ func (e *Executor) newExecute( logStats *logstats.LogStats, execPlan planExec, // used when there is a plan to execute recResult txResult, // used when it's something simple like begin/commit/rollback/savepoint -) error { - // 1: Prepare before planning and execution +) (err error) { + // 1: Prepare before planning and execution. // Start an implicit transaction if necessary. - err := e.startTxIfNecessary(ctx, safeSession) + err = e.startTxIfNecessary(ctx, safeSession) if err != nil { return err } @@ -79,21 +79,35 @@ func (e *Executor) newExecute( query, comments := sqlparser.SplitMarginComments(sql) - // 2: Parse and Validate query + // 2: Parse and Validate query. stmt, reservedVars, err := parseAndValidateQuery(query) if err != nil { return err } - var lastVSchemaCreated time.Time - vs := e.VSchema() - lastVSchemaCreated = vs.GetCreated() + var ( + vs = e.VSchema() + lastVSchemaCreated = vs.GetCreated() + result *sqltypes.Result + plan *engine.Plan + ) + for try := 0; try < MaxBufferingRetries; try++ { - if try > 0 && !vs.GetCreated().After(lastVSchemaCreated) { - // There is a race due to which the executor's vschema may not have been updated yet. - // Without a wait we fail non-deterministically since the previous vschema will not have the updated routing rules - if waitForNewerVSchema(ctx, e, lastVSchemaCreated) { + if try > 0 && !vs.GetCreated().After(lastVSchemaCreated) { // We need to wait for a vschema update + // Without a wait we fail non-deterministically since the previous vschema will not have + // the updated routing rules. + // We retry MaxBufferingRetries-1 (2) times before giving up. How long we wait before each retry + // -- IF we don't see a newer vschema come in -- affects how long we retry in total and how quickly + // we retry the query and (should) succeed when the traffic switch fails or we otherwise hit the + // max buffer failover time without resolving the keyspace event and marking it as consistent. + // This calculation attemps to ensure that we retry at a sensible interval and number of times + // based on the buffering configuration. This way we should be able to perform the max retries + // within the given window of time for most queries and we should not end up waiting too long + // after the traffic switch fails or the buffer window has ended, retrying old queries. + timeout := e.resolver.scatterConn.gateway.buffer.GetConfig().MaxFailoverDuration / (MaxBufferingRetries - 1) + if waitForNewerVSchema(ctx, e, lastVSchemaCreated, timeout) { vs = e.VSchema() + lastVSchemaCreated = vs.GetCreated() } } @@ -102,16 +116,13 @@ func (e *Executor) newExecute( return err } - // 3: Create a plan for the query + // 3: Create a plan for the query. // If we are retrying, it is likely that the routing rules have changed and hence we need to // replan the query since the target keyspace of the resolved shards may have changed as a - // result of MoveTables. So we cannot reuse the plan from the first try. - // When buffering ends, many queries might be getting planned at the same time. Ideally we - // should be able to reuse plans once the first drained query has been planned. For now, we - // punt on this and choose not to prematurely optimize since it is not clear how much caching - // will help and if it will result in hard-to-track edge cases. - - var plan *engine.Plan + // result of MoveTables SwitchTraffic which does a RebuildSrvVSchema which in turn causes + // the vtgate to clear the cached plans when processing the new serving vschema. + // When buffering ends, many queries might be getting planned at the same time and we then + // take full advatange of the cached plan. plan, err = e.getPlan(ctx, vcursor, query, stmt, comments, bindVars, reservedVars, e.normalize, logStats) execStart := e.logPlanningFinished(logStats, plan) @@ -124,12 +135,12 @@ func (e *Executor) newExecute( safeSession.ClearWarnings() } - // add any warnings that the planner wants to add + // Add any warnings that the planner wants to add. for _, warning := range plan.Warnings { safeSession.RecordWarning(warning) } - result, err := e.handleTransactions(ctx, mysqlCtx, safeSession, plan, logStats, vcursor, stmt) + result, err = e.handleTransactions(ctx, mysqlCtx, safeSession, plan, logStats, vcursor, stmt) if err != nil { return err } @@ -137,14 +148,14 @@ func (e *Executor) newExecute( return recResult(plan.Type, result) } - // 4: Prepare for execution + // 4: Prepare for execution. err = e.addNeededBindVars(vcursor, plan.BindVarNeeds, bindVars, safeSession) if err != nil { logStats.Error = err return err } - // 5: Execute the plan and retry if needed + // 5: Execute the plan. if plan.Instructions.NeedsTransaction() { err = e.insideTransaction(ctx, safeSession, logStats, func() error { @@ -158,10 +169,39 @@ func (e *Executor) newExecute( return err } + // 6: Retry if needed. rootCause := vterrors.RootCause(err) if rootCause != nil && strings.Contains(rootCause.Error(), "enforce denied tables") { log.V(2).Infof("Retry: %d, will retry query %s due to %v", try, query, err) - lastVSchemaCreated = vs.GetCreated() + if try == 0 { // We are going to retry at least once + defer func() { + // Prevent any plan cache pollution from queries planned against the wrong keyspace during a MoveTables + // traffic switching operation. + if err != nil { // The error we're checking here is the return value from the newExecute function + cause := vterrors.RootCause(err) + if cause != nil && strings.Contains(cause.Error(), "enforce denied tables") { + // The executor's VSchemaManager clears the plan cache when it receives a new vschema via its + // SrvVSchema watcher (it calls executor.SaveVSchema() in its watch's subscriber callback). This + // happens concurrently with the KeyspaceEventWatcher also receiving the new vschema in its + // SrvVSchema watcher and in its subscriber callback processing it (which includes getting info + // on all shards from the topo), and eventually determining that the keyspace is consistent and + // ending the buffering window. So there's race with query retries such that a query could be + // planned against the wrong side just as the keyspace event is getting resolved and the buffers + // drained. Then that bad plan is the cached plan for the query until you do another + // topo.RebuildSrvVSchema/vtctldclient RebuildVSchemaGraph which then causes the VSchemaManager + // to clear the plan cache. It's essentially a race between the two SrvVSchema watchers and the + // work they do when a new one is received. If we DID a retry AND the last time we retried + // still encountered the error, we know that the plan used was 1) not valid/correct and going to + // the wrong side of the traffic switch as it failed with the denied tables error and 2) it will + // remain the plan in the cache if we do not clear the plans after it was added to to the cache. + // So here we clear the plan cache in order to prevent this scenario where the bad plan is + // cached indefinitely and re-used after the buffering window ends and the keyspace event is + // resolved. + e.ClearPlans() + } + } + }() + } continue }