This repository has been archived by the owner on May 21, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
worker.py
102 lines (73 loc) · 2.7 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import argparse
import time
import traceback
import os
from dbaccess import connect_db, State
import annotate
BASE_PATH = '/usr/local/scheming'
DATABASE = os.path.join(BASE_PATH, 'scheming.db')
UPLOADS = os.path.join(BASE_PATH, 'uploads/')
RESULTS = os.path.join(BASE_PATH, 'results/')
CHECK_INTERVAL = 1 # number of seconds to wait between checking for work
def main():
parser = argparse.ArgumentParser(
description='Processes work items that are added to the database by the web interface.')
args = parser.parse_args()
w = Worker()
w.connect()
w.work_loop()
class Worker(object):
def connect(self):
self.db = connect_db(DATABASE)
def work_loop(self):
'''Repeatedly try to do work items.'''
while True:
if not self.try_to_work():
time.sleep(CHECK_INTERVAL)
def try_to_work(self):
'''Try to load and execute a work item from the database. Update the
database while we're working on it, and finally with the
success/failure of the work operation.
Returns True iff it has processed a work item.'''
# choose an item of work from the database
cur = self.db.execute('select id from uploaded where state = ? limit 1', [State.New])
self.db.commit()
row = cur.fetchone()
if row is None:
# nothing to do
return False
id = row[0]
# take the work for ourself
cur = self.db.execute('update uploaded set state=? where state = ? and id = ?',
[State.Working, State.New, id])
self.db.commit()
if cur.rowcount != 1:
# someone got it before us
return False
# do the work and catch any exceptions
try:
print 'Starting work on', id
start_time = time.time()
self.do_work(id)
except:
new_state = State.Failed
error_msg = traceback.format_exc()
print 'Failed to process', id
print error_msg,
else:
new_state = State.Succeeded
error_msg = ''
print 'Successfully processed', id
time_taken = time.time() - start_time
# record the results
self.db.execute('update uploaded set state=?, error_msg=?, time_taken=? where id=?',
[new_state, error_msg, time_taken, id])
self.db.commit()
return True
def do_work(self, id):
pdf_filename = '{}.pdf'.format(id)
input_filename = os.path.join(UPLOADS, pdf_filename)
output_filename = os.path.join(RESULTS, pdf_filename)
annotate.annotate(input_filename, output_filename)
if __name__ == '__main__':
main()