diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 70799b0f6bc..287bbd19dc0 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -46,7 +46,6 @@ import ( "github.com/google/safehtml/template" "github.com/google/safehtml/template/uncheckedconversions" "github.com/spf13/pflag" - "golang.org/x/sync/semaphore" "vitess.io/vitess/go/flagutil" "vitess.io/vitess/go/netutil" @@ -92,9 +91,6 @@ var ( // refreshKnownTablets tells us whether to process all tablets or only new tablets. refreshKnownTablets = true - // healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000. - healthCheckDialConcurrency int64 = 1024 - // How much to sleep between each check. waitAvailableTabletInterval = 100 * time.Millisecond @@ -177,7 +173,6 @@ func registerWebUIFlags(fs *pflag.FlagSet) { fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.") fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.") fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.") - fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.") ParseTabletURLTemplateFromFlag() } @@ -297,8 +292,6 @@ type HealthCheckImpl struct { subscribers map[chan *TabletHealth]struct{} // loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted loadTabletsTrigger chan struct{} - // healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once. - healthCheckDialSem *semaphore.Weighted } // NewHealthCheck creates a new HealthCheck object. @@ -333,7 +326,6 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur cell: localCell, retryDelay: retryDelay, healthCheckTimeout: healthCheckTimeout, - healthCheckDialSem: semaphore.NewWeighted(healthCheckDialConcurrency), healthByAlias: make(map[tabletAliasString]*tabletHealthCheck), healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth), healthy: make(map[KeyspaceShardTabletType][]*TabletHealth), @@ -844,7 +836,7 @@ func (hc *HealthCheckImpl) TabletConnection(ctx context.Context, alias *topodata // TODO: test that throws this error return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias) } - return thc.Connection(ctx, hc), nil + return thc.Connection(ctx), nil } // getAliasByCell should only be called while holding hc.mu diff --git a/go/vt/discovery/tablet_health_check.go b/go/vt/discovery/tablet_health_check.go index 64450f4c8c6..ecadeefdf78 100644 --- a/go/vt/discovery/tablet_health_check.go +++ b/go/vt/discovery/tablet_health_check.go @@ -19,7 +19,6 @@ package discovery import ( "context" "fmt" - "net" "strings" "sync" "sync/atomic" @@ -34,16 +33,12 @@ import ( "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/tabletconn" - "google.golang.org/grpc" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/proto/topodata" ) -// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options. -var withDialerContextOnce sync.Once - // tabletHealthCheck maintains the health status of a tablet. A map of this // structure is maintained in HealthCheck. type tabletHealthCheck struct { @@ -127,8 +122,8 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) { } // stream streams healthcheck responses to callback. -func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*query.StreamHealthResponse) error) error { - conn := thc.Connection(ctx, hc) +func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.StreamHealthResponse) error) error { + conn := thc.Connection(ctx) if conn == nil { // This signals the caller to retry return nil @@ -141,34 +136,14 @@ func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, c return err } -func (thc *tabletHealthCheck) Connection(ctx context.Context, hc *HealthCheckImpl) queryservice.QueryService { +func (thc *tabletHealthCheck) Connection(ctx context.Context) queryservice.QueryService { thc.connMu.Lock() defer thc.connMu.Unlock() - return thc.connectionLocked(ctx, hc) -} - -func healthCheckDialerFactory(hc *HealthCheckImpl) func(ctx context.Context, addr string) (net.Conn, error) { - return func(ctx context.Context, addr string) (net.Conn, error) { - // Limit the number of healthcheck connections opened in parallel to avoid high OS-thread - // usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens, - // etc). Without this limit it is possible for vtgates watching >10k tablets to hit - // the panic: 'runtime: program exceeds 10000-thread limit'. - if err := hc.healthCheckDialSem.Acquire(ctx, 1); err != nil { - return nil, err - } - defer hc.healthCheckDialSem.Release(1) - var dialer net.Dialer - return dialer.DialContext(ctx, "tcp", addr) - } + return thc.connectionLocked(ctx) } -func (thc *tabletHealthCheck) connectionLocked(ctx context.Context, hc *HealthCheckImpl) queryservice.QueryService { +func (thc *tabletHealthCheck) connectionLocked(ctx context.Context) queryservice.QueryService { if thc.Conn == nil { - withDialerContextOnce.Do(func() { - grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { - return append(opts, grpc.WithContextDialer(healthCheckDialerFactory(hc))), nil - }) - }) conn, err := tabletconn.GetDialer()(ctx, thc.Tablet, grpcclient.FailFast(true)) if err != nil { thc.LastError = err @@ -297,7 +272,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) { }() // Read stream health responses. - err := thc.stream(streamCtx, hc, func(shr *query.StreamHealthResponse) error { + err := thc.stream(streamCtx, func(shr *query.StreamHealthResponse) error { // We received a message. Reset the back-off. retryDelay = hc.retryDelay // Don't block on send to avoid deadlocks. diff --git a/go/vt/grpcclient/client.go b/go/vt/grpcclient/client.go index b8a8847ac4f..d46712de42c 100644 --- a/go/vt/grpcclient/client.go +++ b/go/vt/grpcclient/client.go @@ -21,12 +21,14 @@ package grpcclient import ( "context" "crypto/tls" + "net" "sync" "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/spf13/pflag" + "golang.org/x/sync/semaphore" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -46,6 +48,10 @@ var ( initialConnWindowSize int initialWindowSize int + // `dialConcurrencyLimit` tells us how many tablet grpc connections can be dialed concurrently. + // This should be less than the golang max thread limit of 10000. + dialConcurrencyLimit int64 = 1024 + // every vitess binary that makes grpc client-side calls. grpcclientBinaries = []string{ "mysqlctld", @@ -74,9 +80,24 @@ func RegisterFlags(fs *pflag.FlagSet) { fs.StringVar(&credsFile, "grpc_auth_static_client_creds", credsFile, "When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.") } +func RegisterDialConcurrencyFlagsHealthcheck(fs *pflag.FlagSet) { + // TODO: Deprecate this and rename it to `grpc-dial-concurrency-limit` + fs.Int64Var(&dialConcurrencyLimit, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.") +} + +func RegisterDialConcurrencyFlags(fs *pflag.FlagSet) { + fs.Int64Var(&dialConcurrencyLimit, "grpc-dial-concurrency-limit", 1024, "Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000.") +} + func init() { for _, cmd := range grpcclientBinaries { servenv.OnParseFor(cmd, RegisterFlags) + + if cmd == "vtgate" || cmd == "vtcombo" || cmd == "vtctld" { + servenv.OnParseFor(cmd, RegisterDialConcurrencyFlagsHealthcheck) + } else { + servenv.OnParseFor(cmd, RegisterDialConcurrencyFlags) + } } } @@ -129,6 +150,10 @@ func DialContext(ctx context.Context, target string, failFast FailFast, opts ... newopts = append(newopts, grpc.WithInitialWindowSize(int32(initialWindowSize))) } + if dialConcurrencyLimit > 0 { + newopts = append(newopts, dialConcurrencyLimitOption()) + } + newopts = append(newopts, opts...) var err error grpcDialOptionsMu.Lock() @@ -175,6 +200,35 @@ func SecureDialOption(cert, key, ca, crl, name string) (grpc.DialOption, error) return grpc.WithTransportCredentials(creds), nil } +var dialConcurrencyLimitOpt grpc.DialOption + +// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options. +var dialConcurrencyLimitOnce sync.Once + +func dialConcurrencyLimitOption() grpc.DialOption { + dialConcurrencyLimitOnce.Do(func() { + // This semaphore is used to limit how many grpc connections can be dialed to tablets simultanously. + // This does not limit how many tablet connections can be open at the same time. + sem := semaphore.NewWeighted(dialConcurrencyLimit) + + dialConcurrencyLimitOpt = grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { + // Limit the number of grpc connections opened in parallel to avoid high OS-thread + // usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens, + // etc). Without this limit it is possible for vtgates watching >10k tablets to hit + // the panic: 'runtime: program exceeds 10000-thread limit'. + if err := sem.Acquire(ctx, 1); err != nil { + return nil, err + } + defer sem.Release(1) + + var dialer net.Dialer + return dialer.DialContext(ctx, "tcp", addr) + }) + }) + + return dialConcurrencyLimitOpt +} + // Allows for building a chain of interceptors without knowing the total size up front type clientInterceptorBuilder struct { unaryInterceptors []grpc.UnaryClientInterceptor