Skip to content

Commit

Permalink
[release-18.0] SHOW VITESS_REPLICATION_STATUS: Only use replication t…
Browse files Browse the repository at this point in the history
…racker when it's enabled (#15348) (#15361)

Signed-off-by: Matt Lord <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Matt Lord <[email protected]>
  • Loading branch information
vitess-bot[bot] and mattlord authored Feb 26, 2024
1 parent a27e3d4 commit bfda4ee
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 30 deletions.
46 changes: 41 additions & 5 deletions go/test/endtoend/tabletgateway/vtgate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ import (
"testing"
"time"

"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/proto/topodata"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
vtorcutils "vitess.io/vitess/go/test/endtoend/vtorc/utils"
"vitess.io/vitess/go/vt/proto/topodata"
)

func TestVtgateHealthCheck(t *testing.T) {
Expand All @@ -59,7 +58,7 @@ func TestVtgateReplicationStatusCheck(t *testing.T) {
time.Sleep(2 * time.Second)
verifyVtgateVariables(t, clusterInstance.VtgateProcess.VerifyURL)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
conn, err := mysql.Connect(ctx, &vtParams) // VTGate
require.NoError(t, err)
defer conn.Close()

Expand All @@ -68,6 +67,38 @@ func TestVtgateReplicationStatusCheck(t *testing.T) {
expectNumRows := 2
numRows := len(qr.Rows)
assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status. Expected %d, got %d", expectNumRows, numRows))

// Disable VTOrc(s) recoveries so that it doesn't immediately repair/restart replication.
for _, vtorcProcess := range clusterInstance.VTOrcProcesses {
vtorcutils.DisableGlobalRecoveries(t, vtorcProcess)
}
// Re-enable recoveries afterward as the cluster is re-used.
defer func() {
for _, vtorcProcess := range clusterInstance.VTOrcProcesses {
vtorcutils.EnableGlobalRecoveries(t, vtorcProcess)
}
}()
// Stop replication on the non-PRIMARY tablets.
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "stop slave")
require.NoError(t, err)
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "stop slave")
require.NoError(t, err)
// Restart replication afterward as the cluster is re-used.
defer func() {
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "start slave")
require.NoError(t, err)
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "start slave")
require.NoError(t, err)
}()
time.Sleep(2 * time.Second) // Build up some replication lag
res, err := conn.ExecuteFetch("show vitess_replication_status", 2, false)
require.NoError(t, err)
expectNumRows = 2
numRows = len(qr.Rows)
assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status, expected %d, got %d", expectNumRows, numRows))
rawLag := res.Named().Rows[0]["ReplicationLag"] // Let's just look at the first row
lagInt, _ := rawLag.ToInt64() // Don't check the error as the value could be "NULL"
assert.True(t, rawLag.IsNull() || lagInt > 0, "replication lag should be NULL or greater than 0 but was: %s", rawLag.ToString())
}

func TestVtgateReplicationStatusCheckWithTabletTypeChange(t *testing.T) {
Expand All @@ -90,6 +121,11 @@ func TestVtgateReplicationStatusCheckWithTabletTypeChange(t *testing.T) {
rdOnlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly()
err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_SPARE)
require.NoError(t, err)
// Change it back to RDONLY afterward as the cluster is re-used.
defer func() {
err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_RDONLY)
require.NoError(t, err)
}()

// Only returns rows for REPLICA and RDONLY tablets -- so should be 1 of them since we updated 1 to spare
qr = utils.Exec(t, conn, "show vitess_replication_status like '%'")
Expand Down
16 changes: 16 additions & 0 deletions go/test/endtoend/vtorc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,3 +1121,19 @@ func PrintVTOrcLogsOnFailure(t *testing.T, clusterInstance *cluster.LocalProcess
log.Errorf("%s", string(content))
}
}

// EnableGlobalRecoveries enables global recoveries for the given VTOrc.
func EnableGlobalRecoveries(t *testing.T, vtorc *cluster.VTOrcProcess) {
status, resp, err := MakeAPICall(t, vtorc, "/api/enable-global-recoveries")
require.NoError(t, err)
assert.Equal(t, 200, status)
assert.Equal(t, "Global recoveries enabled\n", resp)
}

// DisableGlobalRecoveries disables global recoveries for the given VTOrc.
func DisableGlobalRecoveries(t *testing.T, vtorc *cluster.VTOrcProcess) {
status, resp, err := MakeAPICall(t, vtorc, "/api/disable-global-recoveries")
require.NoError(t, err)
assert.Equal(t, 200, status)
assert.Equal(t, "Global recoveries disabled\n", resp)
}
8 changes: 4 additions & 4 deletions go/vt/topo/memorytopo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

