forked from alibaba/jvm-sandbox-repeater
-
Notifications
You must be signed in to change notification settings - Fork 0
/
DefaultEventListener.java
333 lines (314 loc) · 11.7 KB
/
DefaultEventListener.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
package com.alibaba.jvm.sandbox.repeater.plugin.core.impl.api;
import com.alibaba.jvm.sandbox.api.ProcessControlException;
import com.alibaba.jvm.sandbox.api.event.*;
import com.alibaba.jvm.sandbox.api.event.Event.Type;
import com.alibaba.jvm.sandbox.api.listener.EventListener;
import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationListener;
import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationProcessor;
import com.alibaba.jvm.sandbox.repeater.plugin.core.bridge.ClassloaderBridge;
import com.alibaba.jvm.sandbox.repeater.plugin.core.cache.RecordCache;
import com.alibaba.jvm.sandbox.repeater.plugin.core.cache.RepeatCache;
import com.alibaba.jvm.sandbox.repeater.plugin.core.model.ApplicationModel;
import com.alibaba.jvm.sandbox.repeater.plugin.core.serialize.SerializeException;
import com.alibaba.jvm.sandbox.repeater.plugin.core.trace.SequenceGenerator;
import com.alibaba.jvm.sandbox.repeater.plugin.core.trace.TraceContext;
import com.alibaba.jvm.sandbox.repeater.plugin.core.trace.Tracer;
import com.alibaba.jvm.sandbox.repeater.plugin.core.wrapper.SerializerWrapper;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.Invocation;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.InvokeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* {@link DefaultEventListener} 默认的事件监听器实现类
* <p>
* 事件监听实现主要对接sandbox分发过来的事件
* <p>
* 基于普通单个方法的录制(一个方法的around事件before/return/throw录制入参和返回值)该实现类可以直接完成需求
* <p>
* 对于多个入口方法组合(例如:onRequest获取入参onResponse获取返回值)这种情况,需要重写 doBefore/doRequest/doThrow 自己控制流程
* </p>
*
* @author zhaoyb1990
*/
public class DefaultEventListener implements EventListener {
protected static Logger log = LoggerFactory.getLogger(DefaultEventListener.class);
protected final InvokeType invokeType;
protected final boolean entrance;
protected final InvocationListener listener;
protected final InvocationProcessor processor;
/**
* 事件偏移量计算;由于java方法本身存在多层嵌套调用,在一次invoke中,我们只录制最外层拦截到的方法,往下递归的树不做拦截
* <p>
* 通过事件偏移量进行判断
*/
private ThreadLocal<AtomicInteger> eventOffset = new ThreadLocal<AtomicInteger>() {
@Override
protected AtomicInteger initialValue() {
return new AtomicInteger(0);
}
};
public DefaultEventListener(InvokeType invokeType, boolean entrance,
InvocationListener listener,
InvocationProcessor processor) {
this.invokeType = invokeType;
this.entrance = entrance;
this.listener = listener;
this.processor = processor;
}
@Override
public void onEvent(Event event) throws Throwable {
try {
/*
* event过滤;针对单个listener,只处理top的事件
*/
if (!isTopEvent(event)) {
if (log.isDebugEnabled()) {
log.debug("not top event ,type={},event={},offset={}", invokeType, event, eventOffset.get().get());
}
return;
}
/*
* 初始化Tracer
*/
initContext(event);
/*
* 执行基础过滤
*/
if (!access(event)) {
if (log.isDebugEnabled()) {
log.debug("event access failed,type={},event={}", invokeType, event);
}
return;
}
/*
* 执行采样计算(只有entrance插件负责计算采样,子调用插件不计算)
*/
if (!sample(event)) {
if (log.isDebugEnabled()) {
log.debug("event missing sample rule,type={},event={}", invokeType, event);
}
return;
}
/*
* processor filter
*/
if (processor != null && processor.ignoreEvent((InvokeEvent) event)) {
if (log.isDebugEnabled()) {
log.debug("event is ignore by processor,type={},event={},processor={}", invokeType, event, processor);
}
return;
}
/*
* 分发事件处理(对于一次around事件可以收集到入参/返回值的可以直接使用;需要从多次before实践获取的)
*/
switch (event.type) {
case BEFORE:
doBefore((BeforeEvent) event);
break;
case RETURN:
doReturn((ReturnEvent) event);
break;
case THROWS:
doThrow((ThrowsEvent) event);
break;
default:
break;
}
} catch (ProcessControlException pe) {
/*
* sandbox流程干预
*/
// process control 会中断事件,不会有return/throw事件过来,因此需要清除偏移量
eventOffset.remove();
throw pe;
} catch (Throwable throwable) {
// uncaught exception
log.error("[Error-0000]-uncaught exception occurred when dispatch event,type={},event={}", invokeType, event, throwable);
ApplicationModel.instance().exceptionOverflow(throwable);
} finally {
/*
* 入口插件 && 完成事件
*/
clearContext(event);
}
}
/**
* 处理before事件
*
* @param event before事件
*/
protected void doBefore(BeforeEvent event) throws ProcessControlException {
// 回放流量;如果是入口则放弃;子调用则进行mock
if (RepeatCache.isRepeatFlow(Tracer.getTraceId())) {
processor.doMock(event, entrance, invokeType);
return;
}
Invocation invocation = initInvocation(event);
invocation.setStart(System.currentTimeMillis());
invocation.setTraceId(Tracer.getTraceId());
invocation.setIndex(entrance ? 0 : SequenceGenerator.generate(Tracer.getTraceId()));
invocation.setIdentity(processor.assembleIdentity(event));
invocation.setEntrance(entrance);
invocation.setType(invokeType);
invocation.setProcessId(event.processId);
invocation.setInvokeId(event.invokeId);
invocation.setRequest(processor.assembleRequest(event));
invocation.setResponse(processor.assembleResponse(event));
invocation.setSerializeToken(ClassloaderBridge.instance().encode(event.javaClassLoader));
try {
// fix issue#14 : useGeneratedKeys
if (processor.inTimeSerializeRequest(invocation, event)) {
SerializerWrapper.inTimeSerialize(invocation);
}
} catch (SerializeException e) {
Tracer.getContext().setSampled(false);
log.error("Error occurred serialize", e);
}
RecordCache.cacheInvocation(event.invokeId, invocation);
}
/**
* 初始化invocation
* 放开给插件重写,可以初始化自定义的调用描述类型,模块不感知插件的类型
*
* @param beforeEvent before事件
* @return 一次调用
*/
protected Invocation initInvocation(BeforeEvent beforeEvent) {
return new Invocation();
}
/**
* 计算采样率
*
* @param event 事件
* @return 是否采样
*/
protected boolean sample(Event event) {
if (entrance && event.type == Type.BEFORE) {
return Tracer.getContext().inTimeSample(invokeType);
} else {
final TraceContext context = Tracer.getContext();
return context != null && context.isSampled();
}
}
/**
* 处理return事件
*
* @param event return事件
*/
protected void doReturn(ReturnEvent event) {
if (RepeatCache.isRepeatFlow(Tracer.getTraceId())) {
return;
}
Invocation invocation = RecordCache.getInvocation(event.invokeId);
if (invocation == null) {
log.debug("no valid invocation found in return,type={},traceId={}", invokeType, Tracer.getTraceId());
return;
}
invocation.setResponse(processor.assembleResponse(event));
invocation.setEnd(System.currentTimeMillis());
listener.onInvocation(invocation);
}
/**
* 处理throw事件
*
* @param event throw事件
*/
protected void doThrow(ThrowsEvent event) {
if (RepeatCache.isRepeatFlow(Tracer.getTraceId())) {
return;
}
Invocation invocation = RecordCache.getInvocation(event.invokeId);
if (invocation == null) {
log.debug("no valid invocation found in throw,type={},traceId={}", invokeType, Tracer.getTraceId());
return;
}
invocation.setThrowable(processor.assembleThrowable(event));
invocation.setEnd(System.currentTimeMillis());
listener.onInvocation(invocation);
}
/**
* 关注的事件
*
* @param event 事件
* @return 是否关注
*/
protected boolean isTopEvent(Event event) {
boolean isTop;
switch (event.type) {
case THROWS:
case RETURN:
isTop = eventOffset.get().decrementAndGet() == 0;
break;
case BEFORE:
isTop = eventOffset.get().getAndIncrement() == 0;
break;
default:
isTop = false;
break;
}
// eventOffset == 0 代表是该线程的最后一次return/throw事件,需要主动清理资源
if (eventOffset.get().get() == 0) {
eventOffset.remove();
}
return isTop;
}
/**
* 初始化上下文;
* 只有entrance插件负责初始化和清理上下文
* 子调用无需关心traceContext信息(多线程情况下由ttl负责copy和restore,单线程由entrance负责管理)
*
* @param event 事件
*/
protected void initContext(Event event) {
if (entrance && isEntranceBegin(event)) {
Tracer.start();
}
}
/**
* 是否入口处理开始
*
* @param event 事件
* @return true/false
*/
protected boolean isEntranceBegin(Event event) {
return event.type == Type.BEFORE;
}
/**
* 清理上下文
*
* @param event 事件
*/
private void clearContext(Event event) {
if (entrance && isEntranceFinish(event)) {
Tracer.end();
}
}
/**
* 是否入口处理完成;非around事件需要重写
*
* @param event 事件
* @return true/false
*/
protected boolean isEntranceFinish(Event event) {
return event.type != Type.BEFORE
// 开启trace的类型负责清理
&& Tracer.getContext().getInvokeType() == invokeType;
}
/**
* 事件是否可以通过
* <p>
* 降级之后只有回放流量可以通过
*
* @param event 事件
* @return 是否通过
*/
protected boolean access(Event event) {
return ApplicationModel.instance().isWorkingOn() &&
(!ApplicationModel.instance().isDegrade() || RepeatCache.isRepeatFlow(Tracer.getTraceId()));
}
@Override
public String toString() {
return "DefaultEventListener:invokeType=" + invokeType + ";entrance=" + entrance;
}
}