-
Notifications
You must be signed in to change notification settings - Fork 53
/
delivery.py
199 lines (174 loc) · 8.33 KB
/
delivery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# Test case delivery classes
import smtplib
import mailbox
from time import sleep
from logging import CSVLogger, DummyLogger
from tests.discovery import get_evasions
# Base class
class DeliveryBase:
def __init__(self, target, sender, recipients, args):
"""Initializes target for test case delivery, e.g. connection to server"""
self.target = target
self.sender = sender
self.recipients = recipients
self.args = args
self.testcase_selection = args.testcases
# Initiate logging
if args.log:
self.logger = CSVLogger(args.log)
else:
self.logger = DummyLogger()
# Build evasion dict with initialized evasion factories
evasions = get_evasions()
self.evasions = {
evasion.identifier: evasion(evasion.identifier in args.evasion)
for evasion in evasions
}
def deliver_testcase(self, testcase, recipient):
"""Delivers test case to target"""
pass
def deliver_testcases(self, test):
"""Deliver all test cases of a test suite to the target"""
self.recipient_index = 1
for recipient in self.recipients:
self.recipient = recipient
self.testcase_index = 1
self.testcases = test(self.sender, recipient, self.evasions, self.args)
try:
testcase_set = self.testcase_selection[test.identifier]
except KeyError: # Definition was given, but not for this test
testcase_set = {}
except TypeError: # No definition was given at all
testcase_set = None
for testcase in self.testcases:
if testcase_set is None or self.testcase_index in testcase_set:
self.deliver_testcase(testcase, recipient)
self.testcase_index += 1
self.recipient_index += 1
def close(self):
"""Finalizing delivery, e.g. for cleanup or freeing resources"""
self.logger.close()
class DelayMixin:
"""Implements delaying of test cases and automatic delay incremention. Should be mixed in for all network-based delivery classes."""
def __init__(self, target, sender, recipients, args):
super().__init__(target, sender, recipients, args)
self.delay = self.args.delay or 0
self.auto_delay = self.args.auto_delay
self.delay_step = self.args.delay_step
self.delay_max = self.args.delay_max
self.allow_delay_increase()
def allow_delay_increase(self):
"""Allow automatic increase of delay after it was increased. This prevents multiple increments per test case on multiple errors."""
self.allow_increase_delay = True
def increase_delay(self):
"""Increase the send delay automatically as configured in arguments. Disable delay increase until next call of .allow_delay_increase()"""
if self.auto_delay and self.allow_increase_delay:
old_delay = self.delay
self.delay += self.delay_step
if self.delay > self.delay_max:
self.delay = self.delay_max
print("! Increased delay from {:0.1f} to {:0.1f}".format(old_delay, self.delay))
self.allow_increase_delay = False
def do_delay(self):
"""Sleep for the amount currently set as delay."""
sleep(self.delay)
def deliver_testcase(self, *args, **kwargs):
"""Add delay to each test case"""
super().deliver_testcase(*args, **kwargs)
self.do_delay()
class SMTPDelivery(DelayMixin, DeliveryBase):
"""Deliver test cases to a SMTP server"""
delay_increasing_status = range(400, 500)
def __init__(self, target, sender, recipients, args):
super().__init__(target, sender, recipients, args)
self.target = target
self.smtp = smtplib.SMTP(target)
def deliver_testcase(self, testcase, recipient):
super().deliver_testcase(testcase, recipient)
print("Sending test case {} from test '{}' to {}".format(self.testcase_index, self.testcases.name, recipient))
self.allow_delay_increase()
try:
try:
if testcase.delivery_sender:
sender = self.sender
else:
sender = None
except AttributeError:
sender = None
try:
if testcase.delivery_recipient:
recipient = recipient
else:
recipient = None
except AttributeError:
recipient = None
result = self.smtp.send_message(testcase, sender, recipient)
if result:
for failed_recipient, (code, message) in result.items():
print("! Sending to recipient {} failed with error code {}: {}".format(failed_recipient, code, message))
self.logger.log(self.testcases.identifier, self.testcase_index, self.recipient, False, code, message)
if code in self.delay_increasing_status:
self.increase_delay()
else:
self.logger.log(self.testcases.identifier, self.testcase_index, self.recipient)
except smtplib.SMTPRecipientsRefused as e:
print("! Reciepent refused")
for failed_recipient, (code, message) in e.recipients.items():
print("! Sending to recipient {} failed with error code {}: {}".format(failed_recipient, code, str(message, "iso-8859-1")))
self.logger.log(self.testcases.identifier, self.testcase_index, failed_recipient, False, code, str(message, "iso-8859-1"))
if code in self.delay_increasing_status:
self.increase_delay()
except smtplib.SMTPHeloError as e:
print("! SMTP error while HELO: " + str(e))
if e.smtp_code in self.delay_increasing_status:
self.increase_delay()
except smtplib.SMTPSenderRefused as e:
print("! SMTP server rejected sender address: " + str(e))
self.logger.log(self.testcases.identifier, self.testcase_index, self.recipient, False, e.smtp_code, e.smtp_error)
if e.smtp_code in self.delay_increasing_status:
self.increase_delay()
except smtplib.SMTPDataError as e:
print("! Unexpected SMTP error: " + str(e))
self.logger.log(self.testcases.identifier, self.testcase_index, self.recipient, False, e.smtp_code, e.smtp_error)
if e.smtp_code in self.delay_increasing_status:
self.increase_delay()
except smtplib.SMTPNotSupportedError as e:
print("! SMTP server doesn't supports: " + str(e))
self.logger.log(self.testcases.identifier, self.testcase_index, self.recipient, False, -1, str(e))
except smtplib.SMTPServerDisconnected as e:
self.logger.log(self.testcases.identifier, self.testcase_index, self.recipient, False, -2, str(e))
print("! SMTP server disconnected unexpected - reconnecting: " + str(e))
self.smtp = smtplib.SMTP(self.target)
def close(self):
super().close()
self.smtp.quit()
class FileDelivery(DeliveryBase):
"""Dumps each test case into separate plain file"""
def deliver_testcase(self, testcase, recipient):
outname = "{}/{}-{:04d}-{:02d}.msg".format(
self.target,
self.testcases.identifier,
self.testcase_index,
self.recipient_index
)
try:
out = open(outname, "wb")
out.write(testcase.as_bytes())
out.close()
except ( IOError, OSError ) as e:
print("! Error while creation of test file {}: {}".format(outname, str(e)))
class MailboxDeliveryBase(DeliveryBase):
"""Base class for delivery methods using the mailbox module"""
mailbox_class = mailbox.Mailbox
def __init__(self, target, sender, recipients, args):
super().__init__(target, sender, recipients, args)
self.mailbox = self.mailbox_class(target)
def deliver_testcase(self, testcase, recipient):
self.mailbox.add(testcase)
def close(self):
super().close()
self.mailbox.close()
class MBoxDelivery(MailboxDeliveryBase):
mailbox_class = mailbox.mbox
class MaildirDelivery(MailboxDeliveryBase):
mailbox_class = mailbox.Maildir