diff --git a/go/test/endtoend/vtorc/general/vtorc_test.go b/go/test/endtoend/vtorc/general/vtorc_test.go index d79e2964f3e..38bc5f34df9 100644 --- a/go/test/endtoend/vtorc/general/vtorc_test.go +++ b/go/test/endtoend/vtorc/general/vtorc_test.go @@ -495,3 +495,76 @@ func TestDurabilityPolicySetLater(t *testing.T) { assert.NotNil(t, primary, "should have elected a primary") utils.CheckReplication(t, newCluster, primary, shard0.Vttablets, 10*time.Second) } + +func TestFullStatusConnectionPooling(t *testing.T) { + defer utils.PrintVTOrcLogsOnFailure(t, clusterInfo.ClusterInstance) + defer cluster.PanicHandler(t) + utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 4, 0, []string{ + "--tablet_manager_grpc_concurrency=1", + }, cluster.VTOrcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1, "") + keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] + shard0 := &keyspace.Shards[0] + vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0] + + // find primary from topo + curPrimary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) + assert.NotNil(t, curPrimary, "should have elected a primary") + vtOrcProcess := clusterInfo.ClusterInstance.VTOrcProcesses[0] + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.ElectNewPrimaryRecoveryName, 1) + utils.WaitForSuccessfulPRSCount(t, vtOrcProcess, keyspace.Name, shard0.Name, 1) + + // Kill the current primary. + _ = curPrimary.VttabletProcess.Kill() + + // Wait until VTOrc notices some problems + status, resp := utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response == "null" + }) + assert.Equal(t, 200, status) + assert.Contains(t, resp, "UnreachablePrimary") + + time.Sleep(1 * time.Minute) + + // Change the primaries ports and restart it. + curPrimary.VttabletProcess.Port = clusterInfo.ClusterInstance.GetAndReservePort() + curPrimary.VttabletProcess.GrpcPort = clusterInfo.ClusterInstance.GetAndReservePort() + err := curPrimary.VttabletProcess.Setup() + require.NoError(t, err) + + // See that VTOrc eventually reports no errors. + // Wait until there are no problems and the api endpoint returns null + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response != "null" + }) + assert.Equal(t, 200, status) + assert.Equal(t, "null", resp) + + // REPEATED + // Kill the current primary. + _ = curPrimary.VttabletProcess.Kill() + + // Wait until VTOrc notices some problems + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response == "null" + }) + assert.Equal(t, 200, status) + assert.Contains(t, resp, "UnreachablePrimary") + + time.Sleep(1 * time.Minute) + + // Change the primaries ports back to original and restart it. + curPrimary.VttabletProcess.Port = curPrimary.HTTPPort + curPrimary.VttabletProcess.GrpcPort = curPrimary.GrpcPort + err = curPrimary.VttabletProcess.Setup() + require.NoError(t, err) + + // See that VTOrc eventually reports no errors. + // Wait until there are no problems and the api endpoint returns null + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response != "null" + }) + assert.Equal(t, 200, status) + assert.Equal(t, "null", resp) +} diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index dca2c7b1e26..00f75740338 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -733,7 +733,7 @@ func MakeAPICall(t *testing.T, vtorc *cluster.VTOrcProcess, url string) (status // The function provided takes in the status and response and returns if we should continue to retry or not func MakeAPICallRetry(t *testing.T, vtorc *cluster.VTOrcProcess, url string, retry func(int, string) bool) (status int, response string) { t.Helper() - timeout := time.After(10 * time.Second) + timeout := time.After(30 * time.Second) for { select { case <-timeout: diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index d8ae032bd74..6589a45924c 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -45,6 +45,17 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +type DialPoolGroup int + +const ( + // DialPoolGroupDefault is the default group for dialing. + DialPoolGroupDefault DialPoolGroup = iota + DialPoolGroupThrottler + DialPoolGroupVTOrc +) + +type invalidatorFunc func(error) + var ( concurrency = 8 cert string @@ -92,14 +103,17 @@ type tmc struct { client tabletmanagerservicepb.TabletManagerClient } +// rpcClientMap maps an address to a tmc +type rpcClientMap map[string](chan *tmc) + // grpcClient implements both dialer and poolDialer. type grpcClient struct { // This cache of connections is to maximize QPS for ExecuteFetchAs{Dba,App}, // CheckThrottler and FullStatus. Note we'll keep the clients open and close them upon Close() only. // But that's OK because usually the tasks that use them are one-purpose only. // The map is protected by the mutex. - mu sync.Mutex - rpcClientMap map[string]chan *tmc + mu sync.Mutex + rpcClientMaps map[DialPoolGroup]rpcClientMap } type dialer interface { @@ -108,7 +122,7 @@ type dialer interface { } type poolDialer interface { - dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error) + dialPool(ctx context.Context, dialPoolGroup DialPoolGroup, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, invalidatorFunc, error) } // Client implements tmclient.TabletManagerClient. @@ -152,53 +166,75 @@ func (client *grpcClient) dial(ctx context.Context, tablet *topodatapb.Tablet) ( return tabletmanagerservicepb.NewTabletManagerClient(cc), cc, nil } -func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error) { +func (client *grpcClient) dialPool(ctx context.Context, dialPoolGroup DialPoolGroup, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, invalidatorFunc, error) { addr := netutil.JoinHostPort(tablet.Hostname, int32(tablet.PortMap["grpc"])) opt, err := grpcclient.SecureDialOption(cert, key, ca, crl, name) if err != nil { - return nil, err + return nil, nil, err } client.mu.Lock() - if client.rpcClientMap == nil { - client.rpcClientMap = make(map[string]chan *tmc) + defer client.mu.Unlock() + + if client.rpcClientMaps == nil { + client.rpcClientMaps = make(map[DialPoolGroup]rpcClientMap) } - c, ok := client.rpcClientMap[addr] + m, ok := client.rpcClientMaps[dialPoolGroup] if !ok { - c = make(chan *tmc, concurrency) - client.rpcClientMap[addr] = c - client.mu.Unlock() + m = make(rpcClientMap) + client.rpcClientMaps[dialPoolGroup] = m + } + invalidator := func(error) { + if err == nil { + return + } + client.mu.Lock() + defer client.mu.Unlock() + for tm := range m[addr] { + tm.cc.Close() + } + close(m[addr]) + delete(m, addr) + } - for i := 0; i < cap(c); i++ { + if _, ok := m[addr]; !ok { + tmcCount := concurrency + if dialPoolGroup != DialPoolGroupDefault { + // Specialized pools do not need "concurrency" and can use a single client + tmcCount = 1 + } + c := make(chan *tmc, tmcCount) + for range cap(c) { cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt) if err != nil { - return nil, err + return nil, nil, err } c <- &tmc{ cc: cc, client: tabletmanagerservicepb.NewTabletManagerClient(cc), } } - } else { - client.mu.Unlock() + m[addr] = c } - - result := <-c - c <- result - return result.client, nil + c := m[addr] + tm := <-c + c <- tm + return tm.client, invalidator, nil } // Close is part of the tmclient.TabletManagerClient interface. func (client *grpcClient) Close() { client.mu.Lock() defer client.mu.Unlock() - for _, c := range client.rpcClientMap { - close(c) - for ch := range c { - ch.cc.Close() + for _, m := range client.rpcClientMaps { + for _, c := range m { + for tm := range c { + tm.cc.Close() + } + close(c) } } - client.rpcClientMap = nil + client.rpcClientMaps = nil } // @@ -472,7 +508,7 @@ func (client *Client) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb. var err error if usePool { if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, _, err = poolDialer.dialPool(ctx, DialPoolGroupDefault, tablet) if err != nil { return nil, err } @@ -508,7 +544,7 @@ func (client *Client) ExecuteMultiFetchAsDba(ctx context.Context, tablet *topoda var err error if usePool { if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, _, err = poolDialer.dialPool(ctx, DialPoolGroupDefault, tablet) if err != nil { return nil, err } @@ -564,7 +600,7 @@ func (client *Client) ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb. var err error if usePool { if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, _, err = poolDialer.dialPool(ctx, DialPoolGroupDefault, tablet) if err != nil { return nil, err } @@ -611,9 +647,10 @@ func (client *Client) ReplicationStatus(ctx context.Context, tablet *topodatapb. // and dialing the other tablet every time is not practical. func (client *Client) FullStatus(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.FullStatus, error) { var c tabletmanagerservicepb.TabletManagerClient + var invalidator invalidatorFunc var err error if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, invalidator, err = poolDialer.dialPool(ctx, DialPoolGroupVTOrc, tablet) if err != nil { return nil, err } @@ -629,6 +666,9 @@ func (client *Client) FullStatus(ctx context.Context, tablet *topodatapb.Tablet) } response, err := c.FullStatus(ctx, &tabletmanagerdatapb.FullStatusRequest{}) + if invalidator != nil { + invalidator(err) + } if err != nil { return nil, err } @@ -1101,9 +1141,10 @@ func (client *Client) Backup(ctx context.Context, tablet *topodatapb.Tablet, req // and dialing the other tablet every time is not practical. func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error) { var c tabletmanagerservicepb.TabletManagerClient + var invalidator invalidatorFunc var err error if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, invalidator, err = poolDialer.dialPool(ctx, DialPoolGroupThrottler, tablet) if err != nil { return nil, err } @@ -1119,6 +1160,9 @@ func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tab } response, err := c.CheckThrottler(ctx, req) + if invalidator != nil { + invalidator(err) + } if err != nil { return nil, err }