-
Notifications
You must be signed in to change notification settings - Fork 4
/
dbutil.py
157 lines (132 loc) · 5.13 KB
/
dbutil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python
'''
Code to work with python DB API v2.0 Connection and Cursor objects. (http://www.python.org/dev/peps/pep-0249/)
No RDBMS specific code should be in here.
No Orchestra specific code.
No application (rodeo, roundup, genotator, etc.) specific code.
Things this module does not do:
Get connections, since that is specific to the database server being used (MySQL, Oracle, etc.)
Generate SQL, since that can be database-specific too. (some exceptions might apply.)
'''
import contextlib
import logging
class Reuser(object):
'''
An instance is a callable object that returns an open connection. Multiple
calls will return the same connection, if the connection is still "live".
It will open a new connection if no connection is open yet or the existing
connection is closed (or otherwise fails to be pinged.)
This object is meant to handle two common scenarios with one API.
* Opening and closing a connection very rapidly can cause a database to
balk. Reuser allows a process to open one connection and reuse it.
* A connection kept open a long time can be closed by the database.
Reuser will open a new connection when the existing connection can
not be reused.
'''
def __init__(self, open_conn):
'''
open_conn: function that returns an open connection
'''
self.open_conn = open_conn
self.conn = None
def __call__(self):
'''
returns: an open db connection, opening one if necessary.
'''
if not self._ping():
self.conn = self.open_conn()
return self.conn
def _ping(self):
'''
returns: True if there is an active connection. False otherwise.
'''
if self.conn:
try:
selectSQL(self.conn, 'SELECT 1')
return True
except Exception as e:
# OperationalError 2006 happens when the db connection times
# out.
if not e.args or e.args[0] != 2006:
# only log non-2006 execptions
logging.exception('Exception encountered when pinging connection in Reuser._ping.')
return False
else:
return False
@contextlib.contextmanager
def doTransaction(conn, start=True, startSQL='START TRANSACTION'):
'''
wrap a connection in a transaction. starts a transaction, yields the conn, and then if an exception occurs, calls rollback(). otherwise calls commit().
start: if True, executes 'START TRANSACTION' sql before yielding conn. Useful for connections that are autocommit by default.
startSQL: override if 'START TRANSACTION' does not work for your db server.
'''
try:
if start:
executeSQL(conn, startSQL)
yield conn
except:
conn.rollback()
raise
else:
conn.commit()
@contextlib.contextmanager
def doCursor(conn):
'''
create and yield a cursor, closing it when done.
'''
cursor = conn.cursor()
try:
yield cursor
finally:
cursor.close()
def selectSQL(conn, sql, args=None):
'''
sql: a select statement
args: if sql has parameters defined with either %s or %(key)s then args should be a either list or dict of parameter
values respectively.
returns a tuple of rows, each of which is a tuple.
'''
with doCursor(conn) as cursor:
cursor.execute(sql, args)
results = cursor.fetchall()
return results
def insertSQL(conn, sql, args=None):
'''
args: if sql has parameters defined with either %s or %(key)s then args should be a either list or dict of parameter
values respectively.
returns the insert id
'''
with doCursor(conn) as cursor:
cursor.execute(sql, args)
id = conn.insert_id()
return id
def updateSQL(conn, sql, args=None):
'''
args: if sql has parameters defined with either %s or %(key)s then args should be a either list or dict of parameter
values respectively.
returns the number of rows affected by the sql statement
'''
with doCursor(conn) as cursor:
numRowsAffected = cursor.execute(sql, args)
return numRowsAffected
def executeSQL(conn, sql, args=None):
'''
args: if sql has parameters defined with either %s or %(key)s then args should be a either list or dict of parameter
values respectively.
executes sql statement. useful for executing statements like CREATE TABLE or RENAME TABLE,
which do not have an result like insert id or a rowset.
returns: the number of rows affected by the sql statement if any.
'''
with doCursor(conn) as cursor:
numRowsAffected = cursor.execute(sql, args)
return numRowsAffected
def executeManySQL(conn, sql, args=None):
'''
args: list of groups of arguments. if sql has parameters defined with either %s or %(key)s then groups should be a either lists or dicts of parameter
values respectively.
returns: not sure. perhaps number of rows affected.
'''
with doCursor(conn) as cursor:
retval = cursor.executemany(sql, args)
return retval
# last line