Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a way to know if DemotePrimary is blocked and send it in the health stream #17289

Merged
merged 10 commits into from
Dec 26, 2024
19 changes: 19 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tabletmanager
import (
"context"
"fmt"
"runtime"
"strings"
"time"

Expand All @@ -29,6 +30,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
Expand Down Expand Up @@ -520,6 +522,23 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure
}
defer tm.unlock()

finishCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-finishCtx.Done():
// Finished running DemotePrimary. Nothing to do.
case <-time.After(10 * topo.RemoteOperationTimeout):
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
// We waited for over 10 times of remote operation timeout, but DemotePrimary is still not done.
// Collect more information and signal demote primary is indefinitely stuck.
log.Errorf("DemotePrimary seems to be blocked. Collecting more information.")
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
tm.QueryServiceControl.SetDemotePrimaryBlocked()
buf := make([]byte, 1<<16) // 64 KB buffer size
stackSize := runtime.Stack(buf, true)
log.Errorf("Stack trace:\n%s", string(buf[:stackSize]))
}
}()

tablet := tm.Tablet()
wasPrimary := tablet.Type == topodatapb.TabletType_PRIMARY
wasServing := tm.QueryServiceControl.IsServing()
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ type Controller interface {

// WaitForPreparedTwoPCTransactions waits for all prepared transactions to be resolved.
WaitForPreparedTwoPCTransactions(ctx context.Context) error

// SetDemotePrimaryBlocked marks that demote primary is blocked in the state manager.
SetDemotePrimaryBlocked()
}

// Ensure TabletServer satisfies Controller interface.
Expand Down
29 changes: 17 additions & 12 deletions go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,19 @@ type stateManager struct {
//
// If a transition fails, we set retrying to true and launch
// retryTransition which loops until the state converges.
mu sync.Mutex
wantState servingState
wantTabletType topodatapb.TabletType
state servingState
target *querypb.Target
ptsTimestamp time.Time
retrying bool
replHealthy bool
lameduck bool
alsoAllow []topodatapb.TabletType
reason string
transitionErr error
mu sync.Mutex
wantState servingState
wantTabletType topodatapb.TabletType
state servingState
target *querypb.Target
ptsTimestamp time.Time
retrying bool
replHealthy bool
demotePrimaryBlocked bool
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
lameduck bool
alsoAllow []topodatapb.TabletType
reason string
transitionErr error

rw *requestsWaiter

Expand Down Expand Up @@ -715,6 +716,10 @@ func (sm *stateManager) Broadcast() {
defer sm.mu.Unlock()

lag, err := sm.refreshReplHealthLocked()
if sm.demotePrimaryBlocked {
// If we are blocked from demoting primary, we should send an error for it.
err = vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Demoting primary is blocked")
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
}
sm.hs.ChangeState(sm.target.TabletType, sm.ptsTimestamp, lag, err, sm.isServingLocked())
}

Expand Down
36 changes: 36 additions & 0 deletions go/vt/vttablet/tabletserver/state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,42 @@ func TestStateManagerNotify(t *testing.T) {
sm.StopService()
}

func TestDemotePrimaryBlocked(t *testing.T) {
sm := newTestStateManager()
defer sm.StopService()
err := sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateServing, "")
require.NoError(t, err)
// Stopping the ticker so that we don't get unexpected health streams.
sm.hcticks.Stop()

ch := make(chan *querypb.StreamHealthResponse, 5)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := sm.hs.Stream(context.Background(), func(shr *querypb.StreamHealthResponse) error {
ch <- shr
return nil
})
assert.Contains(t, err.Error(), "tabletserver is shutdown")
}()
defer wg.Wait()

// Send a broadcast message and check we have no error there.
sm.Broadcast()
gotshr := <-ch
require.Empty(t, gotshr.RealtimeStats.HealthError)

// If demote primary is blocked, then we should get an error.
sm.demotePrimaryBlocked = true
sm.Broadcast()
gotshr = <-ch
require.EqualValues(t, "Demoting primary is blocked", gotshr.RealtimeStats.HealthError)

// Stop the state manager.
sm.StopService()
}

func TestRefreshReplHealthLocked(t *testing.T) {
sm := newTestStateManager()
defer sm.StopService()
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,11 @@ func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) e
}
}

// SetDemotePrimaryBlocked marks that demote primary is blocked in the state manager.
func (tsv *TabletServer) SetDemotePrimaryBlocked() {
tsv.sm.demotePrimaryBlocked = true
}

// CreateTransaction creates the metadata for a 2PC transaction.
func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, participants []*querypb.Target) (err error) {
return tsv.execRequest(
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletservermock/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ func (tqsc *Controller) WaitForPreparedTwoPCTransactions(context.Context) error
return nil
}

// SetDemotePrimaryBlocked is part of the tabletserver.Controller interface
func (tqsc *Controller) SetDemotePrimaryBlocked() {
tqsc.MethodCalled["SetDemotePrimaryBlocked"] = true
}

// EnterLameduck implements tabletserver.Controller.
func (tqsc *Controller) EnterLameduck() {
tqsc.mu.Lock()
Expand Down
Loading