// NewLeaderParticipation is part of the topo.Server interface
func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) {
if c.closed {
if c.closed.Load() {
return nil, ErrConnectionClosed
}

Expand Down Expand Up @@ -72,7 +72,7 @@ type cLeaderParticipation struct {

// WaitForLeadership is part of the topo.LeaderParticipation interface.
func (mp *cLeaderParticipation) WaitForLeadership() (context.Context, error) {
if mp.c.closed {
if mp.c.closed.Load() {
return nil, ErrConnectionClosed
}

Expand Down Expand Up @@ -120,7 +120,7 @@ func (mp *cLeaderParticipation) Stop() {

// GetCurrentLeaderID is part of the topo.LeaderParticipation interface
func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, error) {
if mp.c.closed {
if mp.c.closed.Load() {
return "", ErrConnectionClosed
}

Expand All @@ -139,7 +139,7 @@ func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string,

// WaitForNewLeader is part of the topo.LeaderParticipation interface
func (mp *cLeaderParticipation) WaitForNewLeader(ctx context.Context) (<-chan string, error) {
if mp.c.closed {
if mp.c.closed.Load() {
return nil, ErrConnectionClosed
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/memorytopo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (ld *memoryTopoLockDescriptor) Unlock(ctx context.Context) error {
}

func (c *Conn) unlock(ctx context.Context, dirPath string) error {
if c.closed {
if c.closed.Load() {
return ErrConnectionClosed
}

Expand Down
7 changes: 4 additions & 3 deletions go/vt/topo/memorytopo/memorytopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math/rand"
"strings"
"sync"
"sync/atomic"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -123,13 +124,13 @@ type Conn struct {
factory *Factory
cell string
serverAddr string
closed bool
closed atomic.Bool
}

// dial returns immediately, unless the Conn points to the sentinel
// UnreachableServerAddr, in which case it will block until the context expires.
func (c *Conn) dial(ctx context.Context) error {
if c.closed {
if c.closed.Load() {
return ErrConnectionClosed
}
if c.serverAddr == UnreachableServerAddr {
Expand All @@ -141,7 +142,7 @@ func (c *Conn) dial(ctx context.Context) error {

// Close is part of the topo.Conn interface.
func (c *Conn) Close() {
c.closed = true
c.closed.Store(true)
}

type watch struct {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/topo/memorytopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

// Watch is part of the topo.Conn interface.
func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) {
if c.closed {
if c.closed.Load() {
return nil, nil, ErrConnectionClosed
}

Expand Down Expand Up @@ -75,7 +75,7 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c

// WatchRecursive is part of the topo.Conn interface.
func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) {
if c.closed {
if c.closed.Load() {
return nil, nil, ErrConnectionClosed
}

Expand Down
32 changes: 26 additions & 6 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,15 +911,15 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp
tabletHostPort := ts.GetTabletHostPort()
throttlerStatus, err := getTabletThrottlerStatus(tabletHostPort)
if err != nil {
log.Warningf("Could not get throttler status from %s: %v", tabletHostPort, err)
log.Warningf("Could not get throttler status from %s: %v", topoproto.TabletAliasString(ts.Tablet.Alias), err)
}

replSourceHost := ""
replSourcePort := int64(0)
replIOThreadHealth := ""
replSQLThreadHealth := ""
replLastError := ""
replLag := int64(-1)
replLag := "-1" // A string to support NULL as a value
sql := "show slave status"
results, err := e.txConn.tabletGateway.Execute(ctx, ts.Target, sql, nil, 0, 0, nil)
if err != nil || results == nil {
Expand All @@ -930,8 +930,25 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp
replIOThreadHealth = row["Slave_IO_Running"].ToString()
replSQLThreadHealth = row["Slave_SQL_Running"].ToString()
replLastError = row["Last_Error"].ToString()
if ts.Stats != nil {
replLag = int64(ts.Stats.ReplicationLagSeconds)
// We cannot check the tablet's tabletenv config from here so
// we only use the tablet's stat -- which is managed by the
// ReplicationTracker -- if we can tell that it's enabled,
// meaning that it has a non-zero value. If it's actually
// enabled AND zero (rather than the zeroval), then mysqld
// should also return 0 so in this case the value is correct
// and equivalent either way. The only reason that we would
// want to use the ReplicationTracker based value, when we
// can, is because the polling method allows us to get the
// estimated lag value when replication is not running (based
// on how long we've seen that it's not been running).
if ts.Stats != nil && ts.Stats.ReplicationLagSeconds > 0 { // Use the value we get from the ReplicationTracker
replLag = fmt.Sprintf("%d", ts.Stats.ReplicationLagSeconds)
} else { // Use the value from mysqld
if row["Seconds_Behind_Master"].IsNull() {
replLag = strings.ToUpper(sqltypes.NullStr) // Uppercase to match mysqld's output in SHOW REPLICA STATUS
} else {
replLag = row["Seconds_Behind_Master"].ToString()
}
}
}
replicationHealth := fmt.Sprintf("{\"EventStreamRunning\":\"%s\",\"EventApplierRunning\":\"%s\",\"LastError\":\"%s\"}", replIOThreadHealth, replSQLThreadHealth, replLastError)
Expand All @@ -944,7 +961,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp
ts.Tablet.Hostname,
fmt.Sprintf("%s:%d", replSourceHost, replSourcePort),
replicationHealth,
fmt.Sprintf("%d", replLag),
replLag,
throttlerStatus,
))
}
Expand Down Expand Up @@ -1457,11 +1474,14 @@ func (e *Executor) checkThatPlanIsValid(stmt sqlparser.Statement, plan *engine.P
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "plan includes scatter, which is disallowed using the `no_scatter` command line argument")
}

// getTabletThrottlerStatus uses HTTP to get the throttler status
// on a tablet. It uses HTTP because the CheckThrottler RPC is a
// tmclient RPC and you cannot use tmclient outside of a tablet.
func getTabletThrottlerStatus(tabletHostPort string) (string, error) {
client := http.Client{
Timeout: 100 * time.Millisecond,
}
resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check?app=vtgate", tabletHostPort))
resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check-self", tabletHostPort))
if err != nil {
return "", err
}
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ import (
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtgate/buffer"
Expand All @@ -55,6 +50,12 @@ import (
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vtgate/vschemaacl"
"vitess.io/vitess/go/vt/vtgate/vtgateservice"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

func TestExecutorResultsExceeded(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/repltracker/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"time"

"vitess.io/vitess/go/stats"

"vitess.io/vitess/go/vt/mysqlctl"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

var replicationLagSeconds = stats.NewGauge("replicationLagSec", "replication lag in seconds")
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletserver/repltracker/repltracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/heartbeat"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

var (
Expand Down

0 comments on commit bfda4ee

Please sign in to comment.