From 2a296cf505857959a315390caaa7f4876de73442 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Mon, 26 Feb 2024 13:24:14 -0500 Subject: [PATCH 1/3] SHOW VITESS_REPLICATION_STATUS: Only use replication tracker when it's enabled (#15348) Signed-off-by: Matt Lord --- go/test/endtoend/tabletgateway/vtgate_test.go | 46 +++++++++++++++++-- go/vt/vtgate/executor.go | 32 ++++++++++--- go/vt/vtgate/executor_test.go | 11 +++-- .../tabletserver/repltracker/poller.go | 4 +- .../tabletserver/repltracker/repltracker.go | 5 +- 5 files changed, 78 insertions(+), 20 deletions(-) diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go index be227927981..c48aa6c2131 100644 --- a/go/test/endtoend/tabletgateway/vtgate_test.go +++ b/go/test/endtoend/tabletgateway/vtgate_test.go @@ -28,15 +28,14 @@ import ( "testing" "time" - "vitess.io/vitess/go/test/endtoend/utils" - "vitess.io/vitess/go/vt/proto/topodata" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" + vtorcutils "vitess.io/vitess/go/test/endtoend/vtorc/utils" + "vitess.io/vitess/go/vt/proto/topodata" ) func TestVtgateHealthCheck(t *testing.T) { @@ -59,7 +58,7 @@ func TestVtgateReplicationStatusCheck(t *testing.T) { time.Sleep(2 * time.Second) verifyVtgateVariables(t, clusterInstance.VtgateProcess.VerifyURL) ctx := context.Background() - conn, err := mysql.Connect(ctx, &vtParams) + conn, err := mysql.Connect(ctx, &vtParams) // VTGate require.NoError(t, err) defer conn.Close() @@ -68,6 +67,38 @@ func TestVtgateReplicationStatusCheck(t *testing.T) { expectNumRows := 2 numRows := len(qr.Rows) assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status. Expected %d, got %d", expectNumRows, numRows)) + + // Disable VTOrc(s) recoveries so that it doesn't immediately repair/restart replication. + for _, vtorcProcess := range clusterInstance.VTOrcProcesses { + vtorcutils.DisableGlobalRecoveries(t, vtorcProcess) + } + // Re-enable recoveries afterward as the cluster is re-used. + defer func() { + for _, vtorcProcess := range clusterInstance.VTOrcProcesses { + vtorcutils.EnableGlobalRecoveries(t, vtorcProcess) + } + }() + // Stop replication on the non-PRIMARY tablets. + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "stop slave") + require.NoError(t, err) + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "stop slave") + require.NoError(t, err) + // Restart replication afterward as the cluster is re-used. + defer func() { + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "start slave") + require.NoError(t, err) + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "start slave") + require.NoError(t, err) + }() + time.Sleep(2 * time.Second) // Build up some replication lag + res, err := conn.ExecuteFetch("show vitess_replication_status", 2, false) + require.NoError(t, err) + expectNumRows = 2 + numRows = len(qr.Rows) + assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status, expected %d, got %d", expectNumRows, numRows)) + rawLag := res.Named().Rows[0]["ReplicationLag"] // Let's just look at the first row + lagInt, _ := rawLag.ToInt64() // Don't check the error as the value could be "NULL" + assert.True(t, rawLag.IsNull() || lagInt > 0, "replication lag should be NULL or greater than 0 but was: %s", rawLag.ToString()) } func TestVtgateReplicationStatusCheckWithTabletTypeChange(t *testing.T) { @@ -90,6 +121,11 @@ func TestVtgateReplicationStatusCheckWithTabletTypeChange(t *testing.T) { rdOnlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly() err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_SPARE) require.NoError(t, err) + // Change it back to RDONLY afterward as the cluster is re-used. + defer func() { + err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_RDONLY) + require.NoError(t, err) + }() // Only returns rows for REPLICA and RDONLY tablets -- so should be 1 of them since we updated 1 to spare qr = utils.Exec(t, conn, "show vitess_replication_status like '%'") diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index d259ac560ea..880e3851f19 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -911,7 +911,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp tabletHostPort := ts.GetTabletHostPort() throttlerStatus, err := getTabletThrottlerStatus(tabletHostPort) if err != nil { - log.Warningf("Could not get throttler status from %s: %v", tabletHostPort, err) + log.Warningf("Could not get throttler status from %s: %v", topoproto.TabletAliasString(ts.Tablet.Alias), err) } replSourceHost := "" @@ -919,7 +919,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp replIOThreadHealth := "" replSQLThreadHealth := "" replLastError := "" - replLag := int64(-1) + replLag := "-1" // A string to support NULL as a value sql := "show slave status" results, err := e.txConn.tabletGateway.Execute(ctx, ts.Target, sql, nil, 0, 0, nil) if err != nil || results == nil { @@ -930,8 +930,25 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp replIOThreadHealth = row["Slave_IO_Running"].ToString() replSQLThreadHealth = row["Slave_SQL_Running"].ToString() replLastError = row["Last_Error"].ToString() - if ts.Stats != nil { - replLag = int64(ts.Stats.ReplicationLagSeconds) + // We cannot check the tablet's tabletenv config from here so + // we only use the tablet's stat -- which is managed by the + // ReplicationTracker -- if we can tell that it's enabled, + // meaning that it has a non-zero value. If it's actually + // enabled AND zero (rather than the zeroval), then mysqld + // should also return 0 so in this case the value is correct + // and equivalent either way. The only reason that we would + // want to use the ReplicationTracker based value, when we + // can, is because the polling method allows us to get the + // estimated lag value when replication is not running (based + // on how long we've seen that it's not been running). + if ts.Stats != nil && ts.Stats.ReplicationLagSeconds > 0 { // Use the value we get from the ReplicationTracker + replLag = fmt.Sprintf("%d", ts.Stats.ReplicationLagSeconds) + } else { // Use the value from mysqld + if row["Seconds_Behind_Master"].IsNull() { + replLag = strings.ToUpper(sqltypes.NullStr) // Uppercase to match mysqld's output in SHOW REPLICA STATUS + } else { + replLag = row["Seconds_Behind_Master"].ToString() + } } } replicationHealth := fmt.Sprintf("{\"EventStreamRunning\":\"%s\",\"EventApplierRunning\":\"%s\",\"LastError\":\"%s\"}", replIOThreadHealth, replSQLThreadHealth, replLastError) @@ -944,7 +961,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp ts.Tablet.Hostname, fmt.Sprintf("%s:%d", replSourceHost, replSourcePort), replicationHealth, - fmt.Sprintf("%d", replLag), + replLag, throttlerStatus, )) } @@ -1457,11 +1474,14 @@ func (e *Executor) checkThatPlanIsValid(stmt sqlparser.Statement, plan *engine.P return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "plan includes scatter, which is disallowed using the `no_scatter` command line argument") } +// getTabletThrottlerStatus uses HTTP to get the throttler status +// on a tablet. It uses HTTP because the CheckThrottler RPC is a +// tmclient RPC and you cannot use tmclient outside of a tablet. func getTabletThrottlerStatus(tabletHostPort string) (string, error) { client := http.Client{ Timeout: 100 * time.Millisecond, } - resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check?app=vtgate", tabletHostPort)) + resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check-self", tabletHostPort)) if err != nil { return "", err } diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index c0c52fa7377..0b086021d88 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -42,11 +42,6 @@ import ( "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/discovery" - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtgate/buffer" @@ -55,6 +50,12 @@ import ( "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vtgate/vschemaacl" "vitess.io/vitess/go/vt/vtgate/vtgateservice" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) func TestExecutorResultsExceeded(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/repltracker/poller.go b/go/vt/vttablet/tabletserver/repltracker/poller.go index ace01dffb2d..6fc964bef57 100644 --- a/go/vt/vttablet/tabletserver/repltracker/poller.go +++ b/go/vt/vttablet/tabletserver/repltracker/poller.go @@ -21,10 +21,10 @@ import ( "time" "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/vt/mysqlctl" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" + + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) var replicationLagSeconds = stats.NewGauge("replicationLagSec", "replication lag in seconds") diff --git a/go/vt/vttablet/tabletserver/repltracker/repltracker.go b/go/vt/vttablet/tabletserver/repltracker/repltracker.go index 5ab44eb774e..d14bec8b708 100644 --- a/go/vt/vttablet/tabletserver/repltracker/repltracker.go +++ b/go/vt/vttablet/tabletserver/repltracker/repltracker.go @@ -23,10 +23,11 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/vttablet/tabletserver/heartbeat" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var ( From 38addd91926fa3479c091ded0f26e04874092b16 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 26 Feb 2024 14:21:07 -0500 Subject: [PATCH 2/3] Add missing e2e test util functions Signed-off-by: Matt Lord --- go/test/endtoend/vtorc/utils/utils.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index 07b5b016fcc..eeea70df443 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -1121,3 +1121,19 @@ func PrintVTOrcLogsOnFailure(t *testing.T, clusterInstance *cluster.LocalProcess log.Errorf("%s", string(content)) } } + +// EnableGlobalRecoveries enables global recoveries for the given VTOrc. +func EnableGlobalRecoveries(t *testing.T, vtorc *cluster.VTOrcProcess) { + status, resp, err := MakeAPICall(t, vtorc, "/api/enable-global-recoveries") + require.NoError(t, err) + assert.Equal(t, 200, status) + assert.Equal(t, "Global recoveries enabled\n", resp) +} + +// DisableGlobalRecoveries disables global recoveries for the given VTOrc. +func DisableGlobalRecoveries(t *testing.T, vtorc *cluster.VTOrcProcess) { + status, resp, err := MakeAPICall(t, vtorc, "/api/disable-global-recoveries") + require.NoError(t, err) + assert.Equal(t, 200, status) + assert.Equal(t, "Global recoveries disabled\n", resp) +} From c40b26229c1a0d887843868692508e6a4aafc9d6 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 26 Feb 2024 15:27:51 -0500 Subject: [PATCH 3/3] Address data race on memorytopo.closed Signed-off-by: Matt Lord --- go/vt/topo/memorytopo/election.go | 8 ++++---- go/vt/topo/memorytopo/lock.go | 2 +- go/vt/topo/memorytopo/memorytopo.go | 7 ++++--- go/vt/topo/memorytopo/watch.go | 4 ++-- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go index 868a2c53287..ad173695099 100644 --- a/go/vt/topo/memorytopo/election.go +++ b/go/vt/topo/memorytopo/election.go @@ -26,7 +26,7 @@ import ( // NewLeaderParticipation is part of the topo.Server interface func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) { - if c.closed { + if c.closed.Load() { return nil, ErrConnectionClosed } @@ -72,7 +72,7 @@ type cLeaderParticipation struct { // WaitForLeadership is part of the topo.LeaderParticipation interface. func (mp *cLeaderParticipation) WaitForLeadership() (context.Context, error) { - if mp.c.closed { + if mp.c.closed.Load() { return nil, ErrConnectionClosed } @@ -120,7 +120,7 @@ func (mp *cLeaderParticipation) Stop() { // GetCurrentLeaderID is part of the topo.LeaderParticipation interface func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, error) { - if mp.c.closed { + if mp.c.closed.Load() { return "", ErrConnectionClosed } @@ -139,7 +139,7 @@ func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, // WaitForNewLeader is part of the topo.LeaderParticipation interface func (mp *cLeaderParticipation) WaitForNewLeader(ctx context.Context) (<-chan string, error) { - if mp.c.closed { + if mp.c.closed.Load() { return nil, ErrConnectionClosed } diff --git a/go/vt/topo/memorytopo/lock.go b/go/vt/topo/memorytopo/lock.go index c15fb9099bb..5c2a2462495 100644 --- a/go/vt/topo/memorytopo/lock.go +++ b/go/vt/topo/memorytopo/lock.go @@ -112,7 +112,7 @@ func (ld *memoryTopoLockDescriptor) Unlock(ctx context.Context) error { } func (c *Conn) unlock(ctx context.Context, dirPath string) error { - if c.closed { + if c.closed.Load() { return ErrConnectionClosed } diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index f24b2f6c89e..b881be1b785 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -25,6 +25,7 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -123,13 +124,13 @@ type Conn struct { factory *Factory cell string serverAddr string - closed bool + closed atomic.Bool } // dial returns immediately, unless the Conn points to the sentinel // UnreachableServerAddr, in which case it will block until the context expires. func (c *Conn) dial(ctx context.Context) error { - if c.closed { + if c.closed.Load() { return ErrConnectionClosed } if c.serverAddr == UnreachableServerAddr { @@ -141,7 +142,7 @@ func (c *Conn) dial(ctx context.Context) error { // Close is part of the topo.Conn interface. func (c *Conn) Close() { - c.closed = true + c.closed.Store(true) } type watch struct { diff --git a/go/vt/topo/memorytopo/watch.go b/go/vt/topo/memorytopo/watch.go index 73b2d248434..0f245c95b5f 100644 --- a/go/vt/topo/memorytopo/watch.go +++ b/go/vt/topo/memorytopo/watch.go @@ -25,7 +25,7 @@ import ( // Watch is part of the topo.Conn interface. func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { - if c.closed { + if c.closed.Load() { return nil, nil, ErrConnectionClosed } @@ -75,7 +75,7 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c // WatchRecursive is part of the topo.Conn interface. func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { - if c.closed { + if c.closed.Load() { return nil, nil, ErrConnectionClosed }