forked from egrcc/zhihu-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
auth.py
240 lines (204 loc) · 8.41 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Build-in / Std
import os, sys, time, platform, random
import re, json, cookielib
# requirements
import requests, termcolor
requests = requests.Session()
requests.cookies = cookielib.LWPCookieJar('cookies')
try:
requests.cookies.load(ignore_discard=True)
except:
pass
class Logging:
flag = True
@staticmethod
def error(msg):
if Logging.flag == True:
print "".join( [ termcolor.colored("ERROR", "red"), ": ", termcolor.colored(msg, "white") ] )
@staticmethod
def warn(msg):
if Logging.flag == True:
print "".join( [ termcolor.colored("WARN", "yellow"), ": ", termcolor.colored(msg, "white") ] )
@staticmethod
def info(msg):
# attrs=['reverse', 'blink']
if Logging.flag == True:
print "".join( [ termcolor.colored("INFO", "magenta"), ": ", termcolor.colored(msg, "white") ] )
@staticmethod
def debug(msg):
if Logging.flag == True:
print "".join( [ termcolor.colored("DEBUG", "magenta"), ": ", termcolor.colored(msg, "white") ] )
@staticmethod
def success(msg):
if Logging.flag == True:
print "".join( [ termcolor.colored("SUCCES", "green"), ": ", termcolor.colored(msg, "white") ] )
# Setting Logging
Logging.flag = True
class LoginPasswordError(Exception):
def __init__(self, message):
if type(message) != type("") or message == "": self.message = u"帐号密码错误"
else: self.message = message
Logging.error(self.message)
class NetworkError(Exception):
def __init__(self, message):
if type(message) != type("") or message == "": self.message = u"网络异常"
else: self.message = message
Logging.error(self.message)
class AccountError(Exception):
def __init__(self, message):
if type(message) != type("") or message == "": self.message = u"帐号类型错误"
else: self.message = message
Logging.error(self.message)
def download_captcha():
url = "http://www.zhihu.com/captcha.gif"
r = requests.get(url, params={"r": random.random()} )
if int(r.status_code) != 200:
raise NetworkError(u"验证码请求失败")
image_name = u"verify." + r.headers['content-type'].split("/")[1]
open( image_name, "wb").write(r.content)
"""
System platform: https://docs.python.org/2/library/platform.html
"""
Logging.info(u"正在调用外部程序渲染验证码 ... ")
if platform.system() == "Linux":
Logging.info(u"Command: xdg-open %s &" % image_name )
os.system("xdg-open %s &" % image_name )
elif platform.system() == "Darwin":
Logging.info(u"Command: open %s &" % image_name )
os.system("open %s &" % image_name )
elif platform.system() == "SunOS":
os.system("open %s &" % image_name )
elif platform.system() == "FreeBSD":
os.system("open %s &" % image_name )
elif platform.system() == "Unix":
os.system("open %s &" % image_name )
elif platform.system() == "OpenBSD":
os.system("open %s &" % image_name )
elif platform.system() == "NetBSD":
os.system("open %s &" % image_name )
elif platform.system() == "Windows":
os.system("open %s &" % image_name )
else:
Logging.info(u"我们无法探测你的作业系统,请自行打开验证码 %s 文件,并输入验证码。" % os.path.join(os.getcwd(), image_name) )
captcha_code = raw_input( termcolor.colored("请输入验证码: ", "cyan") )
return captcha_code
def search_xsrf():
url = "http://www.zhihu.com/"
r = requests.get(url)
if int(r.status_code) != 200:
raise NetworkError(u"验证码请求失败")
results = re.compile(r"\<input\stype=\"hidden\"\sname=\"_xsrf\"\svalue=\"(\S+)\"", re.DOTALL).findall(r.text)
if len(results) < 1:
Logging.info(u"提取XSRF 代码失败" )
return None
return results[0]
def build_form(account, password):
if re.match(r"^1\d{10}$", account): account_type = "phone_num"
elif re.match(r"^\S+\@\S+\.\S+$", account): account_type = "email"
else: raise AccountError(u"帐号类型错误")
form = {account_type: account, "password": password, "remember_me": True }
form['_xsrf'] = search_xsrf()
form['captcha'] = download_captcha()
return form
def upload_form(form):
if "email" in form: url = "http://www.zhihu.com/login/email"
elif "phone_num" in form: url = "http://www.zhihu.com/login/phone_num"
else: raise ValueError(u"账号类型错误")
headers = {
'User-Agent': "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36",
'Host': "www.zhihu.com",
'Origin': "http://www.zhihu.com",
'Pragma': "no-cache",
'Referer': "http://www.zhihu.com/",
'X-Requested-With': "XMLHttpRequest"
}
r = requests.post(url, data=form, headers=headers)
if int(r.status_code) != 200:
raise NetworkError(u"表单上传失败!")
if r.headers['content-type'].lower() == "application/json":
try:
# 修正 justkg 提出的问题: https://github.com/egrcc/zhihu-python/issues/30
result = json.loads(r.content)
except Exception as e:
Logging.error(u"JSON解析失败!")
Logging.debug(e)
Logging.debug(r.content)
result = {}
if result["r"] == 0:
Logging.success(u"登录成功!" )
return {"result": True}
elif result["r"] == 1:
Logging.success(u"登录失败!" )
return {"error": {"code": int(result['errcode']), "message": result['msg'], "data": result['data'] } }
else:
Logging.warn(u"表单上传出现未知错误: \n \t %s )" % ( str(result) ) )
return {"error": {"code": -1, "message": u"unknow error"} }
else:
Logging.warn(u"无法解析服务器的响应内容: \n \t %s " % r.text )
return {"error": {"code": -2, "message": u"parse error"} }
def islogin():
# check session
url = "http://www.zhihu.com/settings/profile"
r = requests.get(url, allow_redirects=False)
status_code = int(r.status_code)
if status_code == 301 or status_code == 302:
# 未登录
return False
elif status_code == 200:
return True
else:
Logging.warn(u"网络故障")
return None
def read_account_from_config_file(config_file="config.ini"):
# NOTE: The ConfigParser module has been renamed to configparser in Python 3.
# The 2to3 tool will automatically adapt imports when converting your sources to Python 3.
# https://docs.python.org/2/library/configparser.html
from ConfigParser import ConfigParser
cf = ConfigParser()
if os.path.exists(config_file) and os.path.isfile(config_file):
Logging.info(u"正在加载配置文件 ...")
cf.read(config_file)
email = cf.get("info", "email")
password = cf.get("info", "password")
if email == "" or password == "":
Logging.warn(u"帐号信息无效")
return (None, None)
else: return (email, password)
else:
Logging.error(u"配置文件加载失败!")
return (None, None)
def login(account=None, password=None):
if islogin() == True:
Logging.success(u"你已经登录过咯")
return True
if account == None:
(account, password) = read_account_from_config_file()
if account == None:
account = raw_input("请输入登录帐号: ")
password = raw_input("请输入登录密码: ")
form_data = build_form(account, password)
"""
result:
{"result": True}
{"error": {"code": 19855555, "message": "unknow.", "data": "data" } }
{"error": {"code": -1, "message": u"unknow error"} }
"""
result = upload_form(form_data)
if "error" in result:
if result["error"]['code'] == 1991829:
# 验证码错误
Logging.error(u"验证码输入错误,请准备重新输入。" )
return login()
else:
Logging.warn(u"unknow error." )
return False
elif "result" in result and result['result'] == True:
# 登录成功
Logging.success(u"登录成功!" )
requests.cookies.save()
return True
if __name__ == "__main__":
# login(account="[email protected]", password="xxxxx")
login()