diff --git a/server/server.go b/server/server.go index a3e108b6..5d4776a6 100644 --- a/server/server.go +++ b/server/server.go @@ -101,6 +101,7 @@ type Config struct { GPRCServerMaxRecvMsgSize int `yaml:"grpc_server_max_recv_msg_size"` GRPCServerMaxSendMsgSize int `yaml:"grpc_server_max_send_msg_size"` GPRCServerMaxConcurrentStreams uint `yaml:"grpc_server_max_concurrent_streams"` + GPRCServerNumStreamWorkers uint `yaml:"grpc_server_num_stream_workers"` GRPCServerMaxConnectionIdle time.Duration `yaml:"grpc_server_max_connection_idle"` GRPCServerMaxConnectionAge time.Duration `yaml:"grpc_server_max_connection_age"` GRPCServerMaxConnectionAgeGrace time.Duration `yaml:"grpc_server_max_connection_age_grace"` @@ -159,6 +160,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.GPRCServerMaxRecvMsgSize, "server.grpc-max-recv-msg-size-bytes", 4*1024*1024, "Limit on the size of a gRPC message this server can receive (bytes).") f.IntVar(&cfg.GRPCServerMaxSendMsgSize, "server.grpc-max-send-msg-size-bytes", 4*1024*1024, "Limit on the size of a gRPC message this server can send (bytes).") f.UintVar(&cfg.GPRCServerMaxConcurrentStreams, "server.grpc-max-concurrent-streams", 100, "Limit on the number of concurrent streams for gRPC calls (0 = unlimited)") + f.UintVar(&cfg.GPRCServerNumStreamWorkers, "server.grpc_server-num-stream-workers", 0, "Number of worker goroutines that should be used to process incoming streams.Setting this 0 (default) will disable workers and spawn a new goroutine for each stream.") f.DurationVar(&cfg.GRPCServerMaxConnectionIdle, "server.grpc.keepalive.max-connection-idle", infinty, "The duration after which an idle connection should be closed. Default: infinity") f.DurationVar(&cfg.GRPCServerMaxConnectionAge, "server.grpc.keepalive.max-connection-age", infinty, "The duration for the maximum amount of time a connection may exist before it will be closed. Default: infinity") f.DurationVar(&cfg.GRPCServerMaxConnectionAgeGrace, "server.grpc.keepalive.max-connection-age-grace", infinty, "An additive period after max-connection-age after which the connection will be forcibly closed. Default: infinity") @@ -354,6 +356,7 @@ func newServer(cfg Config, metrics *Metrics) (*Server, error) { grpcOptions := []grpc.ServerOption{ grpc.ChainUnaryInterceptor(grpcMiddleware...), grpc.ChainStreamInterceptor(grpcStreamMiddleware...), + grpc.NumStreamWorkers(uint32(cfg.GPRCServerNumStreamWorkers)), grpc.KeepaliveParams(grpcKeepAliveOptions), grpc.KeepaliveEnforcementPolicy(grpcKeepAliveEnforcementPolicy), grpc.MaxRecvMsgSize(cfg.GPRCServerMaxRecvMsgSize),