This repository has been archived by the owner on May 23, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 15
/
liqi.py
126 lines (115 loc) · 4.64 KB
/
liqi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#捕获websocket数据并解析雀魂"动作"语义为Json
import json
from struct import unpack
import base64
from enum import Enum
from typing import List, Dict
from google.protobuf.json_format import MessageToDict
from proto import liqi_pb2 as pb
class MsgType(Enum):
Notify = 1
Req = 2
Res = 3
class LiqiProto:
def __init__(self):
#解析一局的WS消息队列
self.tot = 0 # 当前总共解析的包数量
# (method_name:str,pb.MethodObj) for 256 sliding windows; req->res
self.res_type = {} # int -> (method_name,pb2obj)
self.jsonProto = json.load(open('./proto/liqi.json', 'r'))
def parse(self, flow_msg):
#parse一帧WS flow msg,要求按顺序parse
buf = flow_msg.content
from_client = flow_msg.from_client
result = {}
msg_type = MsgType(buf[0]) # 通信报文类型
if msg_type == MsgType.Notify:
msg_block = fromProtobuf(buf[1:]) # 解析剩余报文结构
method_name = msg_block[0]['data'].decode()
_, lq, message_name = method_name.split('.')
liqi_pb2_notify = getattr(pb, message_name)
proto_obj = liqi_pb2_notify.FromString(msg_block[1]['data'])
dict_obj = MessageToDict(
proto_obj, preserving_proto_field_name=True, including_default_value_fields=True)
if 'data' in dict_obj:
B = base64.b64decode(dict_obj['data'])
action_proto_obj = getattr(
pb, dict_obj['name']).FromString(decode(B))
action_dict_obj = MessageToDict(
action_proto_obj, preserving_proto_field_name=True, including_default_value_fields=True)
dict_obj['data'] = action_dict_obj
msg_id = self.tot
else:
msg_id = unpack('<H', buf[1:3])[0] # 小端序解析报文编号(0~255)
msg_block = fromProtobuf(buf[3:]) # 解析剩余报文结构
if msg_type == MsgType.Req:
assert(msg_id < 1 << 16)
assert(len(msg_block) == 2)
assert(msg_id not in self.res_type)
method_name = msg_block[0]['data'].decode()
_, lq, service, rpc = method_name.split('.')
proto_domain = self.jsonProto['nested'][lq]['nested'][service]['methods'][rpc]
liqi_pb2_req = getattr(pb, proto_domain['requestType'])
proto_obj = liqi_pb2_req.FromString(msg_block[1]['data'])
dict_obj = MessageToDict(
proto_obj, preserving_proto_field_name=True, including_default_value_fields=True)
self.res_type[msg_id] = (method_name, getattr(
pb, proto_domain['responseType'])) # wait response
elif msg_type == MsgType.Res:
assert(len(msg_block[0]['data']) == 0)
assert(msg_id in self.res_type)
method_name, liqi_pb2_res = self.res_type.pop(msg_id)
proto_obj = liqi_pb2_res.FromString(msg_block[1]['data'])
dict_obj = MessageToDict(
proto_obj, preserving_proto_field_name=True, including_default_value_fields=True)
result = {'id': msg_id, 'type': msg_type,
'method': method_name, 'data': dict_obj}
self.tot += 1
return result
def fromProtobuf(buf) -> List[Dict]:
"""
dump the struct of protobuf,观察报文结构
buf: protobuf bytes
"""
p = 0
result = []
while(p < len(buf)):
block_begin = p
block_type = (buf[p] & 7)
block_id = buf[p] >> 3
p += 1
if block_type == 0:
#varint
block_type = 'varint'
data, p = parseVarint(buf, p)
elif block_type == 2:
#string
block_type = 'string'
s_len, p = parseVarint(buf, p)
data = buf[p:p+s_len]
p += s_len
else:
raise Exception('unknow type:', block_type, ' at', p)
result.append({'id': block_id, 'type': block_type,
'data': data, 'begin': block_begin})
return result
def parseVarint(buf, p):
# parse a varint from protobuf
data = 0
base = 0
while(p < len(buf)):
data += (buf[p] & 127) << base
base += 7
p += 1
if buf[p-1] >> 7 == 0:
break
return (data, p)
def decode(data: bytes):
keys = [0x84, 0x5e, 0x4e, 0x42, 0x39, 0xa2, 0x1f, 0x60, 0x1c]
data = bytearray(data)
k = len(keys)
d = len(data)
for i, j in enumerate(data):
u = (23 ^ d) + 5 * i + keys[i % k] & 255
data[i] ^= u
return bytes(data)