forked from pubnative/mysqldriver-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.go
139 lines (119 loc) · 2.93 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package mysqldriver
import (
"errors"
"strings"
)
var ErrClosedDB = errors.New("mysqldriver: can't get connection from the closed DB")
// DB manages pool of connection
type DB struct {
OnDial func(conn *Conn) error // called when new connection is established
conns chan *Conn
username string
password string
protocol string
address string
database string
}
// NewDB initializes pool of connections but doesn't
// establishes connection to DB.
//
// Pool size is fixed and can't be resized later.
// DataSource parameter has the following format:
// [username[:password]@][protocol[(address)]]/dbname
func NewDB(dataSource string, pool int) *DB {
usr, pass, proto, addr, dbname := parseDataSource(dataSource)
conns := make(chan *Conn, pool)
return &DB{
conns: conns,
username: usr,
password: pass,
protocol: proto,
address: addr,
database: dbname,
}
}
// GetConn gets connection from the pool if there is one or
// establishes a new one.This method always returns the connection
// regardless the pool size. When DB is closed, this method
// returns ErrClosedDB error.
func (db *DB) GetConn() (*Conn, error) {
select {
case conn, more := <-db.conns:
if !more {
return nil, ErrClosedDB
}
return conn, nil
default:
return db.dial()
}
}
// PutConn returns connection to the pool. When pool is reached,
// connection is closed and won't be further reused.
// If connection is already closed, PutConn will discard it
// so it's safe to return closed connection to the pool.
func (db *DB) PutConn(conn *Conn) (err error) {
defer func() {
if e := recover(); e != nil {
err = conn.Close()
return
}
}()
if !conn.valid {
// broken connection shouldn't be in a pool
return conn.Close()
}
if conn.closed {
return nil
}
conn.conn.ResetStats()
select {
case db.conns <- conn:
default:
err = conn.Close()
}
return
}
// Close closes all connections in a pool and
// doesn't allow to establish new ones to DB any more.
// Returns slice of errors if any occurred.
func (db *DB) Close() []error {
close(db.conns)
var errors []error
for {
conn, more := <-db.conns
if more {
if err := conn.Close(); err != nil {
errors = append(errors, err)
}
} else {
break
}
}
return errors
}
func (db *DB) dial() (*Conn, error) {
conn, err := NewConn(db.username, db.password, db.protocol, db.address, db.database)
if err != nil {
return conn, err
}
if db.OnDial != nil {
err = db.OnDial(conn)
}
return conn, err
}
func parseDataSource(dataSource string) (username, password, protocol, address, database string) {
params := strings.Split(dataSource, "@")
userData := strings.Split(params[0], ":")
serverData := strings.Split(params[1], "/")
username = userData[0]
if len(userData) > 1 {
password = userData[1]
}
if len(serverData) > 1 {
database = serverData[1]
}
protoHost := strings.Split(serverData[0], "(")
protocol = protoHost[0]
address = protoHost[1][:len(protoHost[1])-1]
return
}