forked from fffonion/MAClient
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmaclient_network.py
288 lines (258 loc) · 12.3 KB
/
maclient_network.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
#!/usr/bin/env python
# coding:utf-8
# maclient network utility
# Contributor:
# fffonion <[email protected]>
import os
import sys
import time
import base64
import socket
import urllib
import maclient_smart
from cross_platform import *
if PYTHON3:
import http.client as httplib
xrange = range
else:
import httplib
try:
import httplib2
except ImportError:
print('httplib2 not found in python libs. You can download it here: https://github.com/fffonion/httplib2-plus')
try:
from Crypto.Cipher import AES
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5
SLOW_MODE = False
except ImportError:
import pyaes as AES
SLOW_MODE=True
serv = {'cn':'http://game1-CBT.ma.sdo.com:10001/connect/app/',
'cn2':'http://game2-CBT.ma.sdo.com:10001/connect/app/',
'cn3':'http://game3-CBT.ma.sdo.com:10001/connect/app/',
'tw':'http://game.ma.mobimon.com.tw:10001/connect/app/',
'jp':'http://web.million-arthurs.com/connect/app/', 'jp_data':'',
'kr':'http://ma.actoz.com:10001/connect/app/', 'kr_data':''}
serv['cn_data'] = serv['cn2_data'] = serv['cn3_data'] = 'http://MA.webpatch.sdg-china.com/'
headers_main = {'User-Agent': 'Million/%d (GT-I9100; GT-I9100; 2.3.4) samsung/GT-I9100/GT-I9100:2.3.4/GRJ22/eng.build.20120314.185218:eng/release-keys', 'Connection': 'Keep-Alive', 'Accept-Encoding':'gzip,deflate'}
headers_post = {'Content-Type': 'application/x-www-form-urlencoded'}
pad = lambda s: s + (16 - len(s) % 16) * chr(16 - len(s) % 16)
unpad = lambda s : s[0:-ord(s[-1])]
b2u = PYTHON3 and (lambda x:x.decode(encoding = 'utf-8')) or (lambda x:x)
MOD_AES, MOD_AES_RANDOM, MOD_RSA_AES_RANDOM = 0, 1, 2
class Crypt():
def __init__(self,loc):
self.init_cipher(loc=loc)
self.random_cipher_plain=''
if loc[:2]=='cn':
self.gen_rsa_pubkey()
def gen_cipher_with_uid(self, uid, loc):
pass
def gen_rsa_pubkey(self):
#pk="""MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAM5U06JAbYWdRBrnMdE2bEuDmWgUav7xNKm7i8s1Uy/\nfvpvfxLeoWowLGIBKz0kDLIvhuLV8Lv4XV0+aXdl2j4kCAwEAAQ=="""
pk = maclient_smart.key_rsa_pool[2]
#pk = 'MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A' + ''.join(pk[1:-2])
pk = [pk[i:i+64] for i in range(0, len(pk), 64)]
pk = '-----BEGIN PUBLIC KEY-----\n' + '\n'.join(pk) + '\n-----END PUBLIC KEY-----'
#print pk
self.rsa = PKCS1_v1_5.new(RSA.importKey(pk))
#bio = BIO.MemoryBuffer(pk) # pk is ASCII, don't encode
#self.rsa = RSA.load_pub_key_bio(bio)
def gen_random_cipher(self):
self.random_cipher_plain=os.urandom(16)
self.random_cipher = self._gen_cipher(self.random_cipher_plain)
def _gen_cipher(self,plain):
return AES.new(plain, AES.MODE_ECB)
def init_cipher(self,loc = 'cn', uid = None):
_key = getattr(maclient_smart, 'key_%s' % loc[:2])
if loc == 'jp':
if not uid:
uid = '0'
_key['crypt'] = '%s%s%s' % (_key['crypt'], uid, '0' * (32 - len(_key['crypt'] + uid)))
print(_key['crypt'])
if sys.platform == 'cli':
pass # import clr
# clr.AddReference("IronPyCrypto.dll")
self.cipher_data = self._gen_cipher(_key['crypt'])
self.cipher_res = self._gen_cipher(_key['res'])
def decode_res(self, bytein):
return cipher.decrypt(bytein)
def decode_data(self, bytein):
if len(bytein) == 0:
return ''
else:
return unpad(b2u(self.cipher_data.decrypt(bytein)))
def decode_data64(self, strin):
return self.decode_data(base64.decodestring(self.urlescape(strin)))
def encode_data(self, bytein, mode):
if mode==MOD_AES:
return self.cipher_data.encrypt(pad(bytein))
elif mode >= MOD_AES_RANDOM:
return self.random_cipher.encrypt(pad(bytein))
def encode_rsa_64(self, strin):
return base64.encodestring(self.rsa.encrypt(strin))
def encode_data64(self, bytein, mode):
res=b2u(base64.encodestring(self.encode_data(bytein, mode))).strip('\n')
# if mode != MOD_RSA_AES_RANDOM:
# return self.urlunescape(res)
return res
def encode_param(self, param, mode=MOD_AES):
p = param.split('&')
if mode == MOD_RSA_AES_RANDOM:
_m=self.encode_rsa_64
else:
_m=lambda x:x
p_enc = '%0A&'.join(['%s=%s' % (p[i].split('=')[0], self.urlunescape(_m(self.encode_data64(p[i].split('=')[1], mode)))) for i in xrange(len(p))])
# print p_enc
return p_enc.replace('\n', '')
def decode_param(self, param_enc):
p_enc = param_enc.split('&')
p = '%0A&'.join(['%s=%s' % (p_enc[i].split('=')[0], self.decode_data64(p_enc[i].split('=')[1])) for i in xrange(len(p_enc))])
return p
#modified '&' to '\n'
def urlunescape(self, url):
return url.replace('=', '%3D').replace('\n', '%0A').replace('/', '%2F').replace('+', '%2B')
def urlescape(self, url):
return url.replace('%3D', '=').replace('%0A', '\n').replace('%2F', '/').replace('%2B', '+')
def decrypt_file(self, filein, fileout, ext = 'png'):
fileout = '%s.%s' % (fileout, ext)
if not os.path.exists(os.path.split(fileout)[0]):
os.makedirs(os.path.split(fileout)[0])
try:
if not os.path.exists(fileout):
open(fileout, 'wb').write(decode_res(open(filein, 'rb').read()))
except ValueError:
print('skip', filein)
else:
pass
#ht = httplib2.Http(timeout = 15,proxy_info = httplib2.ProxyInfo(httplib2.socks.PROXY_TYPE_HTTP_NO_TUNNEL, "192.168.124.1", 23300))
ht = httplib2.Http(timeout = 15)
class poster():
def __init__(self, loc, logger, ua):
self.cookie = ''
# self.maClientInstance=mac
# ironpython版的httplib2的iri2uri中用utf-8代替了idna,因此手动变回来
self.rollback_utf8 = sys.platform.startswith('cli') and \
(lambda dt:dt.decode('utf-8')) or\
(lambda dt:dt)
self.logger = logger
self.load_svr(loc, ua)
if SLOW_MODE:
self.logger.warning(du8('post:没有安装pycrypto库,可能将额外耗费大量时间'))
self.issavetraffic = False
def set_cookie(self, cookie):
self.cookie = cookie
def enable_savetraffic(self):
self.issavetraffic = True
def gen_2nd_key(self, uid, loc='jp'):
self.crypt.gen_cipher_with_uid(uid, loc)
def load_svr(self, loc, ua=''):
self.servloc = loc
self.shortloc = loc[:2]
self.header = dict(headers_main)
self.header.update(headers_post)
if self.shortloc in ['cn','kr']:
ht.add_credentials("iW7B5MWJ", "8KdtjVfX")
if ua != '':
if '%d' in ua: # formatted ua
self.header['User-Agent'] = ua % getattr(maclient_smart, 'app_ver_%s' % self.shortloc)
else:
self.header['User-Agent'] = ua
else:
self.header['User-Agent'] = self.header['User-Agent'] % getattr(maclient_smart, 'app_ver_%s' % self.shortloc)
self.default_2ndkey = loc in ['jp','cn']
self.crypt=Crypt(self.shortloc)
def update_server(self, check_inspection_str):
#not using
if check_inspection_str:
strs = check_inspection_str.split(',')
try:
serv[self.servloc] = strs[3]
serv['%s_data' % self.servloc] = strs[2]
except KeyError:
pass
except IndexError:
self.logger.error(du8('错误的密钥?'))
raw_input()
os._exit(1)
def post(self, uri, postdata = '', usecookie = True, setcookie = True, extraheader = {'Cookie2': '$Version=1'}, noencrypt = False, savetraffic = False, no2ndkey = False):
header = {}
header.update(self.header)
header.update(extraheader)
if usecookie:
header.update({'Cookie':self.cookie})
if not noencrypt :
if self.shortloc=='cn':#pass key to server
#add sign to param
self.crypt.gen_random_cipher()
sign='K=%s'%self.crypt.urlunescape(
self.crypt.encode_rsa_64(
base64.encodestring(
self.crypt.random_cipher_plain))).rstrip('\n')
if postdata:#has real stuff
if uri in ['login','regist']:
postdata = self.crypt.encode_param(postdata.encode('utf-8'), mode=MOD_RSA_AES_RANDOM)
else:
postdata = self.crypt.encode_param(postdata, mode=MOD_AES_RANDOM)
postdata='&'.join([sign,postdata])
else:
postdata=sign
elif postdata != '':
postdata = self.crypt.encode_param(postdata)
trytime = 0
ttimes = 3
callback_hook = None
if savetraffic and self.issavetraffic:
callback_hook = lambda x:x
while trytime < ttimes:
try:
resp, content = ht.request('%s%s%s' % (serv[self.servloc], uri, not noencrypt and '?cyt=1' or ''), method = 'POST', headers = header, body = postdata, callback_hook = callback_hook, chunk_size = None)
except socket.error as e:
if e.errno == None:
err = 'Timed out'
else:
err = e.errno
self.logger.warning('post:%s got socket error:%s, retrying in %d times' % (uri, err, ttimes - trytime))
except httplib.ResponseNotReady:
# socket重置,不计入重试次数
trytime -= 1
self.logger.warning('post:socket closed, retrying in %d times' % (ttimes - trytime))
except httplib2.ServerNotFoundError:
self.logger.warning('post:no internet, retrying in %d times' % (ttimes - trytime))
except TypeError: # 使用了官方版的httplib2
if savetraffic and self.issavetraffic:
self.logger.warning(du8('你正在使用官方版的httplib2,因此省流模式将无法正常工作'))
resp, content = ht.request('%s%s%s' % (serv[self.servloc], uri, not noencrypt and '?cyt=1' or ''), method = 'POST', headers = header, body = postdata)
break
else:
if int(resp['status']) < 400:
break
self.logger.warning('post:POSTing %s, server returns code %s, retrying in %d times' % (uri, resp['status'], 3 - trytime))
resp, content = {'status':'600'}, ''
trytime += 1
time.sleep(2.718281828 * trytime)
if not 'content-length' in resp:
resp['content-length'] = str(len(content))
# 状态码判断
if int(resp['status']) > 400:
self.logger.error('post:%s %s' % (uri, ','.join([ (i in resp and (i + ':' + resp[i]) or '')for i in ['status', 'content-length', 'set-cookie']]) + du8('\n请到信号良好的地方重试【←←')))
resp.update({'error':True, 'errno':resp['status'], 'errmsg':'Client or server error.'})
return resp, content
else:
self.logger.debug('post:%s content-length:%s%s' % (uri, resp['content-length'], ('set-cookie' in resp and (' set-cookie:%s' % resp['set-cookie']) or '')))
# 省流模式
if savetraffic and self.issavetraffic:
return resp, content
# 否则解码
dec = self.rollback_utf8(self.crypt.decode_data(content))
if os.path.exists('debug'):
open('debug/%s.xml' % uri.replace('/', '#').replace('?', '~'), 'w').write(dec)
# open('debug/~%s.xml'%uri.replace('/','#').replace('?','~'),'w').write(content)
if setcookie and 'set-cookie' in resp:
self.cookie = resp['set-cookie'].split(',')[-1].rstrip('path=/').strip()
# print self.cookie
return resp, dec
if __name__ == "__main__":
p = Crypt('cn')
p.gen_random_cipher()