From 147337724e8f2c24072e2db7a784b57a09763a30 Mon Sep 17 00:00:00 2001 From: atercattus Date: Mon, 22 Jan 2024 21:31:03 +0400 Subject: [PATCH 1/5] client/pool - allow to check a connection to DB inside pool initialization --- client/pool.go | 140 +++++++++++++++++++++++++++++++++-------- client/pool_options.go | 60 ++++++++++++++++++ client/pool_test.go | 43 ++++++++++++- 3 files changed, 214 insertions(+), 29 deletions(-) create mode 100644 client/pool_options.go diff --git a/client/pool.go b/client/pool.go index 315307c5a..1077d134a 100644 --- a/client/pool.go +++ b/client/pool.go @@ -77,43 +77,45 @@ var ( MaxNewConnectionAtOnce = 5 ) -// NewPool initializes new connection pool and uses params: addr, user, password, dbName and options. -// minAlive specifies the minimum number of open connections that the pool will try to maintain. -// maxAlive specifies the maximum number of open connections (for internal reasons, -// may be greater by 1 inside newConnectionProducer). -// maxIdle specifies the maximum number of idle connections (see DefaultIdleTimeout). -func NewPool( - logFunc LogFunc, - minAlive int, - maxAlive int, - maxIdle int, +// NewPoolWithOptions initializes new connection pool and uses params: addr, user, password, dbName and options. +func NewPoolWithOptions( addr string, user string, password string, dbName string, - options ...func(conn *Conn), -) *Pool { - if minAlive > maxAlive { - minAlive = maxAlive + options ...PoolOption, +) (*Pool, error) { + po := poolOptions{ + addr: addr, + user: user, + password: password, + dbName: dbName, + } + for _, o := range options { + o(&po) + } + + if po.minAlive > po.maxAlive { + po.minAlive = po.maxAlive } - if maxIdle > maxAlive { - maxIdle = maxAlive + if po.maxIdle > po.maxAlive { + po.maxIdle = po.maxAlive } - if maxIdle <= minAlive { - maxIdle = minAlive + if po.maxIdle <= po.minAlive { + po.maxIdle = po.minAlive } pool := &Pool{ - logFunc: logFunc, - minAlive: minAlive, - maxAlive: maxAlive, - maxIdle: maxIdle, + logFunc: po.logFunc, + minAlive: po.minAlive, + maxAlive: po.maxAlive, + maxIdle: po.maxIdle, idleCloseTimeout: Timestamp(math.Ceil(DefaultIdleTimeout.Seconds())), idlePingTimeout: Timestamp(math.Ceil(MaxIdleTimeoutWithoutPing.Seconds())), connect: func() (*Conn, error) { - return Connect(addr, user, password, dbName, options...) + return Connect(addr, user, password, dbName, po.connOptions...) }, readyConnection: make(chan Connection), @@ -127,13 +129,56 @@ func NewPool( go pool.newConnectionProducer() if pool.minAlive > 0 { - pool.logFunc(`Pool: Setup %d new connections (minimal pool size)...`, pool.minAlive) - pool.startNewConnections(pool.minAlive) + go pool.startNewConnections(pool.minAlive) } pool.wg.Add(1) go pool.closeOldIdleConnections() + if po.newPoolPingTimeout > 0 { + ctx, cancel := context.WithTimeout(pool.ctx, po.newPoolPingTimeout) + err := pool.checkConnection(ctx) + cancel() + if err != nil { + pool.Close() + return nil, errors.Errorf("checkConnection: %s", err) + } + } + + return pool, nil +} + +// NewPool initializes new connection pool and uses params: addr, user, password, dbName and options. +// minAlive specifies the minimum number of open connections that the pool will try to maintain. +// maxAlive specifies the maximum number of open connections (for internal reasons, +// may be greater by 1 inside newConnectionProducer). +// maxIdle specifies the maximum number of idle connections (see DefaultIdleTimeout). +// +// Deprecated: use NewPoolWithOptions +func NewPool( + logFunc LogFunc, + minAlive int, + maxAlive int, + maxIdle int, + addr string, + user string, + password string, + dbName string, + options ...func(conn *Conn), +) *Pool { + pool, err := NewPoolWithOptions( + addr, + user, + password, + dbName, + WithLogFunc(logFunc), + WithPoolLimits(minAlive, maxAlive, maxIdle), + WithConnOptions(options...), + ) + if err != nil { + pool.logFunc(`Pool: NewPool: %s`, err.Error()) + } + return pool } @@ -235,6 +280,7 @@ func (pool *Pool) putConnectionUnsafe(connection Connection) { func (pool *Pool) newConnectionProducer() { defer pool.wg.Done() + var connection Connection var err error @@ -263,10 +309,24 @@ func (pool *Pool) newConnectionProducer() { pool.synchro.stats.TotalCount-- // Bad luck, should try again pool.synchro.Unlock() - time.Sleep(time.Duration(10+rand.Intn(90)) * time.Millisecond) - continue + pool.logFunc("Cannot establish new db connection: %s", err.Error()) + + timer := time.NewTimer( + time.Duration(10+rand.Intn(90)) * time.Millisecond, + ) + + select { + case <-timer.C: + continue + case <-pool.ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return + } } } + select { case pool.readyConnection <- connection: case <-pool.ctx.Done(): @@ -309,6 +369,7 @@ func (pool *Pool) getIdleConnectionUnsafe() Connection { func (pool *Pool) closeOldIdleConnections() { defer pool.wg.Done() + var toPing []Connection ticker := time.NewTicker(5 * time.Second) @@ -468,6 +529,8 @@ func (pool *Pool) closeConn(conn *Conn) { } func (pool *Pool) startNewConnections(count int) { + pool.logFunc(`Pool: Setup %d new connections (minimal pool size)...`, count) + connections := make([]Connection, 0, count) for i := 0; i < count; i++ { if conn, err := pool.createNewConnection(); err == nil { @@ -475,6 +538,8 @@ func (pool *Pool) startNewConnections(count int) { pool.synchro.stats.TotalCount++ pool.synchro.Unlock() connections = append(connections, conn) + } else { + pool.logFunc(`Pool: createNewConnection: %s`, err) } } @@ -512,3 +577,24 @@ func (pool *Pool) Close() { pool.synchro.idleConnections = nil pool.synchro.Unlock() } + +// checkConnection tries to connect and ping DB server +func (pool *Pool) checkConnection(ctx context.Context) error { + errChan := make(chan error, 1) + + go func() { + conn, err := pool.connect() + if err == nil { + err = conn.Ping() + _ = conn.Close() + } + errChan <- err + }() + + select { + case err := <-errChan: + return err + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/client/pool_options.go b/client/pool_options.go new file mode 100644 index 000000000..f47b00716 --- /dev/null +++ b/client/pool_options.go @@ -0,0 +1,60 @@ +package client + +import ( + "time" +) + +type ( + poolOptions struct { + logFunc LogFunc + + minAlive int + maxAlive int + maxIdle int + + addr string + user string + password string + dbName string + + connOptions []func(conn *Conn) + + newPoolPingTimeout time.Duration + } +) + +type ( + PoolOption func(o *poolOptions) +) + +// WithPoolLimits sets pool limits: +// - minAlive specifies the minimum number of open connections that the pool will try to maintain. +// - maxAlive specifies the maximum number of open connections (for internal reasons, +// may be greater by 1 inside newConnectionProducer). +// - maxIdle specifies the maximum number of idle connections (see DefaultIdleTimeout). +func WithPoolLimits(minAlive, maxAlive, maxIdle int) PoolOption { + return func(o *poolOptions) { + o.minAlive = minAlive + o.maxAlive = maxAlive + o.maxIdle = maxIdle + } +} + +func WithLogFunc(f LogFunc) PoolOption { + return func(o *poolOptions) { + o.logFunc = f + } +} + +func WithConnOptions(options ...func(conn *Conn)) PoolOption { + return func(o *poolOptions) { + o.connOptions = append(o.connOptions, options...) + } +} + +// WithNewPoolPingTimeout enables connect & ping to DB during the pool initialization +func WithNewPoolPingTimeout(timeout time.Duration) PoolOption { + return func(o *poolOptions) { + o.newPoolPingTimeout = timeout + } +} diff --git a/client/pool_test.go b/client/pool_test.go index 1599f9662..1540c251f 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -3,8 +3,10 @@ package client import ( "context" "fmt" + "net" "strings" "testing" + "time" "github.com/go-mysql-org/go-mysql/test_util" "github.com/siddontang/go-log/log" @@ -26,7 +28,12 @@ func TestPoolSuite(t *testing.T) { func (s *poolTestSuite) TestPool_Close() { addr := fmt.Sprintf("%s:%s", *test_util.MysqlHost, s.port) - pool := NewPool(log.Debugf, 5, 10, 5, addr, *testUser, *testPassword, "") + pool, err := NewPoolWithOptions(addr, *testUser, *testPassword, "", + WithPoolLimits(5, 10, 5), + WithLogFunc(log.Debugf), + ) + require.NoError(s.T(), err) + conn, err := pool.GetConn(context.Background()) require.NoError(s.T(), err) err = conn.Ping() @@ -35,8 +42,40 @@ func (s *poolTestSuite) TestPool_Close() { pool.Close() var poolStats ConnectionStats pool.GetStats(&poolStats) - require.Equal(s.T(), 0, poolStats.TotalCount) + require.Equal(s.T(), 0, poolStats.IdleCount) require.Len(s.T(), pool.readyConnection, 0) _, err = pool.GetConn(context.Background()) require.Error(s.T(), err) } + +func (s *poolTestSuite) TestPool_WrongPassword() { + addr := fmt.Sprintf("%s:%s", *test_util.MysqlHost, s.port) + + _, err := NewPoolWithOptions(addr, *testUser, "wrong-password", "", + WithPoolLimits(5, 10, 5), + WithLogFunc(log.Debugf), + WithNewPoolPingTimeout(time.Second), + ) + + if (err == nil) || !strings.Contains(err.Error(), "ERROR 1045 (28000): Access denied for user") { + require.FailNow(s.T(), "unexpected error", "%v", err) + } +} + +func (s *poolTestSuite) TestPool_WrongAddr() { + l, err := net.Listen("tcp4", "127.0.0.1:0") + require.NoError(s.T(), err) + + laddr, ok := l.Addr().(*net.TCPAddr) + require.True(s.T(), ok) + + _ = l.Close() + + _, err = NewPoolWithOptions(laddr.String(), *testUser, *testPassword, "", + WithPoolLimits(5, 10, 5), + WithLogFunc(log.Debugf), + WithNewPoolPingTimeout(time.Second), + ) + + require.Error(s.T(), err) +} From 8a620535f46604c41284d704bc6b88ab63e22071 Mon Sep 17 00:00:00 2001 From: atercattus Date: Mon, 22 Jan 2024 21:32:29 +0400 Subject: [PATCH 2/5] fix Makefile after PR#803 --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 44a0eb2b5..8f7b42e0b 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ test-local: -v $${PWD}/docker/resources/replication.cnf:/etc/mysql/conf.d/replication.cnf \ mysql:$(MYSQL_VERSION) docker/resources/waitfor.sh 127.0.0.1 3306 \ - && go test -race -v -timeout 2m ./... -gocheck.v + && go test -race -v -timeout 2m ./client/... docker stop go-mysql-server fmt: From 77e02445e09ed6f1c8f568a808fa3b0f5614c4b8 Mon Sep 17 00:00:00 2001 From: atercattus Date: Mon, 22 Jan 2024 21:51:00 +0400 Subject: [PATCH 3/5] client/pool - allow to check a connection to DB inside pool initialization - add getDefaultPoolOptions --- client/pool.go | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/client/pool.go b/client/pool.go index 1077d134a..91341a537 100644 --- a/client/pool.go +++ b/client/pool.go @@ -2,6 +2,7 @@ package client import ( "context" + "log" "math" "math/rand" "sync" @@ -85,12 +86,13 @@ func NewPoolWithOptions( dbName string, options ...PoolOption, ) (*Pool, error) { - po := poolOptions{ - addr: addr, - user: user, - password: password, - dbName: dbName, - } + po := getDefaultPoolOptions() + + po.addr = addr + po.user = user + po.password = password + po.dbName = dbName + for _, o := range options { o(&po) } @@ -598,3 +600,13 @@ func (pool *Pool) checkConnection(ctx context.Context) error { return ctx.Err() } } + +// getDefaultPoolOptions returns pool config for low load services +func getDefaultPoolOptions() poolOptions { + return poolOptions{ + logFunc: log.Printf, + minAlive: 1, + maxAlive: 10, + maxIdle: 2, + } +} From 2b537748ac19278d1f95d53a58a6738b38b57fdc Mon Sep 17 00:00:00 2001 From: atercattus Date: Thu, 25 Jan 2024 11:46:33 +0400 Subject: [PATCH 4/5] client/pool - allow to check a connection to DB inside pool initialization - tests refactoring --- client/pool_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/client/pool_test.go b/client/pool_test.go index 1540c251f..d3f4ee9c0 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -57,9 +57,7 @@ func (s *poolTestSuite) TestPool_WrongPassword() { WithNewPoolPingTimeout(time.Second), ) - if (err == nil) || !strings.Contains(err.Error(), "ERROR 1045 (28000): Access denied for user") { - require.FailNow(s.T(), "unexpected error", "%v", err) - } + require.ErrorContains(s.T(), err, "ERROR 1045 (28000): Access denied for user") } func (s *poolTestSuite) TestPool_WrongAddr() { From b1d7cd8603217a890cbd85981625998be5d5f708 Mon Sep 17 00:00:00 2001 From: atercattus Date: Thu, 25 Jan 2024 12:15:11 +0400 Subject: [PATCH 5/5] fix Makefile after PR#803 - fix --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 8f7b42e0b..480b6a874 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ test-local: -v $${PWD}/docker/resources/replication.cnf:/etc/mysql/conf.d/replication.cnf \ mysql:$(MYSQL_VERSION) docker/resources/waitfor.sh 127.0.0.1 3306 \ - && go test -race -v -timeout 2m ./client/... + && go test -race -v -timeout 2m ./... docker stop go-mysql-server fmt: