Skip to content

Commit

Permalink
client pool
Browse files Browse the repository at this point in the history
client pool
6718ea378b80e1066aa30589885e3e728910c1c1
  • Loading branch information
oke11o committed Mar 6, 2024
1 parent 4af0e09 commit a0052dd
Show file tree
Hide file tree
Showing 18 changed files with 245 additions and 107 deletions.
4 changes: 3 additions & 1 deletion .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"components/guns/grpc/scenario/templater.go":"load/projects/pandora/components/guns/grpc/scenario/templater.go",
"components/guns/grpc/scenario/templater_text.go":"load/projects/pandora/components/guns/grpc/scenario/templater_text.go",
"components/guns/grpc/scenario/templater_text_test.go":"load/projects/pandora/components/guns/grpc/scenario/templater_text_test.go",
"components/guns/grpc/shared_deps.go":"load/projects/pandora/components/guns/grpc/shared_deps.go",
"components/guns/http/base.go":"load/projects/pandora/components/guns/http/base.go",
"components/guns/http/base_test.go":"load/projects/pandora/components/guns/http/base_test.go",
"components/guns/http/client.go":"load/projects/pandora/components/guns/http/client.go",
Expand Down Expand Up @@ -164,6 +165,7 @@
"core/aggregator/reporter.go":"load/projects/pandora/core/aggregator/reporter.go",
"core/aggregator/reporter_test.go":"load/projects/pandora/core/aggregator/reporter_test.go",
"core/aggregator/test.go":"load/projects/pandora/core/aggregator/test.go",
"core/clientpool/pool.go":"load/projects/pandora/core/clientpool/pool.go",
"core/config/config.go":"load/projects/pandora/core/config/config.go",
"core/config/config_test.go":"load/projects/pandora/core/config/config_test.go",
"core/config/doc.go":"load/projects/pandora/core/config/doc.go",
Expand Down Expand Up @@ -356,7 +358,7 @@
"script/coverage.sh":"load/projects/pandora/script/coverage.sh",
"tests/acceptance/common.go":"load/projects/pandora/tests/acceptance/common.go",
"tests/acceptance/config_model.go":"load/projects/pandora/tests/acceptance/config_model.go",
"tests/acceptance/custom_reflection_grpc_test.go":"load/projects/pandora/tests/acceptance/custom_reflection_grpc_test.go",
"tests/acceptance/grpc_test.go":"load/projects/pandora/tests/acceptance/grpc_test.go",
"tests/acceptance/http_test.go":"load/projects/pandora/tests/acceptance/http_test.go",
"tests/acceptance/testdata/grpc/base.yaml":"load/projects/pandora/tests/acceptance/testdata/grpc/base.yaml",
"tests/acceptance/testdata/grpc/grpc.payload":"load/projects/pandora/tests/acceptance/testdata/grpc/grpc.payload",
Expand Down
77 changes: 62 additions & 15 deletions components/guns/grpc/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
Expand All @@ -17,6 +18,7 @@ import (
ammo "github.com/yandex/pandora/components/providers/grpc"
"github.com/yandex/pandora/core"
"github.com/yandex/pandora/core/aggregator/netsample"
"github.com/yandex/pandora/core/clientpool"
"github.com/yandex/pandora/core/warmup"
"github.com/yandex/pandora/lib/answlog"
"go.uber.org/zap"
Expand Down Expand Up @@ -47,6 +49,7 @@ type GunConfig struct {
TLS bool `config:"tls"`
DialOptions GrpcDialOptions `config:"dial_options"`
AnswLog AnswLogConfig `config:"answlog"`
PoolSize int `config:"pool-size"`
}

type AnswLogConfig struct {
Expand All @@ -57,7 +60,6 @@ type AnswLogConfig struct {

type Gun struct {
DebugLog bool
Client *grpc.ClientConn
Conf GunConfig
Aggr core.Aggregator
core.GunDeps
Expand All @@ -79,9 +81,27 @@ func DefaultGunConfig() GunConfig {
}
}

func (g *Gun) WarmUp(opts *warmup.Options) (interface{}, error) {
target := replacePort(g.Conf.Target, g.Conf.ReflectPort)
conn, err := MakeGRPCConnect(target, g.Conf.TLS, g.Conf.DialOptions)
func (g *Gun) WarmUp(opts *warmup.Options) (any, error) {
return g.createSharedDeps(opts)
}

func (g *Gun) createSharedDeps(opts *warmup.Options) (*SharedDeps, error) {
services, err := g.prepareMethodList(opts)
if err != nil {
return nil, err
}
clientPool, err := g.prepareClientPool()
if err != nil {
return nil, err
}
return &SharedDeps{
services: services,
clientPool: clientPool,
}, nil
}

func (g *Gun) prepareMethodList(opts *warmup.Options) (map[string]desc.MethodDescriptor, error) {
conn, err := g.makeReflectionConnect()
if err != nil {
return nil, fmt.Errorf("failed to connect to target: %w", err)
}
Expand Down Expand Up @@ -113,13 +133,22 @@ func (g *Gun) WarmUp(opts *warmup.Options) (interface{}, error) {
return services, nil
}

func (g *Gun) AcceptWarmUpResult(i interface{}) error {
services, ok := i.(map[string]desc.MethodDescriptor)
if !ok {
return fmt.Errorf("grpc WarmUp result should be services: map[string]desc.MethodDescriptor")
func (g *Gun) prepareClientPool() (*clientpool.Pool[grpcdynamic.Stub], error) {
if g.Conf.PoolSize <= 0 {
return nil, nil
}
g.Services = services
return nil
clientPool, err := clientpool.New[grpcdynamic.Stub](g.Conf.PoolSize)
if err != nil {
return nil, fmt.Errorf("create clientpool err: %w", err)
}
for i := 0; i < g.Conf.PoolSize; i++ {
conn, err := g.makeConnect()
if err != nil {
return nil, fmt.Errorf("makeGRPCConnect fail %w", err)
}
clientPool.Add(grpcdynamic.NewStub(conn))
}
return clientPool, nil
}

func NewGun(conf GunConfig) *Gun {
Expand All @@ -128,14 +157,23 @@ func NewGun(conf GunConfig) *Gun {
}

func (g *Gun) Bind(aggr core.Aggregator, deps core.GunDeps) error {
conn, err := MakeGRPCConnect(g.Conf.Target, g.Conf.TLS, g.Conf.DialOptions)
if err != nil {
return fmt.Errorf("makeGRPCConnect fail %w", err)
sharedDeps, ok := deps.Shared.(*SharedDeps)
if !ok {
return errors.New("grpc WarmUp result should be struct: *SharedDeps")
}
g.Services = sharedDeps.services
if sharedDeps.clientPool != nil {
g.Stub = sharedDeps.clientPool.Next()
} else {
conn, err := g.makeConnect()
if err != nil {
return fmt.Errorf("makeGRPCConnect fail %w", err)
}
g.Stub = grpcdynamic.NewStub(conn)
}
g.Client = conn

g.Aggr = aggr
g.GunDeps = deps
g.Stub = grpcdynamic.NewStub(conn)

if ent := deps.Log.Check(zap.DebugLevel, "Gun bind"); ent != nil {
deps.Log.Warn("Deprecation Warning: log level: debug doesn't produce request/response logs anymore. Please use AnswLog option instead:\nanswlog:\n enabled: true\n filter: all|warning|error\n path: answ.log")
Expand Down Expand Up @@ -217,6 +255,15 @@ func (g *Gun) AnswLogging(logger *zap.Logger, method *desc.MethodDescriptor, req
logger.Debug("Response:", zap.Stringer("resp", response), zap.Error(grpcErr))
}

func (g *Gun) makeConnect() (conn *grpc.ClientConn, err error) {
return MakeGRPCConnect(g.Conf.Target, g.Conf.TLS, g.Conf.DialOptions)
}

func (g *Gun) makeReflectionConnect() (conn *grpc.ClientConn, err error) {
target := replacePort(g.Conf.Target, g.Conf.ReflectPort)
return MakeGRPCConnect(target, g.Conf.TLS, g.Conf.DialOptions)
}

func MakeGRPCConnect(target string, isTLS bool, dialOptions GrpcDialOptions) (conn *grpc.ClientConn, err error) {
opts := []grpc.DialOption{}
if isTLS {
Expand Down
4 changes: 0 additions & 4 deletions components/guns/grpc/scenario/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ func (g *Gun) WarmUp(opts *warmup.Options) (interface{}, error) {
return g.gun.WarmUp(opts)
}

func (g *Gun) AcceptWarmUpResult(i interface{}) error {
return g.gun.AcceptWarmUpResult(i)
}

func (g *Gun) Bind(aggr core.Aggregator, deps core.GunDeps) error {
return g.gun.Bind(aggr, deps)
}
Expand Down
12 changes: 12 additions & 0 deletions components/guns/grpc/shared_deps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package grpc

import (
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic/grpcdynamic"
"github.com/yandex/pandora/core/clientpool"
)

type SharedDeps struct {
services map[string]desc.MethodDescriptor
clientPool *clientpool.Pool[grpcdynamic.Stub]
}
33 changes: 33 additions & 0 deletions core/clientpool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package clientpool

import (
"errors"
"sync/atomic"
)

func New[T any](size int) (*Pool[T], error) {
if size <= 0 {
return nil, errors.New("pool size must be greater than zero")
}
return &Pool[T]{
pool: make([]T, 0, size),
}, nil
}

type Pool[T any] struct {
pool []T
i atomic.Uint64
}

func (p *Pool[T]) Add(conn T) {
p.pool = append(p.pool, conn)
}

func (p *Pool[T]) Next() T {
if len(p.pool) == 0 {
var zero T
return zero
}
i := p.i.Add(1)
return p.pool[int(i)%len(p.pool)]
}
2 changes: 2 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type GunDeps struct {
InstanceID int
PoolID string

Shared any

// TODO(skipor): https://github.com/yandex/pandora/issues/71
// Pass parallelism value. InstanceId MUST be -1 if parallelism > 1.
}
Expand Down
11 changes: 4 additions & 7 deletions core/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ func (e *Engine) Wait() {

func newPool(log *zap.Logger, m Metrics, onWaitDone func(), conf InstancePoolConfig) *instancePool {
log = log.With(zap.String("pool", conf.ID))
return &instancePool{log, m, onWaitDone, conf, nil}
return &instancePool{log: log, metrics: m, onWaitDone: onWaitDone, InstancePoolConfig: conf}
}

type instancePool struct {
log *zap.Logger
metrics Metrics
onWaitDone func()
InstancePoolConfig
gunWarmUpResult interface{}
sharedGunDeps any
}

// Run start instance pool. Run blocks until fail happen, or all instances finish.
Expand Down Expand Up @@ -169,10 +169,7 @@ func (p *instancePool) warmUpGun(ctx context.Context) error {
return fmt.Errorf("can't initiate a gun: %w", err)
}
if gunWithWarmUp, ok := gun.(warmup.WarmedUp); ok {
p.gunWarmUpResult, err = gunWithWarmUp.WarmUp(&warmup.Options{
Log: p.log,
Ctx: ctx,
})
p.sharedGunDeps, err = gunWithWarmUp.WarmUp(&warmup.Options{Log: p.log, Ctx: ctx})
if err != nil {
return fmt.Errorf("gun warm up failed: %w", err)
}
Expand Down Expand Up @@ -362,7 +359,7 @@ func (p *instancePool) startInstances(
instanceSharedDeps: instanceSharedDeps{
provider: p.Provider,
metrics: p.metrics,
gunWarmUpResult: p.gunWarmUpResult,
gunDeps: p.sharedGunDeps,
aggregator: p.Aggregator,
discardOverflow: p.DiscardOverflow,
},
Expand Down
14 changes: 4 additions & 10 deletions core/engine/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package engine

import (
"context"
"fmt"
"io"

"github.com/pkg/errors"
"github.com/yandex/pandora/core"
"github.com/yandex/pandora/core/aggregator/netsample"
"github.com/yandex/pandora/core/coreutil"
"github.com/yandex/pandora/core/warmup"
"github.com/yandex/pandora/lib/tag"
"go.uber.org/zap"
)
Expand All @@ -24,7 +22,7 @@ type instance struct {

func newInstance(ctx context.Context, log *zap.Logger, poolID string, id int, deps instanceDeps) (*instance, error) {
log = log.With(zap.Int("instance", id))
gunDeps := core.GunDeps{Ctx: ctx, Log: log, PoolID: poolID, InstanceID: id}
gunDeps := core.GunDeps{Ctx: ctx, Log: log, PoolID: poolID, InstanceID: id, Shared: deps.gunDeps}
sched, err := deps.newSchedule()
if err != nil {
return nil, err
Expand All @@ -33,16 +31,12 @@ func newInstance(ctx context.Context, log *zap.Logger, poolID string, id int, de
if err != nil {
return nil, err
}
if warmedUp, ok := gun.(warmup.WarmedUp); ok {
if err := warmedUp.AcceptWarmUpResult(deps.gunWarmUpResult); err != nil {
return nil, fmt.Errorf("gun failed to accept warmup result: %w", err)
}
}

err = gun.Bind(deps.aggregator, gunDeps)
if err != nil {
return nil, err
}
inst := &instance{log, id, gun, sched, deps.instanceSharedDeps}
inst := &instance{log: log, id: id, gun: gun, schedule: sched, instanceSharedDeps: deps.instanceSharedDeps}
return inst, nil
}

Expand All @@ -55,7 +49,7 @@ type instanceDeps struct {
type instanceSharedDeps struct {
provider core.Provider
metrics Metrics
gunWarmUpResult interface{}
gunDeps any
aggregator core.Aggregator
discardOverflow bool
}
Expand Down
16 changes: 7 additions & 9 deletions core/engine/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,13 @@ func Test_Instance(t *testing.T) {

var justBeforeEach = func() {
deps := instanceDeps{

newSchedule,
newGun,
instanceSharedDeps{
provider,
metrics,
nil,
aggregator,
false,
newSchedule: newSchedule,
newGun: newGun,
instanceSharedDeps: instanceSharedDeps{
provider: provider,
metrics: metrics,
aggregator: aggregator,
discardOverflow: false,
},
}
ins, insCreateErr = newInstance(ctx, newNopLogger(), "pool_0", 0, deps)
Expand Down
1 change: 0 additions & 1 deletion core/warmup/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@ package warmup

type WarmedUp interface {
WarmUp(*Options) (interface{}, error)
AcceptWarmUpResult(interface{}) error
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/net v0.19.0
golang.org/x/net v0.20.0
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2
google.golang.org/grpc v1.60.1
google.golang.org/protobuf v1.32.0
Expand Down Expand Up @@ -64,6 +64,7 @@ require (
github.com/zclconf/go-cty v1.13.2 // indirect
go.uber.org/goleak v1.3.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand All @@ -360,8 +360,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
Loading

0 comments on commit a0052dd

Please sign in to comment.