forked from epandurski/cmbarter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
process_emails.py
executable file
·327 lines (256 loc) · 11.5 KB
/
process_emails.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
#! /usr/bin/env python
## The author disclaims copyright to this source code. In place of
## a legal notice, here is a poem:
##
## "Metaphysics"
##
## Matter: is the music
## of the space.
## Music: is the matter
## of the soul.
##
## Soul: is the space
## of God.
## Space: is the soul
## of logic.
##
## Logic: is the god
## of the mind.
## God: is the logic
## of bliss.
##
## Bliss: is a mind
## of music.
## Mind: is the bliss
## of the matter.
##
######################################################################
## This file implements the email processing.
##
from __future__ import with_statement
import sys, os, getopt, base64, datetime, re, time
import smtplib
import pytz
from cmbarter.settings import CMBARTER_HOST, CMBARTER_DSN
from cmbarter.modules import curiousorm
from cmbarter.modules import messages
from cmbarter.modules.utils import send_email, get_ugettext, wrap_text
USAGE = """Usage: process_emails.py [OPTIONS]
Fetches pending messages to the outgoing e-mail server.
-h, --help display this help and exit
--smtp-host=HOST[:PORT] supply SMTP server name and optional port
If omitted, the value of the SMTP_HOST
environment variable is used. If it is
empty -- "localhost" is presumed.
--smtp-username=USERNAME supply SMTP login name
If omitted, the value of the SMTP_USERNAME
environment variable is used. If it is
empty -- no authentication is presumed.
--smtp-password=PASSWORD supply SMTP password
If omitted, the value of the SMTP_PASSWORD
environment variable is used.
-s, --ssl use an SSL/TLS connection
-S, --starttls use opportunistic TLS connection (STARTTLS)
--dsn=DSN give explicitly the database source name
--site-domain=DOMANNAME give explicitly the site domainname
Example:
$ ./process_emails.py --smtp-host=mail.foo.com --smtp-username=cmbarter --smtp-password='mypassword'
"""
deadline = time.time() + 600.0 # The script will exit after 10 minutes at most.
def exit_if_deadline_has_been_passed():
if time.time() > deadline:
sys.exit()
def parse_args(argv):
global site_domain, dsn, smtp_host, smtp_username, smtp_password, ssl, starttls
try:
opts, args = getopt.gnu_getopt(argv, 'hsS', [
'smtp-host=', 'smtp-username=','smtp-password=',
'ssl', 'starttls',
'site-domain=', 'dsn=', 'help'])
except getopt.GetoptError:
print(USAGE)
sys.exit(2)
if len(args) != 0:
print(USAGE)
sys.exit(2)
for opt, arg in opts:
if opt in ('-h', '--help'):
print(USAGE)
sys.exit()
elif opt == '--dsn':
dsn = arg
elif opt == '--smtp-host':
smtp_host = arg
elif opt == '--smtp-username':
smtp_username = arg
elif opt == '--smtp-password':
smtp_password = arg
elif opt == '--site-domain':
site_domain = arg
elif opt in ('-s', '--ssl'):
ssl = True
elif opt in ('-S', '--starttls'):
starttls = True
def process_email_validations(db):
trader_records = curiousorm.Cursor(dsn, """
SELECT ev.trader_id, ev.email, ts.last_request_language_code
FROM email_verification ev, trader_status ts
WHERE
ev.email_verification_code IS NULL AND
ts.trader_id=ev.trader_id AND
ts.max_email_verification_count> 0
""", dictrows=True)
for trader_id, email, lang_code in trader_records:
exit_if_deadline_has_been_passed()
with db.Transaction() as trx:
trx.set_asynchronous_commit()
has_email_verification_rights = trx.acquire_email_verification_rights(trader_id)
if has_email_verification_rights:
# Generate a verification secret.
email_verification_code = base64.urlsafe_b64encode(os.urandom(15)).decode('ascii')
# Compose an email message containing the secret.
_ = get_ugettext(lang_code)
subject = _(messages.ADDRESS_VERIFICATION_SUBJECT)
content = _(messages.ADDRESS_VERIFICATION_CONTENT) % {
"site_domain": site_domain,
"traderid": str(trader_id).zfill(9),
"email": email,
"secret_code": email_verification_code }
orig_date = datetime.datetime.now(pytz.utc)
with db.Transaction() as trx:
trx.set_asynchronous_commit()
if trx.update_email_verification_code(trader_id, email, email_verification_code):
# Only when the verification secret is written to
# user's profile, we insert the composed message
# into the "outgoing_email" table.
trx.insert_outgoing_email(
subject, wrap_text(content), orig_date,
"noreply@%s" % site_domain, site_domain, # From
email, '', # To
'', '', '', '')
def process_outgoing_customer_broadcasts(db):
broadcasts = curiousorm.Cursor(dsn, """
SELECT id, trader_id, from_mailbox, subject, content, insertion_ts
FROM outgoing_customer_broadcast
""", buffer_size=100, dictrows=True)
for broadcast_id, issuer_id, issuer_mailbox, subject, content, orig_date in broadcasts:
exit_if_deadline_has_been_passed()
with db.Transaction() as trx:
trx.set_asynchronous_commit()
# We delete the "outgoing_customer_broadcast" record; then
# we insert a record in the "outgoing_email" table for
# each individual recipient of the message.
if trx.delete_outgoing_customer_broadcast(broadcast_id):
recipients = trx.get_broadcast_recipient_list(issuer_id)
content = wrap_text(content) # Transform the message to 72-columns
for row in recipients:
# Compose the email message.
_ = get_ugettext(row['last_request_language_code'])
signature = _(messages.CUSTOMER_BROADCAST_SIGNATURE) % {
"site_domain": site_domain,
"partner_name": row['issuer_display_name'],
"traderid": str(row['trader_id']).zfill(9),
"secret_code": row['email_cancellation_code'] }
trx.insert_outgoing_email(
subject,
"%s\n\n-- \n%s" % (content, wrap_text(signature)),
orig_date,
"noreply@%s" % site_domain, row['issuer_display_name'], # From
row['mailbox'], '', # To
'', '', # Reply-To
'', '') # Sender
else:
recipients = []
for row in recipients:
# This is rather ugly! We acquire the right to send the
# mail after it have been sent already. Nevertheless this
# is OK, because "get_broadcast_recipient_list()" function
# makes sure that traders that have exceeded their
# receive-limit do not get emailed. So, the only purpose
# of the call bellow is to decrease the
# "max_received_email_count" counter.
with db.Transaction() as trx:
trx.set_asynchronous_commit()
trx.acquire_received_email_rights(row['trader_id'])
def process_notifications(db):
notification_records = curiousorm.Cursor(dsn, """
SELECT
n.id, n.trader_id, n.to_mailbox, n.email_cancellation_code,
ts.last_request_language_code
FROM outgoing_notification n, trader_status ts
WHERE ts.trader_id=n.trader_id
""", dictrows=True)
for notification_id, trader_id, to_mailbox, email_cancellation_code, lang_code in \
notification_records:
exit_if_deadline_has_been_passed()
# Compose the email message.
_ = get_ugettext(lang_code)
subject = _(messages.NOTIFICATION_SUBJECT)
content = _(messages.NOTIFICATION_CONTENT) % {
"site_domain": site_domain,
"traderid": str(trader_id).zfill(9),
"secret_code": email_cancellation_code }
orig_date = datetime.datetime.now(pytz.utc)
with db.Transaction() as trx:
trx.set_asynchronous_commit()
if trx.delete_outgoing_notification(notification_id):
# Only if the notification record existed and has been
# successfully deleted, we insert the composed message
# into the "outgoing_email" table.
trx.insert_outgoing_email(
subject, wrap_text(content), orig_date,
"noreply@%s" % site_domain, site_domain, # From
to_mailbox, '', # To
'', '', '', '')
def send_outgoing_emails(db, ssl=False, starttls=False):
outgoing_emails = curiousorm.Cursor(dsn, """
SELECT
id, subject, content, orig_date,
from_mailbox, from_display_name,
to_mailbox, to_display_name,
reply_to_mailbox, reply_to_display_name,
sender_mailbox, sender_display_name
FROM outgoing_email
""", buffer_size=100, dictrows=True)
connect = smtplib.SMTP_SSL if ssl else smtplib.SMTP
smtp_connection = connect(smtp_host)
try:
if not ssl and starttls:
smtp_connection.starttls()
if smtp_username:
smtp_connection.login(smtp_username, smtp_password)
for m in outgoing_emails:
exit_if_deadline_has_been_passed()
try:
send_email(smtp_connection, m)
except (smtplib.SMTPRecipientsRefused, smtplib.SMTPSenderRefused):
# This should never happen, but anyway, it does not brake anything.
pass
with db.Transaction() as trx:
trx.set_asynchronous_commit()
trx.delete_outgoing_email(m['id'])
finally:
smtp_connection.quit()
if __name__ == "__main__":
smtp_host = os.environ.get('SMTP_HOST', 'localhost')
smtp_username = os.environ.get('SMTP_USERNAME', '')
smtp_password = os.environ.get('SMTP_PASSWORD', '')
ssl = False
starttls = False
site_domain = CMBARTER_HOST
dsn = CMBARTER_DSN
parse_args(sys.argv[1:])
db = curiousorm.Connection(dsn, dictrows=True)
# We must ensure that at most one process in running at a time, so
# we try to obtain an advisory database lock. This lock is held by
# execute_turn.py, so we are guaranteed that we will not take
# precious system resources while a turn is running.
if db.pg_try_advisory_lock(1):
try:
process_email_validations(db)
process_outgoing_customer_broadcasts(db)
process_notifications(db)
send_outgoing_emails(db, ssl=ssl, starttls=starttls)
finally:
db.pg_advisory_unlock(1)
db.close()