-
Notifications
You must be signed in to change notification settings - Fork 2
/
Report.py
311 lines (253 loc) · 13.7 KB
/
Report.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.service import Service
from selenium.common import TimeoutException
from selenium.webdriver.common.by import By
from func_timeout import func_set_timeout
from selenium import webdriver
from datetime import datetime
from src import jy_click
import func_timeout
import requests
import time
import sys
import re
import os
skip = 11
proxies = {'http': None, 'https': None}
base_dir = os.path.dirname(os.path.abspath(__file__))
uid_file = os.path.join(base_dir, '附加文件', '运行数据','uid.txt')
log_file = os.path.join(base_dir, '附加文件', '运行记录','错误记录.txt')
title_file = os.path.join(base_dir, '附加文件', '运行记录','标题记录.txt')
script_ALL = os.path.join(base_dir, '附加文件', '页面脚本', '总脚本(纯JS代码).js')
success_directory = os.path.join(base_dir, '附加文件', '成功验证码')
fail_directory = os.path.join(base_dir, '附加文件', '失败验证码')
user_data_dir = os.path.join(base_dir, '附加文件', 'User Data')
chrome_binary_path = os.path.join(base_dir, '附加文件', 'chrome-win', 'chrome.exe')
chrome_driver_path = os.path.join(base_dir, '附加文件', '运行数据','chromedriver.exe')
os.makedirs(success_directory, exist_ok=True)
def log_error(message):
with open(log_file, 'a', encoding='utf-8') as log:
timestamp = datetime.now().strftime('[%Y-%m-%d %H:%M:%S]')
log.write(f"\n\n{timestamp} {message}")
def remove_completed_uid(uid):
try:
with open(uid_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
with open(uid_file, 'w', encoding='utf-8') as f:
for line in lines:
if line.strip() != uid:
f.write(line)
print(f"删除UID: {uid}")
except Exception as e:
print(f"删除UID时发生错误: {e}")
def get_location(target):
# 获取元素在屏幕上的位置信息
location = target.location
size = target.size
height = size['height']
width = size['width']
left = location['x']
top = location['y']
right = left + width
bottom = top + height
script = f"return {{'left': {left}, 'top': {top}, 'right': {right}, 'bottom': {bottom}}};"
rect = driver.execute_script(script)
left_x = int(rect['left'])
top_y = int(rect['top'])
return left_x, top_y
def trigger_captcha(browser):
try:
# 选择分类
WebDriverWait(browser, 20, 1).until(
EC.presence_of_element_located((By.XPATH, '/html/body/div/div/div[2]/div[1]/div[2]/div[1]/div'))
).click()
# 输入举报理由
WebDriverWait(browser, 20, 1).until(
EC.presence_of_element_located((By.XPATH, '/html/body/div[1]/div/div[3]/div[2]/textarea'))
).send_keys('视频封面标题以及内容违规,推广以原神、碧蓝档案等二次元游戏人物为主角的色情视频')
while True:
# 点击确认
WebDriverWait(browser, 20, 1).until(
EC.presence_of_element_located((By.XPATH, '/html/body/div/div/div[5]/div[2]'))
).click()
# 检查元素是否存在
try:
WebDriverWait(browser, 5).until(
EC.presence_of_element_located((By.XPATH, '//*[@class="geetest_item_wrap"]'))
)
print("验证码元素已出现!")
break # 如果元素出现则退出循环
except Exception:
print("验证码元素未出现,重新点击确认...")
except Exception as e:
print(f"发生错误: {e}")
def wait_for_js_execution():
# 等待一定条件,确保 JS 执行完成
WebDriverWait(driver, 10).until(lambda d: d.execute_script('return document.readyState') == 'complete')
uids = []
with open(uid_file, 'r', encoding='utf-8') as f: # 以读取模式打开文件
for line in f:
line = line.strip() # 去掉行首尾的空白字符
if line: # 如果不是空行,则认为是UID
uids.append(line)
if not uids:
print("uid.txt 文件中没有可处理的UID,程序退出")
log_error("uid.txt 文件中没有可处理的UID,程序退出")
exit(0)
options = webdriver.ChromeOptions()
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_argument(f'--user-data-dir={user_data_dir}') # 设置用户数据目录
options.binary_location = chrome_binary_path # 指定 Chrome 浏览器的可执行文件路径
options.add_argument('--proxy-server="direct://"')
options.add_argument('--proxy-bypass-list=*')
options.add_argument("--disable-gpu")
options.add_argument("--disable-sync")
options.add_argument("disable-cache")#禁用缓存
options.add_argument("--headless")
options.add_argument('log-level=3')
service = Service(executable_path=chrome_driver_path)
driver = webdriver.Chrome(service=service, options=options) # 启动 Chrome 浏览器
driver.set_window_size(1000, 700) # 设置浏览器窗口大小(宽度, 高度)
#driver.set_window_position(-850, 775) # 设置浏览器窗口位置(x, y)
#driver.set_window_position(-850, 1355)
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
try:
for uid in uids:
try:
search_url = f'https://api.bilibili.com/x/series/recArchivesByKeywords?mid={uid}&keywords=&ps=1&orderby=views'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}
response = requests.get(search_url, headers=headers, proxies=proxies, timeout=(5, 10))
response.raise_for_status()
data = response.json()
if 'data' in data and 'archives' in data['data'] and len(data['data']['archives']) > 0:
first_video = data['data']['archives'][0]
aid = first_video.get('aid')
title = first_video.get('title')
print(f"UID:{uid}, AID: {aid}, 标题: {title}")
with open(title_file, 'a', encoding='utf-8') as file:
file.write(f"\nUID:{uid}, AID: {aid}, 标题: {title}")
if skip == 11:
print("不跳过人机验证")
url = f"https://www.bilibili.com/appeal/?avid={aid}"
driver.get(url)
trigger_captcha(driver)
while True:
try:
img = WebDriverWait(driver, 5).until(
EC.presence_of_element_located(
(By.XPATH, '//*[@class="geetest_item_wrap"]'))
)
f = img.get_attribute('style')
attempt = 0 # 初始化尝试计数
while ('url("' not in f) and (attempt < 10):
f = img.get_attribute('style')
attempt += 1
time.sleep(0.5)
print(attempt)
url = re.search(r'url\("([^"]+?)\?[^"]*"\);', f).group(1)
content = requests.get(url, proxies=proxies, timeout=(5, 10)).content
plan = jy_click.JYClick().run(content)
print(plan)
a, b = get_location(img)
lan_x = 306 / 334
lan_y = 343 / 384
for crop in plan:
x1, y1, x2, y2 = crop
x, y = [(x1 + x2) / 2, (y1 + y2) / 2]
print(a + x * lan_x, b + y * lan_y, "点击坐标")
# 执行点击操作
ActionChains(driver).move_by_offset(a + x * lan_x, b + y * lan_y).click().perform()
ActionChains(driver).move_by_offset(-(a + x * lan_x),
-(b + y * lan_y)).perform() # 恢复鼠标位置
time.sleep(0.3)
try: # 执行点击确认按钮的操作
element = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CLASS_NAME, 'geetest_commit_tip'))
)
element.click() # 提交验证码
except Exception as e:
print(f"提交验证码时发生错误: {e}")
refresh_element = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CLASS_NAME, 'geetest_refresh'))
)
refresh_element.click() # 点击刷新验证按钮
print('已点击刷新按钮')
try: # 等待 'geetest_item_wrap' 元素消失,表示验证码提交成功
WebDriverWait(driver, 3).until(
EC.invisibility_of_element_located(
(By.XPATH, '//*[@class="geetest_item_wrap"]'))
)
print("验证码已消失!")
try:
WebDriverWait(driver, 5).until(EC.alert_is_present()) # 等待最多5秒,直到弹窗出现
alert = driver.switch_to.alert # 切换到弹窗
alert_text = alert.text # 获取弹窗文本
# 检查弹窗内容
if not "-352" in alert_text:
alert.accept()
print("验证码验证成功!")
success_name = url.split('/')[-1]
success_path = os.path.join(success_directory, success_name)
with open(success_path, 'wb') as file:
file.write(content)
#print(f"图片已保存至: {success_path}")
break
else:
log_error('意外情况,弹窗出现-352')
sys.exit('意外情况,弹窗出现-352')
except TimeoutException:
log_error('多次验证失败,程序退出')
sys.exit('多次验证失败,程序退出')
except Exception as e:
print(f"验证码验证失败!,错误: {e}")
os.makedirs(fail_directory, exist_ok=True)
fail_name = url.split('/')[-1]
fail_path = os.path.join(fail_directory, fail_name)
with open(fail_path, 'wb') as file:
file.write(content)
#print(f"图片已保存至: {fail_path}")
except Exception as e:
print(f"人机验证循环出错,错误: {e}")
log_error(f"人机验证循环出错,错误: {e}")
sys.exit('人机验证循环出错') # 如果发生异常也退出程序
skip = 0
else:
print("跳过人机验证")
skip = skip + 1
userurl = f"https://space.bilibili.com/{uid}"
with open(script_ALL, "r", encoding="utf-8") as file:
ALL = file.read()
@func_set_timeout(4.7)
def report_scrpit():
driver.get(userurl)
print('\n',userurl,'\n')
driver.execute_script(ALL)
time.sleep(470)
try:
report_scrpit()
print('未超时') #不可能
except func_timeout.exceptions.FunctionTimedOut:
logs = driver.get_log('browser')
warning_logs = [log for log in logs if log['level'] == 'WARNING']
for log in warning_logs:
print(log['message'])
remove_completed_uid(uid)
continue # 使用 continue 继续下一个 UID
else:
print(f"UID {uid} 没有找到视频,继续下一个 UID。")
remove_completed_uid(uid)
continue # 找不到任何视频,继续下一个 UID
except (requests.exceptions.HTTPError, requests.exceptions.RequestException) as e:
print(f"UID循环内发生错误,错误UID:{uid},错误: {e}")
log_error(f"UID循环内发生错误,错误UID:{uid},错误: {e}")
sys.exit(f"UID循环内发生错误,错误UID:{uid}")
driver.quit() # 完成所有操作后关闭浏览器
except Exception as e:
print(f"从文件获取UID时发生错误,错误: {e}")
log_error(f"从文件获取UID时发生错误,错误: {e}")
sys.exit('从文件获取UID时发生错误')
finally:
driver.quit()