Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-18.0] SHOW VITESS_REPLICATION_STATUS: Only use replication tracker when it's enabled (#15348) #15361

Merged
merged 3 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions go/test/endtoend/tabletgateway/vtgate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ import (
"testing"
"time"

"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/proto/topodata"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
vtorcutils "vitess.io/vitess/go/test/endtoend/vtorc/utils"
"vitess.io/vitess/go/vt/proto/topodata"
)

func TestVtgateHealthCheck(t *testing.T) {
Expand All @@ -59,7 +58,7 @@ func TestVtgateReplicationStatusCheck(t *testing.T) {
time.Sleep(2 * time.Second)
verifyVtgateVariables(t, clusterInstance.VtgateProcess.VerifyURL)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
conn, err := mysql.Connect(ctx, &vtParams) // VTGate
require.NoError(t, err)
defer conn.Close()

Expand All @@ -68,6 +67,38 @@ func TestVtgateReplicationStatusCheck(t *testing.T) {
expectNumRows := 2
numRows := len(qr.Rows)
assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status. Expected %d, got %d", expectNumRows, numRows))

// Disable VTOrc(s) recoveries so that it doesn't immediately repair/restart replication.
for _, vtorcProcess := range clusterInstance.VTOrcProcesses {
vtorcutils.DisableGlobalRecoveries(t, vtorcProcess)
}
// Re-enable recoveries afterward as the cluster is re-used.
defer func() {
for _, vtorcProcess := range clusterInstance.VTOrcProcesses {
vtorcutils.EnableGlobalRecoveries(t, vtorcProcess)
}
}()
// Stop replication on the non-PRIMARY tablets.
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "stop slave")
require.NoError(t, err)
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "stop slave")
require.NoError(t, err)
// Restart replication afterward as the cluster is re-used.
defer func() {
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "start slave")
require.NoError(t, err)
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "start slave")
require.NoError(t, err)
}()
time.Sleep(2 * time.Second) // Build up some replication lag
res, err := conn.ExecuteFetch("show vitess_replication_status", 2, false)
require.NoError(t, err)
expectNumRows = 2
numRows = len(qr.Rows)
assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status, expected %d, got %d", expectNumRows, numRows))
rawLag := res.Named().Rows[0]["ReplicationLag"] // Let's just look at the first row
lagInt, _ := rawLag.ToInt64() // Don't check the error as the value could be "NULL"
assert.True(t, rawLag.IsNull() || lagInt > 0, "replication lag should be NULL or greater than 0 but was: %s", rawLag.ToString())
}

func TestVtgateReplicationStatusCheckWithTabletTypeChange(t *testing.T) {
Expand All @@ -90,6 +121,11 @@ func TestVtgateReplicationStatusCheckWithTabletTypeChange(t *testing.T) {
rdOnlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly()
err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_SPARE)
require.NoError(t, err)
// Change it back to RDONLY afterward as the cluster is re-used.
defer func() {
err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_RDONLY)
require.NoError(t, err)
}()

// Only returns rows for REPLICA and RDONLY tablets -- so should be 1 of them since we updated 1 to spare
qr = utils.Exec(t, conn, "show vitess_replication_status like '%'")
Expand Down
16 changes: 16 additions & 0 deletions go/test/endtoend/vtorc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,3 +1121,19 @@ func PrintVTOrcLogsOnFailure(t *testing.T, clusterInstance *cluster.LocalProcess
log.Errorf("%s", string(content))
}
}

// EnableGlobalRecoveries enables global recoveries for the given VTOrc.
func EnableGlobalRecoveries(t *testing.T, vtorc *cluster.VTOrcProcess) {
status, resp, err := MakeAPICall(t, vtorc, "/api/enable-global-recoveries")
require.NoError(t, err)
assert.Equal(t, 200, status)
assert.Equal(t, "Global recoveries enabled\n", resp)
}

// DisableGlobalRecoveries disables global recoveries for the given VTOrc.
func DisableGlobalRecoveries(t *testing.T, vtorc *cluster.VTOrcProcess) {
status, resp, err := MakeAPICall(t, vtorc, "/api/disable-global-recoveries")
require.NoError(t, err)
assert.Equal(t, 200, status)
assert.Equal(t, "Global recoveries disabled\n", resp)
}
8 changes: 4 additions & 4 deletions go/vt/topo/memorytopo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

