Skip to content

Commit

Permalink
feat: 连接池重构
Browse files Browse the repository at this point in the history
  • Loading branch information
dayueba committed Oct 25, 2022
1 parent 6089ed1 commit ee5a3a9
Show file tree
Hide file tree
Showing 10 changed files with 345 additions and 346 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
called "msgpack rpc" or "my rpc"

## docs
- [WIP]连接池设计: [docs](https://xjip3se76o.feishu.cn/wiki/wikcnhhKMKTjAtiv1VCFqwD7fYt)
- 限流: [docs](https://xjip3se76o.feishu.cn/wiki/wikcnx5mMBOXaGYIeeM0uTXriTh)
- 熔断: [docs](https://xjip3se76o.feishu.cn/wiki/wikcnawR2Gn782uhDUtinYUizNQ)
- 负载均衡: [docs](https://xjip3se76o.feishu.cn/wiki/wikcnP8GuEVxgNl2qfa38GnSSCb)

## benckmark
```
Expand All @@ -22,7 +24,6 @@ INFO msg=throughput (TPS) : 16956
- [ ] errors
- [ ] 日志
- [ ] 配置文件格式
- [ ] 熔断
- [ ] 负载均衡
- [ ] 服务注册与发现 重构
- [ ] 性能优化
- [ ] examples 完善
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/vmihailenco/msgpack/v5 v5.3.5
go.uber.org/zap v1.23.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)

require (
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
133 changes: 133 additions & 0 deletions pool/connpool/channel_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package connpool

import (
"context"
"errors"
"io"
"net"
"sync"
"time"
"sync/atomic"
)

var oneByte = make([]byte, 1)

type channelPool struct {
initCap int
maxCap int
maxIdle int
idleTimeout time.Duration
dialTimeout time.Duration
Dial func(context.Context) (net.Conn, error)
conns chan *Conn
mu sync.Mutex
inflight int32
}

func (c *channelPool) Get(ctx context.Context) (*Conn, error) {
if c.conns == nil {
return nil, ErrConnClosed
}
select {
case conn := <-c.conns:
if conn == nil {
return nil, ErrConnClosed
}

if conn.unusable {
return nil, ErrConnClosed
}

return conn, nil
default:
// if inflight > maxCap
// conn := <- c.conns
// else inflight ++ con := c.Dial(ctx)
conn, err := c.Dial(ctx)
if err != nil {
return nil, err
}
atomic.AddInt32(&c.inflight, 1)
return c.wrapConn(conn), nil
}
}

func (c *channelPool) Put(conn *Conn) error {
if conn == nil {
return errors.New("connection closed")
}
c.mu.Lock()
defer c.mu.Unlock()
if c.conns == nil {
conn.MarkUnusable()
conn.Close()
}

select {
case c.conns <- conn:
return nil
default:
return conn.Conn.Close()
}
}

func (c *channelPool) wrapConn(conn net.Conn) *Conn {
p := &Conn{
c: c,
t: time.Now(),
dialTimeout: c.dialTimeout,
}
p.Conn = conn
return p
}

func (c *channelPool) RegisterChecker(internal time.Duration, checker func(conn *Conn) bool) {
if internal <= 0 || checker == nil {
return
}

go func() {
for {
time.Sleep(internal)
length := len(c.conns)
for i := 0; i < length; i++ {
select {
case pc := <-c.conns:
if !checker(pc) {
pc.MarkUnusable()
pc.Close()
} else {
c.Put(pc)
}
default:
}
}
}
}()
}

// 负责校验连接是否存活
func (c *channelPool) Checker(pc *Conn) bool {
// check timeout
if pc.t.Add(c.idleTimeout).Before(time.Now()) {
return false
}

// check conn is alive or not
if !isConnAlive(pc.Conn) {
return false
}

return true
}

func isConnAlive(conn net.Conn) bool {
conn.SetReadDeadline(time.Now().Add(time.Millisecond))

if n, err := conn.Read(oneByte); n > 0 || err == io.EOF {
return false
}

conn.SetReadDeadline(time.Time{})
return true
}
70 changes: 70 additions & 0 deletions pool/connpool/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package connpool

import (
"errors"
"net"
"sync"
"time"
)

var ErrConnClosed = errors.New("connection closed")

var _ net.Conn = (*Conn)(nil)

type Conn struct {
net.Conn
c *channelPool
unusable bool
mu sync.Mutex
t time.Time // 该连接的空闲时间
dialTimeout time.Duration // connection timeout duration
}

func (p *Conn) Close() error {
p.mu.Lock()
defer p.mu.Unlock()

if p.unusable {
if p.Conn != nil {
return p.Conn.Close()
}
}

// reset connection deadline
p.Conn.SetDeadline(time.Time{})

// 如果连接正常 则放回连接池
return p.c.Put(p)
}

func (p *Conn) MarkUnusable() {
p.mu.Lock()
p.unusable = true
p.mu.Unlock()
}

func (p *Conn) Read(b []byte) (int, error) {
// 判断该连接状态
if p.unusable {
return 0, ErrConnClosed
}
n, err := p.Conn.Read(b)
if err != nil {
p.MarkUnusable()
// 关闭连接
p.Conn.Close()
}
return n, err
}

func (p *Conn) Write(b []byte) (int, error) {
if p.unusable {
return 0, ErrConnClosed
}
n, err := p.Conn.Write(b)
if err != nil {
p.MarkUnusable()
p.Conn.Close()
}
return n, err
}
Loading

0 comments on commit ee5a3a9

Please sign in to comment.