forked from byt3bl33d3r/WitnessMe
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwitnessme.py
executable file
·179 lines (149 loc) · 6.51 KB
/
witnessme.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#! /usr/bin/env python3
import threading
import logging
import asyncio
import os
import argparse
import signal
import pyppeteer
import pathlib
import witnessme.stats as stats
from time import sleep
from witnessme.utils import patch_pyppeteer, resolve_host, is_ipaddress
from witnessme.database import ScanDatabase
from witnessme.parsers import AutomaticTargetGenerator
from datetime import datetime
from argparse import ArgumentDefaultsHelpFormatter
from urllib.parse import urlparse
logging.basicConfig(format="%(asctime)s [%(levelname)s] - %(filename)s: %(funcName)s - %(message)s", level=logging.INFO)
logging.getLogger('asyncio').setLevel(logging.ERROR)
logging.getLogger('sqlite3').setLevel(logging.ERROR)
logging.getLogger('aiosqlite').setLevel(logging.ERROR)
logging.getLogger('asyncio.coroutines').setLevel(logging.ERROR)
logging.getLogger('websockets').setLevel(logging.ERROR)
logging.getLogger('websockets.server').setLevel(logging.ERROR)
logging.getLogger('websockets.protocol').setLevel(logging.ERROR)
logging.getLogger('pyppeteer').setLevel(logging.INFO)
async def on_request(request):
pass
#logging.info(f"on_request() called: url: {request.url}")
async def on_response(response):
pass
#logging.info(f"on_response() called, url: {response.url}")
async def on_requestfinished(request):
pass
#logging.info(f"on_requestfinished() called, url: {request.url}")
async def screenshot(url, page):
"""
The page.goto() options might need to be tweaked depending on testing in real environments.
https://github.com/GoogleChrome/puppeteer/blob/master/docs/api.md#pagewaitfornavigationoptions
load - consider navigation to be finished when the load event is fired.
domcontentloaded - consider navigation to be finished when the DOMContentLoaded event is fired.
networkidle0 - consider navigation to be finished when there are no more than 0 network connections for at least 500 ms.
networkidle2 - consider navigation to be finished when there are no more than 2 network connections for at least 500 ms.
"""
url = urlparse(url)
response = await page.goto(
url.geturl(),
options={
"waitUntil": "networkidle0"
}
)
hostname = None
if is_ipaddress(url.hostname):
hostname = await asyncio.wait_for(resolve_host(url.hostname), timeout=3)
else:
hostname = url.hostname
if not url.port:
url = url._replace(netloc=f"{url.hostname}:{response.remotePort}")
screenshot = f'{url.scheme}_{url.hostname}_{url.port}.png'
screenshot_path = str(pathlib.Path(f'./{report_folder}/{screenshot}').absolute())
await page.screenshot(
{
'path': screenshot_path,
'fullPage': True
}
)
return {
"ip": response.remoteIPAddress,
"hostname": hostname,
"url": url.geturl(),
"screenshot": screenshot,
"port": url.port,
"scheme": url.scheme,
"title": await page.title(), # await page.evaluate('document.title')
"server": response.headers.get('server'),
"headers": response.headers,
"body": await response.text()
}
def task_watch():
while True:
sleep(5)
logging.info(f"total: {stats.inputs}, done: {stats.execs}, pending: {stats.inputs - stats.execs}")
async def worker(context, queue):
#while True:
url = await queue.get()
page = await context.newPage()
page.setDefaultNavigationTimeout(args.timeout * 1000) # setDefaultNavigationTimeout() accepts milliseconds
#page.on('request', lambda req: asyncio.create_task(on_request(req)))
#page.on('requestfinished', lambda req: asyncio.create_task(on_requestfinished(req)))
#page.on('response', lambda resp: asyncio.create_task(on_response(resp)))
try:
r = await asyncio.wait_for(screenshot(url, page), timeout=args.timeout)
logging.debug(r)
async with ScanDatabase(report_folder) as db:
await db.add_host_and_service(**r)
logging.info(f"Took screenshot of {url}")
except asyncio.TimeoutError:
logging.info(f"Task for url {url} timed out")
except Exception as e:
#if not any(err in str(e) for err in ['ERR_ADDRESS_UNREACHABLE', 'ERR_CONNECTION_REFUSED', 'ERR_CONNECTION_TIMED_OUT']):
logging.error(f"Error taking screenshot: {e}")
finally:
stats.execs += 1
await page.close()
queue.task_done()
async def producer(queue):
with AutomaticTargetGenerator(args.target) as generated_targets:
for url in generated_targets:
stats.inputs += 1
await queue.put(url)
async def start_scan(queue, n_urls: int):
logging.info("Starting headless browser")
browser = await pyppeteer.launch(headless=True, ignoreHTTPSErrors=True, args=['--no-sandbox']) # --no-sandbox is required to make Chrome/Chromium run under root.
context = await browser.createIncognitoBrowserContext()
try:
worker_threads = [asyncio.create_task(worker(context, queue)) for _ in range(n_urls)]
logging.info(f"Using {len(worker_threads)} worker thread(s)")
await asyncio.gather(*worker_threads, return_exceptions=True)
finally:
await context.close()
logging.info("Killing headless browser")
await browser.close()
async def main():
await ScanDatabase.create_db_and_schema(report_folder)
queue = asyncio.Queue()
asyncio.create_task(producer(queue))
logging.info("Waiting for queue to populate...")
while queue.qsize() == 0:
await asyncio.sleep(0.1)
t = threading.Thread(target=task_watch)
t.setDaemon(True)
t.start()
while queue.qsize() > 0:
await start_scan(
queue=queue,
n_urls=args.threads if queue.qsize() > args.threads else queue.qsize(),
)
if __name__ == '__main__':
parser = argparse.ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("target", nargs='+', type=str, help='The target IP(s), range(s), CIDR(s) or hostname(s)')
parser.add_argument("-p", "--ports", nargs='+', default=[80, 8080, 443, 8443], help="Ports to scan if IP Range/CIDR is provided")
parser.add_argument('--threads', default=25, type=int, help='Number of concurrent threads')
parser.add_argument('--timeout', default=35, type=int, help='Timeout for each connection attempt in seconds')
args = parser.parse_args()
patch_pyppeteer()
time = datetime.now().strftime("%Y_%m_%d_%H%M%S")
report_folder = f"scan_{time}"
os.mkdir(report_folder)
asyncio.run(main())