// NewLeaderParticipation is part of the topo.Server interface
func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) {
if c.closed {
if c.closed.Load() {
return nil, ErrConnectionClosed
}

Expand Down Expand Up @@ -72,7 +72,7 @@ type cLeaderParticipation struct {

// WaitForLeadership is part of the topo.LeaderParticipation interface.
func (mp *cLeaderParticipation) WaitForLeadership() (context.Context, error) {
if mp.c.closed {
if mp.c.closed.Load() {
return nil, ErrConnectionClosed
}

Expand Down Expand Up @@ -120,7 +120,7 @@ func (mp *cLeaderParticipation) Stop() {

// GetCurrentLeaderID is part of the topo.LeaderParticipation interface
func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, error) {
if mp.c.closed {
if mp.c.closed.Load() {
return "", ErrConnectionClosed
}

Expand All @@ -139,7 +139,7 @@ func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string,

// WaitForNewLeader is part of the topo.LeaderParticipation interface
func (mp *cLeaderParticipation) WaitForNewLeader(ctx context.Context) (<-chan string, error) {
if mp.c.closed {
if mp.c.closed.Load() {
return nil, ErrConnectionClosed
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/memorytopo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (ld *memoryTopoLockDescriptor) Unlock(ctx context.Context) error {
}

func (c *Conn) unlock(ctx context.Context, dirPath string) error {
if c.closed {
if c.closed.Load() {
return ErrConnectionClosed
}

Expand Down
7 changes: 4 additions & 3 deletions go/vt/topo/memorytopo/memorytopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math/rand"
"strings"
"sync"
"sync/atomic"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -123,13 +124,13 @@ type Conn struct {
factory *Factory
cell string
serverAddr string
closed bool
closed atomic.Bool
}

// dial returns immediately, unless the Conn points to the sentinel
// UnreachableServerAddr, in which case it will block until the context expires.
func (c *Conn) dial(ctx context.Context) error {
if c.closed {
if c.closed.Load() {
return ErrConnectionClosed
}
if c.serverAddr == UnreachableServerAddr {
Expand All @@ -141,7 +142,7 @@ func (c *Conn) dial(ctx context.Context) error {

// Close is part of the topo.Conn interface.
func (c *Conn) Close() {
c.closed = true
c.closed.Store(true)
}

type watch struct {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/topo/memorytopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

// Watch is part of the topo.Conn interface.
func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) {
if c.closed {
if c.closed.Load() {
return nil, nil, ErrConnectionClosed
}

Expand Down Expand Up @@ -75,7 +75,7 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c

// WatchRecursive is part of the topo.Conn interface.
func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) {
if c.closed {
if c.closed.Load() {
return nil, nil, ErrConnectionClosed
}

Expand Down
32 changes: 26 additions & 6 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,15 +911,15 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp
tabletHostPort := ts.GetTabletHostPort()
throttlerStatus, err := getTabletThrottlerStatus(tabletHostPort)
if err != nil {
log.Warningf("Could not get throttler status from %s: %v", tabletHostPort, err)
log.Warningf("Could not get throttler status from %s: %v", topoproto.TabletAliasString(ts.Tablet.Alias), err)
}

replSourceHost := ""
replSourcePort := int64(0)
replIOThreadHealth := ""
replSQLThreadHealth := ""
replLastError := ""
replLag := int64(-1)
replLag := "-1" // A string to support NULL as a value
sql := "show slave status"
results, err := e.txConn.tabletGateway.Execute(ctx, ts.Target, sql, nil, 0, 0, nil)
if err != nil || results == nil {
Expand All @@ -930,8 +930,25 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp
replIOThreadHealth = row["Slave_IO_Running"].ToString()
replSQLThreadHealth = row["Slave_SQL_Running"].ToString()
replLastError = row["Last_Error"].ToString()
if ts.Stats != nil {
replLag = int64(ts.Stats.ReplicationLagSeconds)
// We cannot check the tablet's tabletenv config from here so
// we only use the tablet's stat -- which is managed by the
// ReplicationTracker -- if we can tell that it's enabled,
// meaning that it has a non-zero value. If it's actually
// enabled AND zero (rather than the zeroval), then mysqld
// should also return 0 so in this case the value is correct
// and equivalent either way. The only reason that we would
// want to use the ReplicationTracker based value, when we
// can, is because the polling method allows us to get the
// estimated lag value when replication is not running (based
// on how long we've seen that it's not been running).
if ts.Stats != nil && ts.Stats.ReplicationLagSeconds > 0 { // Use the value we get from the ReplicationTracker
replLag = fmt.Sprintf("%d", ts.Stats.ReplicationLagSeconds)
} else { // Use the value from mysqld
if row["Seconds_Behind_Master"].IsNull() {
replLag = strings.ToUpper(sqltypes.NullStr) // Uppercase to match mysqld's output in SHOW REPLICA STATUS
} else {
replLag = row["Seconds_Behind_Master"].ToString()
}
}
}
replicationHealth := fmt.Sprintf("{\"EventStreamRunning\":\"%s\",\"EventApplierRunning\":\"%s\",\"LastError\":\"%s\"}", replIOThreadHealth, replSQLThreadHealth, replLastError)
Expand All @@ -944,7 +961,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp
ts.Tablet.Hostname,
fmt.Sprintf("%s:%d", replSourceHost, replSourcePort),
replicationHealth,
fmt.Sprintf("%d", replLag),
replLag,
throttlerStatus,
))
}
Expand Down Expand Up @@ -1457,11 +1474,14 @@ func (e *Executor) checkThatPlanIsValid(stmt sqlparser.Statement, plan *engine.P
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "plan includes scatter, which is disallowed using the `no_scatter` command line argument")
}

// getTabletThrottlerStatus uses HTTP to get the throttler status
// on a tablet. It uses HTTP because the CheckThrottler RPC is a
// tmclient RPC and you cannot use tmclient outside of a tablet.
func getTabletThrottlerStatus(tabletHostPort string) (string, error) {
client := http.Client{
Timeout: 100 * time.Millisecond,
}
resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check?app=vtgate", tabletHostPort))
resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check-self", tabletHostPort))
if err != nil {
return "", err
}
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ import (
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtgate/buffer"
Expand All @@ -55,6 +50,12 @@ import (
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vtgate/vschemaacl"
"vitess.io/vitess/go/vt/vtgate/vtgateservice"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

func TestExecutorResultsExceeded(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/repltracker/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"time"

"vitess.io/vitess/go/stats"

"vitess.io/vitess/go/vt/mysqlctl"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

var replicationLagSeconds = stats.NewGauge("replicationLagSec", "replication lag in seconds")
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletserver/repltracker/repltracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/heartbeat"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

var (
Expand Down
Loading