-
Notifications
You must be signed in to change notification settings - Fork 4
/
heartbleed_scan.py
365 lines (313 loc) · 11.7 KB
/
heartbleed_scan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
#!/usr/bin/env python
# Usage example: python ssltest.py example.com
import sys
import struct
import socket
import time
import select
import re
import threading
import netaddr
import json
import os
import datetime
import signal
from optparse import OptionParser
from collections import defaultdict
from multiprocessing.dummy import Pool
host_status = {}
hosts_to_skip = []
counter = defaultdict(int)
lock = threading.Lock()
options = OptionParser(usage='%prog <network> [network2] [network3] ...', description='Test for SSL heartbleed vulnerability (CVE-2014-0160) on multiple domains')
options.add_option('--port', '-p', dest="port", default=443, help="Port to scan on all hosts or networks, default 443")
options.add_option('--input', '-i', dest="input_file", default=[], action="append", help="Optional input file of networks or ip addresses, one address per line")
options.add_option('--logfile', '-o', dest="log_file", default="results.txt", help="Optional logfile destination")
options.add_option('--resume', dest="resume", action="store_true", default=False, help="Do not rescan hosts that are already in the logfile")
options.add_option('--timeout', '-t', dest="timeout", default=5, help="How long to wait for remote host to respond before timing out")
options.add_option('--threads', dest="threads", default=100, help="If specific, run X concurrent threads")
options.add_option('--json', dest="json_file", default=None, help="Save data as json into this file")
options.add_option('--only-vulnerable', dest="only_vulnerable", action="store_true", default=False, help="Only scan hosts that have been scanned before and were vulnerable")
options.add_option('--only-unscanned', dest="only_unscanned", action="store_true", default=False, help="Only scan hosts that appear in the json file but have not been scanned")
options.add_option('--summary', dest="summary", action="store_true", default=False, help="Useful with --json. Don't scan, just print old results")
options.add_option('--verbose', dest="verbose", action="store_true", default=False, help="Print verbose information to screen")
options.add_option('--max', dest="max", default=None, help="Exit program after scanning X hosts. Useful with --only-unscanned")
opts, args = options.parse_args()
threadpool = Pool(processes=int(opts.threads))
def h2bin(x):
return x.replace(' ', '').replace('\n', '').decode('hex')
hello = h2bin('''
16 03 03 00 dc 01 00 00 d8 03 03 53
43 5b 90 9d 9b 72 0b bc 0c bc 2b 92 a8 48 97 cf
bd 39 04 cc 16 0a 85 03 90 9f 77 04 33 d4 de 00
00 66 c0 14 c0 0a c0 22 c0 21 00 39 00 38 00 88
00 87 c0 0f c0 05 00 35 00 84 c0 12 c0 08 c0 1c
c0 1b 00 16 00 13 c0 0d c0 03 00 0a c0 13 c0 09
c0 1f c0 1e 00 33 00 32 00 9a 00 99 00 45 00 44
c0 0e c0 04 00 2f 00 96 00 41 c0 11 c0 07 c0 0c
c0 02 00 05 00 04 00 15 00 12 00 09 00 14 00 11
00 08 00 06 00 03 00 ff 01 00 00 49 00 0b 00 04
03 00 01 02 00 0a 00 34 00 32 00 0e 00 0d 00 19
00 0b 00 0c 00 18 00 09 00 0a 00 16 00 17 00 08
00 06 00 07 00 14 00 15 00 04 00 05 00 12 00 13
00 01 00 02 00 03 00 0f 00 10 00 11 00 23 00 00
00 0f 00 01 01
''')
def recvall(s, length, timeout=5):
endtime = time.time() + timeout
rdata = ''
remain = length
while remain > 0:
rtime = endtime - time.time()
if rtime < 0:
return None
r, w, e = select.select([s], [], [], 5)
if s in r:
try:
data = s.recv(remain)
except Exception, e:
return None
# EOF?
if not data:
return None
rdata += data
remain -= len(data)
return rdata
def recvmsg(s):
hdr = recvall(s, 5)
if hdr is None:
return None, None, None
typ, ver, ln = struct.unpack('>BHH', hdr)
pay = recvall(s, ln, 10)
if pay is None:
return None, None, None
return typ, ver, pay
def hit_hb(s):
while True:
typ, ver, pay = recvmsg(s)
if typ is None:
return False
if typ == 24:
return True
if typ == 21:
return False
def unpack_handshake(pay):
"""
Unpack the SSL handshake in Multiple Handshake Message
"""
paylen = len(pay)
offset = 0
payarr = []
while offset < paylen:
h = pay[offset:offset + 4]
t, l24 = struct.unpack('>B3s', h)
l = struct.unpack('>I', '\x00' + l24)[0]
payarr.append((
t,
l,
pay[offset+4:offset+4+l]
))
offset = offset+l+4
return payarr
def is_vulnerable(host, timeout, port=443):
""" Check if remote host is vulnerable to heartbleed
Returns:
None -- If remote host has no ssl
False -- Remote host has ssl but likely not vulnerable
True -- Remote host might be vulnerable
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(int(timeout))
try:
s.connect((host, int(port)))
except Exception, e:
return None
s.send(hello)
while True:
typ, ver, pay = recvmsg(s)
if typ is None:
return None
if typ == 22:
payarr = unpack_handshake(pay)
# Look for server hello done message.
finddone = [t for t, l, p in payarr if t == 14]
if len(finddone) > 0:
break
# construct heartbeat request packet
ver_chr = chr(ver&0xff)
hb = h2bin("18 03") + ver_chr + h2bin("40 00 01 3f fd") + "\x01"*16381
hb += h2bin("18 03") + ver_chr + h2bin("00 03 01 00 00")
s.send(hb)
return hit_hb(s)
def store_results(host_name, current_status):
current_time = time.time()
with lock:
counter[current_status] += 1
counter["Total"] += 1
if host_name not in host_status:
host_status[host_name] = {}
host = host_status[host_name]
# Make a note when this host was last scanned
host['last_scan'] = current_time
# Make a note if this host has never been scanned before
if 'first_scan' not in host:
host['first_scan'] = current_time
elif host.get('status', 'never been scanned') != current_status:
# If it has a different check result from before
host['changelog'] = host.get('changelog', [])
changelog_entry = [current_time, current_status]
host['changelog'].append(changelog_entry)
host['status'] = current_status
with open(opts.log_file, 'a') as f:
message = "{current_time} {host} {current_status}".format(**locals())
f.write(message + "\n")
return message
def scan_host(host):
""" Scans a single host, logs into
Returns:
list(timestamp, ipaddress, vulnerabilitystatus)
"""
if opts.max and int(opts.max) >= counter["Total"]:
return
host = str(host)
if host in hosts_to_skip:
return
result = is_vulnerable(host, opts.timeout, opts.port)
message = store_results(host, result)
if opts.verbose:
print message
return message
def scan_hostlist(hostlist, threads=5):
""" Iterates through hostlist and scans them
Arguments:
hostlist -- Iterable with ip addresses
threads -- If specified, run in multithreading mode
"""
task = threadpool.map_async(scan_host, hostlist)
while True:
print counter['Total'], "hosts done"
task.wait(1)
if task.ready() or hasattr(threadpool, 'done'):
return
threadpool.close()
threadpool.join()
def clean_hostlist(args):
""" Returns list of iterables
Examples:
>>> hostlist = ["127.0.0.1", "127.0.0.2"]
>>> clean_hostlist(hostlist)
"""
hosts = []
networks = []
for i in args:
# If it contains any alphanumerics, it might be a domain name
if any(c.isalpha() for c in i):
# Special hack, because alexa top x list is kind of weird
i = i.split('/')[0]
hosts.append(i)
# If arg contains a / we assume its a network name
elif '/' in i:
networks.append(netaddr.IPNetwork(i))
else:
hosts.append(i)
result = []
for network in networks:
if network.size >= opts.threads:
result.append(network)
else:
for i in network:
hosts.append(str(i))
if hosts:
result.append(hosts)
return result
def import_json(filename):
""" Reads heartbleed data in json format from this file """
with open(filename) as f:
json_data = f.read()
data = json.loads(json_data)
for k, v in data.items():
host_status[k] = v
def export_json(filename):
""" Save scan results into filename as json data
"""
json_data = json.dumps(host_status, indent=4)
with open(filename, 'w') as f:
f.write(json_data)
def print_summary():
""" Print summary of previously stored json data to screen """
if not opts.json_file:
pass
#options.error("You need to provide --json with --summary")
else:
import_json(opts.json_file)
counter = defaultdict(int)
for host, data in host_status.items():
friendly_status = "unknown"
status = data.get('status', "Not scanned")
if status is None:
friendly_status = "SSL Connection Failed"
elif status is True:
friendly_status = "Vulnerable"
elif status is False:
friendly_status = "Not Vulnerable"
else:
friendly_status = str(status)
last_scan = int(float(data.get('last_scan',0)))
last_scan = datetime.datetime.fromtimestamp(last_scan).strftime('%Y-%m-%d %H:%M:%S')
counter[friendly_status] += 1
counter['Total'] += 1
if opts.only_vulnerable and not status:
continue
elif opts.only_unscanned and 'status' in data:
continue
print "%s %-20s %5s" % (last_scan, host, friendly_status)
print "------------ summary -----------"
for k,v in counter.items():
print "%-7s %s" % (v, k)
return
def signal_handler(signal, frame):
print "Ctrl+C pressed.. aborting..."
threadpool.terminate()
threadpool.done = True
def main():
if opts.summary:
print_summary()
return
if not args and not opts.input_file and not opts.json_file:
options.print_help()
return
# If any input files were provided, parse through them and add all addresses to "args"
for input_file in opts.input_file:
with open(input_file) as f:
for line in f:
words = line.split()
if not words:
continue
# If input file is in masscan's portscan format
if line.startswith("Discovered open port"):
args.append(words.pop())
elif len(words) == 1:
args.append(words[0])
else:
print "Skipping invalid input line: " % line
continue
if opts.json_file:
try:
import_json(opts.json_file)
except IOError:
print opts.json_file, "not found. Not importing any data"
for host_name, data in host_status.items():
if opts.only_unscanned and 'status' in data:
continue
if data.get('status', None) is True or not opts.only_vulnerable:
args.append(host_name)
# For every network in args, convert it to a netaddr network, so we can iterate through each host
remote_networks = clean_hostlist(args)
for network in remote_networks:
scan_hostlist(network, threads=opts.threads)
if opts.json_file:
export_json(opts.json_file)
print_summary()
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_handler)
main()