diff --git a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go index e52229dc84c..44732ba12d0 100644 --- a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go +++ b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go @@ -201,9 +201,11 @@ func TestBufferingWithMultipleDisruptions(t *testing.T) { require.NoError(t, err) // We wait a second just to make sure the PRS changes are processed by the buffering logic in vtgate. time.Sleep(1 * time.Second) - // Finally, we'll now simulate the 2 shards being healthy again by setting them back to read-write. - utils.RunSQL(context.Background(), t, "set global read_only=0", shards[0].Vttablets[0]) - utils.RunSQL(context.Background(), t, "set global read_only=0", shards[1].Vttablets[0]) + // Finally, we'll now make the 2 shards healthy again by running PRS. + err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[0].Name, shards[0].Vttablets[1].Alias) + require.NoError(t, err) + err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[1].Name, shards[1].Vttablets[1].Alias) + require.NoError(t, err) // Wait for all the writes to have succeeded. wg.Wait() } diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index 4532724f8b8..c5ddf75d667 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -84,6 +84,8 @@ func SetupShardedReparentCluster(t *testing.T, durability string) *cluster.Local clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--lock_tables_timeout", "5s", + // Fast health checks help find corner cases. + "--health_check_interval", "1s", "--track_schema_versions=true", "--queryserver_enable_online_ddl=false") clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index 036d4f3ad14..8332e99679b 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -171,8 +171,12 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool { } type shardState struct { - target *querypb.Target - serving bool + target *querypb.Target + serving bool + // waitForReparent is used to tell the keyspace event watcher + // that this shard should be marked serving only after a reparent + // operation has succeeded. + waitForReparent bool externallyReparented int64 currentPrimary *topodatapb.TabletAlias } @@ -361,8 +365,32 @@ func (kss *keyspaceState) onHealthCheck(th *TabletHealth) { // if the shard went from serving to not serving, or the other way around, the keyspace // is undergoing an availability event if sstate.serving != th.Serving { - sstate.serving = th.Serving kss.consistent = false + switch { + case th.Serving && sstate.waitForReparent: + // While waiting for a reparent, if we receive a serving primary, + // we should check if the primary term start time is greater than the externally reparented time. + // We mark the shard serving only if it is. This is required so that we don't prematurely stop + // buffering for PRS, or TabletExternallyReparented, after seeing a serving healthcheck from the + // same old primary tablet that has already been turned read-only. + if th.PrimaryTermStartTime > sstate.externallyReparented { + sstate.waitForReparent = false + sstate.serving = true + } + case th.Serving && !sstate.waitForReparent: + sstate.serving = true + case !th.Serving: + sstate.serving = false + // Once we have seen a non-serving primary healthcheck, there is no need for us to explicitly wait + // for a reparent to happen. We use waitForReparent to ensure that we don't prematurely stop + // buffering when we receive a serving healthcheck from the primary that is being demoted. + // However, if we receive a non-serving check, then we know that we won't receive any more serving + // healthchecks anymore until reparent finishes. Specifically, this helps us when PRS fails, but + // stops gracefully because the new candidate couldn't get caught up in time. In this case, we promote + // the previous primary back. Without turning off waitForReparent here, we wouldn't be able to stop + // buffering for that case. + sstate.waitForReparent = false + } } // if the primary for this shard has been externally reparented, we're undergoing a failover, @@ -777,3 +805,36 @@ func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context, } } } + +// MarkShardNotServing marks the given shard not serving. +// We use this when we start buffering for a given shard. This helps +// coordinate between the sharding logic and the keyspace event watcher. +// We take in a boolean as well to tell us whether this error is because +// a reparent is ongoing. If it is, we also mark the shard to wait for a reparent. +// The return argument is whether the shard was found and marked not serving successfully or not. +func (kew *KeyspaceEventWatcher) MarkShardNotServing(ctx context.Context, keyspace string, shard string, isReparentErr bool) bool { + kss := kew.getKeyspaceStatus(ctx, keyspace) + if kss == nil { + // Only happens if the keyspace was deleted. + return false + } + kss.mu.Lock() + defer kss.mu.Unlock() + sstate := kss.shards[shard] + if sstate == nil { + // This only happens if the shard is deleted, or if + // the keyspace event watcher hasn't seen the shard at all. + return false + } + // Mark the keyspace inconsistent and the shard not serving. + kss.consistent = false + sstate.serving = false + if isReparentErr { + // If the error was triggered because a reparent operation has started. + // We mark the shard to wait for a reparent to finish before marking it serving. + // This is required to prevent premature stopping of buffering if we receive + // a serving healthcheck from a primary that is being demoted. + sstate.waitForReparent = true + } + return true +} diff --git a/go/vt/vtgate/buffer/buffer.go b/go/vt/vtgate/buffer/buffer.go index 260fb272544..126a3a8826d 100644 --- a/go/vt/vtgate/buffer/buffer.go +++ b/go/vt/vtgate/buffer/buffer.go @@ -94,6 +94,18 @@ func CausedByFailover(err error) bool { return isFailover } +// IsErrorDueToReparenting is a stronger check than CausedByFailover, meant to return +// if the failure is caused because of a reparent. +func IsErrorDueToReparenting(err error) bool { + if vterrors.Code(err) != vtrpcpb.Code_CLUSTER_EVENT { + return false + } + if strings.Contains(err.Error(), ClusterEventReshardingInProgress) { + return false + } + return true +} + // for debugging purposes func getReason(err error) string { for _, ce := range ClusterEvents { @@ -175,7 +187,7 @@ func (b *Buffer) GetConfig() *Config { // It returns an error if buffering failed (e.g. buffer full). // If it does not return an error, it may return a RetryDoneFunc which must be // called after the request was retried. -func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, err error) (RetryDoneFunc, error) { +func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, kev *discovery.KeyspaceEventWatcher, err error) (RetryDoneFunc, error) { // If an err is given, it must be related to a failover. // We never buffer requests with other errors. if err != nil && !CausedByFailover(err) { @@ -192,7 +204,7 @@ func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, requestsSkipped.Add([]string{keyspace, shard, skippedDisabled}, 1) return nil, nil } - return sb.waitForFailoverEnd(ctx, keyspace, shard, err) + return sb.waitForFailoverEnd(ctx, keyspace, shard, kev, err) } func (b *Buffer) HandleKeyspaceEvent(ksevent *discovery.KeyspaceEvent) { diff --git a/go/vt/vtgate/buffer/buffer_helper_test.go b/go/vt/vtgate/buffer/buffer_helper_test.go index 2deb460fc39..1276f0cd751 100644 --- a/go/vt/vtgate/buffer/buffer_helper_test.go +++ b/go/vt/vtgate/buffer/buffer_helper_test.go @@ -50,7 +50,7 @@ func issueRequestAndBlockRetry(ctx context.Context, t *testing.T, b *Buffer, err bufferingStopped := make(chan error) go func() { - retryDone, err := b.WaitForFailoverEnd(ctx, keyspace, shard, failoverErr) + retryDone, err := b.WaitForFailoverEnd(ctx, keyspace, shard, nil, failoverErr) if err != nil { bufferingStopped <- err } diff --git a/go/vt/vtgate/buffer/buffer_test.go b/go/vt/vtgate/buffer/buffer_test.go index 7f32364d57f..2973ef2dfb9 100644 --- a/go/vt/vtgate/buffer/buffer_test.go +++ b/go/vt/vtgate/buffer/buffer_test.go @@ -107,7 +107,7 @@ func testBuffering1(t *testing.T, fail failover) { } // Subsequent requests with errors not related to the failover are not buffered. - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nonFailoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nonFailoverErr); err != nil || retryDone != nil { t.Fatalf("requests with non-failover errors must never be buffered. err: %v retryDone: %v", err, retryDone) } @@ -155,7 +155,7 @@ func testBuffering1(t *testing.T, fail failover) { } // Second failover: Buffering is skipped because last failover is too recent. - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("subsequent failovers must be skipped due to -buffer_min_time_between_failovers setting. err: %v retryDone: %v", err, retryDone) } if got, want := requestsSkipped.Counts()[statsKeyJoinedLastFailoverTooRecent], int64(1); got != want { @@ -213,7 +213,7 @@ func testDryRun1(t *testing.T, fail failover) { b := New(cfg) // Request does not get buffered. - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests must not be buffered during dry-run. err: %v retryDone: %v", err, retryDone) } // But the internal state changes though. @@ -259,10 +259,10 @@ func testPassthrough1(t *testing.T, fail failover) { b := New(cfg) - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nil); err != nil || retryDone != nil { t.Fatalf("requests with no error must never be buffered. err: %v retryDone: %v", err, retryDone) } - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nonFailoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nonFailoverErr); err != nil || retryDone != nil { t.Fatalf("requests with non-failover errors must never be buffered. err: %v retryDone: %v", err, retryDone) } @@ -298,7 +298,7 @@ func testLastReparentTooRecentBufferingSkipped1(t *testing.T, fail failover) { now = now.Add(1 * time.Second) fail(b, newPrimary, keyspace, shard, now) - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests where the failover end was recently detected before the start must not be buffered. err: %v retryDone: %v", err, retryDone) } if err := waitForPoolSlots(b, cfg.Size); err != nil { @@ -395,10 +395,10 @@ func testPassthroughDuringDrain1(t *testing.T, fail failover) { } // Requests during the drain will be passed through and not buffered. - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nil); err != nil || retryDone != nil { t.Fatalf("requests with no error must not be buffered during a drain. err: %v retryDone: %v", err, retryDone) } - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests with failover errors must not be buffered during a drain. err: %v retryDone: %v", err, retryDone) } @@ -430,7 +430,7 @@ func testPassthroughIgnoredKeyspaceOrShard1(t *testing.T, fail failover) { b := New(cfg) ignoredKeyspace := "ignored_ks" - if retryDone, err := b.WaitForFailoverEnd(context.Background(), ignoredKeyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), ignoredKeyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests for ignored keyspaces must not be buffered. err: %v retryDone: %v", err, retryDone) } statsKeyJoined := strings.Join([]string{ignoredKeyspace, shard, skippedDisabled}, ".") @@ -439,7 +439,7 @@ func testPassthroughIgnoredKeyspaceOrShard1(t *testing.T, fail failover) { } ignoredShard := "ff-" - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, ignoredShard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, ignoredShard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests for ignored shards must not be buffered. err: %v retryDone: %v", err, retryDone) } if err := waitForPoolSlots(b, cfg.Size); err != nil { @@ -621,7 +621,7 @@ func testEvictionNotPossible1(t *testing.T, fail failover) { // Newer requests of the second failover cannot evict anything because // they have no entries buffered. - retryDone, bufferErr := b.WaitForFailoverEnd(context.Background(), keyspace, shard2, failoverErr) + retryDone, bufferErr := b.WaitForFailoverEnd(context.Background(), keyspace, shard2, nil, failoverErr) if bufferErr == nil || retryDone != nil { t.Fatalf("buffer should have returned an error because it's full: err: %v retryDone: %v", bufferErr, retryDone) } diff --git a/go/vt/vtgate/buffer/shard_buffer.go b/go/vt/vtgate/buffer/shard_buffer.go index a58b86f670a..34fa919cfac 100644 --- a/go/vt/vtgate/buffer/shard_buffer.go +++ b/go/vt/vtgate/buffer/shard_buffer.go @@ -136,7 +136,7 @@ func (sb *shardBuffer) disabled() bool { return sb.mode == bufferModeDisabled } -func (sb *shardBuffer) waitForFailoverEnd(ctx context.Context, keyspace, shard string, err error) (RetryDoneFunc, error) { +func (sb *shardBuffer) waitForFailoverEnd(ctx context.Context, keyspace, shard string, kev *discovery.KeyspaceEventWatcher, err error) (RetryDoneFunc, error) { // We assume if err != nil then it's always caused by a failover. // Other errors must be filtered at higher layers. failoverDetected := err != nil @@ -210,7 +210,11 @@ func (sb *shardBuffer) waitForFailoverEnd(ctx context.Context, keyspace, shard s return nil, nil } - sb.startBufferingLocked(err) + // Try to start buffering. If we're unsuccessful, then we exit early. + if !sb.startBufferingLocked(ctx, kev, err) { + sb.mu.Unlock() + return nil, nil + } } if sb.mode == bufferModeDryRun { @@ -254,7 +258,16 @@ func (sb *shardBuffer) shouldBufferLocked(failoverDetected bool) bool { panic("BUG: All possible states must be covered by the switch expression above.") } -func (sb *shardBuffer) startBufferingLocked(err error) { +func (sb *shardBuffer) startBufferingLocked(ctx context.Context, kev *discovery.KeyspaceEventWatcher, err error) bool { + if kev != nil { + if !kev.MarkShardNotServing(ctx, sb.keyspace, sb.shard, IsErrorDueToReparenting(err)) { + // We failed to mark the shard as not serving. Do not buffer the request. + // This can happen if the keyspace has been deleted or if the keyspace even watcher + // hasn't yet seen the shard. Keyspace event watcher might not stop buffering for this + // request at all until it times out. It's better to not buffer this request. + return false + } + } // Reset monitoring data from previous failover. lastRequestsInFlightMax.Set(sb.statsKey, 0) lastRequestsDryRunMax.Set(sb.statsKey, 0) @@ -280,6 +293,7 @@ func (sb *shardBuffer) startBufferingLocked(err error) { sb.buf.config.MaxFailoverDuration, errorsanitizer.NormalizeError(err.Error()), ) + return true } // logErrorIfStateNotLocked logs an error if the current state is not "state". diff --git a/go/vt/vtgate/buffer/variables_test.go b/go/vt/vtgate/buffer/variables_test.go index a0640bde9e4..30d2426c639 100644 --- a/go/vt/vtgate/buffer/variables_test.go +++ b/go/vt/vtgate/buffer/variables_test.go @@ -51,7 +51,7 @@ func TestVariablesAreInitialized(t *testing.T) { // Create a new buffer and make a call which will create the shardBuffer object. // After that, the variables should be initialized for that shard. b := New(NewDefaultConfig()) - _, err := b.WaitForFailoverEnd(context.Background(), "init_test", "0", nil /* err */) + _, err := b.WaitForFailoverEnd(context.Background(), "init_test", "0", nil, nil) if err != nil { t.Fatalf("buffer should just passthrough and not return an error: %v", err) } diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 21087fe5370..2a7be6595bb 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -273,7 +273,7 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, // b) no transaction was created yet. if gw.buffer != nil && !bufferedOnce && !inTransaction && target.TabletType == topodatapb.TabletType_PRIMARY { // The next call blocks if we should buffer during a failover. - retryDone, bufferErr := gw.buffer.WaitForFailoverEnd(ctx, target.Keyspace, target.Shard, err) + retryDone, bufferErr := gw.buffer.WaitForFailoverEnd(ctx, target.Keyspace, target.Shard, gw.kev, err) // Request may have been buffered. if retryDone != nil {