forked from fffonion/MAClient
-
Notifications
You must be signed in to change notification settings - Fork 0
/
maclient_plugin.py
186 lines (172 loc) · 6.58 KB
/
maclient_plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#!/usr/bin/env python
# coding:utf-8
# maclient plugin loader and hooker
# Contributor:
# fffonion <[email protected]>
import os, os.path as opath
import sys
import glob
from cross_platform import *
# for plugin use
if PYTHON3:
import urllib.request as urllib2
else:
import urllib2
sys.path.append(opath.join(getPATH0, 'plugins'))
# os.chdir(opath.join(getPATH0(),'plugins'))
# sys.path[0]=os.path.abspath(opath.join(getPATH0,'plugins'))
PREF_ENTER = 'ENTER_'
PREF_EXIT = 'EXIT_'
class plugins():
def __init__(self, logger, show_tip = True):
self.logger = logger
# 是否显示插件tip
self.show_tip = show_tip
self.has_shown_tips = False
# 所有插件模块对象
self.plugins = {}
# 所有插件模块中的plugin实例
self.plugins_instance = {}
# 新增cli命令字典
self.extra_cmd = {}
# 从maclient实例映射的变量
self.val_dict = {}
self.load_plugins()
# self.scan_hooks()
self.enable = True
def scan_hooks(self):
self.hook_reg = {}
ALL_ACTIONS = ['tasker', 'auto_check', 'check_strict_bc', 'set_card', 'red_tea', 'green_tea',
'explore', '_explore_floor', 'gacha', 'select_card_sell', 'fairy_battle_loop', 'fairy_select', '_fairy_battle',
'like', 'friends', 'reward_box', 'point_setting', 'factor_battle', 'invoke_autoset']
# scan plugin hooks
for p in self.plugins:
if self.show_tip and not self.has_shown_tips:
try:
print('%s:%s' % (p, du8(self.plugins[p].__tip__)))
except AttributeError:
pass
# extra cmd
ecmd = self._get_module_meta(p, 'extra_cmd')
for e in ecmd:
hdl = self._get_module_meta(p, ecmd[e])
if hdl:
self.extra_cmd[e] = hdl
# function hook
for act in ALL_ACTIONS:
for method in [PREF_ENTER, PREF_EXIT]: # enter, exit
key = '%s%s' % (method, act)
if key not in self.hook_reg:
self.hook_reg[key] = {}
if key in self._get_module_meta(p, 'hooks'): # add to hook reg
# priority record
self.hook_reg[key][p] = self._get_module_meta(p, 'hooks')[key]
self.has_shown_tips = True
# def set_enable(self,lst):
# pass
def set_maclient_val(self, val_dict):
self.val_dict = val_dict
def do_extra_cmd(self, cmd):
ops = cmd.split(' ')
if self.enable:
return self.extra_cmd[ops[0]](self.val_dict)(' '.join(ops[1:]))
else:
self.logger.warning('Plugins not enabled.')
def set_disable(self, lst):
for p in lst:
if p and (p in self.plugins):
del(self.plugins[p])
def _get_module_meta(self, mod, key):
# module.xxx
try:
return getattr(self.plugins[mod], key)
except AttributeError:
self.logger.warning('"%s" not found in module "%s"' % (key, mod))
return []
def _get_plugin_attr(self, mod, attr):
# module.plugin.xxx
try:
return getattr(self.plugins_instance[mod], attr)
except AttributeError:
self.logger.warning('Get "%s" failed from "%s" ' % (attr, mod))
return []
def _do_hook(self, action, *args, **kwargs):
if action in self.hook_reg:
for plugin in sorted(self.hook_reg[action].items(), key = lambda d:d[1], reverse = True): # high priority first
f = self._get_plugin_attr(plugin[0], action)
if f:
ret = f(*args, **kwargs)
if ret: # has mod on params
args, kwargs = ret
args = args[1:] # cut off caller instance variable
return args, kwargs
def load_plugins(self):
import glob
plugin_dir = opath.abspath(opath.join(getPATH0, 'plugins'))
mods = glob.glob(opath.join(plugin_dir, '*.pyd')) + \
glob.glob(opath.join(plugin_dir, '*.py')) + \
glob.glob(opath.join(plugin_dir, '*.pyc')) + \
glob.glob(opath.join(plugin_dir, '*.pyo'))
# mods=[]
modstr = ''
for m in mods:
m = opath.splitext(opath.split(m)[1])[0]
if m.startswith('_'):
continue
if m not in self.plugins:
# module object
try:
self.plugins[m] = __import__(m)
except ImportError:
self.logger.warning('%s is disabled due to an Import Error' % m)
else:
# plugin instance
try:
self.plugins_instance[m] = self.plugins[m].plugin()
modstr = '%s,%s' % (modstr, m)
except AttributeError:
# no plugin() class
self.plugins_instance[m] = None
def _line_tracer(self):
# originally from http://stackoverflow.com/questions/19227636/decorator-to-log-function-execution-line-by-line
# it works almostly the same as module 'memory_profiler'
# not working yet
traced_func.add(func.func_code)
def _wrapper(*args, **kwargs): # need a wrap
old_trace_function = sys.gettrace()
sys.settrace(logging_tracer)
try:
result = func(*args, **kwargs)
except:
raise
else:
return result
finally:
sys.settrace(old_trace_function)
return _wrapper
def func_hook(self, func):
def do(*args, **kwargs):
if self.enable:
ret = self._do_hook('%s%s' % (PREF_ENTER, func.__name__), *args, **kwargs)
args, kwargs = ret
ret = func(*args, **kwargs)
self._do_hook('%s%s' % (PREF_EXIT, func.__name__), *args, **kwargs)
return ret
else:
return func(*args, **kwargs) # passby
return do
def line_hook(self):
pass
# def __call__(self,func):
# return self.func_hook(func)
if __name__ == '__main__':
import maclient_logging
p = plugins(maclient_logging.Logging('logging'))
# p.set_disable(['hehe'])
# print p.plugins,p.hook_reg
class a():
@p.func_hook
def explore(a, b, c, d = 1):
print(1)
aa = a()
aa.explore(1, 2, 3, d = 3)